Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compactor): calculate pending bytes for scale compactor #6497

Closed
9 changes: 9 additions & 0 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,15 @@ def section_compaction(outer_panels):
),
],
),
panels.timeseries_kilobytes(
"scale compactor core count",
"compactor core resource need to scale out",
[
panels.target(
f"sum({metric('storage_compactor_core_score_count')})",
),
],
),
panels.timeseries_count(
"Compaction Success & Failure Count",
"num of compactions from each level to next level",
Expand Down
12 changes: 12 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ service HummockManagerService {
rpc RiseCtlUpdateCompactionConfig(RiseCtlUpdateCompactionConfigRequest) returns (RiseCtlUpdateCompactionConfigResponse);
rpc InitMetadataForReplay(InitMetadataForReplayRequest) returns (InitMetadataForReplayResponse);
rpc SetCompactorRuntimeConfig(SetCompactorRuntimeConfigRequest) returns (SetCompactorRuntimeConfigResponse);
rpc GetScaleCompactor(GetScaleCompactorRequest) returns (GetScaleCompactorResponse);
}

message CompactionConfig {
Expand Down Expand Up @@ -581,3 +582,14 @@ message HummockVersionStats {
uint64 hummock_version_id = 1;
map<uint32, TableStats> table_stats = 2;
}

message GetScaleCompactorRequest {
}

message GetScaleCompactorResponse {
uint64 scale_cores = 1;
uint64 running_cores = 2;
uint64 total_cores = 3;
uint64 waiting_compaction_bytes = 4;
uint64 pending_compaction_bytes = 5;
}
111 changes: 110 additions & 1 deletion src/meta/src/hummock/compaction/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

use std::collections::HashSet;
use std::sync::Arc;

use risingwave_hummock_sdk::HummockCompactionTaskId;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::CompactionConfig;
use risingwave_pb::hummock::{CompactionConfig, Level, OverlappingLevel};

use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
use crate::hummock::compaction::min_overlap_compaction_picker::MinOverlappingPicker;
Expand All @@ -36,6 +37,11 @@ const SCORE_BASE: u64 = 100;

pub trait LevelSelector: Sync + Send {
fn need_compaction(&self, levels: &Levels, level_handlers: &[LevelHandler]) -> bool;
fn waiting_schedule_compaction_bytes(
&self,
levels: &Levels,
level_handlers: &[LevelHandler],
) -> u64;

fn pick_compaction(
&self,
Expand Down Expand Up @@ -82,6 +88,81 @@ impl DynamicLevelSelector {
inner: LevelSelectorCore::new(config, overlap_strategy),
}
}

fn calculate_l0_overlap(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please corret me if I am wrong, this method calculates "total size of non-pending L0 file size + total size of base level SSTs overlapping with the non-pending L0 SSTs". Based on this, I have the following questions:

  1. Shouldn't we use if !handlers[0].is_pending_compact(&table_info.id) in L102 so that we are actually use non-pending L0 SSTs to calculate overlap?
  2. output_files in L108 represents the pending SSTs in base level that output to base level itself. IIUC, we should consider these SSTs in next_level_size instead of ignoring them in L116. To be more precise, I think we should consider non-pending SSTs and pending SSTs with target level == base level for next_level_size.
  3. Do we intentionally ignore L0 sub-level compaction?

&self,
l0: &OverlappingLevel,
base_level: &Level,
handlers: &[LevelHandler],
) -> u64 {
let total_level_size = l0.total_file_size - handlers[0].get_pending_file_size();
let mut overlap_info = self.inner.overlap_strategy.create_overlap_info();
for sub_level in &l0.sub_levels {
for table_info in &sub_level.table_infos {
if handlers[0].is_pending_compact(&table_info.id) {
overlap_info.update(table_info);
}
}
}
let mut next_level_size = 0;
let output_files = handlers[base_level.level_idx as usize].get_pending_output_file();
let overlap_files = overlap_info.check_multiple_overlap(&base_level.table_infos);

for sst in &overlap_files {
// this file would not stay in current-level because this data would be moved to the
// next level. But for other files, even if it is pending in another compact
// task and would be deleted after compact task end, the data of which would
// still stay in this level.
if output_files.contains(&sst.id) {
continue;
}
next_level_size += sst.file_size;
}
next_level_size + total_level_size
}

fn calculate_base_level_overlap(
&self,
target_bytes: u64,
select_level: &Level,
target_level: &Level,
handlers: &[LevelHandler],
) -> u64 {
if select_level.total_file_size <= target_bytes {
return 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we use zero instead of an exact value when <= target_bytes? Is it because the next compact_task won't be spawned?

}
let compacting_file_size = handlers[select_level.level_idx as usize]
.get_pending_output_file_size(target_level.level_idx);
if select_level.total_file_size - compacting_file_size <= target_bytes {
return 0;
}
let output_files = handlers[select_level.level_idx as usize].get_pending_output_file();
let mut info = self.inner.overlap_strategy.create_overlap_info();
let mut compact_bytes = 0;
for sst in &select_level.table_infos {
if compact_bytes + compacting_file_size + target_bytes >= select_level.total_file_size {
break;
}
if output_files.contains(&sst.id) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we ignore pending SSTs output to select level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

continue;
}
info.update(sst);
compact_bytes += sst.file_size;
}
let output_files = if target_level.level_idx as usize + 1 >= handlers.len() {
HashSet::default()
} else {
handlers[target_level.level_idx as usize].get_pending_output_file()
};
let overlap_files = info.check_multiple_overlap(&target_level.table_infos);
for sst in overlap_files {
if output_files.contains(&sst.id) {
continue;
}
compact_bytes += sst.file_size;
}
compact_bytes
}
}

impl LevelSelectorCore {
Expand Down Expand Up @@ -284,6 +365,34 @@ impl LevelSelector for DynamicLevelSelector {
.unwrap_or(false)
}

fn waiting_schedule_compaction_bytes(
&self,
levels: &Levels,
level_handlers: &[LevelHandler],
) -> u64 {
let ctx = self.inner.calculate_level_base_size(levels);
let mut pending_compaction_bytes = 0;
pending_compaction_bytes += self.calculate_l0_overlap(
levels.l0.as_ref().unwrap(),
&levels.levels[ctx.base_level],
level_handlers,
);
for level in &levels.levels {
let level_idx = level.level_idx as usize;
// The data of last level would not be compact to other level.
if level_idx < ctx.base_level || level_idx >= levels.levels.len() {
continue;
}
pending_compaction_bytes += self.calculate_base_level_overlap(
ctx.level_max_bytes[level_idx],
level,
&levels.levels[level_idx],
level_handlers,
);
}
pending_compaction_bytes
}

fn pick_compaction(
&self,
task_id: HummockCompactionTaskId,
Expand Down
18 changes: 2 additions & 16 deletions src/meta/src/hummock/compaction/manual_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_pb::hummock::{
};

use super::overlap_strategy::OverlapInfo;
use crate::hummock::compaction::level_selector::{LevelSelector, LevelSelectorCore};
use crate::hummock::compaction::level_selector::LevelSelectorCore;
use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapInfo};
use crate::hummock::compaction::{
CompactionInput, CompactionPicker, CompactionTask, ManualCompactionOption,
Expand Down Expand Up @@ -334,18 +334,8 @@ impl ManualCompactionSelector {
option,
}
}
}

impl LevelSelector for ManualCompactionSelector {
fn need_compaction(&self, levels: &Levels, _: &[LevelHandler]) -> bool {
let ctx = self.inner.calculate_level_base_size(levels);
if self.option.level > 0 && self.option.level < ctx.base_level {
return false;
}
true
}

fn pick_compaction(
pub fn pick_compaction(
&self,
task_id: HummockCompactionTaskId,
levels: &Levels,
Expand All @@ -372,10 +362,6 @@ impl LevelSelector for ManualCompactionSelector {
ret.add_pending_task(task_id, level_handlers);
Some(self.inner.create_compaction_task(ret, ctx.base_level))
}

fn name(&self) -> &'static str {
"ManualCompactionSelector"
}
}

#[cfg(test)]
Expand Down
68 changes: 67 additions & 1 deletion src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ pub use base_level_compaction_picker::LevelCompactionPicker;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId, HummockEpoch};
use risingwave_pb::hummock::compaction_config::CompactionMode;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, InputLevel, KeyRange, LevelType};
use risingwave_pb::hummock::{
CompactTask, CompactionConfig, GetScaleCompactorResponse, InputLevel, KeyRange, LevelType,
};

