-
Notifications
You must be signed in to change notification settings - Fork 611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(storage): do not fetch all sst meta when create iterator #9517
Changes from 2 commits
66018c5
6131819
0d52c65
815481a
65c7603
11b1280
5ffc25d
61331e4
a238b97
442ae11
4da1a3d
862a7fa
cc6d70c
8b9c8a7
3c277fa
6115d27
62eaec9
c289bbc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
// Copyright 2023 RisingWave Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use std::future::Future; | ||
|
||
use risingwave_hummock_sdk::key::{FullKey, PointRange, UserKey}; | ||
use risingwave_hummock_sdk::HummockEpoch; | ||
use risingwave_pb::hummock::SstableInfo; | ||
|
||
use crate::hummock::iterator::DeleteRangeIterator; | ||
use crate::hummock::sstable_store::SstableStoreRef; | ||
use crate::hummock::{HummockResult, SstableDeleteRangeIterator}; | ||
use crate::monitor::StoreLocalStatistic; | ||
|
||
pub struct ConcatDeleteRangeIterator { | ||
sstables: Vec<SstableInfo>, | ||
current: Option<SstableDeleteRangeIterator>, | ||
idx: usize, | ||
sstable_store: SstableStoreRef, | ||
stats: StoreLocalStatistic, | ||
} | ||
|
||
impl ConcatDeleteRangeIterator { | ||
pub fn new(sstables: Vec<SstableInfo>, sstable_store: SstableStoreRef) -> Self { | ||
Self { | ||
sstables, | ||
sstable_store, | ||
stats: StoreLocalStatistic::default(), | ||
idx: 0, | ||
current: None, | ||
} | ||
} | ||
|
||
/// Seeks to a table, and then seeks to the key if `seek_key` is given. | ||
async fn seek_idx( | ||
&mut self, | ||
idx: usize, | ||
seek_key: Option<UserKey<&[u8]>>, | ||
) -> HummockResult<()> { | ||
self.current.take(); | ||
if idx < self.sstables.len() { | ||
if self.sstables[idx].range_tombstone_count == 0 { | ||
return Ok(()); | ||
} | ||
let table = self | ||
.sstable_store | ||
.sstable(&self.sstables[idx], &mut self.stats) | ||
.await?; | ||
let mut sstable_iter = SstableDeleteRangeIterator::new(table); | ||
|
||
if let Some(key) = seek_key { | ||
sstable_iter.seek(key).await?; | ||
} else { | ||
sstable_iter.rewind().await?; | ||
} | ||
|
||
self.current = Some(sstable_iter); | ||
self.idx = idx; | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl DeleteRangeIterator for ConcatDeleteRangeIterator { | ||
type NextFuture<'a> = impl Future<Output = HummockResult<()>> + 'a; | ||
type RewindFuture<'a> = impl Future<Output = HummockResult<()>> + 'a; | ||
type SeekFuture<'a> = impl Future<Output = HummockResult<()>> + 'a; | ||
|
||
fn next_extended_user_key(&self) -> PointRange<&[u8]> { | ||
self.current.as_ref().unwrap().next_extended_user_key() | ||
} | ||
|
||
fn current_epoch(&self) -> HummockEpoch { | ||
self.current.as_ref().unwrap().current_epoch() | ||
} | ||
|
||
fn next(&mut self) -> Self::NextFuture<'_> { | ||
async { | ||
if let Some(iter) = self.current.as_mut() { | ||
if iter.is_valid() { | ||
iter.next().await?; | ||
Little-Wallace marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
let mut idx = self.idx; | ||
while idx + 1 < self.sstables.len() && !self.is_valid() { | ||
self.seek_idx(idx + 1, None).await?; | ||
idx += 1; | ||
} | ||
} | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
fn rewind(&mut self) -> Self::RewindFuture<'_> { | ||
async move { | ||
let mut idx = 0; | ||
while idx < self.sstables.len() && self.sstables[idx].range_tombstone_count == 0 { | ||
idx += 1; | ||
} | ||
self.current.take(); | ||
self.seek_idx(0, None).await?; | ||
Little-Wallace marked this conversation as resolved.
Show resolved
Hide resolved
|
||
while idx + 1 < self.sstables.len() && !self.is_valid() { | ||
Little-Wallace marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.seek_idx(idx + 1, None).await?; | ||
idx += 1; | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'_> { | ||
async move { | ||
let mut idx = self | ||
.sstables | ||
.partition_point(|sst| { | ||
FullKey::decode(&sst.key_range.as_ref().unwrap().left) | ||
.user_key | ||
.le(&target_user_key) | ||
}) | ||
.saturating_sub(1); // considering the boundary of 0 | ||
Little-Wallace marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.current.take(); | ||
Little-Wallace marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.seek_idx(idx, Some(target_user_key)).await?; | ||
while idx + 1 < self.sstables.len() && !self.is_valid() { | ||
self.seek_idx(idx + 1, None).await?; | ||
idx += 1; | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
fn is_valid(&self) -> bool { | ||
self.current | ||
.as_ref() | ||
.map(|iter| iter.is_valid()) | ||
.unwrap_or(false) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,49 +16,21 @@ use std::cmp::Ordering::{Equal, Greater, Less}; | |
use std::future::Future; | ||
use std::sync::Arc; | ||
|
||
use itertools::Itertools; | ||
use risingwave_common::must_match; | ||
use risingwave_hummock_sdk::key::FullKey; | ||
use risingwave_pb::hummock::SstableInfo; | ||
|
||
use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection}; | ||
use crate::hummock::sstable::SstableIteratorReadOptions; | ||
use crate::hummock::sstable_store::TableHolder; | ||
use crate::hummock::value::HummockValue; | ||
use crate::hummock::{HummockResult, SstableIteratorType, SstableStore, SstableStoreRef}; | ||
use crate::hummock::{HummockResult, SstableIteratorType, SstableStoreRef}; | ||
use crate::monitor::StoreLocalStatistic; | ||
|
||
enum ConcatItem { | ||
Unfetched(SstableInfo), | ||
Prefetched(TableHolder), | ||
fn smallest_key(sstable_info: &SstableInfo) -> &[u8] { | ||
&sstable_info.key_range.as_ref().unwrap().left | ||
} | ||
|
||
impl ConcatItem { | ||
async fn prefetch( | ||
&mut self, | ||
sstable_store: &SstableStore, | ||
stats: &mut StoreLocalStatistic, | ||
) -> HummockResult<TableHolder> { | ||
if let ConcatItem::Unfetched(sstable_info) = self { | ||
let table = sstable_store.sstable(sstable_info, stats).await?; | ||
*self = ConcatItem::Prefetched(table); | ||
} | ||
Ok(must_match!(self, ConcatItem::Prefetched(table) => table.clone())) | ||
} | ||
|
||
fn smallest_key(&self) -> &[u8] { | ||
match self { | ||
ConcatItem::Unfetched(sstable_info) => &sstable_info.key_range.as_ref().unwrap().left, | ||
ConcatItem::Prefetched(table_holder) => &table_holder.value().meta.smallest_key, | ||
} | ||
} | ||
|
||
fn largest_key(&self) -> &[u8] { | ||
match self { | ||
ConcatItem::Unfetched(sstable_info) => &sstable_info.key_range.as_ref().unwrap().right, | ||
ConcatItem::Prefetched(table_holder) => &table_holder.value().meta.largest_key, | ||
} | ||
} | ||
fn largest_key(sstable_info: &SstableInfo) -> &[u8] { | ||
&sstable_info.key_range.as_ref().unwrap().right | ||
} | ||
|
||
/// Served as the concrete implementation of `ConcatIterator` and `BackwardConcatIterator`. | ||
|
@@ -70,7 +42,7 @@ pub struct ConcatIteratorInner<TI: SstableIteratorType> { | |
cur_idx: usize, | ||
|
||
/// All non-overlapping tables. | ||
tables: Vec<ConcatItem>, | ||
tables: Vec<SstableInfo>, | ||
|
||
sstable_store: SstableStoreRef, | ||
|
||
|
@@ -82,8 +54,8 @@ impl<TI: SstableIteratorType> ConcatIteratorInner<TI> { | |
/// Caller should make sure that `tables` are non-overlapping, | ||
/// arranged in ascending order when it serves as a forward iterator, | ||
/// and arranged in descending order when it serves as a backward iterator. | ||
fn new_inner( | ||
tables: Vec<ConcatItem>, | ||
pub fn new( | ||
tables: Vec<SstableInfo>, | ||
sstable_store: SstableStoreRef, | ||
read_options: Arc<SstableIteratorReadOptions>, | ||
) -> Self { | ||
|
@@ -97,30 +69,6 @@ impl<TI: SstableIteratorType> ConcatIteratorInner<TI> { | |
} | ||
} | ||
|
||
/// Caller should make sure that `tables` are non-overlapping, | ||
/// arranged in ascending order when it serves as a forward iterator, | ||
/// and arranged in descending order when it serves as a backward iterator. | ||
pub fn new( | ||
tables: Vec<SstableInfo>, | ||
sstable_store: SstableStoreRef, | ||
read_options: Arc<SstableIteratorReadOptions>, | ||
) -> Self { | ||
let tables = tables.into_iter().map(ConcatItem::Unfetched).collect_vec(); | ||
Self::new_inner(tables, sstable_store, read_options) | ||
} | ||
|
||
/// Caller should make sure that `tables` are non-overlapping, | ||
/// arranged in ascending order when it serves as a forward iterator, | ||
/// and arranged in descending order when it serves as a backward iterator. | ||
pub fn new_with_prefetch( | ||
tables: Vec<TableHolder>, | ||
sstable_store: SstableStoreRef, | ||
read_options: Arc<SstableIteratorReadOptions>, | ||
) -> Self { | ||
let tables = tables.into_iter().map(ConcatItem::Prefetched).collect_vec(); | ||
Self::new_inner(tables, sstable_store, read_options) | ||
} | ||
|
||
/// Seeks to a table, and then seeks to the key if `seek_key` is given. | ||
async fn seek_idx( | ||
&mut self, | ||
|
@@ -132,8 +80,9 @@ impl<TI: SstableIteratorType> ConcatIteratorInner<TI> { | |
old_iter.collect_local_statistic(&mut self.stats); | ||
} | ||
} else { | ||
let table = self.tables[idx] | ||
.prefetch(&self.sstable_store, &mut self.stats) | ||
let table = self | ||
.sstable_store | ||
.sstable(&self.tables[idx], &mut self.stats) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. random thought: we can prefetch the next sstable meta similar to the block prefetch in sstable iterator. not sure how much improvement we can get though There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We can get more memory :rolling_on_the_floor_laughing: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
True. But it looks acceptable because the memory usage is proportional to number of levels, not number of SSTs, and we have very few levels. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But after we support meta-cache refill after compaction, the meta-cache miss rate is very low. |
||
.await?; | ||
let mut sstable_iter = | ||
TI::create(table, self.sstable_store.clone(), self.read_options.clone()); | ||
|
@@ -198,13 +147,12 @@ impl<TI: SstableIteratorType> HummockIterator for ConcatIteratorInner<TI> { | |
.tables | ||
.partition_point(|table| match Self::Direction::direction() { | ||
DirectionEnum::Forward => { | ||
let ord = FullKey::decode(table.smallest_key()).cmp(&key); | ||
let ord = FullKey::decode(smallest_key(table)).cmp(&key); | ||
|
||
ord == Less || ord == Equal | ||
} | ||
DirectionEnum::Backward => { | ||
let ord = FullKey::decode(table.largest_key()).cmp(&key); | ||
|
||
let ord = FullKey::decode(largest_key(table)).cmp(&key); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If right key range is excluded, we will skip the key itself unexpectedly. |
||
ord == Greater || ord == Equal | ||
} | ||
}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In current implementation, after a
next
call,next_extended_user_key
may stay invariant. Is this bad-taste?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But everywhere we call
next_extended_user_key
would always collect all keys whose user-key equalsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In
MergeIterator
, everywhere we callnext_extended_user_key
would always collect all keys IN OTHER ITERATORS whose user-key equals, but except the iterator itself