Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(compaction): avoid duplicate data in LSM #8489

Merged
merged 6 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 85 additions & 35 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct SstableStreamIterator {
/// Counts the time used for IO.
stats_ptr: Arc<AtomicU64>,

// For debugging
/// For key sanity check of divided SST and debugging
sstable_info: SstableInfo,
}

Expand Down Expand Up @@ -77,6 +77,22 @@ impl SstableStreamIterator {
}
}

async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
while let Some(block_iter) = self.block_iter.as_mut() {
if self
.sstable_info
.get_table_ids()
.binary_search(&block_iter.table_id().table_id)
.is_ok()
{
return Ok(());
} else {
self.next_block().await?;
}
}
Ok(())
}

/// Initialises the iterator by moving it to the first KV-pair in the stream's first block where
/// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to
/// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to
Expand All @@ -98,7 +114,7 @@ impl SstableStreamIterator {
}
}

Ok(())
self.prune_from_valid_block_iter().await
}

/// Loads a new block, creates a new iterator for it, and stores that iterator in
Expand Down Expand Up @@ -147,6 +163,7 @@ impl SstableStreamIterator {
block_iter.next();
if !block_iter.is_valid() {
self.next_block().await?;
self.prune_from_valid_block_iter().await?;
}

Ok(())
Expand Down Expand Up @@ -226,11 +243,12 @@ impl ConcatSstableIterator {
/// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
async fn seek_idx(
&mut self,
idx: usize,
mut idx: usize,
seek_key: Option<FullKey<&[u8]>>,
) -> HummockResult<()> {
self.sstable_iter.take();
let seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty()) {
let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
{
(Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
Ordering::Greater => Some(seek_key),
Expand All @@ -240,14 +258,14 @@ impl ConcatSstableIterator {
(None, false) => Some(FullKey::decode(&self.key_range.left)),
};

if idx < self.tables.len() {
while idx < self.tables.len() {
let table_info = &self.tables[idx];
let table = self
.sstable_store
.sstable(table_info, &mut self.stats)
.await?;
let block_metas = &table.value().meta.block_metas;
let start_index = match seek_key {
let mut start_index = match seek_key {
None => 0,
Some(seek_key) => {
// start_index points to the greatest block whose smallest_key <= seek_key.
Expand All @@ -268,32 +286,61 @@ impl ConcatSstableIterator {
) != Ordering::Greater
})
};
if end_index <= start_index {
return Ok(());
}

let stats_ptr = self.stats.remote_io_time.clone();
let now = Instant::now();

let block_stream = self
.sstable_store
.get_stream(table.value(), Some(start_index))
.await?;

// Determine time needed to open stream.
let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);

let mut sstable_iter = SstableStreamIterator::new(
table_info,
block_stream,
end_index - start_index,
&self.stats,
);
sstable_iter.seek(seek_key).await?;
while start_index < end_index {
let start_block_table_id = block_metas[start_index].table_id();
if table_info
.get_table_ids()
.binary_search(&start_block_table_id.table_id)
.is_ok()
{
break;
} else {
start_index +=
&block_metas[(start_index + 1)..].partition_point(|block_meta| {
block_meta.table_id() == start_block_table_id
}) + 1;
}
}

self.sstable_iter = Some(sstable_iter);
let found = if end_index <= start_index {
false
} else {
let stats_ptr = self.stats.remote_io_time.clone();
let now = Instant::now();

let block_stream = self
.sstable_store
.get_stream(table.value(), Some(start_index))
.await?;

// Determine time needed to open stream.
let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);

let mut sstable_iter = SstableStreamIterator::new(
table_info,
block_stream,
end_index - start_index,
&self.stats,
);
sstable_iter.seek(seek_key).await?;

if sstable_iter.is_valid() {
self.sstable_iter = Some(sstable_iter);
true
} else {
false
}
};
self.cur_idx = idx;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also consider sstable_info.get_table_ids() when searching for the start_index in L268? We may save some unneccesary I/Os.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also consider sstable_info.get_table_ids() when searching for the start_index in L268? We may save some unneccesary I/Os.

fixed

if found {
return Ok(());
} else {
idx += 1;
seek_key = None;
}
}
Ok(())
}
Expand Down Expand Up @@ -383,7 +430,8 @@ mod tests {
use crate::hummock::iterator::test_utils::mock_sstable_store;
use crate::hummock::iterator::HummockIterator;
use crate::hummock::test_utils::{
default_builder_opt_for_test, gen_test_sstable, test_key_of, test_value_of, TEST_KEYS_COUNT,
default_builder_opt_for_test, gen_test_sstable_and_info, test_key_of, test_value_of,
TEST_KEYS_COUNT,
};
use crate::hummock::value::HummockValue;

Expand All @@ -394,15 +442,15 @@ mod tests {
for object_id in 0..3 {
let start_index = object_id * TEST_KEYS_COUNT;
let end_index = (object_id + 1) * TEST_KEYS_COUNT;
let table = gen_test_sstable(
let (_table, table_info) = gen_test_sstable_and_info(
default_builder_opt_for_test(),
object_id as u64,
(start_index..end_index)
.map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
sstable_store.clone(),
)
.await;
table_infos.push(table.get_sstable_info());
table_infos.push(table_info);
}
let start_index = 5000;
let end_index = 25000;
Expand Down Expand Up @@ -494,15 +542,15 @@ mod tests {
for object_id in 0..3 {
let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
let end_index = (object_id + 1) * TEST_KEYS_COUNT;
let table = gen_test_sstable(
let (_table, table_info) = gen_test_sstable_and_info(
default_builder_opt_for_test(),
object_id as u64,
(start_index..end_index)
.map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
sstable_store.clone(),
)
.await;
table_infos.push(table.get_sstable_info());
table_infos.push(table_info);
}

// Test seek_idx. Result is dominated by given seek key rather than key range.
Expand Down Expand Up @@ -536,7 +584,9 @@ mod tests {
let block_1_second_key = iter.key().to_vec();
// Use a big enough seek key and result in invalid iterator.
let seek_key = test_key_of(30001);
iter.seek_idx(0, Some(seek_key.to_ref())).await.unwrap();
iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
.await
.unwrap();
assert!(!iter.is_valid());

// Test seek_idx. Result is dominated by key range rather than given seek key.
Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/hummock/sstable/block_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::cmp::Ordering;
use std::ops::Range;

use bytes::BytesMut;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::FullKey;

use super::{KeyPrefix, LenType, RestartPoint};
Expand Down Expand Up @@ -74,10 +75,14 @@ impl BlockIterator {
self.try_prev_inner()
}

pub fn table_id(&self) -> TableId {
self.block.table_id()
}

pub fn key(&self) -> FullKey<&[u8]> {
assert!(self.is_valid());

FullKey::from_slice_without_table_id(self.block.table_id(), &self.key[..])
FullKey::from_slice_without_table_id(self.table_id(), &self.key[..])
}

pub fn value(&self) -> &[u8] {
Expand Down
6 changes: 5 additions & 1 deletion src/storage/src/hummock/sstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use bytes::{Buf, BufMut};
pub use forward_sstable_iterator::*;
mod backward_sstable_iterator;
pub use backward_sstable_iterator::*;
use risingwave_hummock_sdk::key::{KeyPayloadType, TableKey, UserKey};
use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, TableKey, UserKey};
use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
#[cfg(test)]
use risingwave_pb::hummock::{KeyRange, SstableInfo};
Expand Down Expand Up @@ -253,6 +253,10 @@ impl BlockMeta {
pub fn encoded_size(&self) -> usize {
16 /* offset + len + key len + uncompressed size */ + self.smallest_key.len()
}

pub fn table_id(&self) -> TableId {
FullKey::decode(&self.smallest_key).user_key.table_id
}
}

#[derive(Clone, PartialEq, Eq, Debug)]
Expand Down
24 changes: 22 additions & 2 deletions src/storage/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ pub async fn gen_test_sstable_inner<B: AsRef<[u8]>>(
range_tombstones: Vec<DeleteRangeTombstone>,
sstable_store: SstableStoreRef,
policy: CachePolicy,
) -> Sstable {
) -> (Sstable, SstableInfo) {
let writer_opts = SstableWriterOptions {
capacity_hint: None,
tracker: None,
Expand All @@ -227,7 +227,7 @@ pub async fn gen_test_sstable_inner<B: AsRef<[u8]>>(
)
.await
.unwrap();
table.value().as_ref().clone()
(table.value().as_ref().clone(), output.sst_info.sst_info)
}

/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
Expand All @@ -246,6 +246,25 @@ pub async fn gen_test_sstable<B: AsRef<[u8]>>(
CachePolicy::NotFill,
)
.await
.0
}

/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
pub async fn gen_test_sstable_and_info<B: AsRef<[u8]>>(
opts: SstableBuilderOptions,
object_id: HummockSstableObjectId,
kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
sstable_store: SstableStoreRef,
) -> (Sstable, SstableInfo) {
gen_test_sstable_inner(
opts,
object_id,
kv_iter,
vec![],
sstable_store,
CachePolicy::NotFill,
)
.await
}

/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
Expand All @@ -265,6 +284,7 @@ pub async fn gen_test_sstable_with_range_tombstone(
CachePolicy::NotFill,
)
.await
.0
}

/// Generates a user key with table id 0 and the given `table_key`
Expand Down