Skip to content

Commit

Permalink
fix(recovery): wait_epoch should be called in post_collect of recover…
Browse files Browse the repository at this point in the history
…y command (close #8467)
  • Loading branch information
Liang Zhao committed Mar 10, 2023
1 parent 2f626d9 commit 0449d39
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
20 changes: 11 additions & 9 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ pub struct CommandContext<S: MetaStore> {
pub command: Command,

pub checkpoint: bool,
is_recovery: bool,

source_manager: SourceManagerRef<S>,
}
Expand All @@ -230,6 +231,7 @@ impl<S: MetaStore> CommandContext<S> {
curr_epoch: Epoch,
command: Command,
checkpoint: bool,
is_recovery: bool,
source_manager: SourceManagerRef<S>,
) -> Self {
Self {
Expand All @@ -241,6 +243,7 @@ impl<S: MetaStore> CommandContext<S> {
curr_epoch,
command,
checkpoint,
is_recovery,
source_manager,
}
}
Expand Down Expand Up @@ -498,12 +501,13 @@ where
pub async fn post_collect(&self) -> MetaResult<()> {
match &self.command {
#[allow(clippy::single_match)]
Command::Plain(mutation) => match mutation {
// After the `Pause` barrier is collected and committed, we must ensure that the
// storage version with this epoch is synced to all compute nodes before the
// execution of the next command of `Update`, as some newly created operators may
// immediately initialize their states on that barrier.
Some(Mutation::Pause(..)) => {
Command::Plain(mutation) => {
// After the `Pause` barrier or the recovery barrier is collected and committed, we
// must ensure that the storage version with this epoch is synced to
// all compute nodes before the execution of the next command of
// `Update`, as some newly created operators may immediately
// initialize their states on that barrier.
if self.is_recovery || matches!(mutation, Some(Mutation::Pause(..))) {
let futures = self.info.node_map.values().map(|worker_node| async {
let client = self.client_pool.get(worker_node).await?;
let request = WaitEpochCommitRequest {
Expand All @@ -514,9 +518,7 @@ where

try_join_all(futures).await?;
}

_ => {}
},
}

Command::SourceSplitAssignment(split_assignment) => {
self.fragment_manager
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ where
new_epoch,
command,
checkpoint,
false,
self.source_manager.clone(),
));
let mut notifiers = notifiers;
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ where
new_epoch,
command,
true,
true,
self.source_manager.clone(),
));

Expand Down

0 comments on commit 0449d39

Please sign in to comment.