Skip to content

Commit

Permalink
fix(storage): fix suggest_scale_policy at least 1 compactor
Browse files Browse the repository at this point in the history
fix

add log

fix sleep

adjust state

fix state change
  • Loading branch information
Li0k committed Dec 23, 2022
1 parent 5a73c46 commit 72c56ed
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 23 deletions.
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ def section_compaction(outer_panels):
),
],
),
panels.timeseries_kilobytes(
panels.timeseries_bytes(
"Waiting compaction bytes",
"Compaction bytes which need to be schedule in new task",
[
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

38 changes: 22 additions & 16 deletions src/meta/src/hummock/compaction_schedule_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ use crate::hummock::error::{Error, Result};
use crate::MetaResult;

const STREAM_BUFFER_SIZE: usize = 4;
const MAX_BURST_TIME: u64 = 120;
const MIN_LAST_STATE_TIME: u64 = 60;
const MAX_IDLE_TIME: u64 = 600;

#[derive(Debug)]
pub enum ScalePolicy {
ScaleOut,
ScaleIn(u64),
Expand Down Expand Up @@ -406,7 +407,9 @@ impl CompactionSchedulePolicy for ScoredPolicy {
CompactorState::Busy(_) => return ScalePolicy::ScaleOut,
CompactorState::Burst(_) => (),
CompactorState::Idle(last_idle_time) => {
if last_idle_time.elapsed().as_secs() > MAX_IDLE_TIME {
if last_idle_time.elapsed().as_secs() > MAX_IDLE_TIME
&& self.score_to_compactor.len() > 1
{
let decrease_core = self
.score_to_compactor
.values()
Expand All @@ -428,19 +431,21 @@ impl CompactionSchedulePolicy for ScoredPolicy {
}

fn refresh_state(&mut self) {
let busy_count = self
let idle_count = self
.score_to_compactor
.values()
.filter(|compactor| matches!(compactor.state(), CompactorState::Busy(_)))
.filter(|compactor| matches!(compactor.state(), CompactorState::Idle(_)))
.count();

if busy_count == self.score_to_compactor.len() {
if idle_count == 0 {
match self.state {
CompactorState::Idle(_) => {
self.state = CompactorState::Burst(Instant::now());
CompactorState::Idle(last_update) => {
if last_update.elapsed().as_secs() > MIN_LAST_STATE_TIME {
self.state = CompactorState::Burst(Instant::now());
}
}
CompactorState::Burst(last_burst) => {
if last_burst.elapsed().as_secs() > MAX_BURST_TIME {
CompactorState::Burst(last_update) => {
if last_update.elapsed().as_secs() > MIN_LAST_STATE_TIME {
self.state = CompactorState::Busy(Instant::now());
}
}
Expand All @@ -450,25 +455,26 @@ impl CompactionSchedulePolicy for ScoredPolicy {
match self.state {
CompactorState::Idle(_) => {}

CompactorState::Burst(last_burst) => {
if last_burst.elapsed().as_secs() > 60 {
CompactorState::Burst(last_update) => {
if last_update.elapsed().as_secs() > MIN_LAST_STATE_TIME {
self.state = CompactorState::Idle(Instant::now());
}
}

CompactorState::Busy(last_busy) => {
if last_busy.elapsed().as_secs() > 60 {
CompactorState::Busy(last_update) => {
if last_update.elapsed().as_secs() > MIN_LAST_STATE_TIME {
self.state = CompactorState::Burst(Instant::now());
}
}
}
}

tracing::info!(
"refresh_state busy_count {} total {} state {:?}",
busy_count,
"refresh_state idle_count {} total {} state {:?} suggest_scale_policy {:?}",
idle_count,
self.score_to_compactor.len(),
self.state
self.state,
self.suggest_scale_policy(),
);
}
}
Expand Down
13 changes: 8 additions & 5 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,16 @@ impl Compactor {
}
}

pub fn pre_process_task(task: &mut Task, workload: &CompactorWorkload) {
const CPU_THRESHOLD: u32 = 80;
pub fn pre_process_task(task: &mut Task, workload: &CompactorWorkload) -> bool {
const CPU_THRESHOLD: u32 = 85;
if let Task::CompactTask(compact_task) = task {
if workload.cpu > CPU_THRESHOLD && compact_task.input_ssts[0].level_idx != 0 {
compact_task.set_task_status(TaskStatus::ManualCanceled);
return false;
}
}

true
}

/// The background compaction thread that receives compaction tasks from hummock compaction
Expand All @@ -403,8 +406,6 @@ impl Compactor {
let mut min_interval = tokio::time::interval(stream_retry_interval);
let mut task_progress_interval = tokio::time::interval(task_progress_update_interval);

tokio::time::sleep(Duration::from_secs(1)).await;

// This outer loop is to recreate stream.
'start_stream: loop {
tokio::select! {
Expand Down Expand Up @@ -484,7 +485,9 @@ impl Compactor {
Some(Ok(SubscribeCompactTasksResponse { task })) => {
let task = match task {
Some(mut task) => {
Self::pre_process_task(&mut task, &last_workload);
if !Self::pre_process_task(&mut task, &last_workload) {
tokio::time::sleep(Duration::from_secs(1)).await;
}
task
}
None => continue 'consume_stream,
Expand Down

0 comments on commit 72c56ed

Please sign in to comment.