Skip to content

Commit

Permalink
refactor(storage): change FullKey from concatenated &[u8] to struct (
Browse files Browse the repository at this point in the history
…#6130)

* tmp

* tmp commit

* refactor most of the iterators

* refactor iterators and sst builders

* refactor compaction

* shared buffer left

* shared buffer batch iter

* shared buffer compact left

* pass cargo check

* refactor sst dump

* fix ut

* fix key

* fixes

* remove prefixed_key

* fix backward range

* add streaming table println

* pass ut

* remove println

* fix bug in staging prune

* fix e2e bug

* fix memory state store and refine ut

* include key_cmp.rs

* fix shared buffer filter

* refine parameter name of StateStore impl and comment

* fix ut

* fix clippy error

* fix bench compactor

* add table id filter test

* rename table_key param back to key because table_key should be a storage internal concept

* restore risedev.yml

* misc fix

* change should_delete param to UserKey

* optimize sst builder

* fix ut

* derive copy for FullKey and UserKey

* clippy

* fix

* typo

* change the type of StateStoreIter key to FullKey<Vec<u8>>

* fix list kv

* wrap table key with struct

* fix

* restore risedev.yml

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
Gun9niR and mergify[bot] authored Nov 11, 2022
1 parent 78fd8a0 commit bdc3ade
Show file tree
Hide file tree
Showing 68 changed files with 2,278 additions and 1,584 deletions.
2 changes: 1 addition & 1 deletion docs/state-store-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ After compaction (w/ min watermark = 0), there will eventually be an SST with th
(b, 1) => 2
```

The final written key (aka. full key) is encoded by appending the 8-byte epoch after the user key. When doing full key comparison in Hummock, we should always compare full keys using the `VersionedComparator` to get the correct result.
The final written key (aka. full key) is encoded by appending the 8-byte epoch after the user key. When doing full key comparison in Hummock, we should always compare full keys using the `KeyComparator` to get the correct result.

### Write Path

Expand Down
14 changes: 3 additions & 11 deletions src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound;
use core::ops::Bound::Unbounded;

use bytes::{Buf, BufMut, BytesMut};
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::next_key;
use risingwave_storage::store::{ReadOptions, StateStoreReadExt};

use crate::common::HummockServiceOpts;
Expand All @@ -28,12 +26,7 @@ pub async fn list_kv(epoch: u64, table_id: u32) -> anyhow::Result<()> {
tracing::info!("using u64::MAX as epoch");
}
let scan_result = {
let mut buf = BytesMut::with_capacity(5);
buf.put_u32(table_id);
let range = (
Bound::Included(buf.to_vec()),
Bound::Excluded(next_key(buf.to_vec().as_slice())),
);
let range = (Unbounded, Unbounded);
hummock
.scan(
range,
Expand All @@ -49,8 +42,7 @@ pub async fn list_kv(epoch: u64, table_id: u32) -> anyhow::Result<()> {
.await?
};
for (k, v) in scan_result {
let mut buf = &k[..];
let print_string = format!("[t{}]", buf.get_u32());
let print_string = format!("[t{}]", k.user_key.table_id.table_id());
println!("{} {:?} => {:?}", print_string, k, v)
}
hummock_opts.shutdown().await;
Expand Down
17 changes: 9 additions & 8 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::types::to_datum_ref;
use risingwave_common::types::to_text::ToText;
use risingwave_frontend::TableCatalog;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt;
use risingwave_hummock_sdk::key::{get_epoch, get_table_id, user_key};
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::HummockSstableId;
use risingwave_object_store::object::BlockLocation;
use risingwave_rpc_client::MetaClient;
Expand Down Expand Up @@ -149,8 +149,9 @@ fn print_kv_pairs(
block_iter.seek_to_first();

while block_iter.is_valid() {
let full_key = block_iter.key();
let user_key = user_key(full_key);
let raw_full_key = block_iter.key();
let full_key = FullKey::decode(block_iter.key());
let raw_user_key = full_key.user_key.encode();

let full_val = block_iter.value();
let humm_val = HummockValue::from_slice(block_iter.value())?;
Expand All @@ -159,11 +160,11 @@ fn print_kv_pairs(
HummockValue::Delete => (false, &[] as &[u8]),
};

let epoch = get_epoch(full_key);
let epoch = full_key.epoch;

println!("\t\t full key: {:02x?}", full_key);
println!("\t\t full key: {:02x?}", raw_full_key);
println!("\t\tfull value: {:02x?}", full_val);
println!("\t\t user key: {:02x?}", user_key);
println!("\t\t user key: {:02x?}", raw_user_key);
println!("\t\tuser value: {:02x?}", user_val);
println!("\t\t epoch: {}", epoch);
println!("\t\t type: {}", if is_put { "Put" } else { "Delete" });
Expand All @@ -180,12 +181,12 @@ fn print_kv_pairs(

/// If possible, prints information about the table, column, and stored value.
fn print_table_column(
full_key: &[u8],
full_key: FullKey<&[u8]>,
user_val: &[u8],
table_data: &TableData,
is_put: bool,
) -> anyhow::Result<()> {
let table_id = get_table_id(full_key);
let table_id = full_key.user_key.table_id.table_id();

print!("\t\t table: {} - ", table_id);
let table_catalog = match table_data.get(&table_id) {
Expand Down
13 changes: 8 additions & 5 deletions src/meta/src/hummock/compaction/overlap_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering;

use itertools::Itertools;
use risingwave_hummock_sdk::key_range::KeyRangeCommon;
use risingwave_hummock_sdk::VersionedComparator;
use risingwave_hummock_sdk::KeyComparator;
use risingwave_pb::hummock::{KeyRange, SstableInfo};

pub trait OverlapInfo {
Expand Down Expand Up @@ -76,19 +78,20 @@ impl OverlapInfo for RangeOverlapInfo {
Some(key_range) => {
let mut tables = vec![];
let overlap_begin = others.partition_point(|table_status| {
VersionedComparator::less_than(
KeyComparator::compare_encoded_full_key(
&table_status.key_range.as_ref().unwrap().right,
&key_range.left,
)
) == Ordering::Less
});
if overlap_begin >= others.len() {
return vec![];
}
for table in &others[overlap_begin..] {
if VersionedComparator::less_than(
if KeyComparator::compare_encoded_full_key(
&key_range.right,
&table.key_range.as_ref().unwrap().left,
) {
) == Ordering::Less
{
break;
}
tables.push(table.clone());
Expand Down
17 changes: 17 additions & 0 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use std::time::Duration;

use itertools::Itertools;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::filter_key_extractor::{
FilterKeyExtractorImpl, FilterKeyExtractorManagerRef, FullKeyFilterKeyExtractor,
};
use risingwave_hummock_sdk::key::key_with_epoch;
use risingwave_hummock_sdk::{
CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableId, LocalSstableInfo,
Expand Down Expand Up @@ -201,6 +204,20 @@ pub async fn unregister_table_ids_from_compaction_group<S>(
.unwrap();
}

pub fn update_filter_key_extractor_for_table_ids(
filter_key_extractor_manager_ref: FilterKeyExtractorManagerRef,
table_ids: &[u32],
) {
for table_id in table_ids {
filter_key_extractor_manager_ref.update(
*table_id,
Arc::new(FilterKeyExtractorImpl::FullKey(
FullKeyFilterKeyExtractor::default(),
)),
)
}
}

/// Generate keys like `001_key_test_00002` with timestamp `epoch`.
pub fn iterator_test_key_of_epoch(
table: HummockSstableId,
Expand Down
2 changes: 1 addition & 1 deletion src/source/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ mod tests {
let pk_column_ids = vec![1];
let info = TableSourceInfo {};

let _keyspace = Keyspace::table_root(MemoryStateStore::new(), &table_id);
let _keyspace = Keyspace::table_root(MemoryStateStore::new(), table_id);

let mem_source_manager: TableSourceManagerRef = Arc::new(TableSourceManager::default());
let mut source_builder = SourceDescBuilder::new(
Expand Down
2 changes: 1 addition & 1 deletion src/source/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ mod tests {

fn new_source() -> TableSource {
let store = MemoryStateStore::new();
let _keyspace = Keyspace::table_root(store, &Default::default());
let _keyspace = Keyspace::table_root(store, Default::default());

TableSource::new(vec![ColumnDesc::unnamed(
ColumnId::from(0),
Expand Down
19 changes: 10 additions & 9 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use std::ops::Range;
use std::sync::Arc;

use bytes::BufMut;
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};
use risingwave_hummock_sdk::key::key_with_epoch;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::{InMemObjectStore, ObjectStore, ObjectStoreImpl};
Expand Down Expand Up @@ -63,11 +63,12 @@ pub fn default_writer_opts() -> SstableWriterOptions {
}
}

pub fn test_key_of(idx: usize, epoch: u64) -> Vec<u8> {
let mut user_key = Vec::new();
user_key.put_u32(0);
user_key.put_slice(format!("key_test_{:08}", idx * 2).as_bytes());
key_with_epoch(user_key, epoch)
pub fn test_key_of(idx: usize, epoch: u64) -> FullKey<Vec<u8>> {
FullKey::for_test(
TableId::default(),
format!("key_test_{:08}", idx * 2).as_bytes().to_vec(),
epoch,
)
}

const MAX_KEY_COUNT: usize = 128 * 1024;
Expand Down Expand Up @@ -96,11 +97,11 @@ async fn build_table(
let mut builder = SstableBuilder::for_test(sstable_id, writer, opt);
let value = b"1234567890123456789";
let mut full_key = test_key_of(0, epoch);
let user_len = full_key.len() - 8;
let table_key_len = full_key.user_key.table_key.len();
for i in range {
let start = (i % 8) as usize;
let end = start + 8;
full_key[(user_len - 8)..user_len].copy_from_slice(&i.to_be_bytes());
full_key.user_key.table_key[table_key_len - 8..].copy_from_slice(&i.to_be_bytes());
builder
.add(&full_key, HummockValue::put(&value[start..end]), true)
.await
Expand Down
10 changes: 4 additions & 6 deletions src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::time::Duration;

use bytes::BufMut;
use criterion::{criterion_group, criterion_main, Criterion};
use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::UserKey;
use risingwave_object_store::object::{ObjectStore, ObjectStoreImpl, S3ObjectStore};
use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
use risingwave_storage::hummock::value::HummockValue;
Expand Down Expand Up @@ -90,11 +91,8 @@ fn get_builder_options(capacity_mb: usize) -> SstableBuilderOptions {
}
}

fn test_user_key_of(idx: u64) -> Vec<u8> {
let mut user_key = Vec::new();
user_key.put_u32(0);
user_key.put_u64(idx);
user_key
fn test_user_key_of(idx: u64) -> UserKey<Vec<u8>> {
UserKey::for_test(TableId::default(), idx.to_be_bytes().to_vec())
}

async fn build_tables<F: SstableWriterFactory>(
Expand Down
Loading

0 comments on commit bdc3ade

Please sign in to comment.