Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(recovery): wait_epoch should be called in recovery (close #8467) #8468

Merged
merged 1 commit into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
use risingwave_pb::stream_plan::add_mutation::Dispatchers;
use risingwave_pb::stream_plan::barrier::Mutation;
Expand Down Expand Up @@ -493,6 +494,18 @@ where
Ok(())
}

pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> {
let futures = self.info.node_map.values().map(|worker_node| async {
let client = self.client_pool.get(worker_node).await?;
let request = WaitEpochCommitRequest { epoch };
client.wait_epoch_commit(request).await
});

try_join_all(futures).await?;

Ok(())
}

/// Do some stuffs after barriers are collected and the new storage version is committed, for
/// the given command.
pub async fn post_collect(&self) -> MetaResult<()> {
Expand All @@ -504,15 +517,7 @@ where
// execution of the next command of `Update`, as some newly created operators may
// immediately initialize their states on that barrier.
Some(Mutation::Pause(..)) => {
let futures = self.info.node_map.values().map(|worker_node| async {
let client = self.client_pool.get(worker_node).await?;
let request = WaitEpochCommitRequest {
epoch: self.prev_epoch.0,
};
client.wait_epoch_commit(request).await
});

try_join_all(futures).await?;
self.wait_epoch_commit(self.prev_epoch.0).await?;
}

_ => {}
Expand Down
15 changes: 15 additions & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,21 @@ where
self.source_manager.clone(),
));

#[cfg(not(all(test, feature = "failpoints")))]
{
use risingwave_common::util::epoch::INVALID_EPOCH;

let mce = self
.hummock_manager
.get_current_version()
.await
.max_committed_epoch;

if mce != INVALID_EPOCH {
command_ctx.wait_epoch_commit(mce).await?;
}
}

let (barrier_complete_tx, mut barrier_complete_rx) =
tokio::sync::mpsc::unbounded_channel();
self.inject_barrier(command_ctx.clone(), barrier_complete_tx)
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ mod tests {
}

impl MockServices {
async fn start(host: &str, port: u16) -> MetaResult<Self> {
async fn start(host: &str, port: u16, enable_recovery: bool) -> MetaResult<Self> {
let addr = SocketAddr::new(host.parse().unwrap(), port);
let state = Arc::new(FakeFragmentState {
actor_streams: Mutex::new(HashMap::new()),
Expand All @@ -692,7 +692,7 @@ mod tests {

sleep(Duration::from_secs(1)).await;

let env = MetaSrvEnv::for_test_opts(Arc::new(MetaOpts::test(true))).await;
let env = MetaSrvEnv::for_test_opts(Arc::new(MetaOpts::test(enable_recovery))).await;
let system_params = env.system_params_manager().get_params().await;
let meta_metrics = Arc::new(MetaMetrics::new());
let cluster_manager =
Expand Down Expand Up @@ -868,7 +868,7 @@ mod tests {

#[tokio::test]
async fn test_drop_materialized_view() -> MetaResult<()> {
let services = MockServices::start("127.0.0.1", 12334).await?;
let services = MockServices::start("127.0.0.1", 12334, false).await?;

let table_id = TableId::new(0);
let actors = make_mview_stream_actors(&table_id, 4);
Expand Down Expand Up @@ -926,7 +926,7 @@ mod tests {
async fn test_failpoints_drop_mv_recovery() {
let inject_barrier_err = "inject_barrier_err";
let inject_barrier_err_success = "inject_barrier_err_success";
let services = MockServices::start("127.0.0.1", 12335).await.unwrap();
let services = MockServices::start("127.0.0.1", 12335, true).await.unwrap();

let table_id = TableId::new(0);
let actors = make_mview_stream_actors(&table_id, 4);
Expand Down