Skip to content

Commit

Permalink
perf(compaction): avoid duplicate data in LSM (close #8488)
Browse files Browse the repository at this point in the history
  • Loading branch information
Liang Zhao committed Mar 12, 2023
1 parent b7c46d4 commit 70b97b4
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct SstableStreamIterator {
/// Counts the time used for IO.
stats_ptr: Arc<AtomicU64>,

// For debugging
/// For key sanity check of divided SST and debugging
sstable_info: SstableInfo,
}

Expand Down Expand Up @@ -77,6 +77,22 @@ impl SstableStreamIterator {
}
}

async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
while let Some(block_iter) = self.block_iter.as_mut() {
if self
.sstable_info
.get_table_ids()
.binary_search(&FullKey::decode(block_iter.key()).user_key.table_id.table_id)
.is_ok()
{
return Ok(());
} else {
self.next_block().await?;
}
}
Ok(())
}

/// Initialises the iterator by moving it to the first KV-pair in the stream's first block where
/// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to
/// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to
Expand All @@ -97,12 +113,7 @@ impl SstableStreamIterator {
}
}

if self.block_iter.is_none() {
// End of stream.
self.remaining_blocks = 0;
}

Ok(())
self.prune_from_valid_block_iter().await
}

/// Loads a new block, creates a new iterator for it, and stores that iterator in
Expand Down Expand Up @@ -151,6 +162,7 @@ impl SstableStreamIterator {
block_iter.next();
if !block_iter.is_valid() {
self.next_block().await?;
self.prune_from_valid_block_iter().await?;
}

Ok(())
Expand Down Expand Up @@ -225,9 +237,9 @@ impl ConcatSstableIterator {
}

/// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
async fn seek_idx(&mut self, idx: usize, seek_key: Option<&[u8]>) -> HummockResult<()> {
async fn seek_idx(&mut self, mut idx: usize, seek_key: Option<&[u8]>) -> HummockResult<()> {
self.sstable_iter.take();
let seek_key: Option<&[u8]> = match (seek_key, self.key_range.left.is_empty()) {
let mut seek_key: Option<&[u8]> = match (seek_key, self.key_range.left.is_empty()) {
(Some(seek_key), false) => {
match KeyComparator::compare_encoded_full_key(seek_key, &self.key_range.left) {
Ordering::Less | Ordering::Equal => Some(&self.key_range.left),
Expand All @@ -239,7 +251,7 @@ impl ConcatSstableIterator {
(None, false) => Some(&self.key_range.left),
};

if idx < self.tables.len() {
while idx < self.tables.len() {
let table_info = &self.tables[idx];
let table = self
.sstable_store
Expand Down Expand Up @@ -291,9 +303,15 @@ impl ConcatSstableIterator {
&self.stats,
);
sstable_iter.seek(seek_key).await?;

self.sstable_iter = Some(sstable_iter);
self.cur_idx = idx;

if sstable_iter.is_valid() {
self.sstable_iter = Some(sstable_iter);
return Ok(());
} else {
idx += 1;
seek_key = None;
}
}
Ok(())
}
Expand Down

0 comments on commit 70b97b4

Please sign in to comment.