-
Notifications
You must be signed in to change notification settings - Fork 613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(compactor): calculate pending bytes for scale compactor #6497
Changes from 12 commits
766381f
399c82b
4e5ebc5
e219149
834377b
50ee464
7f5f8eb
c6fe113
47fde33
a1f6e28
eba7c48
29b1980
5913773
51706c8
b771d26
a06d7ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,11 +17,13 @@ | |
// COPYING file in the root directory) and Apache 2.0 License | ||
// (found in the LICENSE.Apache file in the root directory). | ||
|
||
use std::collections::HashSet; | ||
use std::sync::Arc; | ||
|
||
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt; | ||
use risingwave_hummock_sdk::HummockCompactionTaskId; | ||
use risingwave_pb::hummock::hummock_version::Levels; | ||
use risingwave_pb::hummock::CompactionConfig; | ||
use risingwave_pb::hummock::{CompactionConfig, Level, OverlappingLevel}; | ||
|
||
use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; | ||
use crate::hummock::compaction::min_overlap_compaction_picker::MinOverlappingPicker; | ||
|
@@ -36,6 +38,11 @@ const SCORE_BASE: u64 = 100; | |
|
||
pub trait LevelSelector: Sync + Send { | ||
fn need_compaction(&self, levels: &Levels, level_handlers: &[LevelHandler]) -> bool; | ||
fn waiting_schedule_compaction_bytes( | ||
&self, | ||
levels: &Levels, | ||
level_handlers: &[LevelHandler], | ||
) -> u64; | ||
|
||
fn pick_compaction( | ||
&self, | ||
|
@@ -82,6 +89,86 @@ impl DynamicLevelSelector { | |
inner: LevelSelectorCore::new(config, overlap_strategy), | ||
} | ||
} | ||
|
||
fn calculate_l0_overlap( | ||
&self, | ||
l0: &OverlappingLevel, | ||
base_level: &Level, | ||
handlers: &[LevelHandler], | ||
) -> u64 { | ||
let total_level_size = l0.total_file_size - handlers[0].get_pending_file_size(); | ||
let mut overlap_info = self.inner.overlap_strategy.create_overlap_info(); | ||
let next_level_files = handlers[0].get_pending_next_level_file(); | ||
|
||
for sub_level in &l0.sub_levels { | ||
for table_info in &sub_level.table_infos { | ||
if next_level_files.contains(&table_info.id) { | ||
continue; | ||
} | ||
overlap_info.update(table_info); | ||
} | ||
} | ||
let mut next_level_size = 0; | ||
let next_level_files = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic is understandable now, but the variable names make it easy for me to get lost while reading I tried to modify several names to distinguish between l0 and base_level, how about this?
|
||
handlers[base_level.level_idx as usize].get_pending_next_level_file(); | ||
let overlap_files = overlap_info.check_multiple_overlap(&base_level.table_infos); | ||
|
||
for sst in &overlap_files { | ||
// this file would not stay in current-level because this data would be moved to the | ||
// next level. But for other files, even if it is pending in another compact | ||
// task and would be deleted after compact task end, the data of which would | ||
// still stay in this level. | ||
if next_level_files.contains(&sst.id) { | ||
continue; | ||
} | ||
next_level_size += sst.file_size; | ||
} | ||
next_level_size + total_level_size | ||
} | ||
|
||
fn calculate_base_level_overlap( | ||
&self, | ||
target_bytes: u64, | ||
select_level: &Level, | ||
target_level: &Level, | ||
handlers: &[LevelHandler], | ||
) -> u64 { | ||
if select_level.total_file_size <= target_bytes { | ||
return 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we use |
||
} | ||
let compacting_file_size = | ||
handlers[select_level.level_idx as usize].get_pending_next_level_file_size(); | ||
if select_level.total_file_size - compacting_file_size <= target_bytes { | ||
return 0; | ||
} | ||
let next_level_files = | ||
handlers[select_level.level_idx as usize].get_pending_next_level_file(); | ||
let mut info = self.inner.overlap_strategy.create_overlap_info(); | ||
let mut compact_bytes = 0; | ||
for sst in &select_level.table_infos { | ||
if compact_bytes + compacting_file_size + target_bytes >= select_level.total_file_size { | ||
break; | ||
} | ||
if next_level_files.contains(&sst.id) { | ||
continue; | ||
} | ||
info.update(sst); | ||
compact_bytes += sst.file_size; | ||
} | ||
let output_files = if target_level.level_idx as usize + 1 >= handlers.len() { | ||
HashSet::default() | ||
} else { | ||
handlers[target_level.level_idx as usize].get_pending_next_level_file() | ||
}; | ||
let overlap_files = info.check_multiple_overlap(&target_level.table_infos); | ||
for sst in overlap_files { | ||
if output_files.contains(&sst.id) { | ||
continue; | ||
} | ||
compact_bytes += sst.file_size; | ||
} | ||
compact_bytes | ||
} | ||
} | ||
|
||
impl LevelSelectorCore { | ||
|
@@ -232,8 +319,8 @@ impl LevelSelectorCore { | |
level_idx - 1 | ||
}; | ||
let total_size = level.total_file_size | ||
+ handlers[upper_level].get_pending_output_file_size(level.level_idx) | ||
- handlers[level_idx].get_pending_output_file_size(level.level_idx + 1); | ||
+ handlers[upper_level].get_pending_next_level_file_size() | ||
- handlers[level_idx].get_pending_next_level_file_size(); | ||
if total_size == 0 { | ||
continue; | ||
} | ||
|
@@ -284,6 +371,34 @@ impl LevelSelector for DynamicLevelSelector { | |
.unwrap_or(false) | ||
} | ||
|
||
fn waiting_schedule_compaction_bytes( | ||
&self, | ||
levels: &Levels, | ||
level_handlers: &[LevelHandler], | ||
) -> u64 { | ||
let ctx = self.inner.calculate_level_base_size(levels); | ||
let mut pending_compaction_bytes = self.calculate_l0_overlap( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to l0_pending_compaction_bytes |
||
levels.l0.as_ref().unwrap(), | ||
levels.get_level(ctx.base_level), | ||
level_handlers, | ||
); | ||
for level in &levels.levels { | ||
let level_idx = level.level_idx as usize; | ||
// The data of last level would not be compact to other level. | ||
if level_idx < ctx.base_level || level_idx >= levels.levels.len() { | ||
continue; | ||
} | ||
let target_level = levels.get_level(level_idx + 1); | ||
pending_compaction_bytes += self.calculate_base_level_overlap( | ||
ctx.level_max_bytes[level_idx], | ||
level, | ||
target_level, | ||
level_handlers, | ||
); | ||
} | ||
pending_compaction_bytes | ||
} | ||
|
||
fn pick_compaction( | ||
&self, | ||
task_id: HummockCompactionTaskId, | ||
|
@@ -626,4 +741,45 @@ pub mod tests { | |
let compaction = selector.pick_compaction(2, &levels, &mut levels_handlers); | ||
assert!(compaction.is_none()); | ||
} | ||
|
||
#[test] | ||
fn test_waiting_schedule_compaction_bytes() { | ||
let config = CompactionConfigBuilder::new() | ||
.max_bytes_for_level_base(200) | ||
.max_level(4) | ||
.max_bytes_for_level_multiplier(5) | ||
.compaction_mode(CompactionMode::Range as i32) | ||
.build(); | ||
// base-level: 2 | ||
// balanced lsm tree size: | ||
// 200/250/1250 | ||
let levels = vec![ | ||
generate_level(1, vec![]), | ||
generate_level(2, generate_tables(0..5, 0..1000, 3, 50)), | ||
generate_level(3, generate_tables(5..10, 0..1000, 2, 100)), | ||
generate_level(4, generate_tables(10..15, 0..1000, 1, 250)), | ||
]; | ||
let levels = Levels { | ||
levels, | ||
l0: Some(generate_l0_nonoverlapping_sublevels(generate_tables( | ||
15..25, | ||
0..600, | ||
3, | ||
10, | ||
))), | ||
}; | ||
|
||
let selector = | ||
DynamicLevelSelector::new(Arc::new(config), Arc::new(RangeOverlapStrategy::default())); | ||
let mut levels_handlers = (0..5).into_iter().map(LevelHandler::new).collect_vec(); | ||
let waiting_bytes = selector.waiting_schedule_compaction_bytes(&levels, &levels_handlers); | ||
// select 10 files in level0 overlap with 3 files in level2; select one file in level2 which | ||
// overlap with one file in level3; select three files in level3 which overlap with | ||
// three files in level4. (10*10+3*50)+(50+100)+(100*3+250*3) = 1600 | ||
assert_eq!(waiting_bytes, 1450); | ||
levels_handlers[2].add_pending_task(1, 3, &levels.levels[1].table_infos[..1]); | ||
levels_handlers[3].add_pending_task(1, 3, &levels.levels[2].table_infos[..1]); | ||
let waiting_bytes = selector.waiting_schedule_compaction_bytes(&levels, &levels_handlers); | ||
assert_eq!(waiting_bytes, 1250); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,13 +31,19 @@ pub use base_level_compaction_picker::LevelCompactionPicker; | |
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId, HummockEpoch}; | ||
use risingwave_pb::hummock::compaction_config::CompactionMode; | ||
use risingwave_pb::hummock::hummock_version::Levels; | ||
use risingwave_pb::hummock::{CompactTask, CompactionConfig, InputLevel, KeyRange, LevelType}; | ||
use risingwave_pb::hummock::{ | ||
CompactTask, CompactionConfig, GetScaleCompactorResponse, InputLevel, KeyRange, LevelType, | ||
}; | ||
|
||
use crate::hummock::compaction::level_selector::{DynamicLevelSelector, LevelSelector}; | ||
use crate::hummock::compaction::manual_compaction_picker::ManualCompactionSelector; | ||
use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy}; | ||
use crate::hummock::level_handler::LevelHandler; | ||
|
||
// we assume that every core could compact data with 50MB/s, and when there has been 32GB data | ||
// waiting to compact, a new compactor-node with 8-core could consume this data with in 2 minutes. | ||
const COMPACTION_BYTES_PER_CORE: u64 = 4 * 1024 * 1024 * 1024; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This assumption may not necessarily be true for all machine types, do we need to use it as a configuration ? or add a TODO in this PR |
||
|
||
pub struct CompactStatus { | ||
compaction_group_id: CompactionGroupId, | ||
pub(crate) level_handlers: Vec<LevelHandler>, | ||
|
@@ -251,6 +257,27 @@ impl CompactStatus { | |
overlap_strategy, | ||
)) | ||
} | ||
|
||
pub fn get_compaction_info( | ||
&self, | ||
levels: &Levels, | ||
compaction_config: CompactionConfig, | ||
) -> ScaleCompactorInfo { | ||
let pending_compaction_bytes = self | ||
.level_handlers | ||
.iter() | ||
.map(|handler| handler.get_pending_file_size()) | ||
.sum::<u64>(); | ||
let waiting_compaction_bytes = self | ||
.create_level_selector(compaction_config) | ||
.waiting_schedule_compaction_bytes(levels, &self.level_handlers); | ||
ScaleCompactorInfo { | ||
running_cores: 0, | ||
total_cores: 0, | ||
waiting_compaction_bytes, | ||
pending_compaction_bytes, | ||
} | ||
} | ||
} | ||
|
||
#[derive(Clone, Debug)] | ||
|
@@ -287,3 +314,40 @@ pub trait CompactionPicker { | |
level_handlers: &[LevelHandler], | ||
) -> Option<CompactionInput>; | ||
} | ||
|
||
#[derive(Default, Clone)] | ||
pub struct ScaleCompactorInfo { | ||
pub running_cores: u64, | ||
pub total_cores: u64, | ||
pub waiting_compaction_bytes: u64, | ||
pub pending_compaction_bytes: u64, | ||
} | ||
|
||
impl ScaleCompactorInfo { | ||
pub fn add(&mut self, other: &ScaleCompactorInfo) { | ||
self.running_cores += other.running_cores; | ||
self.total_cores += other.total_cores; | ||
self.waiting_compaction_bytes += other.waiting_compaction_bytes; | ||
self.pending_compaction_bytes += other.pending_compaction_bytes; | ||
} | ||
|
||
pub fn scale_out_cores(&self) -> u64 { | ||
let mut scale_cores = self.waiting_compaction_bytes / COMPACTION_BYTES_PER_CORE; | ||
if self.running_cores < self.total_cores { | ||
scale_cores = scale_cores.saturating_sub(self.total_cores - self.running_cores); | ||
} | ||
scale_cores | ||
} | ||
} | ||
|
||
impl From<ScaleCompactorInfo> for GetScaleCompactorResponse { | ||
fn from(info: ScaleCompactorInfo) -> Self { | ||
GetScaleCompactorResponse { | ||
suggest_cores: info.total_cores, | ||
running_cores: info.running_cores, | ||
total_cores: info.total_cores, | ||
waiting_compaction_bytes: info.waiting_compaction_bytes, | ||
pending_compaction_bytes: info.pending_compaction_bytes, | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please corret me if I am wrong, this method calculates "total size of non-pending L0 file size + total size of base level SSTs overlapping with the non-pending L0 SSTs". Based on this, I have the following questions:
if !handlers[0].is_pending_compact(&table_info.id)
in L102 so that we are actually use non-pending L0 SSTs to calculate overlap?output_files
in L108 represents the pending SSTs in base level that output to base level itself. IIUC, we should consider these SSTs in next_level_size instead of ignoring them in L116. To be more precise, I think we should consider non-pending SSTs and pending SSTs with target level == base level for next_level_size.