Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): add InternalStateTable Catalog #3139

Merged
merged 19 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package stream_plan;

import "catalog.proto";
import "common.proto";
import "data.proto";
import "expr.proto";
Expand Down Expand Up @@ -56,19 +57,21 @@ message MaterializeNode {
message SimpleAggNode {
repeated expr.AggCall agg_calls = 1;
repeated uint32 distribution_keys = 2;
repeated uint32 table_ids = 3;
repeated catalog.Table internal_tables = 3;
map<uint32, int32> column_mapping = 4;
// Whether to optimize for append only stream.
// It is true when the input is append-only
bool is_append_only = 4;
bool is_append_only = 5;
}

message HashAggNode {
repeated uint32 distribution_keys = 1;
repeated expr.AggCall agg_calls = 2;
repeated uint32 table_ids = 3;
repeated catalog.Table internal_tables = 3;
map<uint32, int32> column_mapping = 4;
// Whether to optimize for append only stream.
// It is true when the input is append-only
bool is_append_only = 4;
bool is_append_only = 5;
}

message TopNNode {
Expand All @@ -90,10 +93,10 @@ message HashJoinNode {
// on-the-fly within the plan.
// TODO: remove this in the future when we have a separate DeltaHashJoin node.
bool is_delta_join = 5;
// Used for internal table states. Id of the left table.
uint32 left_table_id = 6;
// Used for internal table states. Id of the right table.
uint32 right_table_id = 7;
// Used for internal table states.
catalog.Table left_table = 6;
// Used for internal table states.
catalog.Table right_table = 7;
repeated uint32 dist_key_l = 8;
repeated uint32 dist_key_r = 9;
Comment on lines +97 to 101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can merge these fields. but delaying this after the executor really uses the catalog in proto is ok.

// It is true when the input is append-only
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ impl DatabaseId {
pub fn new(database_id: i32) -> Self {
DatabaseId { database_id }
}

pub fn placeholder() -> i32 {
i32::MAX - 1
}
}

#[derive(Clone, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
Expand All @@ -60,6 +64,10 @@ impl SchemaId {
schema_id,
}
}

pub fn placeholder() -> i32 {
i32::MAX - 1
}
}

#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
Expand Down
77 changes: 76 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ use std::fmt;

use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, OrderedColumnDesc, Schema, TableId};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::expr::AggKind;
use risingwave_pb::expr::AggCall as ProstAggCall;