use crate::hummock::compaction::level_selector::{DynamicLevelSelector, LevelSelector};
use crate::hummock::compaction::manual_compaction_picker::ManualCompactionSelector;
use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
use crate::hummock::level_handler::LevelHandler;

// we assume that every core could compact data with 50MB/s, and when there has been 32GB data
// waiting to compact, a new compactor-node with 8-core could consume this data with in 2 minutes.
const COMPACTION_BYTES_PER_CORE: u64 = 4 * 1024 * 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumption may not necessarily be true for all machine types, do we need to use it as a configuration ? or add a TODO in this PR


pub struct CompactStatus {
compaction_group_id: CompactionGroupId,
pub(crate) level_handlers: Vec<LevelHandler>,
Expand Down Expand Up @@ -251,6 +257,27 @@ impl CompactStatus {
overlap_strategy,
))
}

pub fn get_compaction_info(
&self,
levels: &Levels,
compaction_config: CompactionConfig,
) -> ScaleCompactorInfo {
let pending_compaction_bytes = self
.level_handlers
.iter()
.map(|handler| handler.get_pending_file_size())
.sum::<u64>();
let waiting_compaction_bytes = self
.create_level_selector(compaction_config)
.waiting_schedule_compaction_bytes(levels, &self.level_handlers);
ScaleCompactorInfo {
running_cores: 0,
total_cores: 0,
waiting_compaction_bytes,
pending_compaction_bytes,
}
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -287,3 +314,42 @@ pub trait CompactionPicker {
level_handlers: &[LevelHandler],
) -> Option<CompactionInput>;
}

