Skip to content

Commit

Permalink
feat(stream,agg): add distinct deduplicater (#7797)
Browse files Browse the repository at this point in the history
This PR adds a `DistinctDeduplicater` in streaming backend, to support distinct agg in HashAgg and GlobalSimpleAgg. It depends on the state tables inferred in frontend, with one state table for each distinct column. The dedup table schema is like:

```
group key | distinct key | count for agg call 1 | count for agg call 2 | ...
```

Let me explain by an example:

```sql
select
count(*), -- count star, no need for a dedup table
count(distinct a), -- agg call `W`, share a dedup table for distinct column `a`
count(distinct a) filter (where c > 1000), -- agg call `X`, share a dedup table for distinct column `a`
count(distinct b), -- agg call `Y`, share a dedup table for distinct column `b`
count(distinct b) filter (where c > 1000), -- agg call `Z`, share a dedup table for distinct column `b`
from t group by d;
```

There'll be two dedup tables:

- Dedup table for column `a`:
```
d | a | count_for_W | count_for_X
```
- Dedup table for column `b`:
```
d | b | count_for_Y | count_for_Z
```

Each aggregation group has a `DistinctDeduplicater`, which counts the occurrence of each distinct key for different agg calls according the `visibility` (already applied agg filter and group filter). For every duplicate item/row, `DistinctDeduplicater` hide it in the returned `visibility`.

---

Dedup state table cache is not supported yet due to possible concern for memory consumption, may introduce in later PR.

The distinct agg support is not enabled yet (`DistinctAggRule` is still rewriting distinct agg calls to 2-phase agg), may enable in later PR.

Approved-By: soundOfDestiny
Approved-By: st1page
Approved-By: kwannoel
  • Loading branch information
stdrc authored Feb 14, 2023
1 parent 0de2cca commit 4424382
Show file tree
Hide file tree
Showing 23 changed files with 1,038 additions and 46 deletions.
130 changes: 128 additions & 2 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ message SimpleAggNode {
// Whether to optimize for append only stream.
// It is true when the input is append-only
bool is_append_only = 5;
map<uint32, catalog.Table> distinct_dedup_tables = 6;
}

message HashAggNode {
Expand All @@ -224,6 +225,7 @@ message HashAggNode {
// Whether to optimize for append only stream.
// It is true when the input is append-only
bool is_append_only = 5;
map<uint32, catalog.Table> distinct_dedup_tables = 6;
}

message TopNNode {
Expand Down
62 changes: 61 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt;

use itertools::Itertools;
Expand Down Expand Up @@ -361,6 +361,66 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
internal_table_catalog_builder.build(tb_dist)
}

/// Infer dedup tables for distinct agg calls, partitioned by distinct columns.
/// Since distinct agg calls only dedup on the first argument, the key of the result map is
/// `usize`, i.e. the distinct column index.
///
/// Dedup table schema:
/// group key | distinct key | count for AGG1(distinct x) | count for AGG2(distinct x) | ...
pub fn infer_distinct_dedup_tables(
&self,
me: &impl GenericPlanRef,
vnode_col_idx: Option<usize>,
) -> HashMap<usize, TableCatalog> {
let in_dist_key = self.input.distribution().dist_column_indices().to_vec();
let in_fields = self.input.schema().fields();

self.agg_calls
.iter()
.enumerate()
.filter(|(_, call)| call.distinct) // only distinct agg calls need dedup table
.into_group_map_by(|(_, call)| call.inputs[0].index) // one table per distinct column
.into_iter()
.map(|(distinct_col, indices_and_calls)| {
let mut table_builder =
TableCatalogBuilder::new(me.ctx().with_options().internal_table_subset());

let key_cols = self
.group_key
.iter()
.copied()
.chain(std::iter::once(distinct_col))
.collect_vec();
for &idx in &key_cols {
let table_col_idx = table_builder.add_column(&in_fields[idx]);
table_builder.add_order_column(table_col_idx, OrderType::Ascending);
}

// Agg calls with same distinct column share the same dedup table, but they may have
// different filter conditions, so the count of occurrence of one distinct key may
// differ among different calls. We add one column for each call in the dedup table.
for (call_index, _) in indices_and_calls {
table_builder.add_column(&Field {
data_type: DataType::Int64,
name: format!("count_for_agg_call_{}", call_index),
sub_fields: vec![],
type_name: String::default(),
});
}
table_builder
.set_value_indices((key_cols.len()..table_builder.columns().len()).collect());

let mapping = ColIndexMapping::with_included_columns(&key_cols, in_fields.len());
if let Some(idx) = vnode_col_idx.and_then(|idx| mapping.try_map(idx)) {
table_builder.set_vnode_col_idx(idx);
}
let dist_key = mapping.rewrite_dist_key(&in_dist_key).unwrap_or_default();
let table = table_builder.build(dist_key);
(distinct_col, table)
})
.collect()
}

pub fn decompose(self) -> (Vec<PlanAggCall>, Vec<usize>, PlanRef) {
(self.agg_calls, self.group_key, self.input)
}
Expand Down
26 changes: 17 additions & 9 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::{fmt, iter};

use fixedbitset::FixedBitSet;
Expand Down Expand Up @@ -69,6 +70,15 @@ impl LogicalAgg {
self.core.infer_stream_agg_state(&self.base, vnode_col_idx)
}

/// Infer dedup tables for distinct agg calls.
pub fn infer_distinct_dedup_tables(
&self,
vnode_col_idx: Option<usize>,
) -> HashMap<usize, TableCatalog> {
self.core
.infer_distinct_dedup_tables(&self.base, vnode_col_idx)
}

/// Generate plan for stateless 2-phase streaming agg.
/// Should only be used iff input is distributed. Input must be converted to stream form.
fn gen_stateless_two_phase_streaming_agg_plan(&self, stream_input: PlanRef) -> Result<PlanRef> {
Expand Down Expand Up @@ -469,16 +479,14 @@ impl LogicalAggBuilder {
agg_call: AggCall,
) -> std::result::Result<ExprImpl, ErrorCode> {
let return_type = agg_call.return_type();
let (agg_kind, inputs, distinct, mut order_by, filter) = agg_call.decompose();
let (agg_kind, inputs, mut distinct, mut order_by, filter) = agg_call.decompose();
match &agg_kind {
AggKind::Min
| AggKind::Max
| AggKind::Sum
| AggKind::Count
| AggKind::Avg
| AggKind::ApproxCountDistinct => {
// this order by is unnecessary.
order_by = OrderBy::new(vec![]);
AggKind::Min | AggKind::Max => {
distinct = false;
order_by = OrderBy::any();
}
AggKind::Sum | AggKind::Count | AggKind::Avg | AggKind::ApproxCountDistinct => {
order_by = OrderBy::any();
}
_ => {
// To be conservative, we just treat newly added AggKind in the future as not
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ pub fn to_stream_prost_body(
let me = &me.core;
let result_table = me.infer_result_table(base, None);
let agg_states = me.infer_stream_agg_state(base, None);
let distinct_dedup_tables = me.infer_distinct_dedup_tables(base, None);

ProstNode::GlobalSimpleAgg(SimpleAggNode {
agg_calls: me
Expand All @@ -561,6 +562,10 @@ pub fn to_stream_prost_body(
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
),
distinct_dedup_tables: distinct_dedup_tables
.into_iter()
.map(|(key_idx, table)| (key_idx as u32, table.to_internal_table_prost()))
.collect(),
})
}
Node::GroupTopN(me) => {
Expand All @@ -582,6 +587,7 @@ pub fn to_stream_prost_body(
Node::HashAgg(me) => {
let result_table = me.core.infer_result_table(base, me.vnode_col_idx);
let agg_states = me.core.infer_stream_agg_state(base, me.vnode_col_idx);
let distinct_dedup_tables = me.core.infer_distinct_dedup_tables(base, me.vnode_col_idx);

ProstNode::HashAgg(HashAggNode {
group_key: me.core.group_key.iter().map(|&idx| idx as u32).collect(),
Expand All @@ -602,6 +608,10 @@ pub fn to_stream_prost_body(
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
),
distinct_dedup_tables: distinct_dedup_tables
.into_iter()
.map(|(key_idx, table)| (key_idx as u32, table.to_internal_table_prost()))
.collect(),
})
}
Node::HashJoin(me) => {
Expand Down Expand Up @@ -688,6 +698,7 @@ pub fn to_stream_prost_body(
agg_call_states: vec![],
result_table: None,
is_append_only: me.input.0.append_only,
distinct_dedup_tables: Default::default(),
})
}
Node::Materialize(me) => {
Expand Down
Loading

0 comments on commit 4424382

Please sign in to comment.