Skip to content

Commit

Permalink
feat(storage): shared buffer flush L0 by compaction group (risingwave…
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Jun 20, 2022
1 parent bd48bba commit f784ba3
Show file tree
Hide file tree
Showing 25 changed files with 265 additions and 104 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ message BarrierCompleteResponse {
string request_id = 1;
common.Status status = 2;
repeated CreateMviewProgress create_mview_progress = 3;
repeated hummock.SstableInfo sycned_sstables = 4;
message GroupedSstableInfo {
uint64 compaction_group_id = 1;
hummock.SstableInfo sst = 2;
}
repeated GroupedSstableInfo sycned_sstables = 4;
}

// Before starting streaming, the leader node broadcast the actor-host table to needed workers.
Expand Down
10 changes: 9 additions & 1 deletion src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::error::{tonic_err, Result as RwResult};
use risingwave_pb::catalog::Source;
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_stream::executor::{Barrier, Epoch};
Expand Down Expand Up @@ -160,7 +161,14 @@ impl StreamService for StreamServiceImpl {
request_id: req.request_id,
status: None,
create_mview_progress: collect_result.create_mview_progress,
sycned_sstables: collect_result.synced_sstables,
sycned_sstables: collect_result
.synced_sstables
.into_iter()
.map(|(compaction_group_id, sst)| GroupedSstableInfo {
compaction_group_id,
sst: Some(sst),
})
.collect_vec(),
}))
}

