diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 79976dc420a67..d621a72f0613b 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -43,7 +43,7 @@ struct SstableStreamIterator { /// Counts the time used for IO. stats_ptr: Arc, - // For debugging + /// For key sanity check of divided SST and debugging sstable_info: SstableInfo, } @@ -77,6 +77,22 @@ impl SstableStreamIterator { } } + async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> { + while let Some(block_iter) = self.block_iter.as_mut() { + if self + .sstable_info + .get_table_ids() + .binary_search(&FullKey::decode(block_iter.key()).user_key.table_id.table_id) + .is_ok() + { + return Ok(()); + } else { + self.next_block().await?; + } + } + Ok(()) + } + /// Initialises the iterator by moving it to the first KV-pair in the stream's first block where /// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to /// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to @@ -97,12 +113,7 @@ impl SstableStreamIterator { } } - if self.block_iter.is_none() { - // End of stream. - self.remaining_blocks = 0; - } - - Ok(()) + self.prune_from_valid_block_iter().await } /// Loads a new block, creates a new iterator for it, and stores that iterator in @@ -151,6 +162,7 @@ impl SstableStreamIterator { block_iter.next(); if !block_iter.is_valid() { self.next_block().await?; + self.prune_from_valid_block_iter().await?; } Ok(()) @@ -225,9 +237,9 @@ impl ConcatSstableIterator { } /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given. - async fn seek_idx(&mut self, idx: usize, seek_key: Option<&[u8]>) -> HummockResult<()> { + async fn seek_idx(&mut self, mut idx: usize, seek_key: Option<&[u8]>) -> HummockResult<()> { self.sstable_iter.take(); - let seek_key: Option<&[u8]> = match (seek_key, self.key_range.left.is_empty()) { + let mut seek_key: Option<&[u8]> = match (seek_key, self.key_range.left.is_empty()) { (Some(seek_key), false) => { match KeyComparator::compare_encoded_full_key(seek_key, &self.key_range.left) { Ordering::Less | Ordering::Equal => Some(&self.key_range.left), @@ -239,7 +251,7 @@ impl ConcatSstableIterator { (None, false) => Some(&self.key_range.left), }; - if idx < self.tables.len() { + while idx < self.tables.len() { let table_info = &self.tables[idx]; let table = self .sstable_store @@ -291,9 +303,15 @@ impl ConcatSstableIterator { &self.stats, ); sstable_iter.seek(seek_key).await?; - - self.sstable_iter = Some(sstable_iter); self.cur_idx = idx; + + if sstable_iter.is_valid() { + self.sstable_iter = Some(sstable_iter); + return Ok(()); + } else { + idx += 1; + seek_key = None; + } } Ok(()) }