Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): flexible KeyPrefix encoding for Block #8379

Merged
merged 14 commits into from
Mar 14, 2023
Merged
52 changes: 47 additions & 5 deletions src/storage/benches/bench_block_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(once_cell)]
use std::sync::LazyLock;

use bytes::{BufMut, Bytes, BytesMut};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use risingwave_storage::hummock::{
Expand All @@ -22,6 +25,7 @@ const TABLES_PER_SSTABLE: u32 = 10;
const KEYS_PER_TABLE: u64 = 100;
const RESTART_INTERVAL: usize = 16;
const BLOCK_CAPACITY: usize = TABLES_PER_SSTABLE as usize * KEYS_PER_TABLE as usize * 64;
const EXCHANGE_INTERVAL: usize = RESTART_INTERVAL / 2;

fn block_iter_next(block: BlockHolder) {
let mut iter = BlockIterator::new(block);
Expand Down Expand Up @@ -85,11 +89,22 @@ fn bench_block_iter(c: &mut Criterion) {
let l = data.len();
let block = BlockHolder::from_owned_block(Box::new(Block::decode(data, l).unwrap()));
let mut iter = BlockIterator::new(block);
let mut item_count = 0;
let mut ext_index = 0;
let (mut k_ext, mut v_ext) = (&DATA_LEN_SET[ext_index].0, &DATA_LEN_SET[ext_index].1);

iter.seek_to_first();
for t in 1..=TABLES_PER_SSTABLE {
for i in 1..=KEYS_PER_TABLE {
assert_eq!(iter.key(), key(t, i).to_vec());
assert_eq!(iter.value(), value(i).to_vec());
item_count += 1;

if item_count % EXCHANGE_INTERVAL == 0 {
ext_index = (ext_index + 1) % DATA_LEN_SET.len();
(k_ext, v_ext) = (&DATA_LEN_SET[ext_index].0, &DATA_LEN_SET[ext_index].1);
}

assert_eq!(iter.key(), key(t, i, k_ext).to_vec());
assert_eq!(iter.value(), value(i, v_ext).to_vec());
iter.next();
}
}
Expand All @@ -99,30 +114,57 @@ fn bench_block_iter(c: &mut Criterion) {
criterion_group!(benches, bench_block_iter);
criterion_main!(benches);

static DATA_LEN_SET: LazyLock<Vec<(Vec<u8>, Vec<u8>)>> = LazyLock::new(|| {
vec![
(vec![b'a'; 10], vec![b'a'; 10]), // U8U8
(vec![b'a'; 10], vec![b'a'; 300]), // U8U16
(vec![b'a'; 100], vec![b'a'; 65550]), // U8U32
(vec![b'a'; 300], vec![b'a'; 100]), // U16U8
(vec![b'a'; 300], vec![b'a'; 300]), // U16U16
(vec![b'a'; 300], vec![b'a'; 65550]), // U16U32
(vec![b'a'; 65550], vec![b'a'; 100]), // U32U8
(vec![b'a'; 65550], vec![b'a'; 300]), // U32U16
(vec![b'a'; 65550], vec![b'a'; 65550]), // U32U32
]
});

fn build_block_data(t: u32, i: u64) -> Bytes {
let options = BlockBuilderOptions {
capacity: BLOCK_CAPACITY,
compression_algorithm: CompressionAlgorithm::None,
restart_interval: RESTART_INTERVAL,
};
let mut builder = BlockBuilder::new(options);
let mut item_count = 0;
let mut ext_index = 0;
let (mut k_ext, mut v_ext) = (&DATA_LEN_SET[ext_index].0, &DATA_LEN_SET[ext_index].1);

for tt in 1..=t {
for ii in 1..=i {
builder.add(&key(tt, ii), &value(ii));
item_count += 1;

if item_count % EXCHANGE_INTERVAL == 0 {
ext_index = (ext_index + 1) % DATA_LEN_SET.len();
(k_ext, v_ext) = (&DATA_LEN_SET[ext_index].0, &DATA_LEN_SET[ext_index].1);
}

builder.add(&key(tt, ii, k_ext), &value(ii, v_ext));
}
}
Bytes::from(builder.build().to_vec())
}

fn key(t: u32, i: u64) -> Bytes {
fn key(t: u32, i: u64, ext: &[u8]) -> Bytes {
let mut buf = BytesMut::new();
buf.put_slice(ext);
buf.put_u32(t);
buf.put_u64(i);
buf.freeze()
}

fn value(i: u64) -> Bytes {
fn value(i: u64, ext: &[u8]) -> Bytes {
let mut buf = BytesMut::new();
buf.put_u64(i);
buf.put(ext);
buf.freeze()
}
Loading