Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): sstable and block level interfaces use fullkey struct #8419

Merged
merged 7 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ fn print_kv_pairs(

while block_iter.is_valid() {
let raw_full_key = block_iter.key();
let full_key = FullKey::decode(block_iter.key());
let full_key = block_iter.key();
let raw_user_key = full_key.user_key.encode();

let full_val = block_iter.value();
Expand All @@ -245,7 +245,7 @@ fn print_kv_pairs(
println!(
"\t\t full key: {:02x?}, len={}",
raw_full_key,
raw_full_key.len()
raw_full_key.encoded_len()
);
println!("\t\tfull value: {:02x?}, len={}", full_val, full_val.len());
println!("\t\t user key: {:02x?}", raw_user_key);
Expand Down
5 changes: 3 additions & 2 deletions src/storage/benches/bench_block_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use bytes::{BufMut, Bytes, BytesMut};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use risingwave_hummock_sdk::key::FullKey;
use risingwave_storage::hummock::{
Block, BlockBuilder, BlockBuilderOptions, BlockHolder, BlockIterator, CompressionAlgorithm,
};
Expand Down Expand Up @@ -88,7 +89,7 @@ fn bench_block_iter(c: &mut Criterion) {
iter.seek_to_first();
for t in 1..=TABLES_PER_SSTABLE {
for i in 1..=KEYS_PER_TABLE {
assert_eq!(iter.key(), key(t, i).to_vec());
assert_eq!(iter.key(), FullKey::decode(&key(t, i)));
assert_eq!(iter.value(), value(i).to_vec());
iter.next();
}
Expand All @@ -108,7 +109,7 @@ fn build_block_data(t: u32, i: u64) -> Bytes {
let mut builder = BlockBuilder::new(options);
for tt in 1..=t {
for ii in 1..=i {
builder.add(&key(tt, ii), &value(ii));
builder.add(FullKey::decode(&key(tt, ii)), &value(ii));
}
}
Bytes::from(builder.build().to_vec())
Expand Down
6 changes: 5 additions & 1 deletion src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ async fn build_table(
let end = start + 8;
full_key.user_key.table_key[table_key_len - 8..].copy_from_slice(&i.to_be_bytes());
builder
.add(&full_key, HummockValue::put(&value[start..end]), true)
.add(
full_key.to_ref(),
HummockValue::put(&value[start..end]),
true,
)
.await
.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn build_tables<F: SstableWriterFactory>(
for i in RANGE {
builder
.add_full_key(
&FullKey::from_user_key(test_user_key_of(i).as_ref(), 1),
FullKey::from_user_key(test_user_key_of(i).as_ref(), 1),
HummockValue::put(VALUE),
true,
)
Expand Down
51 changes: 29 additions & 22 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl SstableStreamIterator {
/// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to
/// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to
/// the very first KV-pair of the stream's first block.
pub async fn seek(&mut self, seek_key: Option<&[u8]>) -> HummockResult<()> {
pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
// Load first block.
self.next_block().await?;

Expand Down Expand Up @@ -156,7 +156,7 @@ impl SstableStreamIterator {
Ok(())
}

fn key(&self) -> &[u8] {
fn key(&self) -> FullKey<&[u8]> {
self.block_iter
.as_ref()
.unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
Expand Down Expand Up @@ -225,18 +225,20 @@ impl ConcatSstableIterator {
}

/// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
async fn seek_idx(&mut self, idx: usize, seek_key: Option<&[u8]>) -> HummockResult<()> {
async fn seek_idx(
&mut self,
idx: usize,
seek_key: Option<FullKey<&[u8]>>,
) -> HummockResult<()> {
self.sstable_iter.take();
let seek_key: Option<&[u8]> = match (seek_key, self.key_range.left.is_empty()) {
(Some(seek_key), false) => {
match KeyComparator::compare_encoded_full_key(seek_key, &self.key_range.left) {
Ordering::Less | Ordering::Equal => Some(&self.key_range.left),
Ordering::Greater => Some(seek_key),
}
}
let seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty()) {
(Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
Ordering::Greater => Some(seek_key),
},
(Some(seek_key), true) => Some(seek_key),
(None, true) => None,
(None, false) => Some(&self.key_range.left),
(None, false) => Some(FullKey::decode(&self.key_range.left)),
};

if idx < self.tables.len() {
Expand All @@ -252,8 +254,7 @@ impl ConcatSstableIterator {
// start_index points to the greatest block whose smallest_key <= seek_key.
block_metas
.partition_point(|block| {
KeyComparator::compare_encoded_full_key(&block.smallest_key, seek_key)
!= Ordering::Greater
seek_key.cmp(&FullKey::decode(&block.smallest_key)) != Ordering::Less
})
.saturating_sub(1)
}
Expand Down Expand Up @@ -323,7 +324,7 @@ impl HummockIterator for ConcatSstableIterator {
}

fn key(&self) -> FullKey<&[u8]> {
FullKey::decode(self.sstable_iter.as_ref().expect("no table iter").key())
self.sstable_iter.as_ref().expect("no table iter").key()
}

fn value(&self) -> HummockValue<&[u8]> {
Expand Down Expand Up @@ -362,7 +363,7 @@ impl HummockIterator for ConcatSstableIterator {
KeyComparator::compare_encoded_full_key(max_sst_key, seek_key) == Ordering::Less
});

self.seek_idx(table_idx, Some(key_slice)).await
self.seek_idx(table_idx, Some(key)).await
}
}

Expand Down Expand Up @@ -521,20 +522,22 @@ mod tests {
let block_2_smallest_key = block_metas[2].smallest_key.clone();
// Use block_1_smallest_key as seek key and result in the first KV of block 1.
let seek_key = block_1_smallest_key.clone();
iter.seek_idx(0, Some(seek_key.as_slice())).await.unwrap();
iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
.await
.unwrap();
assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
// Use prev_full_key(block_1_smallest_key) as seek key and result in the first KV of block
// 1.
let seek_key = prev_full_key(block_1_smallest_key.as_slice());
iter.seek_idx(0, Some(seek_key.as_slice())).await.unwrap();
iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
.await
.unwrap();
assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
iter.next().await.unwrap();
let block_1_second_key = iter.key().to_vec();
// Use a big enough seek key and result in invalid iterator.
let seek_key = test_key_of(30001);
iter.seek_idx(0, Some(seek_key.encode().as_slice()))
.await
.unwrap();
iter.seek_idx(0, Some(seek_key.to_ref())).await.unwrap();
assert!(!iter.is_valid());

// Test seek_idx. Result is dominated by key range rather than given seek key.
Expand All @@ -547,11 +550,15 @@ mod tests {
// Use block_2_smallest_key as seek key and result in invalid iterator.
let seek_key = block_2_smallest_key.clone();
assert!(KeyComparator::compare_encoded_full_key(&seek_key, &kr.right) == Ordering::Greater);
iter.seek_idx(0, Some(seek_key.as_slice())).await.unwrap();
iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
.await
.unwrap();
assert!(!iter.is_valid());
// Use a small enough seek key and result in the second KV of block 1.
let seek_key = test_key_of(0).encode();
iter.seek_idx(0, Some(seek_key.as_slice())).await.unwrap();
iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
.await
.unwrap();
assert!(iter.is_valid());
assert_eq!(iter.key(), block_1_second_key.to_ref());
// Use None seek key and result in the second KV of block 1.
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ impl Compactor {

// Don't allow two SSTs to share same user key
sst_builder
.add_full_key(&iter_key, value, is_new_user_key)
.add_full_key(iter_key, value, is_new_user_key)
.await?;

iter.next().await?;
Expand Down
10 changes: 7 additions & 3 deletions src/storage/src/hummock/sstable/backward_sstable_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ impl BackwardSstableIterator {
}

/// Seeks to a block, and then seeks to the key if `seek_key` is given.
async fn seek_idx(&mut self, idx: isize, seek_key: Option<&[u8]>) -> HummockResult<()> {
async fn seek_idx(
&mut self,
idx: isize,
seek_key: Option<FullKey<&[u8]>>,
) -> HummockResult<()> {
if idx >= self.sst.value().block_count() as isize || idx < 0 {
self.block_iter = None;
} else {
Expand Down Expand Up @@ -104,7 +108,7 @@ impl HummockIterator for BackwardSstableIterator {
}

fn key(&self) -> FullKey<&[u8]> {
FullKey::decode(self.block_iter.as_ref().expect("no block iter").key())
self.block_iter.as_ref().expect("no block iter").key()
}

fn value(&self) -> HummockValue<&[u8]> {
Expand Down Expand Up @@ -148,7 +152,7 @@ impl HummockIterator for BackwardSstableIterator {
.saturating_sub(1); // considering the boundary of 0
let block_idx = block_idx as isize;

self.seek_idx(block_idx, Some(encoded_key_slice)).await?;
self.seek_idx(block_idx, Some(key)).await?;
if !self.is_valid() {
// Seek to prev block
self.seek_idx(block_idx - 1, None).await?;
Expand Down
Loading