Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): do not compress table_id #8512

Merged
merged 23 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ impl<T: AsRef<[u8]>> UserKey<T> {
buf.put_slice(self.table_key.as_ref());
}

pub fn encode_table_key_into(&self, buf: &mut impl BufMut) {
buf.put_slice(self.table_key.as_ref());
}

/// Encode in to a buffer.
pub fn encode_length_prefixed(&self, buf: &mut impl BufMut) {
buf.put_u32(self.table_id.table_id());
Expand Down Expand Up @@ -583,6 +587,12 @@ impl<T: AsRef<[u8]>> FullKey<T> {
buf
}

// Encode in to a buffer.
pub fn encode_into_without_table_id(&self, buf: &mut impl BufMut) {
self.user_key.encode_table_key_into(buf);
buf.put_u64(self.epoch);
}

pub fn encode_reverse_epoch(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(
TABLE_PREFIX_LEN + self.user_key.table_key.as_ref().len() + EPOCH_LEN,
Expand Down Expand Up @@ -614,6 +624,20 @@ impl<'a> FullKey<&'a [u8]> {
}
}

/// Construct a [`FullKey`] from a byte slice without `table_id` encoded.
pub fn from_slice_without_table_id(
table_id: TableId,
slice_without_table_id: &'a [u8],
) -> Self {
let epoch_pos = slice_without_table_id.len() - EPOCH_LEN;
let epoch = (&slice_without_table_id[epoch_pos..]).get_u64();

Self {
user_key: UserKey::new(table_id, TableKey(&slice_without_table_id[..epoch_pos])),
epoch,
}
}

/// Construct a [`FullKey`] from a byte slice.
pub fn decode_reverse_epoch(slice: &'a [u8]) -> Self {
let epoch_pos = slice.len() - EPOCH_LEN;
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl SstableStreamIterator {

if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
block_iter.seek(seek_key);

if !block_iter.is_valid() {
// `seek_key` is larger than everything in the first block.
self.next_block().await?;
Expand Down
41 changes: 34 additions & 7 deletions src/storage/src/hummock/sstable/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::mem::size_of;
use std::ops::Range;

use bytes::{Buf, BufMut, Bytes, BytesMut};
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::KeyComparator;
use {lz4, zstd};
Expand Down Expand Up @@ -142,13 +143,18 @@ pub struct Block {
pub data: Bytes,
/// Uncompressed entried data length.
data_len: usize,

/// Table id of this block.
table_id: TableId,

/// Restart points.
restart_points: Vec<RestartPoint>,
}

impl Block {
pub fn decode(buf: Bytes, uncompressed_capacity: usize) -> HummockResult<Self> {
// Verify checksum.

let xxhash64_checksum = (&buf[buf.len() - 8..]).get_u64_le();
xxhash64_verify(&buf[..buf.len() - 8], xxhash64_checksum)?;

Expand Down Expand Up @@ -184,11 +190,12 @@ impl Block {
}

pub fn decode_from_raw(buf: Bytes) -> Self {
let table_id = (&buf[buf.len() - 4..]).get_u32_le();
// decode restart_points_type_index
let n_index = ((&buf[buf.len() - 4..]).get_u32_le()) as usize;
let n_index = ((&buf[buf.len() - 8..buf.len() - 4]).get_u32_le()) as usize;
let index_data_len = size_of::<u32>() + n_index * RestartPoint::size_of();
let data_len = buf.len() - index_data_len;
let mut restart_points_type_index_buf = &buf[data_len..buf.len() - 4];
let data_len = buf.len() - 4 - index_data_len;
let mut restart_points_type_index_buf = &buf[data_len..buf.len() - 8];

let mut index_key_vec = Vec::with_capacity(n_index);
for _ in 0..n_index {
Expand All @@ -213,6 +220,7 @@ impl Block {
let mut restart_points_buf = &buf[data_len..restarts_end];

let mut type_index: usize = 0;

for _ in 0..n_restarts {
let offset = restart_points_buf.get_u32_le();
if type_index < index_key_vec.len() - 1
Expand All @@ -232,6 +240,7 @@ impl Block {
data: buf,
data_len,
restart_points,
table_id: TableId::new(table_id),
}
}

Expand All @@ -243,7 +252,13 @@ impl Block {
}

pub fn capacity(&self) -> usize {
self.data.len() + self.restart_points.capacity() * std::mem::size_of::<u32>()
self.data.len()
+ self.restart_points.capacity() * std::mem::size_of::<u32>()
+ std::mem::size_of::<u32>()
}

pub fn table_id(&self) -> TableId {
self.table_id
}

/// Gets restart point by index.
Expand Down Expand Up @@ -385,6 +400,7 @@ pub struct BlockBuilder {
/// Compression algorithm.
compression_algorithm: CompressionAlgorithm,

table_id: Option<u32>,
// restart_points_type_index stores only the restart_point corresponding to each type change,
// as an index, in order to reduce space usage
restart_points_type_index: Vec<RestartPoint>,
Expand All @@ -402,6 +418,7 @@ impl BlockBuilder {
last_key: vec![],
entry_count: 0,
compression_algorithm: options.compression_algorithm,
table_id: None,
restart_points_type_index: Vec::default(),
}
}
Expand All @@ -420,15 +437,20 @@ impl BlockBuilder {
///
/// Panic if key is not added in ASCEND order.
pub fn add(&mut self, full_key: FullKey<&[u8]>, value: &[u8]) {
let input_table_id = full_key.user_key.table_id.table_id();
match self.table_id {
Some(current_table_id) => debug_assert_eq!(current_table_id, input_table_id),
None => self.table_id = Some(input_table_id),
}
#[cfg(debug_assertions)]
self.debug_valid();

let mut key: BytesMut = Default::default();
full_key.encode_into(&mut key);
full_key.encode_into_without_table_id(&mut key);
if self.entry_count > 0 {
debug_assert!(!key.is_empty());
debug_assert_eq!(
KeyComparator::compare_encoded_full_key(&self.last_key[..], &key),
KeyComparator::compare_encoded_full_key(&self.last_key[..], &key[..]),
Ordering::Less
);
}
Expand Down Expand Up @@ -462,7 +484,7 @@ impl BlockBuilder {

key.as_ref()
} else {
bytes_diff_below_max_key_length(&self.last_key, &key)
bytes_diff_below_max_key_length(&self.last_key, &key[..])
};

let prefix = KeyPrefix::new_without_len(
Expand Down Expand Up @@ -492,6 +514,7 @@ impl BlockBuilder {
pub fn clear(&mut self) {
self.buf.clear();
self.restart_points.clear();
self.table_id = None;
self.restart_points_type_index.clear();
self.last_key.clear();
self.entry_count = 0;
Expand All @@ -504,6 +527,7 @@ impl BlockBuilder {
+ (RestartPoint::size_of()) // (offset + len_type(u8)) * len
* self.restart_points_type_index.len()
+ std::mem::size_of::<u32>() // restart_points_type_index len
+ std::mem::size_of::<u32>() // table_id len
}

/// Finishes building block.
Expand Down Expand Up @@ -545,6 +569,7 @@ impl BlockBuilder {
self.buf
.put_u32_le(self.restart_points_type_index.len() as u32);

self.buf.put_u32_le(self.table_id.unwrap());
match self.compression_algorithm {
CompressionAlgorithm::None => (),
CompressionAlgorithm::Lz4 => {
Expand Down Expand Up @@ -581,6 +606,7 @@ impl BlockBuilder {
self.compression_algorithm.encode(&mut self.buf);
let checksum = xxhash64_checksum(&self.buf);
self.buf.put_u64_le(checksum);

self.buf.as_ref()
}

Expand All @@ -595,6 +621,7 @@ impl BlockBuilder {
+ std::mem::size_of::<u32>() // restart_points_type_indics.len
+ std::mem::size_of::<CompressionAlgorithm>() // compression_algorithm
+ std::mem::size_of::<u64>() // checksum
+ std::mem::size_of::<u32>() // table_id
}

pub fn debug_valid(&self) {
Expand Down
38 changes: 18 additions & 20 deletions src/storage/src/hummock/sstable/block_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::ops::Range;

use bytes::BytesMut;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::KeyComparator;

use super::{KeyPrefix, LenType, RestartPoint};
use crate::hummock::BlockHolder;
Expand Down Expand Up @@ -77,7 +76,8 @@ impl BlockIterator {

pub fn key(&self) -> FullKey<&[u8]> {
assert!(self.is_valid());
FullKey::decode(&self.key)

FullKey::from_slice_without_table_id(self.block.table_id(), &self.key[..])
}

pub fn value(&self) -> &[u8] {
Expand All @@ -99,19 +99,19 @@ impl BlockIterator {
}

pub fn seek(&mut self, key: FullKey<&[u8]>) {
let full_key_encoded = key.encode();
self.seek_restart_point_by_key(&full_key_encoded);
self.next_until_key(&full_key_encoded);
self.seek_restart_point_by_key(key);

self.next_until_key(key);
}

pub fn seek_le(&mut self, key: FullKey<&[u8]>) {
let full_key_encoded = key.encode();
self.seek_restart_point_by_key(&full_key_encoded);
self.next_until_key(&full_key_encoded);
self.seek_restart_point_by_key(key);

self.next_until_key(key);
if !self.is_valid() {
self.seek_to_last();
}
self.prev_until_key(&full_key_encoded);
self.prev_until_key(key);
}
}

Expand Down Expand Up @@ -171,19 +171,15 @@ impl BlockIterator {
}

/// Moves forward until reaching the first that equals or larger than the given `key`.
fn next_until_key(&mut self, key: &[u8]) {
while self.is_valid()
&& KeyComparator::compare_encoded_full_key(&self.key[..], key) == Ordering::Less
{
fn next_until_key(&mut self, key: FullKey<&[u8]>) {
while self.is_valid() && self.key().cmp(&key) == Ordering::Less {
self.next_inner();
}
}

/// Moves backward until reaching the first key that equals or smaller than the given `key`.
fn prev_until_key(&mut self, key: &[u8]) {
while self.is_valid()
&& KeyComparator::compare_encoded_full_key(&self.key[..], key) == Ordering::Greater
{
fn prev_until_key(&mut self, key: FullKey<&[u8]>) {
while self.is_valid() && self.key().cmp(&key) == Ordering::Greater {
self.prev_inner();
}
}
Expand Down Expand Up @@ -240,7 +236,7 @@ impl BlockIterator {
}

/// Searches the restart point index that the given `key` belongs to.
fn search_restart_point_index_by_key(&self, key: &[u8]) -> usize {
fn search_restart_point_index_by_key(&self, key: FullKey<&[u8]>) -> usize {
// Find the largest restart point that restart key equals or less than the given key.
self.block
.search_restart_partition_point(
Expand All @@ -252,7 +248,9 @@ impl BlockIterator {
let prefix =
self.decode_prefix_at(probe as usize, key_len_type, value_len_type);
let probe_key = &self.block.data()[prefix.diff_key_range()];
match KeyComparator::compare_encoded_full_key(probe_key, key) {
let full_probe_key =
FullKey::from_slice_without_table_id(self.block.table_id(), probe_key);
match full_probe_key.cmp(&key) {
Ordering::Less | Ordering::Equal => true,
Ordering::Greater => false,
}
Expand All @@ -262,7 +260,7 @@ impl BlockIterator {
}

/// Seeks to the restart point that the given `key` belongs to.
fn seek_restart_point_by_key(&mut self, key: &[u8]) {
fn seek_restart_point_by_key(&mut self, key: FullKey<&[u8]>) {
let index = self.search_restart_point_index_by_key(key);
self.seek_restart_point_by_index(index)
}
Expand Down