Skip to content

Commit

Permalink
feat(concat delete iterator): prefetch for concat delete range iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
Liang Zhao committed May 9, 2023
1 parent 0ff65d1 commit 573bb5e
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 10 deletions.
146 changes: 141 additions & 5 deletions src/storage/src/hummock/iterator/concat_delete_range_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,166 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::future::Future;
use std::ops::Bound;
use std::sync::Arc;

use risingwave_hummock_sdk::key::{FullKey, PointRange, UserKey};
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::hummock::SstableInfo;

use crate::hummock::iterator::DeleteRangeIterator;
use crate::hummock::sstable_store::SstableStoreRef;
use crate::hummock::{HummockResult, SstableDeleteRangeIterator};
use crate::hummock::sstable::SstableIteratorReadOptions;
use crate::hummock::sstable_store::{SstableStoreRef, TableHolder};
use crate::hummock::{
HummockResult, SstableDeleteRangeIterator, SstableMetaResponse, SstableStore,
};
use crate::monitor::StoreLocalStatistic;

enum MetaFetcher {
Simple,
Prefetch(PrefetchContext),
}

impl MetaFetcher {
async fn get_meta(
&mut self,
sstables: &[SstableInfo],
sst_idx: usize,
sstable_store: &SstableStore,
stats: &mut StoreLocalStatistic,
) -> HummockResult<TableHolder> {
match self {
MetaFetcher::Simple => sstable_store.sstable(&sstables[sst_idx], stats).await,
MetaFetcher::Prefetch(context) => {
context
.get_meta(sstables, sst_idx, sstable_store, stats)
.await
}
}
}
}

struct PrefetchContext {
prefetched_metas: VecDeque<(usize, SstableMetaResponse)>,

/// sst[idx..=dest_idx] will definitely be visited in the future.
dest_idx: usize,
}

const DEFAULT_PREFETCH_META_NUM: usize = 1;

impl PrefetchContext {
fn new(dest_idx: usize) -> Self {
Self {
prefetched_metas: VecDeque::with_capacity(DEFAULT_PREFETCH_META_NUM + 1),
dest_idx,
}
}

async fn get_meta(
&mut self,
sstables: &[SstableInfo],
idx: usize,
sstable_store: &SstableStore,
stats: &mut StoreLocalStatistic,
) -> HummockResult<TableHolder> {
let is_empty = if let Some((prefetched_idx, _)) = self.prefetched_metas.front() {
if *prefetched_idx == idx {
false
} else {
tracing::warn!(
"prefetch mismatch: sst_idx = {}, prefetched_sst_idx = {}",
idx,
*prefetched_idx
);
self.prefetched_metas.clear();
true
}
} else {
true
};
if is_empty {
self.prefetched_metas.push_back((
idx,
sstable_store.get_sstable_response(&sstables[idx], stats),
));
}
let sstable_response = self.prefetched_metas.pop_front().unwrap().1;

let next_prefetch_idx = self
.prefetched_metas
.back()
.map_or(idx, |(latest_idx, _)| *latest_idx)
+ 1;
if next_prefetch_idx <= self.dest_idx {
self.prefetched_metas.push_back((
next_prefetch_idx,
sstable_store.get_sstable_response(&sstables[next_prefetch_idx], stats),
));
}

sstable_response.await
}
}

pub struct ConcatDeleteRangeIterator {
sstables: Vec<SstableInfo>,
current: Option<SstableDeleteRangeIterator>,
idx: usize,
/// simple or prefetch strategy
meta_fetcher: MetaFetcher,
sstable_store: SstableStoreRef,
stats: StoreLocalStatistic,
options: Arc<SstableIteratorReadOptions>,
}

