From 7216d55f2f4954262fd72cbf0365cdf41fd1c7e7 Mon Sep 17 00:00:00 2001 From: August Date: Tue, 27 Dec 2022 20:27:33 +0800 Subject: [PATCH] fix: clean states in local barrier manager after actor dropped (#7082) Trying to fix continuous recovery found in longevity and chaos test. I found that two problems might be the root cause of continuous recovery: 1. Fixed, unnecessary recovery triggered as described in #6989 . As I tested locally, when workload was very high, there were many ongoing barrier collect responses(up to 80+) when recovery. After recovery finished, each response would trigger a recovery process, because the whole cluster has already reset to previous committed epoch. 2. Before this PR, when force stopping actors in CN, the local manger will clean all states and then abort all actors. The problem is between cleaning states and aborting actors, the actors could also report epoch collected or error status to local barrier manager especially when the number of actors is high. This will cause a chain reaction in recovery. I tested it locally and the recovery became normal. Besides, it could also be the cause of #6639 , #6715 . Approved-By: fuyufjh Approved-By: BugenZhao --- src/stream/src/task/barrier_manager/managed_state.rs | 1 + src/stream/src/task/stream_manager.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 5e12df5aa5a4c..2d1d522fb2be9 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -153,6 +153,7 @@ impl ManagedBarrierState { /// Clear and reset all states. pub(crate) fn clear_all_states(&mut self) { + tracing::debug!("clear all states in local barrier manager"); self.epoch_barrier_state_map.clear(); self.create_mview_progress.clear(); self.failure_actors.clear(); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 9c70859326600..836c287a3a4b3 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -307,10 +307,10 @@ impl LocalStreamManager { /// Force stop all actors on this worker. pub async fn stop_all_actors(&self) -> StreamResult<()> { + self.core.lock().await.drop_all_actors(); // Clear shared buffer in storage to release memory self.clear_storage_buffer().await; self.clear_all_senders_and_collect_rx(); - self.core.lock().await.drop_all_actors(); Ok(()) }