Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compactor): calculate pending bytes for scale compactor #6497

Closed
9 changes: 9 additions & 0 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,15 @@ def section_compaction(outer_panels):
),
],
),
panels.timeseries_kilobytes(
"scale compactor core count",
"compactor core resource need to scale out",
[
panels.target(
f"sum({metric('storage_compactor_core_score_count')})",
),
],
),
panels.timeseries_count(
"Compaction Success & Failure Count",
"num of compactions from each level to next level",
Expand Down
12 changes: 12 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ service HummockManagerService {
rpc RiseCtlUpdateCompactionConfig(RiseCtlUpdateCompactionConfigRequest) returns (RiseCtlUpdateCompactionConfigResponse);
rpc InitMetadataForReplay(InitMetadataForReplayRequest) returns (InitMetadataForReplayResponse);
rpc SetCompactorRuntimeConfig(SetCompactorRuntimeConfigRequest) returns (SetCompactorRuntimeConfigResponse);
rpc GetScaleCompactor(GetScaleCompactorRequest) returns (GetScaleCompactorResponse);
}

message CompactionConfig {
Expand Down Expand Up @@ -581,3 +582,14 @@ message HummockVersionStats {
uint64 hummock_version_id = 1;
map<uint32, TableStats> table_stats = 2;
}

message GetScaleCompactorRequest {
}

message GetScaleCompactorResponse {
uint64 suggest_cores = 1;
uint64 running_cores = 2;
uint64 total_cores = 3;
uint64 waiting_compaction_bytes = 4;
uint64 pending_compaction_bytes = 5;
}
162 changes: 159 additions & 3 deletions src/meta/src/hummock/compaction/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

use std::collections::HashSet;
use std::sync::Arc;

use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt;
use risingwave_hummock_sdk::HummockCompactionTaskId;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::CompactionConfig;
use risingwave_pb::hummock::{CompactionConfig, Level, OverlappingLevel};

use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
use crate::hummock::compaction::min_overlap_compaction_picker::MinOverlappingPicker;
Expand All @@ -36,6 +38,11 @@ const SCORE_BASE: u64 = 100;

pub trait LevelSelector: Sync + Send {
fn need_compaction(&self, levels: &Levels, level_handlers: &[LevelHandler]) -> bool;
fn waiting_schedule_compaction_bytes(
&self,
levels: &Levels,
level_handlers: &[LevelHandler],
) -> u64;

fn pick_compaction(
&self,
Expand Down Expand Up @@ -82,6 +89,86 @@ impl DynamicLevelSelector {
inner: LevelSelectorCore::new(config, overlap_strategy),
}
}

fn calculate_l0_overlap(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please corret me if I am wrong, this method calculates "total size of non-pending L0 file size + total size of base level SSTs overlapping with the non-pending L0 SSTs". Based on this, I have the following questions:

  1. Shouldn't we use if !handlers[0].is_pending_compact(&table_info.id) in L102 so that we are actually use non-pending L0 SSTs to calculate overlap?
  2. output_files in L108 represents the pending SSTs in base level that output to base level itself. IIUC, we should consider these SSTs in next_level_size instead of ignoring them in L116. To be more precise, I think we should consider non-pending SSTs and pending SSTs with target level == base level for next_level_size.
  3. Do we intentionally ignore L0 sub-level compaction?

&self,
l0: &OverlappingLevel,
base_level: &Level,
handlers: &[LevelHandler],
) -> u64 {
let total_level_size = l0.total_file_size - handlers[0].get_pending_file_size();
let mut overlap_info = self.inner.overlap_strategy.create_overlap_info();
let next_level_files = handlers[0].get_pending_next_level_file();

for sub_level in &l0.sub_levels {
for table_info in &sub_level.table_infos {
if next_level_files.contains(&table_info.id) {
continue;
}
overlap_info.update(table_info);
}
}
let mut next_level_size = 0;
let next_level_files =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is understandable now, but the variable names make it easy for me to get lost while reading

I tried to modify several names to distinguish between l0 and base_level, how about this?

total_level_size -> l0_total_level_size

next_level_files -> pending_base_level_files 
next_level_size -> base_level_size

handlers[base_level.level_idx as usize].get_pending_next_level_file();
let overlap_files = overlap_info.check_multiple_overlap(&base_level.table_infos);

for sst in &overlap_files {
// this file would not stay in current-level because this data would be moved to the
// next level. But for other files, even if it is pending in another compact
// task and would be deleted after compact task end, the data of which would
// still stay in this level.
if next_level_files.contains(&sst.id) {
continue;
}
next_level_size += sst.file_size;
}
next_level_size + total_level_size
}

fn calculate_base_level_overlap(
&self,
target_bytes: u64,
select_level: &Level,
target_level: &Level,
handlers: &[LevelHandler],
) -> u64 {
if select_level.total_file_size <= target_bytes {
return 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we use zero instead of an exact value when <= target_bytes? Is it because the next compact_task won't be spawned?

}
let compacting_file_size =
handlers[select_level.level_idx as usize].get_pending_next_level_file_size();
if select_level.total_file_size - compacting_file_size <= target_bytes {
return 0;
}
let next_level_files =
handlers[select_level.level_idx as usize].get_pending_next_level_file();
let mut info = self.inner.overlap_strategy.create_overlap_info();
let mut compact_bytes = 0;
for sst in &select_level.table_infos {
if compact_bytes + compacting_file_size + target_bytes >= select_level.total_file_size {
break;
}
if next_level_files.contains(&sst.id) {
continue;
}
info.update(sst);
compact_bytes += sst.file_size;
}
let output_files = if target_level.level_idx as usize + 1 >= handlers.len() {
HashSet::default()
} else {
handlers[target_level.level_idx as usize].get_pending_next_level_file()
};
let overlap_files = info.check_multiple_overlap(&target_level.table_infos);
for sst in overlap_files {
if output_files.contains(&sst.id) {
continue;
}
compact_bytes += sst.file_size;
}
compact_bytes
}
}

impl LevelSelectorCore {
Expand Down Expand Up @@ -232,8 +319,8 @@ impl LevelSelectorCore {
level_idx - 1
};
let total_size = level.total_file_size
+ handlers[upper_level].get_pending_output_file_size(level.level_idx)
- handlers[level_idx].get_pending_output_file_size(level.level_idx + 1);
+ handlers[upper_level].get_pending_next_level_file_size()
- handlers[level_idx].get_pending_next_level_file_size();
if total_size == 0 {
continue;
}
Expand Down Expand Up @@ -284,6 +371,34 @@ impl LevelSelector for DynamicLevelSelector {
.unwrap_or(false)
}

fn waiting_schedule_compaction_bytes(
&self,
levels: &Levels,
level_handlers: &[LevelHandler],
) -> u64 {
let ctx = self.inner.calculate_level_base_size(levels);
let mut pending_compaction_bytes = self.calculate_l0_overlap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to l0_pending_compaction_bytes

levels.l0.as_ref().unwrap(),
levels.get_level(ctx.base_level),
level_handlers,
);
for level in &levels.levels {
let level_idx = level.level_idx as usize;
// The data of last level would not be compact to other level.
if level_idx < ctx.base_level || level_idx >= levels.levels.len() {
continue;
}
let target_level = levels.get_level(level_idx + 1);
pending_compaction_bytes += self.calculate_base_level_overlap(
ctx.level_max_bytes[level_idx],
level,
target_level,
level_handlers,
);
}
pending_compaction_bytes
}

fn pick_compaction(
&self,
task_id: HummockCompactionTaskId,
Expand Down Expand Up @@ -626,4 +741,45 @@ pub mod tests {
let compaction = selector.pick_compaction(2, &levels, &mut levels_handlers);
assert!(compaction.is_none());
}

#[test]
fn test_waiting_schedule_compaction_bytes() {
let config = CompactionConfigBuilder::new()
.max_bytes_for_level_base(200)
.max_level(4)
.max_bytes_for_level_multiplier(5)
.compaction_mode(CompactionMode::Range as i32)
.build();
// base-level: 2
// balanced lsm tree size:
// 200/250/1250
let levels = vec![
generate_level(1, vec![]),
generate_level(2, generate_tables(0..5, 0..1000, 3, 50)),
generate_level(3, generate_tables(5..10, 0..1000, 2, 100)),
generate_level(4, generate_tables(10..15, 0..1000, 1, 250)),
];
let levels = Levels {
levels,
l0: Some(generate_l0_nonoverlapping_sublevels(generate_tables(
15..25,
0..600,
3,
10,
))),
};

let selector =
DynamicLevelSelector::new(Arc::new(config), Arc::new(RangeOverlapStrategy::default()));
let mut levels_handlers = (0..5).into_iter().map(LevelHandler::new).collect_vec();
let waiting_bytes = selector.waiting_schedule_compaction_bytes(&levels, &levels_handlers);
// select 10 files in level0 overlap with 3 files in level2; select one file in level2 which
// overlap with one file in level3; select three files in level3 which overlap with
// three files in level4. (10*10+3*50)+(50+100)+(100*3+250*3) = 1600
assert_eq!(waiting_bytes, 1450);
levels_handlers[2].add_pending_task(1, 3, &levels.levels[1].table_infos[..1]);
levels_handlers[3].add_pending_task(1, 3, &levels.levels[2].table_infos[..1]);
let waiting_bytes = selector.waiting_schedule_compaction_bytes(&levels, &levels_handlers);
assert_eq!(waiting_bytes, 1250);
}
}
18 changes: 2 additions & 16 deletions src/meta/src/hummock/compaction/manual_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_pb::hummock::{
};

use super::overlap_strategy::OverlapInfo;
use crate::hummock::compaction::level_selector::{LevelSelector, LevelSelectorCore};
use crate::hummock::compaction::level_selector::LevelSelectorCore;
use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapInfo};
use crate::hummock::compaction::{
CompactionInput, CompactionPicker, CompactionTask, ManualCompactionOption,
Expand Down Expand Up @@ -334,18 +334,8 @@ impl ManualCompactionSelector {
option,
}
}
}

