Skip to content

Commit

Permalink
feat(storage): change the prefix_hint to dist_key_hint for bloom_filt…
Browse files Browse the repository at this point in the history
…er (#6575)

* distribution_key_start_index_in_pk

* refactor judgement of prefix_hint

* rename prefix_hint to dist_key_hint

* retry

* retry

* add start_index in FilterKeyExtractor

* fix MultiFilterKeyExtractor

* todo: rename all pk_prefix to dist_key

* rollback and fix

* detect correctness, fix

* hope to pass CI

* some rename

* outer join fails

* remove table_id and vnode in bloom_filter_key

* ignore some check

* remove assertion, hope to pass CI

* fix ut: test_compaction_with_filter_key_extractor

* remove distribution_key_start_index_in_pk in catalog

* fix typo

* fix dist_key shuffle

* add more check when point get

* fix

* fix

* resolve some comments

* fix again

* try again

* try again

* clean up code, fix point-get check bloom filter

* ignore discontinuous cases

* ignore shuffle dist key

* add discontinuous dist key

* handle shuffle dist key

* need to find bug

* minor fix

* resolve some comments

* typo

* sort before judge continuous

* debug

* find bug and fix

* use xxhash

* cargo toml fmt

* cargo toml fmt

* resolve comments

* resolve some comments

* retry

* add check when point get

* fix ut

* keep xxh

* fix

* resolve conflict

* ut

* retry

* resolve comments

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
wcy-fdu and mergify[bot] authored Dec 8, 2022
1 parent 0439b74 commit 62e49bf
Show file tree
Hide file tree
Showing 27 changed files with 490 additions and 435 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ mod tests {
epoch,
None,
ReadOptions {
prefix_hint: None,
dist_key_hint: None,
check_bloom_filter: false,
ignore_range_tombstone: false,
table_id: Default::default(),
Expand Down
39 changes: 39 additions & 0 deletions src/common/src/catalog/internal_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::LazyLock;

use itertools::Itertools;
use regex::Regex;

pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table";
Expand All @@ -38,3 +39,41 @@ pub fn valid_table_name(table_name: &str) -> bool {
LazyLock::new(|| Regex::new(r"__internal_.*_\d+").unwrap());
!INTERNAL_TABLE_NAME.is_match(table_name)
}

pub fn get_dist_key_in_pk_indices(dist_key_indices: &[usize], pk_indices: &[usize]) -> Vec<usize> {
let dist_key_in_pk_indices = dist_key_indices
.iter()
.map(|&di| {
pk_indices
.iter()
.position(|&pi| di == pi)
.unwrap_or_else(|| {
panic!(
"distribution key {:?} must be a subset of primary key {:?}",
dist_key_indices, pk_indices
)
})
})
.collect_vec();
dist_key_in_pk_indices
}

/// Get distribution key start index in pk, and return None if `dist_key_in_pk_indices` is not empty
/// or continuous.
/// Note that `dist_key_in_pk_indices` may be shuffled, the start index should be the
/// minimum value.
pub fn get_dist_key_start_index_in_pk(dist_key_in_pk_indices: &[usize]) -> Option<usize> {
let mut sorted_dist_key = dist_key_in_pk_indices.iter().sorted();
if let Some(min_idx) = sorted_dist_key.next() {
let mut prev_idx = min_idx;
for idx in sorted_dist_key {
if *idx != prev_idx + 1 {
return None;
}
prev_idx = idx;
}
Some(*min_idx)
} else {
None
}
}
92 changes: 92 additions & 0 deletions src/common/src/util/ordered/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,34 @@ impl OrderedRowSerde {

Ok(len)
}

/// return the distribution key start position in serialized key and the distribution key
/// length.
pub fn deserialize_dist_key_position_with_column_indices(
&self,
key: &[u8],
dist_key_indices_pair: (usize, usize),
) -> memcomparable::Result<(usize, usize)> {
let (dist_key_start_index, dist_key_end_index) = dist_key_indices_pair;
use crate::types::ScalarImpl;
let mut dist_key_start_position: usize = 0;
let mut len: usize = 0;
for index in 0..dist_key_end_index {
let data_type = &self.schema[index];
let order_type = &self.order_types[index];
let data = &key[len..];
let mut deserializer = memcomparable::Deserializer::new(data);
deserializer.set_reverse(*order_type == OrderType::Descending);

let field_length = ScalarImpl::encoding_data_size(data_type, &mut deserializer)?;
len += field_length;
if index < dist_key_start_index {
dist_key_start_position += field_length;
}
}

Ok((dist_key_start_position, (len - dist_key_start_position)))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -237,6 +265,70 @@ mod tests {
}
}

#[test]
fn test_deserialize_dist_key_position_with_column_indices() {
let order_types = vec![
OrderType::Descending,
OrderType::Ascending,
OrderType::Descending,
OrderType::Ascending,
];

let schema = vec![
DataType::Varchar,
DataType::Int16,
DataType::Varchar,
DataType::Varchar,
];
let serde = OrderedRowSerde::new(schema, order_types);
let row1 = Row::new(vec![
Some(Utf8("aaa".to_string().into())),
Some(Int16(5)),
Some(Utf8("bbb".to_string().into())),
Some(Utf8("ccc".to_string().into())),
]);
let rows = vec![row1];
let mut array = vec![];
for row in &rows {
let mut row_bytes = vec![];
serde.serialize(row, &mut row_bytes);
array.push(row_bytes);
}

{
let dist_key_indices = [1, 2];
let dist_key_start_index = 1;
let (dist_key_start_position, dist_key_len) = serde
.deserialize_dist_key_position_with_column_indices(
&array[0],
(
dist_key_start_index,
dist_key_start_index + dist_key_indices.len(),
),
)
.unwrap();

let schema = vec![DataType::Varchar];
let order_types = vec![OrderType::Descending];
let deserde = OrderedRowSerde::new(schema, order_types);
let prefix_slice = &array[0][0..dist_key_start_position];
assert_eq!(
deserde.deserialize(prefix_slice).unwrap(),
Row::new(vec![Some(Utf8("aaa".to_string().into()))])
);

let schema = vec![DataType::INT16, DataType::VARCHAR];
let order_types = vec![OrderType::Ascending, OrderType::Descending];
let deserde = OrderedRowSerde::new(schema, order_types);
let dist_key_slice =
&array[0][dist_key_start_position..dist_key_start_position + dist_key_len];
assert_eq!(
deserde.deserialize(dist_key_slice).unwrap(),
Row::new(vec![Some(Int16(5)), Some(Utf8("bbb".to_string().into()))])
);
}
}

#[test]
fn test_encoding_data_size() {
use std::mem::size_of;
Expand Down
17 changes: 2 additions & 15 deletions src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

use std::ops::{Bound, RangeBounds};

use itertools::Itertools;
use paste::paste;
use risingwave_pb::batch_plan::scan_range::Bound as BoundProst;
use risingwave_pb::batch_plan::ScanRange as ScanRangeProst;

use super::value_encoding::serialize_datum_to_bytes;
use crate::catalog::get_dist_key_in_pk_indices;
use crate::hash::VirtualNode;
use crate::row::{Row2, RowExt};
use crate::types::{Datum, ScalarImpl};
Expand Down Expand Up @@ -84,20 +84,7 @@ impl ScanRange {
return None;
}

let dist_key_in_pk_indices = dist_key_indices
.iter()
.map(|&di| {
pk_indices
.iter()
.position(|&pi| di == pi)
.unwrap_or_else(|| {
panic!(
"distribution keys {:?} must be a subset of primary keys {:?}",
dist_key_indices, pk_indices
)
})
})
.collect_vec();
let dist_key_in_pk_indices = get_dist_key_in_pk_indices(dist_key_indices, pk_indices);
let pk_prefix_len = self.eq_conds.len();
if dist_key_in_pk_indices.iter().any(|&i| i >= pk_prefix_len) {
return None;
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn list_kv(epoch: u64, table_id: u32) -> anyhow::Result<()> {
None,
ReadOptions {
ignore_range_tombstone: false,
prefix_hint: None,
dist_key_hint: None,
table_id: TableId { table_id },
retention_seconds: None,
check_bloom_filter: false,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ mod tests {
vnode_col_idx: None,
value_indices: vec![0],
definition: "".into(),
handle_pk_conflict: false
handle_pk_conflict: false,
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl StreamMaterialize {
}

let ctx = input.ctx();
let distribution_key = base.dist.dist_column_indices().to_vec();
let properties = ctx.with_options().internal_table_subset();
let table = TableCatalog {
id: TableId::placeholder(),
Expand All @@ -158,7 +159,7 @@ impl StreamMaterialize {
columns,
pk: pk_list,
stream_key: pk_indices.clone(),
distribution_key: base.dist.dist_column_indices().to_vec(),
distribution_key,
is_index,
appendonly: input.append_only(),
owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
Expand Down
1 change: 1 addition & 0 deletions src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ tokio-stream = "0.1"
tonic = { version = "0.2", package = "madsim-tonic" }
tracing = "0.1"
twox-hash = "1"
xxhash-rust = { version = "0.8.5", features = ["xxh32"] }
zstd = "0.11.2"

[target.'cfg(not(madsim))'.dependencies]
Expand Down
Loading

0 comments on commit 62e49bf

Please sign in to comment.