Skip to content

Commit

Permalink
refactor(fragmenter): assign table_id while translating stream nodes …
Browse files Browse the repository at this point in the history
…to prost (#5086)

* refactor(plan): to_prost now generates table_id

* refactor(plan): to_prost now generates table_id, remove `assign_local_table_id_to_stream_node`

* refactor(plan): publicize

* refactor(*): cargo fix

* fmt: run cargo fmt

* refactor: rename `ToStreamProst` to `StreamNode`

* test: assign `state_table_id`
  • Loading branch information
ice1000 authored Sep 5, 2022
1 parent 14de3bf commit fa5206c
Show file tree
Hide file tree
Showing 23 changed files with 143 additions and 411 deletions.
8 changes: 6 additions & 2 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
use std::collections::{HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::TableDesc;
use risingwave_common::catalog::{TableDesc, TableId};
use risingwave_common::config::constant::hummock::TABLE_OPTION_DUMMY_RETAINTION_SECOND;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{ColumnIndex as ProstColumnIndex, Table as ProstTable};

use super::column_catalog::ColumnCatalog;
use super::{DatabaseId, FragmentId, SchemaId};
use crate::catalog::TableId;
use crate::optimizer::property::FieldOrder;

/// Includes full information about a table.
Expand Down Expand Up @@ -96,6 +95,11 @@ impl TableCatalog {
self.id
}

pub fn with_id(mut self, id: TableId) -> Self {
self.id = id;
self
}

/// Get the table catalog's associated source id.
#[must_use]
pub fn associated_source_id(&self) -> Option<TableId> {
Expand Down
9 changes: 5 additions & 4 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,26 +132,26 @@ impl dyn PlanNode {
///
/// Note that [`StreamTableScan`] has its own implementation of `to_stream_prost`. We have a
/// hook inside to do some ad-hoc thing for [`StreamTableScan`].
pub fn to_stream_prost(&self) -> StreamPlanProst {
pub fn to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> StreamPlanProst {
if let Some(stream_table_scan) = self.as_stream_table_scan() {
return stream_table_scan.adhoc_to_stream_prost();
}
if let Some(stream_index_scan) = self.as_stream_index_scan() {
return stream_index_scan.adhoc_to_stream_prost();
}

let node = Some(self.to_stream_prost_body());
let node = Some(self.to_stream_prost_body(state));
let input = self
.inputs()
.into_iter()
.map(|plan| plan.to_stream_prost())
.map(|plan| plan.to_stream_prost(state))
.collect();
// TODO: support pk_indices and operator_id
StreamPlanProst {
input,
identity: format!("{}", self),
node_body: node,
operator_id: self.id().0 as u64,
operator_id: self.id().0 as _,
stream_key: self.logical_pk().iter().map(|x| *x as u32).collect(),
fields: self.schema().to_prost(),
append_only: self.append_only(),
Expand Down Expand Up @@ -325,6 +325,7 @@ pub use stream_table_scan::StreamTableScan;
pub use stream_topn::StreamTopN;

use crate::session::OptimizerContextRef;
use crate::stream_fragmenter::BuildFragmentGraphState;

/// `for_all_plan_nodes` includes all plan nodes. If you added a new plan node
/// inside the project, be sure to add here and in its conventions like `for_logical_plan_nodes`
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};

use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamHashJoin, ToStreamProst};
use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamHashJoin, StreamNode};
use crate::expr::Expr;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;

/// [`StreamDeltaJoin`] implements [`super::LogicalJoin`] with delta join. It requires its two
/// inputs to be indexes.
Expand Down Expand Up @@ -144,8 +145,8 @@ impl PlanTreeNodeBinary for StreamDeltaJoin {

impl_plan_tree_node_for_binary! { StreamDeltaJoin }

impl ToStreamProst for StreamDeltaJoin {
fn to_stream_prost_body(&self) -> NodeBody {
impl StreamNode for StreamDeltaJoin {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
let left = self.left();
let right = self.right();
let left_table = left.as_stream_index_scan().unwrap();
Expand Down
29 changes: 15 additions & 14 deletions src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use risingwave_pb::stream_plan::DynamicFilterNode;
use super::utils::TableCatalogBuilder;
use crate::catalog::TableCatalog;
use crate::expr::Expr;
use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, ToStreamProst};
use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, StreamNode};
use crate::optimizer::PlanRef;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::{Condition, ConditionDisplay};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -92,21 +93,21 @@ impl PlanTreeNodeBinary for StreamDynamicFilter {

impl_plan_tree_node_for_binary! { StreamDynamicFilter }

impl ToStreamProst for StreamDynamicFilter {
fn to_stream_prost_body(&self) -> NodeBody {
impl StreamNode for StreamDynamicFilter {
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
let condition = self
.predicate
.as_expr_unless_true()
.map(|x| x.to_expr_proto());
let left_table = infer_left_internal_table_catalog(self.clone().into(), self.left_index)
.with_id(state.gen_table_id_wrapped());
let right_table = infer_right_internal_table_catalog(self.right.clone())
.with_id(state.gen_table_id_wrapped());
NodeBody::DynamicFilter(DynamicFilterNode {
left_key: self.left_index as u32,
condition: self
.predicate
.as_expr_unless_true()
.map(|x| x.to_expr_proto()),
left_table: Some(
infer_left_internal_table_catalog(self.clone().into(), self.left_index)
.to_state_table_prost(),
),
right_table: Some(
infer_right_internal_table_catalog(self.right.clone()).to_state_table_prost(),
),
condition,
left_table: Some(left_table.to_state_table_prost()),
right_table: Some(right_table.to_state_table_prost()),
})
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use std::fmt;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode};

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst};
use super::{PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;

/// `StreamExchange` imposes a particular distribution on its input
/// without changing its content.
Expand Down Expand Up @@ -74,8 +75,8 @@ impl PlanTreeNodeUnary for StreamExchange {
}
impl_plan_tree_node_for_unary! {StreamExchange}

impl ToStreamProst for StreamExchange {
fn to_stream_prost_body(&self) -> NodeBody {
impl StreamNode for StreamExchange {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
NodeBody::Exchange(ExchangeNode {
strategy: Some(DispatchStrategy {
r#type: match &self.base.dist {
Expand Down
12 changes: 6 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@

use std::fmt;

use itertools::Itertools;
use risingwave_pb::stream_plan::expand_node::Subset;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::ExpandNode;

use super::{LogicalExpand, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst};
use super::{LogicalExpand, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Debug, Clone)]
pub struct StreamExpand {
Expand Down Expand Up @@ -72,19 +72,19 @@ impl PlanTreeNodeUnary for StreamExpand {

impl_plan_tree_node_for_unary! { StreamExpand }

impl ToStreamProst for StreamExpand {
fn to_stream_prost_body(&self) -> ProstStreamNode {
impl StreamNode for StreamExpand {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode {
ProstStreamNode::Expand(ExpandNode {
column_subsets: self
.column_subsets()
.iter()
.map(|subset| subset_to_protobuf(subset))
.collect_vec(),
.collect(),
})
}
}

fn subset_to_protobuf(subset: &[usize]) -> Subset {
let column_indices = subset.iter().map(|key| *key as u32).collect_vec();
let column_indices = subset.iter().map(|key| *key as u32).collect();
Subset { column_indices }
}
7 changes: 4 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use std::fmt;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::FilterNode;

use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, ToStreamProst};
use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{Expr, ExprImpl};
use crate::optimizer::plan_node::PlanBase;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::Condition;

/// `StreamFilter` implements [`super::LogicalFilter`]
Expand Down Expand Up @@ -70,8 +71,8 @@ impl PlanTreeNodeUnary for StreamFilter {

impl_plan_tree_node_for_unary! { StreamFilter }

impl ToStreamProst for StreamFilter {
fn to_stream_prost_body(&self) -> ProstStreamNode {
impl StreamNode for StreamFilter {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode {
ProstStreamNode::Filter(FilterNode {
search_condition: Some(ExprImpl::from(self.predicate().clone()).to_expr_proto()),
})
Expand Down
18 changes: 11 additions & 7 deletions src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@

use std::fmt;

use itertools::Itertools;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::logical_agg::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst};
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::PlanAggCallDisplay;
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Debug, Clone)]
pub struct StreamGlobalSimpleAgg {
Expand Down Expand Up @@ -82,8 +82,8 @@ impl PlanTreeNodeUnary for StreamGlobalSimpleAgg {
}
impl_plan_tree_node_for_unary! { StreamGlobalSimpleAgg }

impl ToStreamProst for StreamGlobalSimpleAgg {
fn to_stream_prost_body(&self) -> ProstStreamNode {
impl StreamNode for StreamGlobalSimpleAgg {
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> ProstStreamNode {
use risingwave_pb::stream_plan::*;
let (internal_tables, column_mappings) = self.logical.infer_internal_table_catalog(None);
ProstStreamNode::GlobalSimpleAgg(SimpleAggNode {
Expand All @@ -98,11 +98,15 @@ impl ToStreamProst for StreamGlobalSimpleAgg {
.dist_column_indices()
.iter()
.map(|idx| *idx as u32)
.collect_vec(),
.collect(),
internal_tables: internal_tables
.into_iter()
.map(|table_catalog| table_catalog.to_state_table_prost())
.collect_vec(),
.map(|table| {
table
.with_id(state.gen_table_id_wrapped())
.to_state_table_prost()
})
.collect(),
column_mappings: column_mappings
.into_iter()
.map(|v| ColumnMapping {
Expand Down
17 changes: 9 additions & 8 deletions src/frontend/src/optimizer/plan_node/stream_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use std::fmt;

use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::{LogicalTopN, PlanBase, PlanTreeNodeUnary, ToStreamProst};
use super::{LogicalTopN, PlanBase, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::{Distribution, OrderDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -53,23 +54,23 @@ impl StreamGroupTopN {
}
}

impl ToStreamProst for StreamGroupTopN {
fn to_stream_prost_body(&self) -> ProstStreamNode {
impl StreamNode for StreamGroupTopN {
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> ProstStreamNode {
use risingwave_pb::stream_plan::*;
let group_key = self.group_key.iter().map(|idx| *idx as u32).collect();

if self.logical.limit() == 0 {
panic!("topN's limit shouldn't be 0.");
}
let table = self
.logical
.infer_internal_table_catalog(Some(&self.group_key))
.with_id(state.gen_table_id_wrapped());
let group_topn_node = GroupTopNNode {
limit: self.logical.limit() as u64,
offset: self.logical.offset() as u64,
group_key,
table: Some(
self.logical
.infer_internal_table_catalog(Some(&self.group_key))
.to_state_table_prost(),
),
table: Some(table.to_state_table_prost()),
};

ProstStreamNode::GroupTopN(group_topn_node)
Expand Down
20 changes: 12 additions & 8 deletions src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

use std::fmt;

use itertools::Itertools;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::logical_agg::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst};
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Debug, Clone)]
pub struct StreamHashAgg {
Expand Down Expand Up @@ -88,23 +88,27 @@ impl PlanTreeNodeUnary for StreamHashAgg {
}
impl_plan_tree_node_for_unary! { StreamHashAgg }

impl ToStreamProst for StreamHashAgg {
fn to_stream_prost_body(&self) -> ProstStreamNode {
impl StreamNode for StreamHashAgg {
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> ProstStreamNode {
use risingwave_pb::stream_plan::*;
let (internal_tables, column_mappings) = self
.logical
.infer_internal_table_catalog(self.vnode_col_idx);
ProstStreamNode::HashAgg(HashAggNode {
group_key: self.group_key().iter().map(|idx| *idx as u32).collect_vec(),
group_key: self.group_key().iter().map(|idx| *idx as u32).collect(),
agg_calls: self
.agg_calls()
.iter()
.map(PlanAggCall::to_protobuf)
.collect_vec(),
.collect(),
internal_tables: internal_tables
.into_iter()
.map(|table_catalog| table_catalog.to_state_table_prost())
.collect_vec(),
.map(|table| {
table
.with_id(state.gen_table_id_wrapped())
.to_state_table_prost()
})
.collect(),
column_mappings: column_mappings
.into_iter()
.map(|v| ColumnMapping {
Expand Down
12 changes: 8 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::HashJoinNode;

use super::utils::TableCatalogBuilder;
use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, ToStreamProst};
use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode};
use crate::catalog::table_catalog::TableCatalog;
use crate::expr::Expr;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMapping;

/// [`StreamHashJoin`] implements [`super::LogicalJoin`] with hash table. It builds a hash table
Expand Down Expand Up @@ -192,8 +193,8 @@ impl PlanTreeNodeBinary for StreamHashJoin {

impl_plan_tree_node_for_binary! { StreamHashJoin }

impl ToStreamProst for StreamHashJoin {
fn to_stream_prost_body(&self) -> NodeBody {
impl StreamNode for StreamHashJoin {
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
let left_key_indices = self.eq_join_predicate.left_eq_indexes();
let right_key_indices = self.eq_join_predicate.right_eq_indexes();
let left_key_indices_prost = left_key_indices.iter().map(|idx| *idx as i32).collect_vec();
Expand All @@ -213,10 +214,13 @@ impl ToStreamProst for StreamHashJoin {
.as_expr_unless_true()
.map(|x| x.to_expr_proto()),
left_table: Some(
infer_internal_table_catalog(self.left(), left_key_indices).to_state_table_prost(),
infer_internal_table_catalog(self.left(), left_key_indices)
.with_id(state.gen_table_id_wrapped())
.to_state_table_prost(),
),
right_table: Some(
infer_internal_table_catalog(self.right(), right_key_indices)
.with_id(state.gen_table_id_wrapped())
.to_state_table_prost(),
),
output_indices: self
Expand Down
Loading

0 comments on commit fa5206c

Please sign in to comment.