diff --git a/proto/hummock.proto b/proto/hummock.proto index 2738b7c5baf6c..65b6c73f02291 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -21,6 +21,7 @@ message SstableInfo { uint64 min_epoch = 9; uint64 max_epoch = 10; uint64 uncompressed_file_size = 11; + uint64 range_tombstone_count = 12; } enum LevelType { diff --git a/src/meta/src/hummock/compaction/level_selector.rs b/src/meta/src/hummock/compaction/level_selector.rs index b7bb5f8caa339..cc466089d84c9 100644 --- a/src/meta/src/hummock/compaction/level_selector.rs +++ b/src/meta/src/hummock/compaction/level_selector.rs @@ -595,12 +595,8 @@ pub mod tests { }), file_size: (right - left + 1) as u64, table_ids: vec![table_prefix as u32], - meta_offset: 0, - stale_key_count: 0, - total_key_count: 0, uncompressed_file_size: (right - left + 1) as u64, - min_epoch: 0, - max_epoch: 0, + ..Default::default() } } @@ -625,12 +621,10 @@ pub mod tests { }), file_size: (right - left + 1) as u64, table_ids, - meta_offset: 0, - stale_key_count: 0, - total_key_count: 0, uncompressed_file_size: (right - left + 1) as u64, min_epoch, max_epoch, + ..Default::default() } } diff --git a/src/meta/src/hummock/compaction_schedule_policy.rs b/src/meta/src/hummock/compaction_schedule_policy.rs index 02937ee972ff1..ecfe45085d4fd 100644 --- a/src/meta/src/hummock/compaction_schedule_policy.rs +++ b/src/meta/src/hummock/compaction_schedule_policy.rs @@ -476,17 +476,11 @@ mod tests { level_idx: 0, level_type: 0, table_infos: vec![SstableInfo { - object_id: 0, - sst_id: 0, key_range: None, file_size: input_file_size, table_ids: vec![], - meta_offset: 0, - stale_key_count: 0, - total_key_count: 0, uncompressed_file_size: input_file_size, - min_epoch: 0, - max_epoch: 0, + ..Default::default() }], }], splits: vec![], diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 054ea9839ed42..069b69792bf3a 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -160,12 +160,8 @@ pub fn generate_test_tables(epoch: u64, sst_ids: Vec) -> }), file_size: 2, table_ids: vec![sst_id as u32, sst_id as u32 * 10000], - meta_offset: 0, - stale_key_count: 0, - total_key_count: 0, uncompressed_file_size: 2, - min_epoch: 0, - max_epoch: 0, + ..Default::default() }); } sst_info diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 033c64b6b5bf7..b0a69daf7c0c3 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -161,8 +161,7 @@ async fn test_read_version_basic() { stale_key_count: 1, total_key_count: 1, uncompressed_file_size: 1, - min_epoch: 0, - max_epoch: 0, + ..Default::default() }), LocalSstableInfo::for_test(SstableInfo { object_id: 2, @@ -178,8 +177,7 @@ async fn test_read_version_basic() { stale_key_count: 1, total_key_count: 1, uncompressed_file_size: 1, - min_epoch: 0, - max_epoch: 0, + ..Default::default() }), ], epoch_id_vec_for_clear, diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 78311981d9204..008abd00387b4 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -1081,14 +1081,8 @@ mod tests { right: end_full_key.encode(), right_exclusive: true, }), - file_size: 0, table_ids: vec![TEST_TABLE_ID.table_id], - meta_offset: 0, - stale_key_count: 0, - total_key_count: 0, - uncompressed_file_size: 0, - min_epoch: 0, - max_epoch: 0, + ..Default::default() })] } diff --git a/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs b/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs new file mode 100644 index 0000000000000..2e224fc23074b --- /dev/null +++ b/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs @@ -0,0 +1,275 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; + +use risingwave_hummock_sdk::key::{FullKey, PointRange, UserKey}; +use risingwave_hummock_sdk::HummockEpoch; +use risingwave_pb::hummock::SstableInfo; + +use crate::hummock::iterator::DeleteRangeIterator; +use crate::hummock::sstable_store::SstableStoreRef; +use crate::hummock::{HummockResult, SstableDeleteRangeIterator}; +use crate::monitor::StoreLocalStatistic; + +pub struct ConcatDeleteRangeIterator { + sstables: Vec, + current: Option, + idx: usize, + sstable_store: SstableStoreRef, + stats: StoreLocalStatistic, +} + +impl ConcatDeleteRangeIterator { + pub fn new(sstables: Vec, sstable_store: SstableStoreRef) -> Self { + Self { + sstables, + sstable_store, + stats: StoreLocalStatistic::default(), + idx: 0, + current: None, + } + } + + async fn next_inner(&mut self) -> HummockResult<()> { + if let Some(iter) = self.current.as_mut() { + if iter.is_valid() { + if iter.is_last_range() + && self.idx + 1 < self.sstables.len() + && self.sstables[self.idx + 1].range_tombstone_count > 0 + && iter + .next_extended_user_key() + .left_user_key + .eq(&FullKey::decode( + &self.sstables[self.idx].key_range.as_ref().unwrap().right, + ) + .user_key) + { + // When the last range of the current sstable is equal to the first range of the + // next sstable, the `next` method would return two same `PointRange`. So we + // must skip one. + let exclusive_range_start = iter.next_extended_user_key().is_exclude_left_key; + let last_key_in_sst_start = + iter.next_extended_user_key() + .left_user_key + .eq(&FullKey::decode( + &self.sstables[self.idx + 1].key_range.as_ref().unwrap().left, + ) + .user_key); + iter.next().await?; + if !iter.is_valid() && last_key_in_sst_start { + self.seek_idx(self.idx + 1, None).await?; + let next_range = self.next_extended_user_key(); + debug_assert!(self.is_valid()); + if next_range.is_exclude_left_key == exclusive_range_start + && next_range.left_user_key.eq(&FullKey::decode( + &self.sstables[self.idx].key_range.as_ref().unwrap().left, + ) + .user_key) + { + self.current.as_mut().unwrap().next().await?; + } + return Ok(()); + } + } else { + iter.next().await?; + } + let mut idx = self.idx; + while idx + 1 < self.sstables.len() && !self.is_valid() { + self.seek_idx(idx + 1, None).await?; + idx += 1; + } + } + } + Ok(()) + } + + /// Seeks to a table, and then seeks to the key if `seek_key` is given. + async fn seek_idx( + &mut self, + idx: usize, + seek_key: Option>, + ) -> HummockResult<()> { + self.current.take(); + if idx < self.sstables.len() { + if self.sstables[idx].range_tombstone_count == 0 { + return Ok(()); + } + let table = self + .sstable_store + .sstable(&self.sstables[idx], &mut self.stats) + .await?; + let mut sstable_iter = SstableDeleteRangeIterator::new(table); + + if let Some(key) = seek_key { + sstable_iter.seek(key).await?; + } else { + sstable_iter.rewind().await?; + } + self.current = Some(sstable_iter); + self.idx = idx; + } + Ok(()) + } +} + +impl DeleteRangeIterator for ConcatDeleteRangeIterator { + type NextFuture<'a> = impl Future> + 'a; + type RewindFuture<'a> = impl Future> + 'a; + type SeekFuture<'a> = impl Future> + 'a; + + fn next_extended_user_key(&self) -> PointRange<&[u8]> { + self.current.as_ref().unwrap().next_extended_user_key() + } + + fn current_epoch(&self) -> HummockEpoch { + self.current.as_ref().unwrap().current_epoch() + } + + fn next(&mut self) -> Self::NextFuture<'_> { + self.next_inner() + } + + fn rewind(&mut self) -> Self::RewindFuture<'_> { + async move { + let mut idx = 0; + self.seek_idx(idx, None).await?; + while idx + 1 < self.sstables.len() && !self.is_valid() { + self.seek_idx(idx + 1, None).await?; + idx += 1; + } + Ok(()) + } + } + + fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'_> { + async move { + let mut idx = self + .sstables + .partition_point(|sst| { + FullKey::decode(&sst.key_range.as_ref().unwrap().left) + .user_key + .le(&target_user_key) + }) + .saturating_sub(1); // considering the boundary of 0 + self.seek_idx(idx, Some(target_user_key)).await?; + while idx + 1 < self.sstables.len() && !self.is_valid() { + self.seek_idx(idx + 1, None).await?; + idx += 1; + } + Ok(()) + } + } + + fn is_valid(&self) -> bool { + self.current + .as_ref() + .map(|iter| iter.is_valid()) + .unwrap_or(false) + } +} +#[cfg(test)] +mod tests { + use risingwave_common::catalog::TableId; + + use super::*; + use crate::hummock::iterator::test_utils::mock_sstable_store; + use crate::hummock::test_utils::test_user_key; + use crate::hummock::{ + create_monotonic_events, CompactionDeleteRangesBuilder, DeleteRangeTombstone, + SstableBuilder, SstableBuilderOptions, SstableWriterOptions, + }; + + #[tokio::test] + async fn test_concat_iterator() { + let mut builder = CompactionDeleteRangesBuilder::default(); + let sstable_store = mock_sstable_store(); + let table_id = TableId::new(0); + let data = vec![ + DeleteRangeTombstone::new_for_test(table_id, b"aaaa".to_vec(), b"dddd".to_vec(), 10), + DeleteRangeTombstone::new( + table_id, + b"bbbb".to_vec(), + true, + b"eeee".to_vec(), + false, + 12, + ), + ]; + for range in data { + builder.add_delete_events(create_monotonic_events(vec![range])); + } + + let compaction_delete_range = builder.build_for_compaction(false); + let ranges1 = compaction_delete_range.get_tombstone_between( + test_user_key(b"aaaa").as_ref(), + test_user_key(b"bbbb").as_ref(), + ); + assert_eq!(ranges1.len(), 2); + let opts = SstableBuilderOptions::default(); + let mut builder = SstableBuilder::for_test( + 1, + sstable_store + .clone() + .create_sst_writer(1, SstableWriterOptions::default()), + opts.clone(), + ); + builder.add_monotonic_deletes(ranges1); + let output1 = builder.finish().await.unwrap(); + output1.writer_output.await.unwrap().unwrap(); + let mut builder = SstableBuilder::for_test( + 2, + sstable_store + .clone() + .create_sst_writer(2, SstableWriterOptions::default()), + opts.clone(), + ); + let ranges2 = compaction_delete_range + .get_tombstone_between(test_user_key(b"bbbb").as_ref(), test_user_key(b"").as_ref()); + assert_eq!(ranges2.len(), 3); + builder.add_monotonic_deletes(ranges2); + let output2 = builder.finish().await.unwrap(); + output2.writer_output.await.unwrap().unwrap(); + let mut concat_iterator = ConcatDeleteRangeIterator::new( + vec![output1.sst_info.sst_info, output2.sst_info.sst_info], + sstable_store, + ); + concat_iterator.rewind().await.unwrap(); + assert_eq!(concat_iterator.current_epoch(), HummockEpoch::MAX); + assert_eq!( + concat_iterator.next_extended_user_key().left_user_key, + test_user_key(b"aaaa").as_ref() + ); + concat_iterator.next().await.unwrap(); + assert_eq!(concat_iterator.current_epoch(), 10); + assert_eq!( + concat_iterator.next_extended_user_key().left_user_key, + test_user_key(b"bbbb").as_ref() + ); + concat_iterator.next().await.unwrap(); + assert_eq!(concat_iterator.current_epoch(), 10); + assert_eq!( + concat_iterator.next_extended_user_key().left_user_key, + test_user_key(b"dddd").as_ref() + ); + concat_iterator.next().await.unwrap(); + assert_eq!(concat_iterator.current_epoch(), 12); + assert_eq!( + concat_iterator.next_extended_user_key().left_user_key, + test_user_key(b"eeee").as_ref() + ); + concat_iterator.next().await.unwrap(); + assert!(!concat_iterator.is_valid()); + } +} diff --git a/src/storage/src/hummock/iterator/concat_inner.rs b/src/storage/src/hummock/iterator/concat_inner.rs index 79c0bd2fc96ef..3baa94497f5e9 100644 --- a/src/storage/src/hummock/iterator/concat_inner.rs +++ b/src/storage/src/hummock/iterator/concat_inner.rs @@ -16,49 +16,21 @@ use std::cmp::Ordering::{Equal, Greater, Less}; use std::future::Future; use std::sync::Arc; -use itertools::Itertools; -use risingwave_common::must_match; use risingwave_hummock_sdk::key::FullKey; use risingwave_pb::hummock::SstableInfo; use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection}; use crate::hummock::sstable::SstableIteratorReadOptions; -use crate::hummock::sstable_store::TableHolder; use crate::hummock::value::HummockValue; -use crate::hummock::{HummockResult, SstableIteratorType, SstableStore, SstableStoreRef}; +use crate::hummock::{HummockResult, SstableIteratorType, SstableStoreRef}; use crate::monitor::StoreLocalStatistic; -enum ConcatItem { - Unfetched(SstableInfo), - Prefetched(TableHolder), +fn smallest_key(sstable_info: &SstableInfo) -> &[u8] { + &sstable_info.key_range.as_ref().unwrap().left } -impl ConcatItem { - async fn prefetch( - &mut self, - sstable_store: &SstableStore, - stats: &mut StoreLocalStatistic, - ) -> HummockResult { - if let ConcatItem::Unfetched(sstable_info) = self { - let table = sstable_store.sstable(sstable_info, stats).await?; - *self = ConcatItem::Prefetched(table); - } - Ok(must_match!(self, ConcatItem::Prefetched(table) => table.clone())) - } - - fn smallest_key(&self) -> &[u8] { - match self { - ConcatItem::Unfetched(sstable_info) => &sstable_info.key_range.as_ref().unwrap().left, - ConcatItem::Prefetched(table_holder) => &table_holder.value().meta.smallest_key, - } - } - - fn largest_key(&self) -> &[u8] { - match self { - ConcatItem::Unfetched(sstable_info) => &sstable_info.key_range.as_ref().unwrap().right, - ConcatItem::Prefetched(table_holder) => &table_holder.value().meta.largest_key, - } - } +fn largest_key(sstable_info: &SstableInfo) -> &[u8] { + &sstable_info.key_range.as_ref().unwrap().right } /// Served as the concrete implementation of `ConcatIterator` and `BackwardConcatIterator`. @@ -70,7 +42,7 @@ pub struct ConcatIteratorInner { cur_idx: usize, /// All non-overlapping tables. - tables: Vec, + tables: Vec, sstable_store: SstableStoreRef, @@ -82,8 +54,8 @@ impl ConcatIteratorInner { /// Caller should make sure that `tables` are non-overlapping, /// arranged in ascending order when it serves as a forward iterator, /// and arranged in descending order when it serves as a backward iterator. - fn new_inner( - tables: Vec, + pub fn new( + tables: Vec, sstable_store: SstableStoreRef, read_options: Arc, ) -> Self { @@ -97,30 +69,6 @@ impl ConcatIteratorInner { } } - /// Caller should make sure that `tables` are non-overlapping, - /// arranged in ascending order when it serves as a forward iterator, - /// and arranged in descending order when it serves as a backward iterator. - pub fn new( - tables: Vec, - sstable_store: SstableStoreRef, - read_options: Arc, - ) -> Self { - let tables = tables.into_iter().map(ConcatItem::Unfetched).collect_vec(); - Self::new_inner(tables, sstable_store, read_options) - } - - /// Caller should make sure that `tables` are non-overlapping, - /// arranged in ascending order when it serves as a forward iterator, - /// and arranged in descending order when it serves as a backward iterator. - pub fn new_with_prefetch( - tables: Vec, - sstable_store: SstableStoreRef, - read_options: Arc, - ) -> Self { - let tables = tables.into_iter().map(ConcatItem::Prefetched).collect_vec(); - Self::new_inner(tables, sstable_store, read_options) - } - /// Seeks to a table, and then seeks to the key if `seek_key` is given. async fn seek_idx( &mut self, @@ -132,8 +80,9 @@ impl ConcatIteratorInner { old_iter.collect_local_statistic(&mut self.stats); } } else { - let table = self.tables[idx] - .prefetch(&self.sstable_store, &mut self.stats) + let table = self + .sstable_store + .sstable(&self.tables[idx], &mut self.stats) .await?; let mut sstable_iter = TI::create(table, self.sstable_store.clone(), self.read_options.clone()); @@ -198,14 +147,14 @@ impl HummockIterator for ConcatIteratorInner { .tables .partition_point(|table| match Self::Direction::direction() { DirectionEnum::Forward => { - let ord = FullKey::decode(table.smallest_key()).cmp(&key); + let ord = FullKey::decode(smallest_key(table)).cmp(&key); ord == Less || ord == Equal } DirectionEnum::Backward => { - let ord = FullKey::decode(table.largest_key()).cmp(&key); - - ord == Greater || ord == Equal + let ord = FullKey::decode(largest_key(table)).cmp(&key); + ord == Greater + || (ord == Equal && !table.key_range.as_ref().unwrap().right_exclusive) } }) .saturating_sub(1); // considering the boundary of 0 diff --git a/src/storage/src/hummock/iterator/delete_range_iterator.rs b/src/storage/src/hummock/iterator/delete_range_iterator.rs index f3ee1e8cbdf71..7936fb994a92a 100644 --- a/src/storage/src/hummock/iterator/delete_range_iterator.rs +++ b/src/storage/src/hummock/iterator/delete_range_iterator.rs @@ -13,12 +13,16 @@ // limitations under the License. use std::collections::{BTreeSet, BinaryHeap}; +use std::future::Future; use risingwave_hummock_sdk::key::{PointRange, UserKey}; use risingwave_hummock_sdk::HummockEpoch; +use risingwave_pb::hummock::SstableInfo; +use crate::hummock::iterator::concat_delete_range_iterator::ConcatDeleteRangeIterator; use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferDeleteRangeIterator; -use crate::hummock::SstableDeleteRangeIterator; +use crate::hummock::sstable_store::SstableStoreRef; +use crate::hummock::{HummockResult, SstableDeleteRangeIterator}; /// `DeleteRangeIterator` defines the interface of all delete-range iterators, which is used to /// filter keys deleted by some range tombstone @@ -27,6 +31,15 @@ use crate::hummock::SstableDeleteRangeIterator; /// - if you want to iterate from the beginning, you need to then call its `rewind` method. /// - if you want to iterate from some specific position, you need to then call its `seek` method. pub trait DeleteRangeIterator { + type NextFuture<'a>: Future> + Send + 'a + where + Self: 'a; + type RewindFuture<'a>: Future> + Send + 'a + where + Self: 'a; + type SeekFuture<'a>: Future> + Send + 'a + where + Self: 'a; /// Retrieves the next extended user key that changes current epoch. /// /// Note: @@ -62,7 +75,7 @@ pub trait DeleteRangeIterator { /// /// # Panics /// This function will panic if the iterator is invalid. - fn next(&mut self); + fn next(&mut self) -> Self::NextFuture<'_>; /// Resets the position of the iterator. /// @@ -70,7 +83,7 @@ pub trait DeleteRangeIterator { /// - Do not decide whether the position is valid or not by checking the returned error of this /// function. This function WON'T return an `Err` if invalid. You should check `is_valid` /// before starting iteration. - fn rewind(&mut self); + fn rewind(&mut self) -> Self::RewindFuture<'_>; /// Resets iterator and seeks to the first tombstone whose left-end >= provided key, we use this /// method to skip tombstones which do not overlap with the provided key. @@ -79,7 +92,7 @@ pub trait DeleteRangeIterator { /// - Do not decide whether the position is valid or not by checking the returned error of this /// function. This function WON'T return an `Err` if invalid. You should check `is_valid` /// before starting iteration. - fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>); + fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'_>; /// Indicates whether the iterator can be used. /// @@ -92,13 +105,19 @@ pub trait DeleteRangeIterator { pub enum RangeIteratorTyped { Sst(SstableDeleteRangeIterator), Batch(SharedBufferDeleteRangeIterator), + Concat(ConcatDeleteRangeIterator), } impl DeleteRangeIterator for RangeIteratorTyped { + type NextFuture<'a> = impl Future> + 'a; + type RewindFuture<'a> = impl Future> + 'a; + type SeekFuture<'a> = impl Future> + 'a; + fn next_extended_user_key(&self) -> PointRange<&[u8]> { match self { RangeIteratorTyped::Sst(sst) => sst.next_extended_user_key(), RangeIteratorTyped::Batch(batch) => batch.next_extended_user_key(), + RangeIteratorTyped::Concat(batch) => batch.next_extended_user_key(), } } @@ -106,35 +125,37 @@ impl DeleteRangeIterator for RangeIteratorTyped { match self { RangeIteratorTyped::Sst(sst) => sst.current_epoch(), RangeIteratorTyped::Batch(batch) => batch.current_epoch(), + RangeIteratorTyped::Concat(batch) => batch.current_epoch(), } } - fn next(&mut self) { - match self { - RangeIteratorTyped::Sst(sst) => { - sst.next(); - } - RangeIteratorTyped::Batch(batch) => { - batch.next(); + fn next(&mut self) -> Self::NextFuture<'_> { + async move { + match self { + RangeIteratorTyped::Sst(sst) => sst.next().await, + RangeIteratorTyped::Batch(batch) => batch.next().await, + RangeIteratorTyped::Concat(iter) => iter.next().await, } } } - fn rewind(&mut self) { - match self { - RangeIteratorTyped::Sst(sst) => { - sst.rewind(); - } - RangeIteratorTyped::Batch(batch) => { - batch.rewind(); + fn rewind(&mut self) -> Self::RewindFuture<'_> { + async move { + match self { + RangeIteratorTyped::Sst(sst) => sst.rewind().await, + RangeIteratorTyped::Batch(batch) => batch.rewind().await, + RangeIteratorTyped::Concat(iter) => iter.rewind().await, } } } - fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) { - match self { - RangeIteratorTyped::Sst(sst) => sst.seek(target_user_key), - RangeIteratorTyped::Batch(batch) => batch.seek(target_user_key), + fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'_> { + async move { + match self { + RangeIteratorTyped::Sst(sst) => sst.seek(target_user_key).await, + RangeIteratorTyped::Batch(batch) => batch.seek(target_user_key).await, + RangeIteratorTyped::Concat(iter) => iter.seek(target_user_key).await, + } } } @@ -142,6 +163,7 @@ impl DeleteRangeIterator for RangeIteratorTyped { match self { RangeIteratorTyped::Sst(sst) => sst.is_valid(), RangeIteratorTyped::Batch(batch) => batch.is_valid(), + RangeIteratorTyped::Concat(iter) => iter.is_valid(), } } } @@ -226,18 +248,34 @@ impl ForwardMergeRangeIterator { pub fn add_sst_iter(&mut self, iter: SstableDeleteRangeIterator) { self.unused_iters.push(RangeIteratorTyped::Sst(iter)); } + + pub fn add_concat_iter(&mut self, sstables: Vec, sstable_store: SstableStoreRef) { + self.unused_iters + .push(RangeIteratorTyped::Concat(ConcatDeleteRangeIterator::new( + sstables, + sstable_store, + ))) + } } impl ForwardMergeRangeIterator { - pub(super) fn next_until(&mut self, target_user_key: UserKey<&[u8]>) { + pub(super) async fn next_until( + &mut self, + target_user_key: UserKey<&[u8]>, + ) -> HummockResult<()> { let target_extended_user_key = PointRange::from_user_key(target_user_key, false); while self.is_valid() && self.next_extended_user_key().le(&target_extended_user_key) { - self.next(); + self.next().await?; } + Ok(()) } } impl DeleteRangeIterator for ForwardMergeRangeIterator { + type NextFuture<'a> = impl Future> + 'a; + type RewindFuture<'a> = impl Future> + 'a; + type SeekFuture<'a> = impl Future> + 'a; + fn next_extended_user_key(&self) -> PointRange<&[u8]> { self.heap.peek().unwrap().next_extended_user_key() } @@ -249,70 +287,74 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { .map_or(HummockEpoch::MIN, |epoch| *epoch) } - fn next(&mut self) { - self.tmp_buffer - .push(self.heap.pop().expect("no inner iter")); - while let Some(node) = self.heap.peek() - && node.is_valid() - && node.next_extended_user_key() == self.tmp_buffer[0].next_extended_user_key() - { - self.tmp_buffer.push(self.heap.pop().unwrap()); - } - for node in &self.tmp_buffer { - let epoch = node.current_epoch(); - if epoch != HummockEpoch::MAX { - self.current_epochs.remove(&epoch); + fn next(&mut self) -> Self::NextFuture<'_> { + async { + self.tmp_buffer + .push(self.heap.pop().expect("no inner iter")); + while let Some(node) = self.heap.peek() && node.is_valid() && node.next_extended_user_key() == self.tmp_buffer[0].next_extended_user_key() { + self.tmp_buffer.push(self.heap.pop().unwrap()); } - } - // Correct because ranges in an epoch won't intersect. - for mut node in std::mem::take(&mut self.tmp_buffer) { - node.next(); - if node.is_valid() { + for node in &self.tmp_buffer { let epoch = node.current_epoch(); if epoch != HummockEpoch::MAX { - self.current_epochs.insert(epoch); + self.current_epochs.remove(&epoch); } - self.heap.push(node); - } else { - // Put back to `unused_iters` - self.unused_iters.push(node); } + // Correct because ranges in an epoch won't intersect. + for mut node in std::mem::take(&mut self.tmp_buffer) { + node.next().await?; + if node.is_valid() { + let epoch = node.current_epoch(); + if epoch != HummockEpoch::MAX { + self.current_epochs.insert(epoch); + } + self.heap.push(node); + } else { + // Put back to `unused_iters` + self.unused_iters.push(node); + } + } + Ok(()) } } - fn rewind(&mut self) { - self.current_epochs.clear(); - self.unused_iters.extend(self.heap.drain()); - for mut node in self.unused_iters.drain(..) { - node.rewind(); - if node.is_valid() { - let epoch = node.current_epoch(); - if epoch != HummockEpoch::MAX { - self.current_epochs.insert(epoch); + fn rewind(&mut self) -> Self::RewindFuture<'_> { + async move { + self.current_epochs.clear(); + self.unused_iters.extend(self.heap.drain()); + for mut node in self.unused_iters.drain(..) { + node.rewind().await?; + if node.is_valid() { + let epoch = node.current_epoch(); + if epoch != HummockEpoch::MAX { + self.current_epochs.insert(epoch); + } + self.heap.push(node); } - self.heap.push(node); } + Ok(()) } } - fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) { - self.current_epochs.clear(); - self.unused_iters.extend(self.heap.drain()); - self.heap = self - .unused_iters - .drain_filter(|node| { - node.seek(target_user_key); + fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'_> { + async move { + self.current_epochs.clear(); + let mut iters = std::mem::take(&mut self.unused_iters); + iters.extend(self.heap.drain()); + for mut node in iters { + node.seek(target_user_key).await?; if node.is_valid() { let epoch = node.current_epoch(); if epoch != HummockEpoch::MAX { self.current_epochs.insert(epoch); } - true + self.heap.push(node); } else { - false + self.unused_iters.push(node); } - }) - .collect(); + } + Ok(()) + } } fn is_valid(&self) -> bool { diff --git a/src/storage/src/hummock/iterator/forward_user.rs b/src/storage/src/hummock/iterator/forward_user.rs index ed46c7af60ac5..30f036a521ebd 100644 --- a/src/storage/src/hummock/iterator/forward_user.rs +++ b/src/storage/src/hummock/iterator/forward_user.rs @@ -103,7 +103,7 @@ impl> UserIterator { // handle delete operation match self.iterator.value() { HummockValue::Put(val) => { - self.delete_range_iter.next_until(full_key.user_key); + self.delete_range_iter.next_until(full_key.user_key).await?; if self.delete_range_iter.current_epoch() >= epoch { self.stats.skip_delete_key_count += 1; } else { @@ -170,12 +170,12 @@ impl> UserIterator { epoch: self.read_epoch, }; self.iterator.seek(full_key.to_ref()).await?; - self.delete_range_iter.seek(begin_key.as_ref()); + self.delete_range_iter.seek(begin_key.as_ref()).await?; } Excluded(_) => unimplemented!("excluded begin key is not supported"), Unbounded => { self.iterator.rewind().await?; - self.delete_range_iter.rewind(); + self.delete_range_iter.rewind().await?; } }; @@ -206,7 +206,7 @@ impl> UserIterator { epoch: self.read_epoch, }; self.iterator.seek(full_key).await?; - self.delete_range_iter.seek(full_key.user_key); + self.delete_range_iter.seek(full_key.user_key).await?; // Handle multi-version self.last_key = FullKey::default(); diff --git a/src/storage/src/hummock/iterator/mod.rs b/src/storage/src/hummock/iterator/mod.rs index 4acc79104e84d..d1e61a8621b57 100644 --- a/src/storage/src/hummock/iterator/mod.rs +++ b/src/storage/src/hummock/iterator/mod.rs @@ -38,9 +38,11 @@ use risingwave_hummock_sdk::key::FullKey; use crate::hummock::iterator::HummockIteratorUnion::{First, Fourth, Second, Third}; +mod concat_delete_range_iterator; mod delete_range_iterator; #[cfg(any(test, feature = "test"))] pub mod test_utils; + pub use delete_range_iterator::{ DeleteRangeIterator, ForwardMergeRangeIterator, RangeIteratorTyped, }; diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 07f42581acfe3..32202a7c1b136 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -792,6 +792,10 @@ impl SharedBufferDeleteRangeIterator { } impl DeleteRangeIterator for SharedBufferDeleteRangeIterator { + type NextFuture<'a> = impl Future> + 'a; + type RewindFuture<'a> = impl Future> + 'a; + type SeekFuture<'a> = impl Future> + 'a; + fn next_extended_user_key(&self) -> PointRange<&[u8]> { self.inner.monotonic_tombstone_events[self.next_idx] .event_key @@ -806,21 +810,30 @@ impl DeleteRangeIterator for SharedBufferDeleteRangeIterator { } } - fn next(&mut self) { - self.next_idx += 1; + fn next(&mut self) -> Self::NextFuture<'_> { + async move { + self.next_idx += 1; + Ok(()) + } } - fn rewind(&mut self) { - self.next_idx = 0; + fn rewind(&mut self) -> Self::RewindFuture<'_> { + async move { + self.next_idx = 0; + Ok(()) + } } - fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) { - let target_extended_user_key = PointRange::from_user_key(target_user_key, false); - self.next_idx = self.inner.monotonic_tombstone_events.partition_point( - |MonotonicDeleteEvent { event_key, .. }| { - event_key.as_ref().le(&target_extended_user_key) - }, - ); + fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'a> { + async move { + let target_extended_user_key = PointRange::from_user_key(target_user_key, false); + self.next_idx = self.inner.monotonic_tombstone_events.partition_point( + |MonotonicDeleteEvent { event_key, .. }| { + event_key.as_ref().le(&target_extended_user_key) + }, + ); + Ok(()) + } } fn is_valid(&self) -> bool { diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index c4ce5221146c7..57c48e484862c 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -463,6 +463,7 @@ impl SstableBuilder { uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64, min_epoch: cmp::min(min_epoch, tombstone_min_epoch), max_epoch: cmp::max(max_epoch, tombstone_max_epoch), + range_tombstone_count: meta.monotonic_tombstone_events.len() as u64, }; tracing::trace!( "meta_size {} bloom_filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}", diff --git a/src/storage/src/hummock/sstable/delete_range_aggregator.rs b/src/storage/src/hummock/sstable/delete_range_aggregator.rs index e33be3dec456e..3dc422fd23fb6 100644 --- a/src/storage/src/hummock/sstable/delete_range_aggregator.rs +++ b/src/storage/src/hummock/sstable/delete_range_aggregator.rs @@ -14,6 +14,7 @@ use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; +use std::future::Future; use std::sync::Arc; use itertools::Itertools; @@ -25,7 +26,7 @@ use super::DeleteRangeTombstone; use super::MonotonicDeleteEvent; use crate::hummock::iterator::DeleteRangeIterator; use crate::hummock::sstable_store::TableHolder; -use crate::hummock::Sstable; +use crate::hummock::{HummockResult, Sstable}; pub struct SortedBoundary { sequence: HummockEpoch, @@ -219,7 +220,7 @@ impl CompactionDeleteRanges { } } - /// the `largest_user_key` always mean that + /// the `largest_user_key` is always exclusive pub(crate) fn get_tombstone_between( &self, smallest_user_key: UserKey<&[u8]>, @@ -249,7 +250,6 @@ impl CompactionDeleteRanges { idx += 1; } while idx < self.events.len() { - // TODO: replace it with Bound if !extended_largest_user_key.is_empty() && self.events[idx].0.as_ref().ge(&extended_largest_user_key) { @@ -356,9 +356,26 @@ impl SstableDeleteRangeIterator { pub fn new(table: TableHolder) -> Self { Self { table, next_idx: 0 } } + + /// Retrieves whether `next_extended_user_key` is the last range of this SST file. + /// + /// Note: + /// - Before calling this function, makes sure the iterator `is_valid`. + /// - This function should return immediately. + /// + /// # Panics + /// This function will panic if the iterator is invalid. + pub fn is_last_range(&self) -> bool { + debug_assert!(self.next_idx < self.table.value().meta.monotonic_tombstone_events.len()); + self.next_idx + 1 == self.table.value().meta.monotonic_tombstone_events.len() + } } impl DeleteRangeIterator for SstableDeleteRangeIterator { + type NextFuture<'a> = impl Future> + 'a; + type RewindFuture<'a> = impl Future> + 'a; + type SeekFuture<'a> = impl Future> + 'a; + fn next_extended_user_key(&self) -> PointRange<&[u8]> { self.table.value().meta.monotonic_tombstone_events[self.next_idx] .event_key @@ -373,24 +390,33 @@ impl DeleteRangeIterator for SstableDeleteRangeIterator { } } - fn next(&mut self) { - self.next_idx += 1; + fn next(&mut self) -> Self::NextFuture<'_> { + async move { + self.next_idx += 1; + Ok(()) + } } - fn rewind(&mut self) { - self.next_idx = 0; + fn rewind(&mut self) -> Self::RewindFuture<'_> { + async move { + self.next_idx = 0; + Ok(()) + } } - fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) { - let target_extended_user_key = PointRange::from_user_key(target_user_key, false); - self.next_idx = self - .table - .value() - .meta - .monotonic_tombstone_events - .partition_point(|MonotonicDeleteEvent { event_key, .. }| { - event_key.as_ref().le(&target_extended_user_key) - }); + fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'_> { + async move { + let target_extended_user_key = PointRange::from_user_key(target_user_key, false); + self.next_idx = self + .table + .value() + .meta + .monotonic_tombstone_events + .partition_point(|MonotonicDeleteEvent { event_key, .. }| { + event_key.as_ref().le(&target_extended_user_key) + }); + Ok(()) + } } fn is_valid(&self) -> bool { diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index 0f460d8f2d0da..2674fd2093d9e 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -307,13 +307,10 @@ impl Sstable { right_exclusive: false, }), file_size: self.meta.estimated_size as u64, - table_ids: vec![], meta_offset: self.meta.meta_offset, - stale_key_count: 0, total_key_count: self.meta.key_count as u64, uncompressed_file_size: self.meta.estimated_size as u64, - min_epoch: 0, - max_epoch: 0, + ..Default::default() } } } diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 5c48bbff65127..a69b08c8afb12 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -538,6 +538,16 @@ pub struct SstableWriterOptions { pub policy: CachePolicy, } +impl Default for SstableWriterOptions { + fn default() -> Self { + Self { + capacity_hint: None, + tracker: None, + policy: CachePolicy::NotFill, + } + } +} + pub trait SstableWriterFactory: Send + Sync { type Writer: SstableWriter; diff --git a/src/storage/src/hummock/store/state_store.rs b/src/storage/src/hummock/store/state_store.rs index 86eeab71e4094..d36a1c5008cef 100644 --- a/src/storage/src/hummock/store/state_store.rs +++ b/src/storage/src/hummock/store/state_store.rs @@ -467,7 +467,7 @@ type HummockStorageIteratorPayload = UnorderedMergeIteratorInner< HummockIteratorUnion< Forward, StagingDataIterator, - OrderedMergeIteratorInner, + SstableIterator, ConcatIteratorInner, >, >; diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index bbe0f96eeafe7..d7848db9d4547 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -729,7 +729,18 @@ impl HummockVersionReader { let mut non_overlapping_iters = Vec::new(); let mut overlapping_iters = Vec::new(); let mut overlapping_iter_count = 0; - let mut fetch_meta_reqs = vec![]; + let timer = self + .state_store_metrics + .iter_fetch_meta_duration + .with_label_values(&[table_id_label]) + .start_timer(); + + let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options); + if read_options.prefetch_options.exhaust_iter { + sst_read_options.must_iterated_end_user_key = + Some(user_key_range.1.map(|key| key.cloned())); + } + let sst_read_options = Arc::new(sst_read_options); for level in committed.levels(read_options.table_id) { if level.table_infos.is_empty() { continue; @@ -737,111 +748,67 @@ impl HummockVersionReader { if level.level_type == LevelType::Nonoverlapping as i32 { let table_infos = prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref); - - let fetch_meta_req = table_infos + let sstables = table_infos .filter(|sstable_info| { sstable_info .table_ids .binary_search(&read_options.table_id.table_id) .is_ok() }) + .cloned() .collect_vec(); - fetch_meta_reqs.push((level.level_type, fetch_meta_req)); - } else { - let table_infos = prune_overlapping_ssts( - &level.table_infos, - read_options.table_id, - &table_key_range, - ); - // Overlapping - let fetch_meta_req = table_infos.rev().collect_vec(); - if !fetch_meta_req.is_empty() { - fetch_meta_reqs.push((level.level_type, fetch_meta_req)); + if sstables.is_empty() { + continue; } - } - } - let mut flatten_reqs = vec![]; - let mut req_count = 0; - for (_, fetch_meta_req) in &fetch_meta_reqs { - for sstable_info in fetch_meta_req { - let inner_req_count = req_count; - let capture_ref = async { - // We would fill block to high priority cache for level-0 - self.sstable_store - .sstable_syncable(sstable_info, &local_stats) + if sstables.len() > 1 { + delete_range_iter.add_concat_iter(sstables.clone(), self.sstable_store.clone()); + non_overlapping_iters.push(ConcatIterator::new( + sstables, + self.sstable_store.clone(), + sst_read_options.clone(), + )); + } else { + let sstable = self + .sstable_store + .sstable(&sstables[0], &mut local_stats) .in_span(Span::enter_with_local_parent("get_sstable")) - .await - }; - // use `buffer_unordered` to simulate `try_join_all` by assigning an index - flatten_reqs - .push(async move { capture_ref.await.map(|result| (inner_req_count, result)) }); - req_count += 1; - } - } - let timer = self - .state_store_metrics - .iter_fetch_meta_duration - .with_label_values(&[table_id_label]) - .start_timer(); - let mut local_cache_meta_block_unhit = 0; - let mut flatten_resps = vec![None; req_count]; - for flatten_req in flatten_reqs { - let (req_index, resp) = flatten_req.await?; - local_cache_meta_block_unhit += resp.2; - flatten_resps[req_count - req_index - 1] = Some(resp); - } - let fetch_meta_duration_sec = timer.stop_and_record(); - self.state_store_metrics - .iter_fetch_meta_cache_unhits - .set(local_cache_meta_block_unhit as i64); - if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND { - tracing::warn!("Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.", table_id_string, epoch, fetch_meta_duration_sec, local_cache_meta_block_unhit); - self.state_store_metrics - .iter_slow_fetch_meta_cache_unhits - .set(local_cache_meta_block_unhit as i64); - } - - let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options); - if read_options.prefetch_options.exhaust_iter { - sst_read_options.must_iterated_end_user_key = - Some(user_key_range.1.map(|key| key.cloned())); - } - let sst_read_options = Arc::new(sst_read_options); - - for (level_type, fetch_meta_req) in fetch_meta_reqs { - if level_type == LevelType::Nonoverlapping as i32 { - let mut sstables = vec![]; - for sstable_info in fetch_meta_req { - let (sstable, local_cache_meta_block_miss, ..) = - flatten_resps.pop().unwrap().unwrap(); - assert_eq!(sstable_info.get_object_id(), sstable.value().id); - local_stats.apply_meta_fetch(local_cache_meta_block_miss); + .await?; if !sstable.value().meta.monotonic_tombstone_events.is_empty() && !read_options.ignore_range_tombstone { delete_range_iter .add_sst_iter(SstableDeleteRangeIterator::new(sstable.clone())); } - if let Some(key_hash) = bloom_filter_prefix_hash.as_ref() { - if !hit_sstable_bloom_filter(sstable.value(), *key_hash, &mut local_stats) { + if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() { + if !hit_sstable_bloom_filter(sstable.value(), *dist_hash, &mut local_stats) + { continue; } } - sstables.push(sstable); + overlapping_iters.push(SstableIterator::new( + sstable, + self.sstable_store.clone(), + sst_read_options.clone(), + )); } - - non_overlapping_iters.push(ConcatIterator::new_with_prefetch( - sstables, - self.sstable_store.clone(), - sst_read_options.clone(), - )); } else { - let mut iters = Vec::new(); + let table_infos = prune_overlapping_ssts( + &level.table_infos, + read_options.table_id, + &table_key_range, + ); + // Overlapping + let fetch_meta_req = table_infos.rev().collect_vec(); + if fetch_meta_req.is_empty() { + continue; + } for sstable_info in fetch_meta_req { - let (sstable, local_cache_meta_block_miss, ..) = - flatten_resps.pop().unwrap().unwrap(); + let sstable = self + .sstable_store + .sstable(sstable_info, &mut local_stats) + .in_span(Span::enter_with_local_parent("get_sstable")) + .await?; assert_eq!(sstable_info.get_object_id(), sstable.value().id); - local_stats.apply_meta_fetch(local_cache_meta_block_miss); if !sstable.value().meta.monotonic_tombstone_events.is_empty() && !read_options.ignore_range_tombstone { @@ -854,16 +821,23 @@ impl HummockVersionReader { continue; } } - iters.push(SstableIterator::new( + overlapping_iters.push(SstableIterator::new( sstable, self.sstable_store.clone(), sst_read_options.clone(), )); overlapping_iter_count += 1; } - overlapping_iters.push(OrderedMergeIteratorInner::new(iters)); } } + let fetch_meta_duration_sec = timer.stop_and_record(); + if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND { + tracing::warn!("Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.", + table_id_string, epoch, fetch_meta_duration_sec, local_stats.cache_meta_block_miss); + self.state_store_metrics + .iter_slow_fetch_meta_cache_unhits + .set(local_stats.cache_meta_block_miss as i64); + } local_stats.overlapping_iter_count = overlapping_iter_count; local_stats.non_overlapping_iter_count = non_overlapping_iters.len() as u64; diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 3c801055c8d48..dbc4a3d2f5054 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -113,12 +113,8 @@ pub fn gen_dummy_sst_info( }), file_size, table_ids: vec![], - meta_offset: 0, - stale_key_count: 0, - total_key_count: 0, uncompressed_file_size: file_size, - min_epoch: 0, - max_epoch: 0, + ..Default::default() } } @@ -191,13 +187,9 @@ pub async fn put_sst( right_exclusive: false, }), file_size: meta.estimated_size as u64, - table_ids: vec![], meta_offset: meta.meta_offset, - stale_key_count: 0, - total_key_count: 0, uncompressed_file_size: meta.estimated_size as u64, - min_epoch: 0, - max_epoch: 0, + ..Default::default() }; let writer_output = writer.finish(meta).await?; writer_output.await.unwrap()?;