Skip to content

Commit

Permalink
refactor(storage): bloom filter use pk_prefix instead of distribution…
Browse files Browse the repository at this point in the history
…_key (#6871)

Perviously, we use `distribution_key` as our bloom filter key, and  `distribution_key`  is always the prefix of pk. After watermark design,  we can not ensure the distribution key be the prefix of pk(#6288), so #6575 has changed the `prefix_hint` to `dist_key_hint` for bloom_filter.
However, using distribution key as bloom filter key will make things more complex, for example: we need to handle many corner cases, such as shuffled distribution key and discontinuous distribution key. And we need to intercept distribution key from pk, and do many judgement in `StateTable`/`StorageTable`/`FilterKeyExtract`.

After some discussion, we can decouple bloom filter key and distribution key, just use pk prefix as the bloom filter key, this makes things easier and bloom filter will be used in more places.


Approved-By: hzxa21

Co-Authored-By: congyi <[email protected]>
  • Loading branch information
wcy-fdu and wcy-fdu authored Dec 15, 2022
1 parent b45f477 commit ab014ba
Show file tree
Hide file tree
Showing 36 changed files with 262 additions and 427 deletions.
10 changes: 5 additions & 5 deletions dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion dashboard/proto/gen/plan_common.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ message Table {
repeated int32 value_indices = 20;
string definition = 21;
bool handle_pk_conflict = 22;
uint32 pk_prefix_len_hint = 23;
uint32 read_prefix_len_hint = 23;
}

message View {
Expand Down
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ message StorageTableDesc {
repeated uint32 dist_key_indices = 4;
uint32 retention_seconds = 5;
repeated uint32 value_indices = 6;
uint32 read_prefix_len_hint = 7;
}

enum JoinType {
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ mod tests {
epoch,
None,
ReadOptions {
dist_key_hint: None,
prefix_hint: None,
check_bloom_filter: false,
ignore_range_tombstone: false,
table_id: Default::default(),
Expand Down
3 changes: 2 additions & 1 deletion src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
.iter()
.map(|&k| k as usize)
.collect_vec();

let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(
state_store,
Expand All @@ -225,6 +225,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
distribution,
table_option,
value_indices,
prefix_hint_len,
);

let inner_side_builder = InnerSideExecutorBuilder::new(
Expand Down
3 changes: 2 additions & 1 deletion src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
.iter()
.map(|&k| k as usize)
.collect_vec();

let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
let scan_ranges = seq_scan_node.scan_ranges.clone();
let scan_ranges = {
if scan_ranges.is_empty() {
Expand Down Expand Up @@ -251,6 +251,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
distribution,
table_option,
value_indices,
prefix_hint_len,
);

Ok(Box::new(RowSeqScanExecutor::new(
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub struct TableDesc {
pub retention_seconds: u32,

pub value_indices: Vec<usize>,

/// The prefix len of pk, used in bloom filter.
pub read_prefix_len_hint: usize,
}

impl TableDesc {
Expand Down Expand Up @@ -70,6 +73,7 @@ impl TableDesc {
dist_key_indices: self.distribution_key.iter().map(|&k| k as u32).collect(),
retention_seconds: self.retention_seconds,
value_indices: self.value_indices.iter().map(|&v| v as u32).collect(),
read_prefix_len_hint: self.read_prefix_len_hint as u32,
}
}

Expand Down
106 changes: 5 additions & 101 deletions src/common/src/util/ordered/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ impl OrderedRowSerde {
&self.schema
}

pub fn deserialize_prefix_len_with_column_indices(
pub fn deserialize_prefix_len(
&self,
key: &[u8],
column_indices: impl Iterator<Item = usize>,
prefix_len: usize,
) -> memcomparable::Result<usize> {
use crate::types::ScalarImpl;
let mut len: usize = 0;
for index in column_indices {
for index in 0..prefix_len {
let data_type = &self.schema[index];
let order_type = &self.order_types[index];
let data = &key[len..];
Expand All @@ -110,34 +110,6 @@ impl OrderedRowSerde {

Ok(len)
}

/// return the distribution key start position in serialized key and the distribution key
/// length.
pub fn deserialize_dist_key_position_with_column_indices(
&self,
key: &[u8],
dist_key_indices_pair: (usize, usize),
) -> memcomparable::Result<(usize, usize)> {
let (dist_key_start_index, dist_key_end_index) = dist_key_indices_pair;
use crate::types::ScalarImpl;
let mut dist_key_start_position: usize = 0;
let mut len: usize = 0;
for index in 0..dist_key_end_index {
let data_type = &self.schema[index];
let order_type = &self.order_types[index];
let data = &key[len..];
let mut deserializer = memcomparable::Deserializer::new(data);
deserializer.set_reverse(*order_type == OrderType::Descending);

let field_length = ScalarImpl::encoding_data_size(data_type, &mut deserializer)?;
len += field_length;
if index < dist_key_start_index {
dist_key_start_position += field_length;
}
}

Ok((dist_key_start_position, (len - dist_key_start_position)))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -244,9 +216,7 @@ mod tests {
}

{
let row_0_idx_0_len = serde
.deserialize_prefix_len_with_column_indices(&array[0], 0..=0)
.unwrap();
let row_0_idx_0_len = serde.deserialize_prefix_len(&array[0], 1).unwrap();

let schema = vec![DataType::Varchar];
let order_types = vec![OrderType::Descending];
Expand All @@ -259,9 +229,7 @@ mod tests {
}

{
let row_0_idx_1_len = serde
.deserialize_prefix_len_with_column_indices(&array[0], 0..=1)
.unwrap();
let row_0_idx_1_len = serde.deserialize_prefix_len(&array[0], 2).unwrap();

let order_types = vec![OrderType::Descending, OrderType::Ascending];
let schema = vec![DataType::Varchar, DataType::Int16];
Expand All @@ -271,70 +239,6 @@ mod tests {
}
}

#[test]
fn test_deserialize_dist_key_position_with_column_indices() {
let order_types = vec![
OrderType::Descending,
OrderType::Ascending,
OrderType::Descending,
OrderType::Ascending,
];

let schema = vec![
DataType::Varchar,
DataType::Int16,
DataType::Varchar,
DataType::Varchar,
];
let serde = OrderedRowSerde::new(schema, order_types);
let row1 = Row::new(vec![
Some(Utf8("aaa".to_string().into())),
Some(Int16(5)),
Some(Utf8("bbb".to_string().into())),
Some(Utf8("ccc".to_string().into())),
]);
let rows = vec![row1];
let mut array = vec![];
for row in &rows {
let mut row_bytes = vec![];
serde.serialize(row, &mut row_bytes);
array.push(row_bytes);
}

{
let dist_key_indices = [1, 2];
let dist_key_start_index = 1;
let (dist_key_start_position, dist_key_len) = serde
.deserialize_dist_key_position_with_column_indices(
&array[0],
(
dist_key_start_index,
dist_key_start_index + dist_key_indices.len(),
),
)
.unwrap();

let schema = vec![DataType::Varchar];
let order_types = vec![OrderType::Descending];
let deserde = OrderedRowSerde::new(schema, order_types);
let prefix_slice = &array[0][0..dist_key_start_position];
assert_eq!(
deserde.deserialize(prefix_slice).unwrap(),
Row::new(vec![Some(Utf8("aaa".to_string().into()))])
);

let schema = vec![DataType::INT16, DataType::VARCHAR];
let order_types = vec![OrderType::Ascending, OrderType::Descending];
let deserde = OrderedRowSerde::new(schema, order_types);
let dist_key_slice =
&array[0][dist_key_start_position..dist_key_start_position + dist_key_len];
assert_eq!(
deserde.deserialize(dist_key_slice).unwrap(),
Row::new(vec![Some(Int16(5)), Some(Utf8("bbb".to_string().into()))])
);
}
}

#[test]
fn test_encoding_data_size() {
use std::mem::size_of;
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn list_kv(epoch: u64, table_id: u32) -> anyhow::Result<()> {
None,
ReadOptions {
ignore_range_tombstone: false,
dist_key_hint: None,
prefix_hint: None,
table_id: TableId { table_id },
retention_seconds: None,
check_bloom_filter: false,
Expand Down
1 change: 1 addition & 0 deletions src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub fn make_storage_table<S: StateStore>(hummock: S, table: &TableCatalog) -> St
Distribution::all_vnodes(table.distribution_key().to_vec()),
TableOption::build_table_option(&HashMap::new()),
(0..table.columns().len()).collect(),
table.read_prefix_len_hint,
)
}

Expand Down
11 changes: 6 additions & 5 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub struct TableCatalog {

pub handle_pk_conflict: bool,

pub pk_prefix_len_hint: usize,
pub read_prefix_len_hint: usize,
}

#[derive(Copy, Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -217,6 +217,7 @@ impl TableCatalog {
.retention_seconds
.unwrap_or(TABLE_OPTION_DUMMY_RETENTION_SECOND),
value_indices: self.value_indices.clone(),
read_prefix_len_hint: self.read_prefix_len_hint,
}
}

Expand Down Expand Up @@ -269,7 +270,7 @@ impl TableCatalog {
value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
definition: self.definition.clone(),
handle_pk_conflict: self.handle_pk_conflict,
pk_prefix_len_hint: self.pk_prefix_len_hint as u32,
read_prefix_len_hint: self.read_prefix_len_hint as u32,
}
}
}
Expand Down Expand Up @@ -319,7 +320,7 @@ impl From<ProstTable> for TableCatalog {
value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
definition: tb.definition.clone(),
handle_pk_conflict: tb.handle_pk_conflict,
pk_prefix_len_hint: tb.pk_prefix_len_hint as usize,
read_prefix_len_hint: tb.read_prefix_len_hint as usize,
}
}
}
Expand Down Expand Up @@ -404,7 +405,7 @@ mod tests {
value_indices: vec![0],
definition: "".into(),
handle_pk_conflict: false,
pk_prefix_len_hint: 0,
read_prefix_len_hint: 0,
vnode_col_index: None,
row_id_index: None,
}
Expand Down Expand Up @@ -466,7 +467,7 @@ mod tests {
value_indices: vec![0],
definition: "".into(),
handle_pk_conflict: false,
pk_prefix_len_hint: 0,
read_prefix_len_hint: 0,
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
Loading

0 comments on commit ab014ba

Please sign in to comment.