Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): change the prefix_hint to dist_key_hint for bloom_filter #6575

Merged
merged 59 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
e6af44d
distribution_key_start_index_in_pk
wcy-fdu Nov 24, 2022
241723b
refactor judgement of prefix_hint
wcy-fdu Nov 24, 2022
759df20
rename prefix_hint to dist_key_hint
wcy-fdu Nov 24, 2022
4c6c942
retry
wcy-fdu Nov 24, 2022
02f48da
retry
wcy-fdu Nov 24, 2022
75a3a9b
add start_index in FilterKeyExtractor
wcy-fdu Nov 28, 2022
7ce696c
fix MultiFilterKeyExtractor
wcy-fdu Nov 28, 2022
b2209cb
todo: rename all pk_prefix to dist_key
wcy-fdu Nov 28, 2022
49381a9
rollback and fix
wcy-fdu Nov 28, 2022
21d7c68
detect correctness, fix
wcy-fdu Nov 28, 2022
b6d767d
hope to pass CI
wcy-fdu Nov 28, 2022
35faa1f
some rename
wcy-fdu Nov 29, 2022
4d7f27e
outer join fails
wcy-fdu Nov 29, 2022
85bc650
remove table_id and vnode in bloom_filter_key
wcy-fdu Nov 29, 2022
fd9b63e
ignore some check
wcy-fdu Nov 30, 2022
d0868c1
remove assertion, hope to pass CI
wcy-fdu Nov 30, 2022
9c37a58
fix ut: test_compaction_with_filter_key_extractor
wcy-fdu Nov 30, 2022
cb8e352
remove distribution_key_start_index_in_pk in catalog
wcy-fdu Nov 30, 2022
746e6a0
resolve conflict
wcy-fdu Nov 30, 2022
7216db8
fix typo
wcy-fdu Nov 30, 2022
88aab17
fix dist_key shuffle
wcy-fdu Dec 1, 2022
62e985d
add more check when point get
wcy-fdu Dec 1, 2022
7d3073d
fix
wcy-fdu Dec 1, 2022
09267dd
fix
wcy-fdu Dec 1, 2022
b5a91aa
resolve some comments
wcy-fdu Dec 2, 2022
9a8ff30
fix again
wcy-fdu Dec 2, 2022
6220865
try again
wcy-fdu Dec 2, 2022
0bdbdae
try again
wcy-fdu Dec 2, 2022
ba5752f
clean up code, fix point-get check bloom filter
wcy-fdu Dec 5, 2022
b11a81b
ignore discontinuous cases
wcy-fdu Dec 5, 2022
16a0a1d
ignore shuffle dist key
wcy-fdu Dec 5, 2022
fe060c3
add discontinuous dist key
wcy-fdu Dec 5, 2022
8052d10
handle shuffle dist key
wcy-fdu Dec 5, 2022
ab72252
we should ignore shuffle dist key
wcy-fdu Dec 5, 2022
9b7be4a
need to find bug
wcy-fdu Dec 5, 2022
09ee7c2
minor fix
wcy-fdu Dec 5, 2022
88c1d87
resolve some comments
wcy-fdu Dec 5, 2022
aa34b37
typo
wcy-fdu Dec 5, 2022
8c42dfa
sort before judge continuous
wcy-fdu Dec 6, 2022
c120fcd
debug
wcy-fdu Dec 6, 2022
44c8295
find bug and fix
wcy-fdu Dec 6, 2022
ba22bbc
Merge branch 'main' into wcy/prefix-bloom-filter
wcy-fdu Dec 6, 2022
059a6cc
use xxhash
wcy-fdu Dec 6, 2022
80dbeb9
cargo toml fmt
wcy-fdu Dec 6, 2022
d661984
cargo toml fmt
wcy-fdu Dec 6, 2022
de3d6a3
resolve comments
wcy-fdu Dec 7, 2022
8085a59
resolve some comments
wcy-fdu Dec 7, 2022
77403a1
retry
wcy-fdu Dec 7, 2022
2b0cc7d
add check when point get
wcy-fdu Dec 7, 2022
e15153d
fix ut
wcy-fdu Dec 8, 2022
e440703
keep xxh
wcy-fdu Dec 8, 2022
6573752
fix
wcy-fdu Dec 8, 2022
a65a211
resolve conflict
wcy-fdu Dec 8, 2022
b75ef11
resolve conflict
wcy-fdu Dec 8, 2022
ded6f46
ut
wcy-fdu Dec 8, 2022
4991892
retry
wcy-fdu Dec 8, 2022
84831a3
resolve comments
wcy-fdu Dec 8, 2022
62a68a5
resolve conflict
wcy-fdu Dec 8, 2022
e524946
Merge branch 'main' into wcy/prefix-bloom-filter
mergify[bot] Dec 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ mod tests {
epoch,
None,
ReadOptions {
prefix_hint: None,
dist_key_hint: None,
check_bloom_filter: false,
ignore_range_tombstone: false,
table_id: Default::default(),
Expand Down
33 changes: 33 additions & 0 deletions src/common/src/catalog/internal_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::LazyLock;

use itertools::Itertools;
use regex::Regex;

pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table";
Expand All @@ -38,3 +39,35 @@ pub fn valid_table_name(table_name: &str) -> bool {
LazyLock::new(|| Regex::new(r"__internal_.*_\d+").unwrap());
!INTERNAL_TABLE_NAME.is_match(table_name)
}

pub fn get_dist_key_in_pk_indices(dist_key_indices: &[usize], pk_indices: &[usize]) -> Vec<usize> {
let dist_key_in_pk_indices = dist_key_indices
.iter()
.map(|&di| {
pk_indices
.iter()
.position(|&pi| di == pi)
.unwrap_or_else(|| {
panic!(
"distribution key {:?} must be a subset of primary key {:?}",
dist_key_indices, pk_indices
)
})
})
.collect_vec();
dist_key_in_pk_indices
}

/// Get distribution key start index in pk, and return None if `dist_key_in_pk_indices` is not empty
/// or continuous.
/// Note that `dist_key_in_pk_indices` may be shuffled, the start index should be the
/// minimum value.
pub fn get_dist_key_start_index_in_pk(dist_key_in_pk_indices: &[usize]) -> Option<usize> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_min_dist_key_start_index_if_continuous?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_index is always min

match !dist_key_in_pk_indices.is_empty()
&& *dist_key_in_pk_indices.iter().min().unwrap() + dist_key_in_pk_indices.len() - 1
== *dist_key_in_pk_indices.iter().max().unwrap()
{
true => Some(*dist_key_in_pk_indices.iter().min().unwrap()),
false => None,
}
}
92 changes: 92 additions & 0 deletions src/common/src/util/ordered/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,34 @@ impl OrderedRowSerde {

Ok(len)
}

/// return the distribution key start position in serialized key and the distribution key
/// length.
pub fn deserialize_dist_key_position_with_column_indices(
&self,
key: &[u8],
dist_key_indices_pair: (usize, usize),
) -> memcomparable::Result<(usize, usize)> {
let (dist_key_start_index, dist_key_end_index) = dist_key_indices_pair;
use crate::types::ScalarImpl;
let mut dist_key_start_position: usize = 0;
let mut len: usize = 0;
for index in 0..dist_key_end_index {
let data_type = &self.schema[index];
let order_type = &self.order_types[index];
let data = &key[len..];
let mut deserializer = memcomparable::Deserializer::new(data);
deserializer.set_reverse(*order_type == OrderType::Descending);

let field_length = ScalarImpl::encoding_data_size(data_type, &mut deserializer)?;
len += field_length;
if index < dist_key_start_index {
dist_key_start_position += field_length;
}
}

Ok((dist_key_start_position, (len - dist_key_start_position)))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -237,6 +265,70 @@ mod tests {
}
}

#[test]
fn test_deserialize_dist_key_position_with_column_indices() {
let order_types = vec![
OrderType::Descending,
OrderType::Ascending,
OrderType::Descending,
OrderType::Ascending,
];

let schema = vec![
DataType::Varchar,
DataType::Int16,
DataType::Varchar,
DataType::Varchar,
];
let serde = OrderedRowSerde::new(schema, order_types);
let row1 = Row::new(vec![
Some(Utf8("aaa".to_string().into())),
Some(Int16(5)),
Some(Utf8("bbb".to_string().into())),
Some(Utf8("ccc".to_string().into())),
]);
let rows = vec![row1];
let mut array = vec![];
for row in &rows {
let mut row_bytes = vec![];
serde.serialize(row, &mut row_bytes);
array.push(row_bytes);
}

{
let dist_key_indices = [1, 2];
let dist_key_start_index = 1;
let (dist_key_start_position, dist_key_len) = serde
.deserialize_dist_key_position_with_column_indices(
&array[0],
(
dist_key_start_index,
dist_key_start_index + dist_key_indices.len(),
),
)
.unwrap();

let schema = vec![DataType::Varchar];
let order_types = vec![OrderType::Descending];
let deserde = OrderedRowSerde::new(schema, order_types);
let prefix_slice = &array[0][0..dist_key_start_position];
assert_eq!(
deserde.deserialize(prefix_slice).unwrap(),
Row::new(vec![Some(Utf8("aaa".to_string().into()))])
);

let schema = vec![DataType::INT16, DataType::VARCHAR];
let order_types = vec![OrderType::Ascending, OrderType::Descending];
let deserde = OrderedRowSerde::new(schema, order_types);
let dist_key_slice =
&array[0][dist_key_start_position..dist_key_start_position + dist_key_len];
assert_eq!(
deserde.deserialize(dist_key_slice).unwrap(),
Row::new(vec![Some(Int16(5)), Some(Utf8("bbb".to_string().into()))])
);
}
}

#[test]
fn test_encoding_data_size() {
use std::mem::size_of;
Expand Down
17 changes: 2 additions & 15 deletions src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

use std::ops::{Bound, RangeBounds};

use itertools::Itertools;
use paste::paste;
use risingwave_pb::batch_plan::scan_range::Bound as BoundProst;
use risingwave_pb::batch_plan::ScanRange as ScanRangeProst;

use super::value_encoding::serialize_datum_to_bytes;
use crate::catalog::get_dist_key_in_pk_indices;
use crate::hash::VirtualNode;
use crate::row::{Row2, RowExt};
use crate::types::{Datum, ScalarImpl};
Expand Down Expand Up @@ -84,20 +84,7 @@ impl ScanRange {
return None;
}

let dist_key_in_pk_indices = dist_key_indices
.iter()
.map(|&di| {
pk_indices
.iter()
.position(|&pi| di == pi)
.unwrap_or_else(|| {
panic!(
"distribution keys {:?} must be a subset of primary keys {:?}",
dist_key_indices, pk_indices
)
})
})
.collect_vec();
let dist_key_in_pk_indices = get_dist_key_in_pk_indices(dist_key_indices, pk_indices);
let pk_prefix_len = self.eq_conds.len();
if dist_key_in_pk_indices.iter().any(|&i| i >= pk_prefix_len) {
return None;
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn list_kv(epoch: u64, table_id: u32) -> anyhow::Result<()> {
None,
ReadOptions {
ignore_range_tombstone: false,
prefix_hint: None,
dist_key_hint: None,
table_id: TableId { table_id },
retention_seconds: None,
check_bloom_filter: false,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ mod tests {
vnode_col_idx: None,
value_indices: vec![0],
definition: "".into(),
handle_pk_conflict: false
handle_pk_conflict: false,
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,16 @@ impl StreamMaterialize {

let ctx = input.ctx();
let properties = ctx.inner().with_options.internal_table_subset();
let distribution_key = base.dist.dist_column_indices().to_vec();

let table = TableCatalog {
id: TableId::placeholder(),
associated_source_id: None,
name: mv_name,
columns,
pk: pk_list,
stream_key: pk_indices.clone(),
distribution_key: base.dist.dist_column_indices().to_vec(),
distribution_key,
is_index,
appendonly: input.append_only(),
owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
Expand Down
1 change: 1 addition & 0 deletions src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ tokio-stream = "0.1"
tonic = { version = "0.2", package = "madsim-tonic" }
tracing = "0.1"
twox-hash = "1"
xxhash-rust = { version = "0.8.5", features = ["xxh32"] }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

twox-hash and xxhash-rust are both used to calculate xxhash. Let's pick one and only use one lib for xxhash.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's replace twox-hash with xxhash-rust it in next PR.

zstd = "0.11.2"

[target.'cfg(not(madsim))'.dependencies]
Expand Down
Loading