Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(streaming): add the missed read prefix hint on state table #8545

Merged
merged 4 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ message Table {
repeated int32 value_indices = 20;
string definition = 21;
HandleConflictBehavior handle_pk_conflict_behavior = 22;
// Anticipated read prefix pattern (number of fields) for the table, which can be utilized
// for implementing the table's bloom filter or other storage optimization techniques.
uint32 read_prefix_len_hint = 23;
repeated int32 watermark_indices = 24;
repeated int32 dist_key_in_pk = 25;
Expand Down
80 changes: 51 additions & 29 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,33 +196,56 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
let mut included_upstream_indices = vec![]; // all upstream indices that are included in the state table
let mut column_mapping = BTreeMap::new(); // key: upstream col idx, value: table col idx
let mut table_value_indices = BTreeSet::new(); // table column indices of value columns
let mut add_column = |upstream_idx, order_type, is_value| {
column_mapping.entry(upstream_idx).or_insert_with(|| {
let table_col_idx =
internal_table_catalog_builder.add_column(&in_fields[upstream_idx]);
if let Some(order_type) = order_type {
internal_table_catalog_builder.add_order_column(table_col_idx, order_type);
let mut add_column =
|upstream_idx,
order_type,
is_value,
internal_table_catalog_builder: &mut TableCatalogBuilder| {
column_mapping.entry(upstream_idx).or_insert_with(|| {
let table_col_idx =
internal_table_catalog_builder.add_column(&in_fields[upstream_idx]);
if let Some(order_type) = order_type {
internal_table_catalog_builder
.add_order_column(table_col_idx, order_type);
}
included_upstream_indices.push(upstream_idx);
table_col_idx
});
if is_value {
// note that some indices may be added before as group keys which are not
// value
table_value_indices.insert(column_mapping[&upstream_idx]);
}
included_upstream_indices.push(upstream_idx);
table_col_idx
});
if is_value {
// note that some indices may be added before as group keys which are not value
table_value_indices.insert(column_mapping[&upstream_idx]);
}
};
};

for &idx in &self.group_key {
add_column(idx, Some(OrderType::ascending()), false);
add_column(
idx,
Some(OrderType::ascending()),
false,
&mut internal_table_catalog_builder,
);
}
let read_prefix_len_hint = internal_table_catalog_builder.get_current_pk_len();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the reason we cannot directly capture the builder in the closure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes 🥵 I was just tired and not sure if there could be better method.


for (order_type, idx) in sort_keys {
add_column(idx, Some(order_type), true);
add_column(
idx,
Some(order_type),
true,
&mut internal_table_catalog_builder,
);
}
for &idx in &in_pks {
add_column(idx, Some(OrderType::ascending()), true);
add_column(
idx,
Some(OrderType::ascending()),
true,
&mut internal_table_catalog_builder,
);
}
for idx in include_keys {
add_column(idx, None, true);
add_column(idx, None, true, &mut internal_table_catalog_builder);
}

let mapping =
Expand All @@ -232,16 +255,13 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
internal_table_catalog_builder.set_vnode_col_idx(tb_vnode_idx);
}

// prefix_len_hint should be the length of deduplicated group key because pk is
// deduplicated.
let prefix_len = self.group_key.iter().unique().count();
internal_table_catalog_builder.set_read_prefix_len_hint(prefix_len);
// set value indices to reduce ser/de overhead
let table_value_indices = table_value_indices.into_iter().collect_vec();
internal_table_catalog_builder.set_value_indices(table_value_indices.clone());

MaterializedInputState {
table: internal_table_catalog_builder.build(tb_dist.unwrap_or_default()),
table: internal_table_catalog_builder
.build(tb_dist.unwrap_or_default(), read_prefix_len_hint),
included_upstream_indices,
table_value_indices,
}
Expand All @@ -258,8 +278,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
.add_order_column(tb_column_idx, OrderType::ascending());
included_upstream_indices.push(idx);
}

internal_table_catalog_builder.set_read_prefix_len_hint(self.group_key.len());
let read_prefix_len_hint = internal_table_catalog_builder.get_current_pk_len();

match agg_kind {
AggKind::ApproxCountDistinct => {
Expand Down Expand Up @@ -288,7 +307,8 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
internal_table_catalog_builder.set_vnode_col_idx(tb_vnode_idx);
}
TableState {
table: internal_table_catalog_builder.build(tb_dist.unwrap_or_default()),
table: internal_table_catalog_builder
.build(tb_dist.unwrap_or_default(), read_prefix_len_hint),
}
};

Expand Down Expand Up @@ -369,7 +389,8 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
.add_order_column(tb_column_idx, OrderType::ascending());
}
}
internal_table_catalog_builder.set_read_prefix_len_hint(self.group_key.len());
let read_prefix_len_hint = self.group_key.len();

let mapping = self.i2o_col_mapping();
let tb_dist = mapping.rewrite_dist_key(&in_dist_key).unwrap_or_default();
if let Some(tb_vnode_idx) = vnode_col_idx.and_then(|idx| mapping.try_map(idx)) {
Expand All @@ -380,7 +401,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
// of this table should skip group_key.len().
internal_table_catalog_builder
.set_value_indices((self.group_key.len()..out_fields.len()).collect());
internal_table_catalog_builder.build(tb_dist)
internal_table_catalog_builder.build(tb_dist, read_prefix_len_hint)
}

/// Infer dedup tables for distinct agg calls, partitioned by distinct columns.
Expand Down Expand Up @@ -417,6 +438,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
let table_col_idx = table_builder.add_column(&in_fields[idx]);
table_builder.add_order_column(table_col_idx, OrderType::ascending());
}
let read_prefix_len_hint = table_builder.get_current_pk_len();