use super::{
BatchHashAgg, BatchSimpleAgg, ColPrunable, PlanBase, PlanRef, PlanTreeNodeUnary,
PredicatePushdown, StreamHashAgg, StreamSimpleAgg, ToBatch, ToStream,
};
use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::table_catalog::TableCatalog;
use crate::expr::{AggCall, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::{gen_filter_and_pushdown, LogicalProject};
use crate::optimizer::property::RequiredDist;
Expand Down Expand Up @@ -120,6 +123,78 @@ pub struct LogicalAgg {
input: PlanRef,
}

impl LogicalAgg {
pub fn infer_internal_table_catalog(&self) -> (Vec<TableCatalog>, HashMap<usize, i32>) {
let mut table_catalogs = vec![];
let mut column_mapping = HashMap::new();
let base = self.input.plan_base();
let schema = &base.schema;
let fields = schema.fields();
for agg_call in &self.agg_calls {
let mut internal_pk_indices = vec![];
let mut columns = vec![];
let mut order_desc = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need add the group by keys as the prefix of the table's pk

for &idx in &self.group_keys {
let column_id = columns.len() as i32;
internal_pk_indices.push(column_id as usize); // Currently our column index is same as column id
column_mapping.insert(idx, column_id);
let column_desc = ColumnDesc::from_field_with_column_id(&fields[idx], column_id);
columns.push(ColumnCatalog {
column_desc: column_desc.clone(),
is_hidden: false,
});
order_desc.push(OrderedColumnDesc {
column_desc,
order: OrderType::Ascending,
})
}
match agg_call.agg_kind {
AggKind::Min | AggKind::Max | AggKind::StringAgg => {
for input in &agg_call.inputs {
let column_id = columns.len() as i32;
column_mapping.insert(input.index, column_id);
columns.push(ColumnCatalog {
column_desc: ColumnDesc::from_field_with_column_id(
&fields[input.index],
column_id,
),
is_hidden: false,
});
}
}
AggKind::Sum
| AggKind::Count
| AggKind::RowCount
| AggKind::Avg
| AggKind::SingleValue
| AggKind::ApproxCountDistinct => {
columns.push(ColumnCatalog {
column_desc: ColumnDesc::unnamed(
ColumnId::new(columns.len() as i32),
agg_call.return_type.clone(),
),
is_hidden: false,
});
}
}
table_catalogs.push(TableCatalog {
id: TableId::placeholder(),
associated_source_id: None,
name: String::new(),
columns,
order_desc,
pks: internal_pk_indices,
is_index_on: None,
distribution_keys: base.dist.dist_column_indices().to_vec(),
appendonly: false,
owner: risingwave_common::catalog::DEFAULT_SUPPER_USER.to_string(),
vnode_mapping: None,
});
}
(table_catalogs, column_mapping)
}
}

/// `ExprHandler` extracts agg calls and references to group columns from select list, in
/// preparation for generating a plan like `LogicalProject - LogicalAgg - LogicalProject`.
struct ExprHandler {
Expand Down
17 changes: 15 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::{DatabaseId, SchemaId};
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::logical_agg::PlanAggCall;
Expand Down Expand Up @@ -91,7 +92,7 @@ impl_plan_tree_node_for_unary! { StreamHashAgg }
impl ToStreamProst for StreamHashAgg {
fn to_stream_prost_body(&self) -> ProstStreamNode {
use risingwave_pb::stream_plan::*;

let (internal_tables, column_mapping) = self.logical.infer_internal_table_catalog();
ProstStreamNode::HashAgg(HashAggNode {
distribution_keys: self
.distribution_keys()
Expand All @@ -103,7 +104,19 @@ impl ToStreamProst for StreamHashAgg {
.iter()
.map(PlanAggCall::to_protobuf)
.collect_vec(),
table_ids: vec![],
internal_tables: internal_tables
.into_iter()
.map(|table_catalog| {
table_catalog.to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
)
})
.collect_vec(),
column_mapping: column_mapping
.into_iter()
.map(|(k, v)| (k as u32, v))
.collect(),
is_append_only: self.input().append_only(),
})
}
Expand Down
47 changes: 46 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, DatabaseId, OrderedColumnDesc, SchemaId, TableId};
use risingwave_common::session_config::DELTA_JOIN;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::HashJoinNode;

use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, ToStreamProst};
use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::table_catalog::TableCatalog;
use crate::expr::Expr;
use crate::optimizer::plan_node::EqJoinPredicate;
use crate::optimizer::property::Distribution;
Expand Down Expand Up @@ -214,14 +218,55 @@ impl ToStreamProst for StreamHashJoin {
.map(|idx| *idx as u32)
.collect_vec(),
is_delta_join: self.is_delta,
left_table: Some(infer_internal_table_catalog(self.left()).to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
)),
right_table: Some(infer_internal_table_catalog(self.right()).to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
)),
output_indices: self
.logical
.output_indices()
.iter()
.map(|&x| x as u32)
.collect(),
is_append_only: self.is_append_only,
..Default::default()
})
}
}

fn infer_internal_table_catalog(input: PlanRef) -> TableCatalog {
let base = input.plan_base();
let schema = &base.schema;
let pk_indices = &base.pk_indices;
let columns = schema
.fields()
.iter()
.map(|field| ColumnCatalog {
column_desc: ColumnDesc::from_field_without_column_id(field),
is_hidden: false,
})
.collect_vec();
let mut order_desc = vec![];
for &idx in pk_indices {
order_desc.push(OrderedColumnDesc {
column_desc: columns[idx].column_desc.clone(),
order: OrderType::Ascending,
});
}
TableCatalog {
id: TableId::placeholder(),
associated_source_id: None,
name: String::new(),
columns,
order_desc,
pks: pk_indices.clone(),
distribution_keys: base.dist.dist_column_indices().to_vec(),
is_index_on: None,
appendonly: input.append_only(),
owner: risingwave_common::catalog::DEFAULT_SUPPER_USER.to_string(),
vnode_mapping: None,
}
}
17 changes: 15 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::{DatabaseId, SchemaId};
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::logical_agg::PlanAggCall;
Expand Down Expand Up @@ -74,7 +75,7 @@ impl_plan_tree_node_for_unary! { StreamSimpleAgg }
impl ToStreamProst for StreamSimpleAgg {
fn to_stream_prost_body(&self) -> ProstStreamNode {
use risingwave_pb::stream_plan::*;

let (internal_tables, column_mapping) = self.logical.infer_internal_table_catalog();
// TODO: local or global simple agg?
ProstStreamNode::GlobalSimpleAgg(SimpleAggNode {
agg_calls: self
Expand All @@ -89,7 +90,19 @@ impl ToStreamProst for StreamSimpleAgg {
.iter()
.map(|idx| *idx as u32)
.collect_vec(),
table_ids: vec![],
internal_tables: internal_tables
.into_iter()
.map(|table_catalog| {
table_catalog.to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
)
})
.collect_vec(),
column_mapping: column_mapping
.into_iter()
.map(|(k, v)| (k as u32, v))
.collect(),
is_append_only: self.input().append_only(),
})
}
Expand Down
Loading