Skip to content

Commit

Permalink
feat(storage): support task reject on compactor side
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Dec 20, 2022
1 parent 86baf2b commit f4fddb1
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 112 deletions.
69 changes: 9 additions & 60 deletions src/meta/src/hummock/compaction_schedule_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,27 +307,7 @@ impl ScoredPolicy {
let running_task = *compactor_assigned_task_num
.get(&compactor.context_id())
.unwrap_or(&0);
// if running_task < 2 * compactor.max_concurrent_task_number() {
// // if let CompactorState::Idle(_) = compactor.state() {
// // return Some(compactor.clone());
// // }

// return Some(compactor.clone());
// } else {
// // compactor.set_state(new_state);
// }

// compactor busy
if matches!(compactor.state(), CompactorState::Busy(_)) {
continue;
}

if (matches!(self.state, CompactorState::Idle(_))
&& running_task > compactor.max_concurrent_task_number())
|| (matches!(self.state, CompactorState::Burst(_))
&& running_task > 2 * compactor.max_concurrent_task_number())
{
// policy idle or burst
if running_task > compactor.max_concurrent_task_number() {
continue;
}

Expand All @@ -342,45 +322,14 @@ impl CompactionSchedulePolicy for ScoredPolicy {
&mut self,
compactor_assigned_task_num: &HashMap<HummockContextId, u64>,
) -> Option<Arc<Compactor>> {
if let CompactorState::Busy(_) = self.state {
return None;
}
// if let CompactorState::Busy(_) = self.state {
// return None;
// }

if let Some(compactor) = self.fetch_idle_compactor(compactor_assigned_task_num) {
// if let CompactorState::Idle(_) = &self.state {
// // do not change state.
// } else {
// self.state = CompactorState::Idle(Instant::now());
// }

// self.refresh_state();
return Some(compactor);
}

// match self.state {
// CompactorState::Idle(_) => {
// // self.state = CompactorState::Burst(Instant::now());
// }
// CompactorState::Burst(last_burst) => {
// // if last_burst.elapsed().as_secs() > MAX_BURST_TIME {
// // self.state = CompactorState::Busy(Instant::now());
// // return None;
// // }
// }
// CompactorState::Busy(_) => {
// return None;
// }
// }

// for compactor in self.score_to_compactor.values() {
// let running_task = *compactor_assigned_task_num
// .get(&compactor.context_id())
// .unwrap_or(&0);
// if running_task < 2 * compactor.max_concurrent_task_number() {
// return Some(compactor.clone());
// }
// }
// self.state = CompactorState::Busy(Instant::now());
None
}

Expand Down Expand Up @@ -479,13 +428,13 @@ impl CompactionSchedulePolicy for ScoredPolicy {
}

fn refresh_state(&mut self) {
let idle_count = self
let busy_count = self
.score_to_compactor
.values()
.filter(|compactor| matches!(compactor.state(), CompactorState::Idle(_)))
.filter(|compactor| matches!(compactor.state(), CompactorState::Busy(_)))
.count();

if idle_count * 10 < self.score_to_compactor.len() * 2 {
if busy_count == self.score_to_compactor.len() {
match self.state {
CompactorState::Idle(_) => {
self.state = CompactorState::Burst(Instant::now());
Expand Down Expand Up @@ -516,8 +465,8 @@ impl CompactionSchedulePolicy for ScoredPolicy {
}

tracing::info!(
"refresh_state idle_count {} total {} state {:?}",
idle_count,
"refresh_state busy_count {} total {} state {:?}",
busy_count,
self.score_to_compactor.len(),
self.state
);
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,14 +473,14 @@ impl CompactorManager {
compactor.try_down_state();
}

self.policy.write().refresh_state();

tracing::info!(
"update_compactor_state cpu {} state {:?}",
workload.cpu,
compactor.state(),
);
}

self.policy.write().refresh_state();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ use self::compaction_group_manager::CompactionGroupManagerInner;
use super::Compactor;
use crate::hummock::manager::worker::HummockManagerEventSender;

static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
pub static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
[
TaskStatus::ManualCanceled,
TaskStatus::SendFailCanceled,
Expand Down
22 changes: 15 additions & 7 deletions src/meta/src/rpc/service/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tonic::{Request, Response, Status};
use crate::hummock::compaction::ManualCompactionOption;
use crate::hummock::{
CompactionResumeTrigger, CompactorManagerRef, HummockManagerRef, VacuumManagerRef,
CANCEL_STATUS_SET,
};
use crate::manager::FragmentManagerRef;
use crate::rpc::service::RwReceiverStream;
Expand Down Expand Up @@ -159,13 +160,20 @@ where
status: None,
})),
Some(mut compact_task) => {
self.hummock_manager
.report_compact_task(
req.context_id,
&mut compact_task,
Some(req.table_stats_change),
)
.await?;
let task_status = compact_task.task_status();
if CANCEL_STATUS_SET.contains(&task_status) {
self.hummock_manager
.cancel_compact_task(&mut compact_task, task_status)
.await?;
} else {
self.hummock_manager
.report_compact_task(
req.context_id,
&mut compact_task,
Some(req.table_stats_change),
)
.await?;
}
Ok(Response::new(ReportCompactionTasksResponse {
status: None,
}))
Expand Down
127 changes: 85 additions & 42 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,15 @@ impl Compactor {
}
}

pub fn pre_process_task(task: &mut Task, workload: &CompactorWorkload) {
const CPU_THRESHOLD: u32 = 80;
if let Task::CompactTask(mut compact_task) = task.clone() {
if workload.cpu > CPU_THRESHOLD && compact_task.input_ssts[0].level_idx != 0 {
compact_task.set_task_status(TaskStatus::ManualCanceled);
}
}
}

/// The background compaction thread that receives compaction tasks from hummock compaction
/// manager and runs compaction tasks.
#[cfg_attr(coverage, no_coverage)]
Expand All @@ -393,6 +402,9 @@ impl Compactor {
let shutdown_map = CompactionShutdownMap::default();
let mut min_interval = tokio::time::interval(stream_retry_interval);
let mut task_progress_interval = tokio::time::interval(task_progress_update_interval);

tokio::time::sleep(Duration::from_secs(1)).await;

// This outer loop is to recreate stream.
'start_stream: loop {
tokio::select! {
Expand Down Expand Up @@ -426,6 +438,7 @@ impl Compactor {

let executor = compactor_context.context.compaction_executor.clone();
let mut process_collector = LocalProcessCollector::new().unwrap();
let mut last_workload = CompactorWorkload::default();
// This inner loop is to consume stream or report task progress.
'consume_stream: loop {
let message = tokio::select! {
Expand All @@ -451,6 +464,8 @@ impl Compactor {
cpu,
};

last_workload = workload.clone();

if let Err(e) = hummock_meta_client.compactor_heartbeat(progress_list, workload).await {
// ignore any errors while trying to report task progress
tracing::warn!("Failed to report task progress. {e:?}");
Expand All @@ -468,68 +483,96 @@ impl Compactor {
match message {
Some(Ok(SubscribeCompactTasksResponse { task })) => {
let task = match task {
Some(task) => task,
Some(mut task) => {
Self::pre_process_task(&mut task, &last_workload);
task
}
None => continue 'consume_stream,
};

let shutdown = shutdown_map.clone();
let context = compactor_context.clone();
let meta_client = hummock_meta_client.clone();

executor.spawn(async move {
match task {
Task::CompactTask(compact_task) => {
match task {
Task::CompactTask(compact_task) => {
if matches!(
compact_task.task_status(),
TaskStatus::ManualCanceled
) {
let table_stats_map = TableStatsMap::default();
if let Err(e) = meta_client
.report_compaction_task(
compact_task.clone(),
table_stats_map,
)
.await
{
tracing::warn!(
"Failed to cancel compaction task: {}, error: {} last_cpu {}",
compact_task.task_id,
e,
last_workload.cpu
);
} else {
tracing::debug!(
"ManualCancel by cpu {} compaction task: {}",
last_workload.cpu,
compact_task.task_id,
);
}
} else {
let (tx, rx) = tokio::sync::oneshot::channel();
let task_id = compact_task.task_id;
shutdown
.lock()
.unwrap()
.insert(task_id, tx);
shutdown.lock().unwrap().insert(task_id, tx);
Compactor::compact(context, compact_task, rx).await;
shutdown.lock().unwrap().remove(&task_id);
}
Task::VacuumTask(vacuum_task) => {
Vacuum::vacuum(
vacuum_task,
context.context.sstable_store.clone(),
meta_client,
)
.await;
}
Task::FullScanTask(full_scan_task) => {
Vacuum::full_scan(
full_scan_task,
context.context.sstable_store.clone(),
meta_client,
)
.await;
}
Task::ValidationTask(validation_task) => {
validate_ssts(
validation_task,
context.context.sstable_store.clone(),
)
.await;
}
Task::CancelCompactTask(cancel_compact_task) => {
if let Some(tx) = shutdown
.lock()
.unwrap()
.remove(&cancel_compact_task.task_id)
{
if tx.send(()).is_err() {
tracing::warn!(
}
Task::VacuumTask(vacuum_task) => {
Vacuum::vacuum(
vacuum_task,
context.context.sstable_store.clone(),
meta_client,
)
.await;
}
Task::FullScanTask(full_scan_task) => {
Vacuum::full_scan(
full_scan_task,
context.context.sstable_store.clone(),
meta_client,
)
.await;
}
Task::ValidationTask(validation_task) => {
validate_ssts(
validation_task,
context.context.sstable_store.clone(),
)
.await;
}
Task::CancelCompactTask(cancel_compact_task) => {
if let Some(tx) = shutdown
.lock()
.unwrap()
.remove(&cancel_compact_task.task_id)
{
if tx.send(()).is_err() {
tracing::warn!(
"Cancellation of compaction task failed. task_id: {}",
cancel_compact_task.task_id
);
}
} else {
tracing::warn!(
}
} else {
tracing::warn!(
"Attempting to cancel non-existent compaction task. task_id: {}",
cancel_compact_task.task_id
);
}
}
}
}
});
}
Some(Err(e)) => {
Expand Down

0 comments on commit f4fddb1

Please sign in to comment.