#[derive(Default, Clone)]
pub struct ScaleCompactorInfo {
pub running_cores: u64,
pub total_cores: u64,
pub waiting_compaction_bytes: u64,
pub pending_compaction_bytes: u64,
}

impl ScaleCompactorInfo {
pub fn add(&mut self, other: &ScaleCompactorInfo) {
self.running_cores += other.running_cores;
self.total_cores += other.total_cores;
self.waiting_compaction_bytes += other.waiting_compaction_bytes;
self.pending_compaction_bytes += other.pending_compaction_bytes;
}

pub fn scale_cores(&self) -> u64 {
let mut scale_cores = self.waiting_compaction_bytes / COMPACTION_BYTES_PER_CORE;
if self.running_cores < self.total_cores {
scale_cores = scale_cores.saturating_sub(self.total_cores - self.running_cores);
}
scale_cores
}
}

impl From<ScaleCompactorInfo> for GetScaleCompactorResponse {
fn from(info: ScaleCompactorInfo) -> Self {
let scale_cores = info.scale_cores();

GetScaleCompactorResponse {
scale_cores,
running_cores: info.running_cores,
total_cores: info.total_cores,
waiting_compaction_bytes: info.waiting_compaction_bytes,
pending_compaction_bytes: info.pending_compaction_bytes,
}
}
}
2 changes: 2 additions & 0 deletions src/meta/src/hummock/compaction_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,14 @@ where
break compactor;
} else {
tracing::debug!("No available compactor, pausing compaction.");
self.hummock_manager.report_scale_compactor_info().await;
tokio::select! {
_ = self.compaction_resume_notifier.notified() => {},
_ = &mut shutdown_rx => {
return;
}
}
self.hummock_manager.report_scale_compactor_info().await;
}
};

Expand Down
13 changes: 12 additions & 1 deletion src/meta/src/hummock/level_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use itertools::Itertools;
use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockSstableId};
Expand Down Expand Up @@ -98,6 +98,17 @@ impl LevelHandler {
.sum::<u64>()
}

pub fn get_pending_output_file(&self) -> HashSet<u64> {
let mut pending_files = HashSet::default();
for t in &self.pending_tasks {
if t.target_level != self.level {
continue;
}
pending_files.extend(t.ssts.iter());
}
pending_files
}

pub fn get_pending_output_file_size(&self, target_level: u32) -> u64 {
self.pending_tasks
.iter()
Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ impl<S: MetaStore> HummockManager<S> {
.cloned()
}

pub async fn compaction_config(&self, id: CompactionGroupId) -> Option<CompactionConfig> {
self.compaction_group_manager
.read()
.await
.compaction_groups
.get(&id)
.map(|group| group.compaction_config.clone())
}

/// Registers `table_fragments` to compaction groups.
pub async fn register_table_fragments(
&self,
Expand Down
Loading