Expand Down
10 changes: 8 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::util::epoch::INVALID_EPOCH;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo};
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::data::Barrier;
Expand Down Expand Up @@ -324,9 +324,15 @@ where
// We must ensure all epochs are committed in ascending order, because
// the storage engine will query from new to old in the order in which
// the L0 layer files are generated. see https://github.com/singularity-data/risingwave/issues/1251
let synced_ssts = resps
let synced_ssts: Vec<LocalSstableInfo> = resps
.iter()
.flat_map(|resp| resp.sycned_sstables.clone())
.map(|grouped| {
(
grouped.compaction_group_id,
grouped.sst.expect("field not None"),
)
})
.collect_vec();
self.hummock_manager
.commit_epoch(command_context.prev_epoch.0, synced_ssts)
Expand Down
8 changes: 5 additions & 3 deletions src/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ mod tests {
use risingwave_pb::hummock::CompactTask;
use tokio::sync::mpsc::error::TryRecvError;

use crate::hummock::test_utils::{generate_test_tables, setup_compute_env};
use crate::hummock::test_utils::{
generate_test_tables, setup_compute_env, to_local_sstable_info,
};
use crate::hummock::{CompactorManager, HummockManager};
use crate::storage::MetaStore;

Expand All @@ -163,7 +165,7 @@ mod tests {
vec![hummock_manager_ref.get_new_table_id().await.unwrap()],
);
hummock_manager_ref
.commit_epoch(epoch, original_tables)
.commit_epoch(epoch, to_local_sstable_info(&original_tables))
.await
.unwrap();
}
Expand All @@ -179,7 +181,7 @@ mod tests {
is_target_ultimate_and_leveling: false,
task_status: false,
vnode_mappings: vec![],
compaction_group_id: StaticCompactionGroupId::SharedBuffer.into(),
compaction_group_id: StaticCompactionGroupId::StateDefault.into(),
existing_table_ids: vec![],
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/hummock/hummock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_hummock_sdk::compact::compact_task_to_string;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::{
get_remote_sst_id, CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockEpoch,
HummockRefCount, HummockSSTableId, HummockVersionId,
HummockRefCount, HummockSSTableId, HummockVersionId, LocalSstableInfo,
};
use risingwave_pb::common::ParallelUnitMapping;
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -837,8 +837,11 @@ where
pub async fn commit_epoch(
&self,
epoch: HummockEpoch,
sstables: Vec<SstableInfo>,
sstables: Vec<LocalSstableInfo>,
) -> Result<()> {
// TODO #2065: add SSTs to corresponding compaction groups' levels.
let sstables = sstables.into_iter().map(|(_, sst)| sst).collect_vec();

let mut versioning_guard = self.versioning.write().await;
let old_version = versioning_guard.current_version();
let versioning = versioning_guard.deref_mut();
Expand Down
30 changes: 15 additions & 15 deletions src/meta/src/hummock/hummock_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ async fn test_hummock_compaction_task() {
let epoch: u64 = 1;
let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, sst_num).await);
hummock_manager
.commit_epoch(epoch, original_tables.clone())
.commit_epoch(epoch, to_local_sstable_info(&original_tables))
.await
.unwrap();

Expand Down Expand Up @@ -295,7 +295,7 @@ async fn test_hummock_table() {
let epoch: u64 = 1;
let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await);
hummock_manager
.commit_epoch(epoch, original_tables.clone())
.commit_epoch(epoch, to_local_sstable_info(&original_tables))
.await
.unwrap();

Expand Down Expand Up @@ -349,7 +349,7 @@ async fn test_hummock_transaction() {

// Commit epoch1
hummock_manager
.commit_epoch(epoch1, tables_in_epoch1.clone())
.commit_epoch(epoch1, to_local_sstable_info(&tables_in_epoch1))
.await
.unwrap();
committed_tables.extend(tables_in_epoch1.clone());
Expand Down Expand Up @@ -396,7 +396,7 @@ async fn test_hummock_transaction() {

// Commit epoch2
hummock_manager
.commit_epoch(epoch2, tables_in_epoch2.clone())
.commit_epoch(epoch2, to_local_sstable_info(&tables_in_epoch2))
.await
.unwrap();
committed_tables.extend(tables_in_epoch2);
Expand Down Expand Up @@ -559,7 +559,7 @@ async fn test_hummock_manager_basic() {
let epoch: u64 = 1;
let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await);
hummock_manager
.commit_epoch(epoch, original_tables.clone())
.commit_epoch(epoch, to_local_sstable_info(&original_tables))
.await
.unwrap();

Expand Down Expand Up @@ -660,7 +660,7 @@ async fn test_retryable_pin_version() {
);
// Increase the version
hummock_manager
.commit_epoch(epoch, test_tables.clone())
.commit_epoch(epoch, to_local_sstable_info(&test_tables))
.await
.unwrap();
epoch += 1;
Expand Down Expand Up @@ -693,7 +693,7 @@ async fn test_retryable_pin_version() {
);
// Increase the version
hummock_manager
.commit_epoch(epoch, test_tables.clone())
.commit_epoch(epoch, to_local_sstable_info(&test_tables))
.await
.unwrap();
epoch += 1;
Expand Down Expand Up @@ -731,7 +731,7 @@ async fn test_pin_snapshot_response_lost() {
);
// [ ] -> [ e0 ]
hummock_manager
.commit_epoch(epoch, test_tables.clone())
.commit_epoch(epoch, to_local_sstable_info(&test_tables))
.await
.unwrap();
epoch += 1;
Expand All @@ -758,7 +758,7 @@ async fn test_pin_snapshot_response_lost() {
);
// [ e0:pinned ] -> [ e0:pinned, e1 ]
hummock_manager
.commit_epoch(epoch, test_tables.clone())
.commit_epoch(epoch, to_local_sstable_info(&test_tables))
.await
.unwrap();
epoch += 1;
Expand Down Expand Up @@ -794,7 +794,7 @@ async fn test_pin_snapshot_response_lost() {
);
// [ e0, e1:pinned ] -> [ e0, e1:pinned, e2 ]
hummock_manager
.commit_epoch(epoch, test_tables.clone())
.commit_epoch(epoch, to_local_sstable_info(&test_tables))
.await
.unwrap();
epoch += 1;
Expand All @@ -821,7 +821,7 @@ async fn test_pin_snapshot_response_lost() {
);
// [ e0, e1:pinned, e2:pinned ] -> [ e0, e1:pinned, e2:pinned, e3 ]
hummock_manager
.commit_epoch(epoch, test_tables.clone())
.commit_epoch(epoch, to_local_sstable_info(&test_tables))
.await
.unwrap();
epoch += 1;
Expand All @@ -847,7 +847,7 @@ async fn test_print_compact_task() {
let epoch: u64 = 1;
let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await);
hummock_manager
.commit_epoch(epoch, original_tables.clone())
.commit_epoch(epoch, to_local_sstable_info(&original_tables))
.await
.unwrap();

Expand Down Expand Up @@ -876,7 +876,7 @@ async fn test_invalid_sst_id() {
let epoch = 1;
let ssts = generate_test_tables(epoch, vec![HummockSSTableId::MAX]);
let error = hummock_manager
.commit_epoch(epoch, ssts.clone())
.commit_epoch(epoch, to_local_sstable_info(&ssts))
.await
.unwrap_err();
assert!(matches!(error, Error::InternalError(_)));
Expand Down Expand Up @@ -910,7 +910,7 @@ async fn test_mark_orphan_ssts() {
);
// Cannot commit_epoch for marked SST ids.
let error = hummock_manager
.commit_epoch(epoch, ssts.clone())
.commit_epoch(epoch, to_local_sstable_info(&ssts))
.await
.unwrap_err();
assert!(matches!(error, Error::InternalError(_)));
Expand Down Expand Up @@ -959,7 +959,7 @@ async fn test_trigger_manual_compaction() {
let epoch: u64 = 1;
let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, sst_num).await);
hummock_manager
.commit_epoch(epoch, original_tables.clone())
.commit_epoch(epoch, to_local_sstable_info(&original_tables))
.await
.unwrap();

Expand Down
14 changes: 10 additions & 4 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ use std::sync::Arc;
use anyhow::anyhow;
use async_trait::async_trait;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::{HummockContextId, HummockEpoch, HummockSSTableId, HummockVersionId};
use risingwave_hummock_sdk::{
HummockContextId, HummockEpoch, HummockSSTableId, HummockVersionId, LocalSstableInfo,
};
use risingwave_pb::hummock::{
CompactTask, CompactionGroup, HummockSnapshot, HummockVersion, SstableInfo,
SubscribeCompactTasksResponse, VacuumTask,
CompactTask, CompactionGroup, HummockSnapshot, HummockVersion, SubscribeCompactTasksResponse,
VacuumTask,
};
use risingwave_rpc_client::error::{Result, RpcError};
use risingwave_rpc_client::HummockMetaClient;
Expand Down Expand Up @@ -121,7 +123,11 @@ impl HummockMetaClient for MockHummockMetaClient {
.map_err(mock_err)
}

async fn commit_epoch(&self, epoch: HummockEpoch, sstables: Vec<SstableInfo>) -> Result<()> {
async fn commit_epoch(
&self,
epoch: HummockEpoch,
sstables: Vec<LocalSstableInfo>,
) -> Result<()> {
self.hummock_manager
.commit_epoch(epoch, sstables)
.await
Expand Down
12 changes: 9 additions & 3 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Duration;
use itertools::Itertools;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::key::key_with_epoch;
use risingwave_hummock_sdk::{HummockContextId, HummockEpoch, HummockSSTableId};
use risingwave_hummock_sdk::{HummockContextId, HummockEpoch, HummockSSTableId, LocalSstableInfo};
use risingwave_pb::common::{HostAddress, VNodeBitmap, WorkerNode, WorkerType};
use risingwave_pb::hummock::{HummockVersion, KeyRange, SstableInfo};

Expand All @@ -30,6 +30,12 @@ use crate::manager::MetaSrvEnv;
use crate::rpc::metrics::MetaMetrics;
use crate::storage::{MemStore, MetaStore};

pub fn to_local_sstable_info(ssts: &[SstableInfo]) -> Vec<LocalSstableInfo> {
ssts.iter()
.map(|sst| (StaticCompactionGroupId::StateDefault.into(), sst.clone()))
.collect_vec()
}

pub async fn add_test_tables<S>(
hummock_manager: &HummockManager<S>,
context_id: HummockContextId,
Expand All @@ -46,7 +52,7 @@ where
];
let test_tables = generate_test_tables(epoch, table_ids);
hummock_manager
.commit_epoch(epoch, test_tables.clone())
.commit_epoch(epoch, to_local_sstable_info(&test_tables))
.await
.unwrap();
// Current state: {v0: [], v1: [test_tables]}
Expand Down Expand Up @@ -80,7 +86,7 @@ where
vec![hummock_manager.get_new_table_id().await.unwrap()],
);
hummock_manager
.commit_epoch(epoch, test_tables_3.clone())
.commit_epoch(epoch, to_local_sstable_info(&test_tables_3))
.await
.unwrap();
// Current state: {v0: [], v1: [test_tables], v2: [test_tables_2, to_delete:test_tables], v3:
Expand Down
11 changes: 7 additions & 4 deletions src/rpc_client/src/hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
// limitations under the License.

use async_trait::async_trait;
use risingwave_hummock_sdk::{HummockEpoch, HummockSSTableId, HummockVersionId};
use risingwave_hummock_sdk::{HummockEpoch, HummockSSTableId, HummockVersionId, LocalSstableInfo};
use risingwave_pb::hummock::{
CompactTask, CompactionGroup, HummockVersion, SstableInfo, SubscribeCompactTasksResponse,
VacuumTask,
CompactTask, CompactionGroup, HummockVersion, SubscribeCompactTasksResponse, VacuumTask,
};
use tonic::Streaming;

Expand All @@ -32,7 +31,11 @@ pub trait HummockMetaClient: Send + Sync + 'static {
async fn get_new_table_id(&self) -> Result<HummockSSTableId>;
async fn report_compaction_task(&self, compact_task: CompactTask) -> Result<()>;
// We keep `commit_epoch` only for test/benchmark like ssbench.
async fn commit_epoch(&self, epoch: HummockEpoch, sstables: Vec<SstableInfo>) -> Result<()>;
async fn commit_epoch(
&self,
epoch: HummockEpoch,
sstables: Vec<LocalSstableInfo>,
) -> Result<()>;
async fn subscribe_compact_tasks(&self) -> Result<Streaming<SubscribeCompactTasksResponse>>;
async fn report_vacuum_task(&self, vacuum_task: VacuumTask) -> Result<()>;
async fn get_compaction_groups(&self) -> Result<Vec<CompactionGroup>>;
Expand Down
8 changes: 6 additions & 2 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use async_trait::async_trait;
use paste::paste;
use risingwave_common::catalog::{CatalogVersion, TableId};
use risingwave_common::util::addr::HostAddr;
use risingwave_hummock_sdk::{HummockEpoch, HummockSSTableId, HummockVersionId};
use risingwave_hummock_sdk::{HummockEpoch, HummockSSTableId, HummockVersionId, LocalSstableInfo};
use risingwave_pb::catalog::{
Database as ProstDatabase, Schema as ProstSchema, Source as ProstSource, Table as ProstTable,
};
Expand Down Expand Up @@ -405,7 +405,11 @@ impl HummockMetaClient for MetaClient {
Ok(())
}

async fn commit_epoch(&self, _epoch: HummockEpoch, _sstables: Vec<SstableInfo>) -> Result<()> {
async fn commit_epoch(
&self,
_epoch: HummockEpoch,
_sstables: Vec<LocalSstableInfo>,
) -> Result<()> {
panic!("Only meta service can commit_epoch in production.")
}

Expand Down
8 changes: 7 additions & 1 deletion src/storage/benches/bench_merge_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::executor::block_on;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_storage::hummock::iterator::{
BoxedForwardHummockIterator, Forward, HummockIterator, MergeIterator, OrderedAwareMergeIterator,
};
Expand All @@ -40,7 +41,12 @@ fn gen_interleave_shared_buffer_batch_iter(
HummockValue::put(Bytes::copy_from_slice("value".as_bytes())),
));
}
let batch = SharedBufferBatch::new(batch_data, 2333, buffer_tracker.clone());
let batch = SharedBufferBatch::new(
batch_data,
2333,
buffer_tracker.clone(),
StaticCompactionGroupId::StateDefault.into(),
);
iterators.push(Box::new(batch.into_forward_iter()) as BoxedForwardHummockIterator);
}
iterators
Expand Down
Loading

0 comments on commit f784ba3

Please sign in to comment.