From f44434e26ce929a6f52db693e996e3323d457e1e Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Sat, 11 Mar 2023 17:11:20 +0800 Subject: [PATCH 1/6] perf(compaction): avoid duplicate data in LSM (close #8488) --- src/storage/src/hummock/compactor/iterator.rs | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 028d27c40e438..30aa61a6afb19 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -43,7 +43,7 @@ struct SstableStreamIterator { /// Counts the time used for IO. stats_ptr: Arc, - // For debugging + /// For key sanity check of divided SST and debugging sstable_info: SstableInfo, } @@ -77,6 +77,22 @@ impl SstableStreamIterator { } } + async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> { + while let Some(block_iter) = self.block_iter.as_mut() { + if self + .sstable_info + .get_table_ids() + .binary_search(&FullKey::decode(block_iter.key()).user_key.table_id.table_id) + .is_ok() + { + return Ok(()); + } else { + self.next_block().await?; + } + } + Ok(()) + } + /// Initialises the iterator by moving it to the first KV-pair in the stream's first block where /// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to /// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to @@ -98,7 +114,7 @@ impl SstableStreamIterator { } } - Ok(()) + self.prune_from_valid_block_iter().await } /// Loads a new block, creates a new iterator for it, and stores that iterator in @@ -147,6 +163,7 @@ impl SstableStreamIterator { block_iter.next(); if !block_iter.is_valid() { self.next_block().await?; + self.prune_from_valid_block_iter().await?; } Ok(()) @@ -226,7 +243,7 @@ impl ConcatSstableIterator { /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given. async fn seek_idx( &mut self, - idx: usize, + mut idx: usize, seek_key: Option>, ) -> HummockResult<()> { self.sstable_iter.take(); @@ -240,7 +257,7 @@ impl ConcatSstableIterator { (None, false) => Some(FullKey::decode(&self.key_range.left)), }; - if idx < self.tables.len() { + while idx < self.tables.len() { let table_info = &self.tables[idx]; let table = self .sstable_store @@ -291,9 +308,15 @@ impl ConcatSstableIterator { &self.stats, ); sstable_iter.seek(seek_key).await?; - - self.sstable_iter = Some(sstable_iter); self.cur_idx = idx; + + if sstable_iter.is_valid() { + self.sstable_iter = Some(sstable_iter); + return Ok(()); + } else { + idx += 1; + seek_key = None; + } } Ok(()) } From 5e45edb8762f87dc351dc8261724979fcce6dee4 Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Sat, 11 Mar 2023 17:41:29 +0800 Subject: [PATCH 2/6] unit test --- src/storage/src/hummock/compactor/iterator.rs | 13 +++++----- src/storage/src/hummock/test_utils.rs | 24 +++++++++++++++++-- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 30aa61a6afb19..1b7ee162eaead 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -406,7 +406,8 @@ mod tests { use crate::hummock::iterator::test_utils::mock_sstable_store; use crate::hummock::iterator::HummockIterator; use crate::hummock::test_utils::{ - default_builder_opt_for_test, gen_test_sstable, test_key_of, test_value_of, TEST_KEYS_COUNT, + default_builder_opt_for_test, gen_test_sstable_and_info, test_key_of, test_value_of, + TEST_KEYS_COUNT, }; use crate::hummock::value::HummockValue; @@ -417,7 +418,7 @@ mod tests { for object_id in 0..3 { let start_index = object_id * TEST_KEYS_COUNT; let end_index = (object_id + 1) * TEST_KEYS_COUNT; - let table = gen_test_sstable( + let (_table, table_info) = gen_test_sstable_and_info( default_builder_opt_for_test(), object_id as u64, (start_index..end_index) @@ -425,7 +426,7 @@ mod tests { sstable_store.clone(), ) .await; - table_infos.push(table.get_sstable_info()); + table_infos.push(table_info); } let start_index = 5000; let end_index = 25000; @@ -517,7 +518,7 @@ mod tests { for object_id in 0..3 { let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2; let end_index = (object_id + 1) * TEST_KEYS_COUNT; - let table = gen_test_sstable( + let (_table, table_info) = gen_test_sstable_and_info( default_builder_opt_for_test(), object_id as u64, (start_index..end_index) @@ -525,7 +526,7 @@ mod tests { sstable_store.clone(), ) .await; - table_infos.push(table.get_sstable_info()); + table_infos.push(table_info); } // Test seek_idx. Result is dominated by given seek key rather than key range. @@ -559,7 +560,7 @@ mod tests { let block_1_second_key = iter.key().to_vec(); // Use a big enough seek key and result in invalid iterator. let seek_key = test_key_of(30001); - iter.seek_idx(0, Some(seek_key.to_ref())).await.unwrap(); + iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref())).await.unwrap(); assert!(!iter.is_valid()); // Test seek_idx. Result is dominated by key range rather than given seek key. diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 6ad0a331c8ca7..5d760026f0b33 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -204,7 +204,7 @@ pub async fn gen_test_sstable_inner>( range_tombstones: Vec, sstable_store: SstableStoreRef, policy: CachePolicy, -) -> Sstable { +) -> (Sstable, SstableInfo) { let writer_opts = SstableWriterOptions { capacity_hint: None, tracker: None, @@ -227,7 +227,7 @@ pub async fn gen_test_sstable_inner>( ) .await .unwrap(); - table.value().as_ref().clone() + (table.value().as_ref().clone(), output.sst_info.sst_info) } /// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store` @@ -246,6 +246,25 @@ pub async fn gen_test_sstable>( CachePolicy::NotFill, ) .await + .0 +} + +/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store` +pub async fn gen_test_sstable_and_info>( + opts: SstableBuilderOptions, + sst_id: HummockSstableId, + kv_iter: impl Iterator, HummockValue)>, + sstable_store: SstableStoreRef, +) -> (Sstable, SstableInfo) { + gen_test_sstable_inner( + opts, + sst_id, + kv_iter, + vec![], + sstable_store, + CachePolicy::NotFill, + ) + .await } /// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store` @@ -265,6 +284,7 @@ pub async fn gen_test_sstable_with_range_tombstone( CachePolicy::NotFill, ) .await + .0 } /// Generates a user key with table id 0 and the given `table_key` From 93f58964c5ce4858d40e23077ad8989959106116 Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Tue, 14 Mar 2023 21:17:12 +0800 Subject: [PATCH 3/6] rebase --- src/storage/src/hummock/compactor/iterator.rs | 9 ++++++--- src/storage/src/hummock/test_utils.rs | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 1b7ee162eaead..d94b304650496 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -82,7 +82,7 @@ impl SstableStreamIterator { if self .sstable_info .get_table_ids() - .binary_search(&FullKey::decode(block_iter.key()).user_key.table_id.table_id) + .binary_search(&block_iter.key().user_key.table_id.table_id) .is_ok() { return Ok(()); @@ -247,7 +247,8 @@ impl ConcatSstableIterator { seek_key: Option>, ) -> HummockResult<()> { self.sstable_iter.take(); - let seek_key: Option> = match (seek_key, self.key_range.left.is_empty()) { + let mut seek_key: Option> = match (seek_key, self.key_range.left.is_empty()) + { (Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) { Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)), Ordering::Greater => Some(seek_key), @@ -560,7 +561,9 @@ mod tests { let block_1_second_key = iter.key().to_vec(); // Use a big enough seek key and result in invalid iterator. let seek_key = test_key_of(30001); - iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref())).await.unwrap(); + iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref())) + .await + .unwrap(); assert!(!iter.is_valid()); // Test seek_idx. Result is dominated by key range rather than given seek key. diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 5d760026f0b33..af149f1ac728b 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -252,13 +252,13 @@ pub async fn gen_test_sstable>( /// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store` pub async fn gen_test_sstable_and_info>( opts: SstableBuilderOptions, - sst_id: HummockSstableId, + object_id: HummockSstableObjectId, kv_iter: impl Iterator, HummockValue)>, sstable_store: SstableStoreRef, ) -> (Sstable, SstableInfo) { gen_test_sstable_inner( opts, - sst_id, + object_id, kv_iter, vec![], sstable_store, From 180e0c95fd5d70c7f29c4f9bcc95ee4fc79259c9 Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Thu, 16 Mar 2023 16:22:27 +0800 Subject: [PATCH 4/6] use new table id interface --- src/storage/src/hummock/compactor/iterator.rs | 2 +- src/storage/src/hummock/sstable/block_iterator.rs | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index d94b304650496..125a563985fee 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -82,7 +82,7 @@ impl SstableStreamIterator { if self .sstable_info .get_table_ids() - .binary_search(&block_iter.key().user_key.table_id.table_id) + .binary_search(&block_iter.table_id().table_id) .is_ok() { return Ok(()); diff --git a/src/storage/src/hummock/sstable/block_iterator.rs b/src/storage/src/hummock/sstable/block_iterator.rs index 14b31e9406dea..25395d515a9d6 100644 --- a/src/storage/src/hummock/sstable/block_iterator.rs +++ b/src/storage/src/hummock/sstable/block_iterator.rs @@ -16,6 +16,7 @@ use std::cmp::Ordering; use std::ops::Range; use bytes::BytesMut; +use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::FullKey; use super::{KeyPrefix, LenType, RestartPoint}; @@ -74,10 +75,14 @@ impl BlockIterator { self.try_prev_inner() } + pub fn table_id(&self) -> TableId { + self.block.table_id() + } + pub fn key(&self) -> FullKey<&[u8]> { assert!(self.is_valid()); - FullKey::from_slice_without_table_id(self.block.table_id(), &self.key[..]) + FullKey::from_slice_without_table_id(self.table_id(), &self.key[..]) } pub fn value(&self) -> &[u8] { From bf8ee5b70c0cf859600f9b53fd66abf0a776d837 Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Thu, 16 Mar 2023 20:37:29 +0800 Subject: [PATCH 5/6] address comments --- src/storage/src/hummock/compactor/iterator.rs | 74 +++++++++++++------ 1 file changed, 50 insertions(+), 24 deletions(-) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 125a563985fee..83571cc7bae3a 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -265,7 +265,7 @@ impl ConcatSstableIterator { .sstable(table_info, &mut self.stats) .await?; let block_metas = &table.value().meta.block_metas; - let start_index = match seek_key { + let mut start_index = match seek_key { None => 0, Some(seek_key) => { // start_index points to the greatest block whose smallest_key <= seek_key. @@ -286,33 +286,59 @@ impl ConcatSstableIterator { ) != Ordering::Greater }) }; - if end_index <= start_index { - return Ok(()); - } - let stats_ptr = self.stats.remote_io_time.clone(); - let now = Instant::now(); - - let block_stream = self - .sstable_store - .get_stream(table.value(), Some(start_index)) - .await?; - - // Determine time needed to open stream. - let add = (now.elapsed().as_secs_f64() * 1000.0).ceil(); - stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed); + while start_index < end_index { + let start_block_table_id = FullKey::decode(&block_metas[start_index].smallest_key) + .user_key + .table_id; + if table_info + .get_table_ids() + .binary_search(&start_block_table_id.table_id) + .is_ok() + { + break; + } else { + start_index += + &block_metas[(start_index + 1)..].partition_point(|block_meta| { + FullKey::decode(&block_meta.smallest_key).user_key.table_id + == start_block_table_id + }) + 1; + } + } - let mut sstable_iter = SstableStreamIterator::new( - table_info, - block_stream, - end_index - start_index, - &self.stats, - ); - sstable_iter.seek(seek_key).await?; + let found = if end_index <= start_index { + false + } else { + let stats_ptr = self.stats.remote_io_time.clone(); + let now = Instant::now(); + + let block_stream = self + .sstable_store + .get_stream(table.value(), Some(start_index)) + .await?; + + // Determine time needed to open stream. + let add = (now.elapsed().as_secs_f64() * 1000.0).ceil(); + stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed); + + let mut sstable_iter = SstableStreamIterator::new( + table_info, + block_stream, + end_index - start_index, + &self.stats, + ); + sstable_iter.seek(seek_key).await?; + + if sstable_iter.is_valid() { + self.sstable_iter = Some(sstable_iter); + true + } else { + false + } + }; self.cur_idx = idx; - if sstable_iter.is_valid() { - self.sstable_iter = Some(sstable_iter); + if found { return Ok(()); } else { idx += 1; From 1a3b1598b1263cd2b978d690f6649c83b4fd4001 Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Thu, 16 Mar 2023 21:55:04 +0800 Subject: [PATCH 6/6] address comments --- src/storage/src/hummock/compactor/iterator.rs | 7 ++----- src/storage/src/hummock/sstable/mod.rs | 6 +++++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 83571cc7bae3a..5929440960e5e 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -288,9 +288,7 @@ impl ConcatSstableIterator { }; while start_index < end_index { - let start_block_table_id = FullKey::decode(&block_metas[start_index].smallest_key) - .user_key - .table_id; + let start_block_table_id = block_metas[start_index].table_id(); if table_info .get_table_ids() .binary_search(&start_block_table_id.table_id) @@ -300,8 +298,7 @@ impl ConcatSstableIterator { } else { start_index += &block_metas[(start_index + 1)..].partition_point(|block_meta| { - FullKey::decode(&block_meta.smallest_key).user_key.table_id - == start_block_table_id + block_meta.table_id() == start_block_table_id }) + 1; } } diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index 68437ac0dfa0a..b0ff85c521d9a 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -40,7 +40,7 @@ use bytes::{Buf, BufMut}; pub use forward_sstable_iterator::*; mod backward_sstable_iterator; pub use backward_sstable_iterator::*; -use risingwave_hummock_sdk::key::{KeyPayloadType, TableKey, UserKey}; +use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, TableKey, UserKey}; use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId}; #[cfg(test)] use risingwave_pb::hummock::{KeyRange, SstableInfo}; @@ -253,6 +253,10 @@ impl BlockMeta { pub fn encoded_size(&self) -> usize { 16 /* offset + len + key len + uncompressed size */ + self.smallest_key.len() } + + pub fn table_id(&self) -> TableId { + FullKey::decode(&self.smallest_key).user_key.table_id + } } #[derive(Clone, PartialEq, Eq, Debug)]