impl LevelSelector for ManualCompactionSelector {
fn need_compaction(&self, levels: &Levels, _: &[LevelHandler]) -> bool {
let ctx = self.inner.calculate_level_base_size(levels);
if self.option.level > 0 && self.option.level < ctx.base_level {
return false;
}
true
}

fn pick_compaction(
pub fn pick_compaction(
&self,
task_id: HummockCompactionTaskId,
levels: &Levels,
Expand All @@ -372,10 +362,6 @@ impl LevelSelector for ManualCompactionSelector {
ret.add_pending_task(task_id, level_handlers);
Some(self.inner.create_compaction_task(ret, ctx.base_level))
}

fn name(&self) -> &'static str {
"ManualCompactionSelector"
}
}

#[cfg(test)]
Expand Down
66 changes: 65 additions & 1 deletion src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ pub use base_level_compaction_picker::LevelCompactionPicker;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId, HummockEpoch};
use risingwave_pb::hummock::compaction_config::CompactionMode;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, InputLevel, KeyRange, LevelType};
use risingwave_pb::hummock::{
CompactTask, CompactionConfig, GetScaleCompactorResponse, InputLevel, KeyRange, LevelType,
};

use crate::hummock::compaction::level_selector::{DynamicLevelSelector, LevelSelector};
use crate::hummock::compaction::manual_compaction_picker::ManualCompactionSelector;
use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
use crate::hummock::level_handler::LevelHandler;