// Agg calls with same distinct column share the same dedup table, but they may have
// different filter conditions, so the count of occurrence of one distinct key may
Expand All @@ -437,7 +459,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
table_builder.set_vnode_col_idx(idx);
}
let dist_key = mapping.rewrite_dist_key(&in_dist_key).unwrap_or_default();
let table = table_builder.build(dist_key);
let table = table_builder.build(dist_key, read_prefix_len_hint);
(distinct_col, table)
})
.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub fn infer_left_internal_table_catalog(

// The pk of dynamic filter internal table should be left_key + input_pk.
let mut pk_indices = vec![left_key_index];
let read_prefix_len_hint = pk_indices.len();

// TODO(yuhao): dedup the dist key and pk.
pk_indices.extend(me.logical_pk());

Expand All @@ -80,7 +82,7 @@ pub fn infer_left_internal_table_catalog(
internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
});

internal_table_catalog_builder.build(dist_keys)
internal_table_catalog_builder.build(dist_keys, read_prefix_len_hint)
}

pub fn infer_right_internal_table_catalog(input: &impl stream::StreamPlanRef) -> TableCatalog {
Expand All @@ -100,5 +102,5 @@ pub fn infer_right_internal_table_catalog(input: &impl stream::StreamPlanRef) ->
});

// No distribution keys
internal_table_catalog_builder.build(vec![])
internal_table_catalog_builder.build(vec![], 0)
}
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,6 @@ impl Source {
builder.add_column(&value);
builder.add_order_column(ordered_col_idx, OrderType::ascending());

builder.build(vec![])
builder.build(vec![], 1)
}
}
8 changes: 5 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
order_cols.insert(idx);
});

let read_prefix_len_hint = internal_table_catalog_builder.get_current_pk_len();
column_orders.iter().for_each(|order| {
if !order_cols.contains(&order.column_index) {
internal_table_catalog_builder
Expand All @@ -80,9 +81,10 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
internal_table_catalog_builder.set_vnode_col_idx(vnode_col_idx);
}

internal_table_catalog_builder.set_read_prefix_len_hint(self.group_key.len());
internal_table_catalog_builder
.build(self.input.distribution().dist_column_indices().to_vec())
internal_table_catalog_builder.build(
self.input.distribution().dist_column_indices().to_vec(),
read_prefix_len_hint,
)
}
}

Expand Down
7 changes: 2 additions & 5 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,12 @@ impl HashJoin {
degree_table_catalog_builder
.set_value_indices(vec![degree_table_catalog_builder.columns().len() - 1]);

internal_table_catalog_builder.set_read_prefix_len_hint(join_key_len);
degree_table_catalog_builder.set_read_prefix_len_hint(join_key_len);

internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone());
degree_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk);

(
internal_table_catalog_builder.build(internal_table_dist_keys),
degree_table_catalog_builder.build(degree_table_dist_keys),
internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
degree_table_catalog_builder.build(degree_table_dist_keys, join_key_len),
deduped_input_pk_indices,
)
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl StreamNode for StreamNow {
});

let table_catalog = internal_table_catalog_builder
.build(dist_keys)
.build(dist_keys, 0)
.with_id(state.gen_table_id_wrapped());
NodeBody::Now(NowNode {
state_table: Some(table_catalog.to_internal_table_prost()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub fn infer_internal_table_catalog(watermark_type: DataType) -> TableCatalog {
builder.set_vnode_col_idx(0);
builder.set_value_indices(vec![1]);

builder.build(vec![0])
builder.build(vec![0], 1)
}

impl StreamNode for StreamWatermarkFilter {
Expand Down
13 changes: 8 additions & 5 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ impl TableCatalogBuilder {
self.pk.push(ColumnOrder::new(column_index, order_type));
}

pub fn set_read_prefix_len_hint(&mut self, read_prefix_len_hint: usize) {
self.read_prefix_len_hint = read_prefix_len_hint;
/// get the current exist field number of the primary key.
pub fn get_current_pk_len(&self) -> usize {
self.pk.len()
}

pub fn set_vnode_col_idx(&mut self, vnode_col_idx: usize) {
Expand Down Expand Up @@ -114,8 +115,10 @@ impl TableCatalogBuilder {
self.column_names.insert(column_desc.name.clone(), 0);
}

/// Consume builder and create `TableCatalog` (for proto).
pub fn build(self, distribution_key: Vec<usize>) -> TableCatalog {
/// Consume builder and create `TableCatalog` (for proto). The `read_prefix_len_hint` is the
/// anticipated read prefix pattern (number of fields) for the table, which can be utilized for
/// implementing the table's bloom filter or other storage optimization techniques.
pub fn build(self, distribution_key: Vec<usize>, read_prefix_len_hint: usize) -> TableCatalog {
assert!(self.read_prefix_len_hint <= self.pk.len());
let watermark_columns = match self.watermark_columns {
Some(w) => w,
Expand Down Expand Up @@ -144,7 +147,7 @@ impl TableCatalogBuilder {
.unwrap_or_else(|| (0..self.columns.len()).collect_vec()),
definition: "".into(),
conflict_behavior_type: 0,
read_prefix_len_hint: self.read_prefix_len_hint,
read_prefix_len_hint,
version: None, // the internal table is not versioned and can't be schema changed
watermark_columns,
dist_key_in_pk: self.dist_key_in_pk.unwrap_or(vec![]),
Expand Down