impl ConcatDeleteRangeIterator {
pub fn new(sstables: Vec<SstableInfo>, sstable_store: SstableStoreRef) -> Self {
pub fn new(
sstables: Vec<SstableInfo>,
sstable_store: SstableStoreRef,
options: Arc<SstableIteratorReadOptions>,
) -> Self {
Self {
sstables,
sstable_store,
stats: StoreLocalStatistic::default(),
idx: 0,
current: None,
options,
meta_fetcher: MetaFetcher::Simple,
}
}

fn init_meta_fetcher(&mut self, start_idx: usize) {
if let Some(bound) = self.options.must_iterated_end_user_key.as_ref() {
let sstables = &self.sstables;
let next_to_start_idx = start_idx + 1;
if next_to_start_idx < sstables.len() {
let dest_idx = match bound {
Bound::Unbounded => sstables.len() - 1, // will not overflow
Bound::Included(dest_key) => {
start_idx
+ sstables[next_to_start_idx..].partition_point(|sstable_info| {
FullKey::decode(&sstable_info.key_range.as_ref().unwrap().left)
.user_key
<= dest_key.as_ref()
})
}
Bound::Excluded(end_key) => {
start_idx
+ sstables[next_to_start_idx..].partition_point(|sstable_info| {
FullKey::decode(&sstable_info.key_range.as_ref().unwrap().left)
.user_key
< end_key.as_ref()
})
}
};
if start_idx < dest_idx {
self.meta_fetcher = MetaFetcher::Prefetch(PrefetchContext::new(dest_idx));
}
}
}
}

Expand Down Expand Up @@ -107,8 +240,8 @@ impl ConcatDeleteRangeIterator {
return Ok(());
}
let table = self
.sstable_store
.sstable(&self.sstables[idx], &mut self.stats)
.meta_fetcher
.get_meta(&self.sstables, idx, &self.sstable_store, &mut self.stats)
.await?;
let mut sstable_iter = SstableDeleteRangeIterator::new(table);

Expand Down Expand Up @@ -144,6 +277,7 @@ impl DeleteRangeIterator for ConcatDeleteRangeIterator {
fn rewind(&mut self) -> Self::RewindFuture<'_> {
async move {
let mut idx = 0;
self.init_meta_fetcher(idx);
self.seek_idx(idx, None).await?;
while idx + 1 < self.sstables.len() && !self.is_valid() {
self.seek_idx(idx + 1, None).await?;
Expand All @@ -163,6 +297,7 @@ impl DeleteRangeIterator for ConcatDeleteRangeIterator {
.le(&target_user_key)
})
.saturating_sub(1); // considering the boundary of 0
self.init_meta_fetcher(idx);
self.seek_idx(idx, Some(target_user_key)).await?;
while idx + 1 < self.sstables.len() && !self.is_valid() {
self.seek_idx(idx + 1, None).await?;
Expand Down Expand Up @@ -244,6 +379,7 @@ mod tests {
let mut concat_iterator = ConcatDeleteRangeIterator::new(
vec![output1.sst_info.sst_info, output2.sst_info.sst_info],
sstable_store,
Arc::new(SstableIteratorReadOptions::default()),
);
concat_iterator.rewind().await.unwrap();
assert_eq!(concat_iterator.current_epoch(), HummockEpoch::MAX);
Expand Down
10 changes: 9 additions & 1 deletion src/storage/src/hummock/iterator/delete_range_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

use std::collections::{BTreeSet, BinaryHeap};
use std::future::Future;
use std::sync::Arc;

use risingwave_hummock_sdk::key::{PointRange, UserKey};
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::hummock::SstableInfo;

use crate::hummock::iterator::concat_delete_range_iterator::ConcatDeleteRangeIterator;
use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferDeleteRangeIterator;
use crate::hummock::sstable::SstableIteratorReadOptions;
use crate::hummock::sstable_store::SstableStoreRef;
use crate::hummock::{HummockResult, SstableDeleteRangeIterator};

Expand Down Expand Up @@ -249,11 +251,17 @@ impl ForwardMergeRangeIterator {
self.unused_iters.push(RangeIteratorTyped::Sst(iter));
}

pub fn add_concat_iter(&mut self, sstables: Vec<SstableInfo>, sstable_store: SstableStoreRef) {
pub fn add_concat_iter(
&mut self,
sstables: Vec<SstableInfo>,
sstable_store: SstableStoreRef,
options: Arc<SstableIteratorReadOptions>,
) {
self.unused_iters
.push(RangeIteratorTyped::Concat(ConcatDeleteRangeIterator::new(
sstables,
sstable_store,
options,
)))
}
}
Expand Down
35 changes: 32 additions & 3 deletions src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const MAX_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards
const MIN_BUFFER_SIZE_PER_SHARD: usize = 256 * 1024 * 1024; // 256MB

pub type TableHolder = CacheableEntry<HummockSstableObjectId, Box<Sstable>>;
pub type SstableMetaResponse = LookupResponse<HummockSstableObjectId, Box<Sstable>, HummockError>;

// BEGIN section for tiered cache

Expand Down Expand Up @@ -355,13 +356,13 @@ impl SstableStore {
self.meta_cache.clear();
}

/// Returns `table_holder`, `local_cache_meta_block_miss` (1 if cache miss) and
/// Returns `LookupResponse` of sstable, `local_cache_meta_block_miss` (1 if cache miss) and
/// `local_cache_meta_block_unhit` (1 if not cache hit).
pub async fn sstable_syncable(
pub fn sstable_lookup_response(
&self,
sst: &SstableInfo,
stats: &StoreLocalStatistic,
) -> HummockResult<(TableHolder, u64, u64)> {
) -> (SstableMetaResponse, u64, u64) {
let mut local_cache_meta_block_miss = 0;
let mut local_cache_meta_block_unhit = 0;
let object_id = sst.get_object_id();
Expand Down Expand Up @@ -398,6 +399,34 @@ impl SstableStore {
if !matches!(lookup_response, LookupResponse::Cached(..)) {
local_cache_meta_block_unhit += 1;
}

(
lookup_response,
local_cache_meta_block_miss,
local_cache_meta_block_unhit,
)
}

pub fn get_sstable_response(
&self,
sst: &SstableInfo,
stats: &mut StoreLocalStatistic,
) -> SstableMetaResponse {
let (lookup_response, local_cache_meta_block_miss, _) =
self.sstable_lookup_response(sst, stats);
stats.apply_meta_fetch(local_cache_meta_block_miss);
lookup_response
}

/// Returns `table_holder`, `local_cache_meta_block_miss` (1 if cache miss) and
/// `local_cache_meta_block_unhit` (1 if not cache hit).
pub async fn sstable_syncable(
&self,
sst: &SstableInfo,
stats: &StoreLocalStatistic,
) -> HummockResult<(TableHolder, u64, u64)> {
let (lookup_response, local_cache_meta_block_miss, local_cache_meta_block_unhit) =
self.sstable_lookup_response(sst, stats);
let result = lookup_response
.verbose_instrument_await("meta_cache_lookup")
.await;
Expand Down
10 changes: 9 additions & 1 deletion src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,15 @@ impl HummockVersionReader {
continue;
}
if sstables.len() > 1 {
delete_range_iter.add_concat_iter(sstables.clone(), self.sstable_store.clone());
delete_range_iter.add_concat_iter(
sstables
.iter()
.filter(|sst| sst.get_range_tombstone_count() > 0)
.cloned()
.collect_vec(),
self.sstable_store.clone(),
sst_read_options.clone(),
);
non_overlapping_iters.push(ConcatIterator::new(
sstables,
self.sstable_store.clone(),
Expand Down

0 comments on commit 573bb5e

Please sign in to comment.