diff --git a/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs b/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs index 2e224fc23074b..b037bd8b6e1f1 100644 --- a/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs +++ b/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs @@ -12,33 +12,166 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::VecDeque; use std::future::Future; +use std::ops::Bound; +use std::sync::Arc; use risingwave_hummock_sdk::key::{FullKey, PointRange, UserKey}; use risingwave_hummock_sdk::HummockEpoch; use risingwave_pb::hummock::SstableInfo; use crate::hummock::iterator::DeleteRangeIterator; -use crate::hummock::sstable_store::SstableStoreRef; -use crate::hummock::{HummockResult, SstableDeleteRangeIterator}; +use crate::hummock::sstable::SstableIteratorReadOptions; +use crate::hummock::sstable_store::{SstableStoreRef, TableHolder}; +use crate::hummock::{ + HummockResult, SstableDeleteRangeIterator, SstableMetaResponse, SstableStore, +}; use crate::monitor::StoreLocalStatistic; +enum MetaFetcher { + Simple, + Prefetch(PrefetchContext), +} + +impl MetaFetcher { + async fn get_meta( + &mut self, + sstables: &[SstableInfo], + sst_idx: usize, + sstable_store: &SstableStore, + stats: &mut StoreLocalStatistic, + ) -> HummockResult { + match self { + MetaFetcher::Simple => sstable_store.sstable(&sstables[sst_idx], stats).await, + MetaFetcher::Prefetch(context) => { + context + .get_meta(sstables, sst_idx, sstable_store, stats) + .await + } + } + } +} + +struct PrefetchContext { + prefetched_metas: VecDeque<(usize, SstableMetaResponse)>, + + /// sst[idx..=dest_idx] will definitely be visited in the future. + dest_idx: usize, +} + +const DEFAULT_PREFETCH_META_NUM: usize = 1; + +impl PrefetchContext { + fn new(dest_idx: usize) -> Self { + Self { + prefetched_metas: VecDeque::with_capacity(DEFAULT_PREFETCH_META_NUM + 1), + dest_idx, + } + } + + async fn get_meta( + &mut self, + sstables: &[SstableInfo], + idx: usize, + sstable_store: &SstableStore, + stats: &mut StoreLocalStatistic, + ) -> HummockResult { + let is_empty = if let Some((prefetched_idx, _)) = self.prefetched_metas.front() { + if *prefetched_idx == idx { + false + } else { + tracing::warn!( + "prefetch mismatch: sst_idx = {}, prefetched_sst_idx = {}", + idx, + *prefetched_idx + ); + self.prefetched_metas.clear(); + true + } + } else { + true + }; + if is_empty { + self.prefetched_metas.push_back(( + idx, + sstable_store.get_sstable_response(&sstables[idx], stats), + )); + } + let sstable_response = self.prefetched_metas.pop_front().unwrap().1; + + let next_prefetch_idx = self + .prefetched_metas + .back() + .map_or(idx, |(latest_idx, _)| *latest_idx) + + 1; + if next_prefetch_idx <= self.dest_idx { + self.prefetched_metas.push_back(( + next_prefetch_idx, + sstable_store.get_sstable_response(&sstables[next_prefetch_idx], stats), + )); + } + + sstable_response.await + } +} + pub struct ConcatDeleteRangeIterator { sstables: Vec, current: Option, idx: usize, + /// simple or prefetch strategy + meta_fetcher: MetaFetcher, sstable_store: SstableStoreRef, stats: StoreLocalStatistic, + options: Arc, } impl ConcatDeleteRangeIterator { - pub fn new(sstables: Vec, sstable_store: SstableStoreRef) -> Self { + pub fn new( + sstables: Vec, + sstable_store: SstableStoreRef, + options: Arc, + ) -> Self { Self { sstables, sstable_store, stats: StoreLocalStatistic::default(), idx: 0, current: None, + options, + meta_fetcher: MetaFetcher::Simple, + } + } + + fn init_meta_fetcher(&mut self, start_idx: usize) { + if let Some(bound) = self.options.must_iterated_end_user_key.as_ref() { + let sstables = &self.sstables; + let next_to_start_idx = start_idx + 1; + if next_to_start_idx < sstables.len() { + let dest_idx = match bound { + Bound::Unbounded => sstables.len() - 1, // will not overflow + Bound::Included(dest_key) => { + start_idx + + sstables[next_to_start_idx..].partition_point(|sstable_info| { + FullKey::decode(&sstable_info.key_range.as_ref().unwrap().left) + .user_key + <= dest_key.as_ref() + }) + } + Bound::Excluded(end_key) => { + start_idx + + sstables[next_to_start_idx..].partition_point(|sstable_info| { + FullKey::decode(&sstable_info.key_range.as_ref().unwrap().left) + .user_key + < end_key.as_ref() + }) + } + }; + if start_idx < dest_idx { + self.meta_fetcher = MetaFetcher::Prefetch(PrefetchContext::new(dest_idx)); + } + } } } @@ -107,8 +240,8 @@ impl ConcatDeleteRangeIterator { return Ok(()); } let table = self - .sstable_store - .sstable(&self.sstables[idx], &mut self.stats) + .meta_fetcher + .get_meta(&self.sstables, idx, &self.sstable_store, &mut self.stats) .await?; let mut sstable_iter = SstableDeleteRangeIterator::new(table); @@ -144,6 +277,7 @@ impl DeleteRangeIterator for ConcatDeleteRangeIterator { fn rewind(&mut self) -> Self::RewindFuture<'_> { async move { let mut idx = 0; + self.init_meta_fetcher(idx); self.seek_idx(idx, None).await?; while idx + 1 < self.sstables.len() && !self.is_valid() { self.seek_idx(idx + 1, None).await?; @@ -163,6 +297,7 @@ impl DeleteRangeIterator for ConcatDeleteRangeIterator { .le(&target_user_key) }) .saturating_sub(1); // considering the boundary of 0 + self.init_meta_fetcher(idx); self.seek_idx(idx, Some(target_user_key)).await?; while idx + 1 < self.sstables.len() && !self.is_valid() { self.seek_idx(idx + 1, None).await?; @@ -244,6 +379,7 @@ mod tests { let mut concat_iterator = ConcatDeleteRangeIterator::new( vec![output1.sst_info.sst_info, output2.sst_info.sst_info], sstable_store, + Arc::new(SstableIteratorReadOptions::default()), ); concat_iterator.rewind().await.unwrap(); assert_eq!(concat_iterator.current_epoch(), HummockEpoch::MAX); diff --git a/src/storage/src/hummock/iterator/delete_range_iterator.rs b/src/storage/src/hummock/iterator/delete_range_iterator.rs index 7936fb994a92a..b8ef3608f3b34 100644 --- a/src/storage/src/hummock/iterator/delete_range_iterator.rs +++ b/src/storage/src/hummock/iterator/delete_range_iterator.rs @@ -14,6 +14,7 @@ use std::collections::{BTreeSet, BinaryHeap}; use std::future::Future; +use std::sync::Arc; use risingwave_hummock_sdk::key::{PointRange, UserKey}; use risingwave_hummock_sdk::HummockEpoch; @@ -21,6 +22,7 @@ use risingwave_pb::hummock::SstableInfo; use crate::hummock::iterator::concat_delete_range_iterator::ConcatDeleteRangeIterator; use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferDeleteRangeIterator; +use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::{HummockResult, SstableDeleteRangeIterator}; @@ -249,11 +251,17 @@ impl ForwardMergeRangeIterator { self.unused_iters.push(RangeIteratorTyped::Sst(iter)); } - pub fn add_concat_iter(&mut self, sstables: Vec, sstable_store: SstableStoreRef) { + pub fn add_concat_iter( + &mut self, + sstables: Vec, + sstable_store: SstableStoreRef, + options: Arc, + ) { self.unused_iters .push(RangeIteratorTyped::Concat(ConcatDeleteRangeIterator::new( sstables, sstable_store, + options, ))) } } diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 53faeb8f6bb3c..8e59211876c13 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -45,6 +45,7 @@ const MAX_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards const MIN_BUFFER_SIZE_PER_SHARD: usize = 256 * 1024 * 1024; // 256MB pub type TableHolder = CacheableEntry>; +pub type SstableMetaResponse = LookupResponse, HummockError>; // BEGIN section for tiered cache @@ -355,13 +356,13 @@ impl SstableStore { self.meta_cache.clear(); } - /// Returns `table_holder`, `local_cache_meta_block_miss` (1 if cache miss) and + /// Returns `LookupResponse` of sstable, `local_cache_meta_block_miss` (1 if cache miss) and /// `local_cache_meta_block_unhit` (1 if not cache hit). - pub async fn sstable_syncable( + pub fn sstable_lookup_response( &self, sst: &SstableInfo, stats: &StoreLocalStatistic, - ) -> HummockResult<(TableHolder, u64, u64)> { + ) -> (SstableMetaResponse, u64, u64) { let mut local_cache_meta_block_miss = 0; let mut local_cache_meta_block_unhit = 0; let object_id = sst.get_object_id(); @@ -398,6 +399,34 @@ impl SstableStore { if !matches!(lookup_response, LookupResponse::Cached(..)) { local_cache_meta_block_unhit += 1; } + + ( + lookup_response, + local_cache_meta_block_miss, + local_cache_meta_block_unhit, + ) + } + + pub fn get_sstable_response( + &self, + sst: &SstableInfo, + stats: &mut StoreLocalStatistic, + ) -> SstableMetaResponse { + let (lookup_response, local_cache_meta_block_miss, _) = + self.sstable_lookup_response(sst, stats); + stats.apply_meta_fetch(local_cache_meta_block_miss); + lookup_response + } + + /// Returns `table_holder`, `local_cache_meta_block_miss` (1 if cache miss) and + /// `local_cache_meta_block_unhit` (1 if not cache hit). + pub async fn sstable_syncable( + &self, + sst: &SstableInfo, + stats: &StoreLocalStatistic, + ) -> HummockResult<(TableHolder, u64, u64)> { + let (lookup_response, local_cache_meta_block_miss, local_cache_meta_block_unhit) = + self.sstable_lookup_response(sst, stats); let result = lookup_response .verbose_instrument_await("meta_cache_lookup") .await; diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index d7848db9d4547..16f363132273c 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -761,7 +761,15 @@ impl HummockVersionReader { continue; } if sstables.len() > 1 { - delete_range_iter.add_concat_iter(sstables.clone(), self.sstable_store.clone()); + delete_range_iter.add_concat_iter( + sstables + .iter() + .filter(|sst| sst.get_range_tombstone_count() > 0) + .cloned() + .collect_vec(), + self.sstable_store.clone(), + sst_read_options.clone(), + ); non_overlapping_iters.push(ConcatIterator::new( sstables, self.sstable_store.clone(),