Skip to content

Commit

Permalink
refactor(meta): unify hummock manager worker (#6675)
Browse files Browse the repository at this point in the history
refactor(meta): add hummock manager worker

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
zwang28 and mergify[bot] authored Nov 30, 2022
1 parent bb051f7 commit 3ec1ea0
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 190 deletions.
15 changes: 11 additions & 4 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ mod tests;
mod versioning;
use versioning::*;
mod compaction;
mod worker;

use compaction::*;

type Snapshot = ArcSwap<HummockSnapshot>;
Expand Down Expand Up @@ -109,6 +111,7 @@ pub struct HummockManager<S: MetaStore> {
compaction_tasks_to_cancel: parking_lot::Mutex<Vec<HummockCompactionTaskId>>,

compactor_manager: CompactorManagerRef,
event_sender: HummockManagerEventSender,
}

pub type HummockManagerRef<S> = Arc<HummockManager<S>>;
Expand Down Expand Up @@ -191,6 +194,7 @@ pub(crate) use start_measure_real_process_timer;

use self::compaction_group_manager::CompactionGroupManagerInner;
use super::Compactor;
use crate::hummock::manager::worker::HummockManagerEventSender;

static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
[
Expand Down Expand Up @@ -221,7 +225,7 @@ where
cluster_manager: ClusterManagerRef<S>,
metrics: Arc<MetaMetrics>,
compactor_manager: CompactorManagerRef,
) -> Result<HummockManager<S>> {
) -> Result<HummockManagerRef<S>> {
let compaction_group_manager = Self::build_compaction_group_manager(&env).await?;
Self::with_compaction_group_manager(
env,
Expand All @@ -240,7 +244,7 @@ where
metrics: Arc<MetaMetrics>,
compactor_manager: CompactorManagerRef,
config: CompactionConfig,
) -> Result<HummockManager<S>> {
) -> Result<HummockManagerRef<S>> {
let compaction_group_manager =
Self::build_compaction_group_manager_with_config(&env, config).await?;
Self::with_compaction_group_manager(
Expand All @@ -259,7 +263,8 @@ where
metrics: Arc<MetaMetrics>,
compactor_manager: CompactorManagerRef,
compaction_group_manager: tokio::sync::RwLock<CompactionGroupManagerInner<S>>,
) -> Result<HummockManager<S>> {
) -> Result<HummockManagerRef<S>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let instance = HummockManager {
env,
versioning: MonitoredRwLock::new(
Expand All @@ -281,8 +286,10 @@ where
committed_epoch: INVALID_EPOCH,
current_epoch: INVALID_EPOCH,
}),
event_sender: tx,
};

let instance = Arc::new(instance);
instance.start_worker(rx).await;
instance.load_meta_store_state().await?;
instance.release_invalid_contexts().await?;
instance.cancel_unassigned_compaction_task().await?;
Expand Down
52 changes: 0 additions & 52 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,58 +410,6 @@ async fn test_context_id_validation() {
hummock_manager.pin_version(context_id).await.unwrap();
}

// This is a non-deterministic test depending on the use of timeouts
#[cfg(madsim)]
#[tokio::test]
async fn test_context_id_invalidation() {
use crate::hummock::start_local_notification_receiver;
let (env, hummock_manager, cluster_manager, worker_node) = setup_compute_env(80).await;
let (member_join, member_shutdown) = start_local_notification_receiver(
hummock_manager.clone(),
hummock_manager.compactor_manager_ref_for_test(),
env.notification_manager_ref(),
)
.await;
let invalid_context_id = HummockContextId::MAX;
let context_id = worker_node.id;

// Invalid context id is rejected.
let error = hummock_manager
.pin_version(invalid_context_id)
.await
.unwrap_err();
assert!(matches!(error, Error::InvalidContext(_)));

// Valid context id is accepted.
hummock_manager.pin_version(context_id).await.unwrap();
// Pin multiple times is OK.
hummock_manager.pin_version(context_id).await.unwrap();

// Remove the node from cluster will invalidate context id by clearing
// the invalidated pinned versions.
cluster_manager
.delete_worker_node(worker_node.host.unwrap())
.await
.unwrap();

// Notification of local subscribers and resultant deletion of worker node from
// the Hummock manager needs time to complete. This test can run for a maximum of 10 seconds.
// (in practice, this usually succeeds on first try)
let mut success = false;
for _ in 0..40 {
if hummock_manager.pin_version(context_id).await.is_err() {
success = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
}
member_shutdown.send(()).unwrap();
member_join.await.unwrap();
if !success {
panic!("context_id did not get invalidated")
}
}

#[tokio::test]
async fn test_hummock_manager_basic() {
let (_env, hummock_manager, cluster_manager, worker_node) = setup_compute_env(1).await;
Expand Down
154 changes: 154 additions & 0 deletions src/meta/src/hummock/manager/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use sync_point::sync_point;
use tokio::task::JoinHandle;
use tokio_retry::strategy::{jitter, ExponentialBackoff};

use crate::hummock::utils::RetryableError;
use crate::hummock::{HummockManager, HummockManagerRef};
use crate::manager::LocalNotification;
use crate::storage::MetaStore;

pub type HummockManagerEventSender = tokio::sync::mpsc::UnboundedSender<HummockManagerEvent>;
pub type HummockManagerEventReceiver = tokio::sync::mpsc::UnboundedReceiver<HummockManagerEvent>;

pub enum HummockManagerEvent {
Shutdown,
}

impl<S> HummockManager<S>
where
S: MetaStore,
{
pub(crate) async fn start_worker(
self: &HummockManagerRef<S>,
mut receiver: HummockManagerEventReceiver,
) -> JoinHandle<()> {
let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();
self.env
.notification_manager()
.insert_local_sender(local_notification_tx)
.await;
let hummock_manager = self.clone();
tokio::spawn(async move {
loop {
tokio::select! {
notification = local_notification_rx.recv() => {
match notification {
Some(notification) => {
hummock_manager
.handle_local_notification(notification)
.await;
}
None => {
return;
}
}
}
hummock_manager_event = receiver.recv() => {
match hummock_manager_event {
Some(hummock_manager_event) => {
if !hummock_manager
.handle_hummock_manager_event(hummock_manager_event)
.await {
return;
}
}
None => {
return;
}
}
}
}
}
})
}

/// Returns false indicates to shutdown worker
#[expect(clippy::unused_async)]
async fn handle_hummock_manager_event(&self, event: HummockManagerEvent) -> bool {
match event {
HummockManagerEvent::Shutdown => {
tracing::info!("Hummock manager worker is stopped");
false
}
}
}

async fn handle_local_notification(&self, notification: LocalNotification) {
let retry_strategy = ExponentialBackoff::from_millis(10)
.max_delay(Duration::from_secs(60))
.map(jitter);
match notification {
LocalNotification::WorkerNodeIsDeleted(worker_node) => {
self.compactor_manager.remove_compactor(worker_node.id);
tokio_retry::RetryIf::spawn(
retry_strategy.clone(),
|| async {
if let Err(err) = self.release_contexts(vec![worker_node.id]).await {
tracing::warn!(
"Failed to release hummock context {}. {}. Will retry.",
worker_node.id,
err
);
return Err(err);
}
Ok(())
},
RetryableError::default(),
)
.await
.expect("retry until success");
tracing::info!("Released hummock context {}", worker_node.id);
sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC");
}
// TODO move `CompactionTaskNeedCancel` to `handle_hummock_manager_event`
// TODO extract retry boilerplate code
LocalNotification::CompactionTaskNeedCancel(compact_task) => {
let task_id = compact_task.task_id;
tokio_retry::RetryIf::spawn(
retry_strategy.clone(),
|| async {
let mut compact_task_mut = compact_task.clone();
if let Err(err) = self.cancel_compact_task_impl(&mut compact_task_mut).await
{
tracing::warn!(
"Failed to cancel compaction task {}. {}. Will retry.",
compact_task.task_id,
err
);
return Err(err);
}
Ok(())
},
RetryableError::default(),
)
.await
.expect("retry until success");
tracing::info!("Cancelled compaction task {}", task_id);
sync_point!("AFTER_CANCEL_COMPACTION_TASK_ASYNC");
}
}
}

pub fn try_send_event(&self, event: HummockManagerEvent) {
if let Err(e) = self.event_sender.send(event) {
tracing::warn!("failed to send event to hummock manager {}", e);
}
}
}
Loading

0 comments on commit 3ec1ea0

Please sign in to comment.