// we assume that every core could compact data with 50MB/s, and when there has been 32GB data
// waiting to compact, a new compactor-node with 8-core could consume this data with in 2 minutes.
const COMPACTION_BYTES_PER_CORE: u64 = 4 * 1024 * 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumption may not necessarily be true for all machine types, do we need to use it as a configuration ? or add a TODO in this PR


pub struct CompactStatus {
compaction_group_id: CompactionGroupId,
pub(crate) level_handlers: Vec<LevelHandler>,
Expand Down Expand Up @@ -251,6 +257,27 @@ impl CompactStatus {
overlap_strategy,
))
}

pub fn get_compaction_info(
&self,
levels: &Levels,
compaction_config: CompactionConfig,
) -> ScaleCompactorInfo {
let pending_compaction_bytes = self
.level_handlers
.iter()
.map(|handler| handler.get_pending_file_size())
.sum::<u64>();
let waiting_compaction_bytes = self
.create_level_selector(compaction_config)
.waiting_schedule_compaction_bytes(levels, &self.level_handlers);
ScaleCompactorInfo {
running_cores: 0,
total_cores: 0,
waiting_compaction_bytes,
pending_compaction_bytes,
}
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -287,3 +314,40 @@ pub trait CompactionPicker {
level_handlers: &[LevelHandler],
) -> Option<CompactionInput>;
}

#[derive(Default, Clone)]
pub struct ScaleCompactorInfo {
pub running_cores: u64,
pub total_cores: u64,
pub waiting_compaction_bytes: u64,
pub pending_compaction_bytes: u64,
}

impl ScaleCompactorInfo {
pub fn add(&mut self, other: &ScaleCompactorInfo) {
self.running_cores += other.running_cores;
self.total_cores += other.total_cores;
self.waiting_compaction_bytes += other.waiting_compaction_bytes;
self.pending_compaction_bytes += other.pending_compaction_bytes;
}

pub fn scale_out_cores(&self) -> u64 {
let mut scale_cores = self.waiting_compaction_bytes / COMPACTION_BYTES_PER_CORE;
if self.running_cores < self.total_cores {
scale_cores = scale_cores.saturating_sub(self.total_cores - self.running_cores);
}
scale_cores
}
}

impl From<ScaleCompactorInfo> for GetScaleCompactorResponse {
fn from(info: ScaleCompactorInfo) -> Self {
GetScaleCompactorResponse {
suggest_cores: info.total_cores,
running_cores: info.running_cores,
total_cores: info.total_cores,
waiting_compaction_bytes: info.waiting_compaction_bytes,
pending_compaction_bytes: info.pending_compaction_bytes,
}
}
}
Loading