From 0449d3955f3a7bab4e354857c6d1bb014cae51b1 Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Fri, 10 Mar 2023 11:09:38 +0800 Subject: [PATCH] fix(recovery): wait_epoch should be called in post_collect of recovery command (close #8467) --- src/meta/src/barrier/command.rs | 20 +++++++++++--------- src/meta/src/barrier/mod.rs | 1 + src/meta/src/barrier/recovery.rs | 1 + 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 0353aa074f688..f0727fc8fb7d0 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -215,6 +215,7 @@ pub struct CommandContext { pub command: Command, pub checkpoint: bool, + is_recovery: bool, source_manager: SourceManagerRef, } @@ -230,6 +231,7 @@ impl CommandContext { curr_epoch: Epoch, command: Command, checkpoint: bool, + is_recovery: bool, source_manager: SourceManagerRef, ) -> Self { Self { @@ -241,6 +243,7 @@ impl CommandContext { curr_epoch, command, checkpoint, + is_recovery, source_manager, } } @@ -498,12 +501,13 @@ where pub async fn post_collect(&self) -> MetaResult<()> { match &self.command { #[allow(clippy::single_match)] - Command::Plain(mutation) => match mutation { - // After the `Pause` barrier is collected and committed, we must ensure that the - // storage version with this epoch is synced to all compute nodes before the - // execution of the next command of `Update`, as some newly created operators may - // immediately initialize their states on that barrier. - Some(Mutation::Pause(..)) => { + Command::Plain(mutation) => { + // After the `Pause` barrier or the recovery barrier is collected and committed, we + // must ensure that the storage version with this epoch is synced to + // all compute nodes before the execution of the next command of + // `Update`, as some newly created operators may immediately + // initialize their states on that barrier. + if self.is_recovery || matches!(mutation, Some(Mutation::Pause(..))) { let futures = self.info.node_map.values().map(|worker_node| async { let client = self.client_pool.get(worker_node).await?; let request = WaitEpochCommitRequest { @@ -514,9 +518,7 @@ where try_join_all(futures).await?; } - - _ => {} - }, + } Command::SourceSplitAssignment(split_assignment) => { self.fragment_manager diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index c3cb91ff78e55..f2ad72c779897 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -648,6 +648,7 @@ where new_epoch, command, checkpoint, + false, self.source_manager.clone(), )); let mut notifiers = notifiers; diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 589b75c8b645f..d0cb095a854ea 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -163,6 +163,7 @@ where new_epoch, command, true, + true, self.source_manager.clone(), ));