Skip to content

Commit

Permalink
fix: clean states in local barrier manager after actor dropped (#7082)
Browse files Browse the repository at this point in the history
Trying to fix continuous recovery found in longevity and chaos test. I found that two problems might be the root cause of continuous recovery:
1. Fixed, unnecessary recovery triggered as described in #6989 . As I tested locally, when workload was very high, there were many ongoing barrier collect responses(up to 80+) when recovery. After recovery finished, each response would trigger a recovery process, because the whole cluster has already reset to previous committed epoch.
2. Before this PR, when force stopping actors in CN, the local manger will clean all states and then abort all actors. The problem is between cleaning states and aborting actors, the actors could also report epoch collected or error status to local barrier manager especially when the number of actors is high. This will cause a chain reaction in recovery.

I tested it locally and the recovery became normal. Besides, it could also be the cause of #6639 , #6715 .

Approved-By: fuyufjh
Approved-By: BugenZhao
  • Loading branch information
yezizp2012 authored and lmatz committed Jan 3, 2023
1 parent d173e5d commit 7216d55
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ impl ManagedBarrierState {

/// Clear and reset all states.
pub(crate) fn clear_all_states(&mut self) {
tracing::debug!("clear all states in local barrier manager");
self.epoch_barrier_state_map.clear();
self.create_mview_progress.clear();
self.failure_actors.clear();
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,10 @@ impl LocalStreamManager {

/// Force stop all actors on this worker.
pub async fn stop_all_actors(&self) -> StreamResult<()> {
self.core.lock().await.drop_all_actors();
// Clear shared buffer in storage to release memory
self.clear_storage_buffer().await;
self.clear_all_senders_and_collect_rx();
self.core.lock().await.drop_all_actors();

Ok(())
}
Expand Down

0 comments on commit 7216d55

Please sign in to comment.