From fa5206cfa526828afbe42d89e5d10a04983fe03e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tesla=20I=2E=20Zhang=E2=80=AE?= Date: Mon, 5 Sep 2022 15:11:51 -0400 Subject: [PATCH] refactor(fragmenter): assign table_id while translating stream nodes to prost (#5086) * refactor(plan): to_prost now generates table_id * refactor(plan): to_prost now generates table_id, remove `assign_local_table_id_to_stream_node` * refactor(plan): publicize * refactor(*): cargo fix * fmt: run cargo fmt * refactor: rename `ToStreamProst` to `StreamNode` * test: assign `state_table_id` --- src/frontend/src/catalog/table_catalog.rs | 8 +- src/frontend/src/optimizer/plan_node/mod.rs | 9 +- .../optimizer/plan_node/stream_delta_join.rs | 7 +- .../plan_node/stream_dynamic_filter.rs | 29 +- .../optimizer/plan_node/stream_exchange.rs | 7 +- .../src/optimizer/plan_node/stream_expand.rs | 12 +- .../src/optimizer/plan_node/stream_filter.rs | 7 +- .../plan_node/stream_global_simple_agg.rs | 18 +- .../optimizer/plan_node/stream_group_topn.rs | 17 +- .../optimizer/plan_node/stream_hash_agg.rs | 20 +- .../optimizer/plan_node/stream_hash_join.rs | 12 +- .../optimizer/plan_node/stream_hop_window.rs | 7 +- .../optimizer/plan_node/stream_index_scan.rs | 7 +- .../plan_node/stream_local_simple_agg.rs | 7 +- .../optimizer/plan_node/stream_materialize.rs | 8 +- .../src/optimizer/plan_node/stream_project.rs | 7 +- .../optimizer/plan_node/stream_project_set.rs | 7 +- .../src/optimizer/plan_node/stream_sink.rs | 7 +- .../src/optimizer/plan_node/stream_source.rs | 10 +- .../optimizer/plan_node/stream_table_scan.rs | 7 +- .../src/optimizer/plan_node/stream_topn.rs | 8 +- .../src/optimizer/plan_node/to_prost.rs | 13 +- src/frontend/src/stream_fragmenter/mod.rs | 320 +----------------- 23 files changed, 143 insertions(+), 411 deletions(-) diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index f390906db7357..e5bb6c6ec1df0 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -15,14 +15,13 @@ use std::collections::{HashMap, HashSet}; use itertools::Itertools; -use risingwave_common::catalog::TableDesc; +use risingwave_common::catalog::{TableDesc, TableId}; use risingwave_common::config::constant::hummock::TABLE_OPTION_DUMMY_RETAINTION_SECOND; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ColumnIndex as ProstColumnIndex, Table as ProstTable}; use super::column_catalog::ColumnCatalog; use super::{DatabaseId, FragmentId, SchemaId}; -use crate::catalog::TableId; use crate::optimizer::property::FieldOrder; /// Includes full information about a table. @@ -96,6 +95,11 @@ impl TableCatalog { self.id } + pub fn with_id(mut self, id: TableId) -> Self { + self.id = id; + self + } + /// Get the table catalog's associated source id. #[must_use] pub fn associated_source_id(&self) -> Option { diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index e841f3d7fea53..b5761acc0bee3 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -132,7 +132,7 @@ impl dyn PlanNode { /// /// Note that [`StreamTableScan`] has its own implementation of `to_stream_prost`. We have a /// hook inside to do some ad-hoc thing for [`StreamTableScan`]. - pub fn to_stream_prost(&self) -> StreamPlanProst { + pub fn to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> StreamPlanProst { if let Some(stream_table_scan) = self.as_stream_table_scan() { return stream_table_scan.adhoc_to_stream_prost(); } @@ -140,18 +140,18 @@ impl dyn PlanNode { return stream_index_scan.adhoc_to_stream_prost(); } - let node = Some(self.to_stream_prost_body()); + let node = Some(self.to_stream_prost_body(state)); let input = self .inputs() .into_iter() - .map(|plan| plan.to_stream_prost()) + .map(|plan| plan.to_stream_prost(state)) .collect(); // TODO: support pk_indices and operator_id StreamPlanProst { input, identity: format!("{}", self), node_body: node, - operator_id: self.id().0 as u64, + operator_id: self.id().0 as _, stream_key: self.logical_pk().iter().map(|x| *x as u32).collect(), fields: self.schema().to_prost(), append_only: self.append_only(), @@ -325,6 +325,7 @@ pub use stream_table_scan::StreamTableScan; pub use stream_topn::StreamTopN; use crate::session::OptimizerContextRef; +use crate::stream_fragmenter::BuildFragmentGraphState; /// `for_all_plan_nodes` includes all plan nodes. If you added a new plan node /// inside the project, be sure to add here and in its conventions like `for_logical_plan_nodes` diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index 2dbdec68ac6d6..ad5445c29efb5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -19,10 +19,11 @@ use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode}; -use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamHashJoin, ToStreamProst}; +use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamHashJoin, StreamNode}; use crate::expr::Expr; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay}; +use crate::stream_fragmenter::BuildFragmentGraphState; /// [`StreamDeltaJoin`] implements [`super::LogicalJoin`] with delta join. It requires its two /// inputs to be indexes. @@ -144,8 +145,8 @@ impl PlanTreeNodeBinary for StreamDeltaJoin { impl_plan_tree_node_for_binary! { StreamDeltaJoin } -impl ToStreamProst for StreamDeltaJoin { - fn to_stream_prost_body(&self) -> NodeBody { +impl StreamNode for StreamDeltaJoin { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody { let left = self.left(); let right = self.right(); let left_table = left.as_stream_index_scan().unwrap(); diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs index 96094bba4780f..9fb56266c8338 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs @@ -23,8 +23,9 @@ use risingwave_pb::stream_plan::DynamicFilterNode; use super::utils::TableCatalogBuilder; use crate::catalog::TableCatalog; use crate::expr::Expr; -use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, ToStreamProst}; +use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, StreamNode}; use crate::optimizer::PlanRef; +use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::{Condition, ConditionDisplay}; #[derive(Clone, Debug)] @@ -92,21 +93,21 @@ impl PlanTreeNodeBinary for StreamDynamicFilter { impl_plan_tree_node_for_binary! { StreamDynamicFilter } -impl ToStreamProst for StreamDynamicFilter { - fn to_stream_prost_body(&self) -> NodeBody { +impl StreamNode for StreamDynamicFilter { + fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody { + let condition = self + .predicate + .as_expr_unless_true() + .map(|x| x.to_expr_proto()); + let left_table = infer_left_internal_table_catalog(self.clone().into(), self.left_index) + .with_id(state.gen_table_id_wrapped()); + let right_table = infer_right_internal_table_catalog(self.right.clone()) + .with_id(state.gen_table_id_wrapped()); NodeBody::DynamicFilter(DynamicFilterNode { left_key: self.left_index as u32, - condition: self - .predicate - .as_expr_unless_true() - .map(|x| x.to_expr_proto()), - left_table: Some( - infer_left_internal_table_catalog(self.clone().into(), self.left_index) - .to_state_table_prost(), - ), - right_table: Some( - infer_right_internal_table_catalog(self.right.clone()).to_state_table_prost(), - ), + condition, + left_table: Some(left_table.to_state_table_prost()), + right_table: Some(right_table.to_state_table_prost()), }) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 5867028bd04b5..f30d0e1bff4c3 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -17,8 +17,9 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode}; -use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; +use super::{PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::{Distribution, DistributionDisplay}; +use crate::stream_fragmenter::BuildFragmentGraphState; /// `StreamExchange` imposes a particular distribution on its input /// without changing its content. @@ -74,8 +75,8 @@ impl PlanTreeNodeUnary for StreamExchange { } impl_plan_tree_node_for_unary! {StreamExchange} -impl ToStreamProst for StreamExchange { - fn to_stream_prost_body(&self) -> NodeBody { +impl StreamNode for StreamExchange { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody { NodeBody::Exchange(ExchangeNode { strategy: Some(DispatchStrategy { r#type: match &self.base.dist { diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs index 8b8fec28f42a5..d80e44aad17c8 100644 --- a/src/frontend/src/optimizer/plan_node/stream_expand.rs +++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs @@ -14,13 +14,13 @@ use std::fmt; -use itertools::Itertools; use risingwave_pb::stream_plan::expand_node::Subset; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::ExpandNode; -use super::{LogicalExpand, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; +use super::{LogicalExpand, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::Distribution; +use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone)] pub struct StreamExpand { @@ -72,19 +72,19 @@ impl PlanTreeNodeUnary for StreamExpand { impl_plan_tree_node_for_unary! { StreamExpand } -impl ToStreamProst for StreamExpand { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamExpand { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode { ProstStreamNode::Expand(ExpandNode { column_subsets: self .column_subsets() .iter() .map(|subset| subset_to_protobuf(subset)) - .collect_vec(), + .collect(), }) } } fn subset_to_protobuf(subset: &[usize]) -> Subset { - let column_indices = subset.iter().map(|key| *key as u32).collect_vec(); + let column_indices = subset.iter().map(|key| *key as u32).collect(); Subset { column_indices } } diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs index 5129505182ae7..70766cc66aaf6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs @@ -17,9 +17,10 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::FilterNode; -use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, ToStreamProst}; +use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{Expr, ExprImpl}; use crate::optimizer::plan_node::PlanBase; +use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::Condition; /// `StreamFilter` implements [`super::LogicalFilter`] @@ -70,8 +71,8 @@ impl PlanTreeNodeUnary for StreamFilter { impl_plan_tree_node_for_unary! { StreamFilter } -impl ToStreamProst for StreamFilter { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamFilter { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode { ProstStreamNode::Filter(FilterNode { search_condition: Some(ExprImpl::from(self.predicate().clone()).to_expr_proto()), }) diff --git a/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs index 8a4fa5aeffe9e..5c9fc9eeb7e1e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs @@ -14,13 +14,13 @@ use std::fmt; -use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use super::logical_agg::PlanAggCall; -use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; +use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::PlanAggCallDisplay; use crate::optimizer::property::Distribution; +use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone)] pub struct StreamGlobalSimpleAgg { @@ -82,8 +82,8 @@ impl PlanTreeNodeUnary for StreamGlobalSimpleAgg { } impl_plan_tree_node_for_unary! { StreamGlobalSimpleAgg } -impl ToStreamProst for StreamGlobalSimpleAgg { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamGlobalSimpleAgg { + fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> ProstStreamNode { use risingwave_pb::stream_plan::*; let (internal_tables, column_mappings) = self.logical.infer_internal_table_catalog(None); ProstStreamNode::GlobalSimpleAgg(SimpleAggNode { @@ -98,11 +98,15 @@ impl ToStreamProst for StreamGlobalSimpleAgg { .dist_column_indices() .iter() .map(|idx| *idx as u32) - .collect_vec(), + .collect(), internal_tables: internal_tables .into_iter() - .map(|table_catalog| table_catalog.to_state_table_prost()) - .collect_vec(), + .map(|table| { + table + .with_id(state.gen_table_id_wrapped()) + .to_state_table_prost() + }) + .collect(), column_mappings: column_mappings .into_iter() .map(|v| ColumnMapping { diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index a70d983df9408..096d256ad81e0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -16,8 +16,9 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; -use super::{LogicalTopN, PlanBase, PlanTreeNodeUnary, ToStreamProst}; +use super::{LogicalTopN, PlanBase, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::{Distribution, OrderDisplay}; +use crate::stream_fragmenter::BuildFragmentGraphState; use crate::PlanRef; #[derive(Debug, Clone)] @@ -53,23 +54,23 @@ impl StreamGroupTopN { } } -impl ToStreamProst for StreamGroupTopN { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamGroupTopN { + fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> ProstStreamNode { use risingwave_pb::stream_plan::*; let group_key = self.group_key.iter().map(|idx| *idx as u32).collect(); if self.logical.limit() == 0 { panic!("topN's limit shouldn't be 0."); } + let table = self + .logical + .infer_internal_table_catalog(Some(&self.group_key)) + .with_id(state.gen_table_id_wrapped()); let group_topn_node = GroupTopNNode { limit: self.logical.limit() as u64, offset: self.logical.offset() as u64, group_key, - table: Some( - self.logical - .infer_internal_table_catalog(Some(&self.group_key)) - .to_state_table_prost(), - ), + table: Some(table.to_state_table_prost()), }; ProstStreamNode::GroupTopN(group_topn_node) diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs index 4da4ce7276ac7..8f81c03738f38 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -14,12 +14,12 @@ use std::fmt; -use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use super::logical_agg::PlanAggCall; -use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; +use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::Distribution; +use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone)] pub struct StreamHashAgg { @@ -88,23 +88,27 @@ impl PlanTreeNodeUnary for StreamHashAgg { } impl_plan_tree_node_for_unary! { StreamHashAgg } -impl ToStreamProst for StreamHashAgg { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamHashAgg { + fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> ProstStreamNode { use risingwave_pb::stream_plan::*; let (internal_tables, column_mappings) = self .logical .infer_internal_table_catalog(self.vnode_col_idx); ProstStreamNode::HashAgg(HashAggNode { - group_key: self.group_key().iter().map(|idx| *idx as u32).collect_vec(), + group_key: self.group_key().iter().map(|idx| *idx as u32).collect(), agg_calls: self .agg_calls() .iter() .map(PlanAggCall::to_protobuf) - .collect_vec(), + .collect(), internal_tables: internal_tables .into_iter() - .map(|table_catalog| table_catalog.to_state_table_prost()) - .collect_vec(), + .map(|table| { + table + .with_id(state.gen_table_id_wrapped()) + .to_state_table_prost() + }) + .collect(), column_mappings: column_mappings .into_iter() .map(|v| ColumnMapping { diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index 0ae913f5edd66..4185863c3105b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -25,12 +25,13 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::HashJoinNode; use super::utils::TableCatalogBuilder; -use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, ToStreamProst}; +use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode}; use crate::catalog::table_catalog::TableCatalog; use crate::expr::Expr; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay}; use crate::optimizer::property::Distribution; +use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMapping; /// [`StreamHashJoin`] implements [`super::LogicalJoin`] with hash table. It builds a hash table @@ -192,8 +193,8 @@ impl PlanTreeNodeBinary for StreamHashJoin { impl_plan_tree_node_for_binary! { StreamHashJoin } -impl ToStreamProst for StreamHashJoin { - fn to_stream_prost_body(&self) -> NodeBody { +impl StreamNode for StreamHashJoin { + fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody { let left_key_indices = self.eq_join_predicate.left_eq_indexes(); let right_key_indices = self.eq_join_predicate.right_eq_indexes(); let left_key_indices_prost = left_key_indices.iter().map(|idx| *idx as i32).collect_vec(); @@ -213,10 +214,13 @@ impl ToStreamProst for StreamHashJoin { .as_expr_unless_true() .map(|x| x.to_expr_proto()), left_table: Some( - infer_internal_table_catalog(self.left(), left_key_indices).to_state_table_prost(), + infer_internal_table_catalog(self.left(), left_key_indices) + .with_id(state.gen_table_id_wrapped()) + .to_state_table_prost(), ), right_table: Some( infer_internal_table_catalog(self.right(), right_key_indices) + .with_id(state.gen_table_id_wrapped()) .to_state_table_prost(), ), output_indices: self diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index 19c7d4bc20650..a2352b2d184b9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -17,7 +17,8 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::HopWindowNode; -use super::{LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; +use super::{LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use crate::stream_fragmenter::BuildFragmentGraphState; /// [`StreamHopWindow`] represents a hop window table function. #[derive(Debug, Clone)] @@ -65,8 +66,8 @@ impl PlanTreeNodeUnary for StreamHopWindow { impl_plan_tree_node_for_unary! {StreamHopWindow} -impl ToStreamProst for StreamHopWindow { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamHopWindow { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode { ProstStreamNode::HopWindow(HopWindowNode { time_col: Some(self.logical.time_col.to_proto()), window_slide: Some(self.logical.window_slide.into()), diff --git a/src/frontend/src/optimizer/plan_node/stream_index_scan.rs b/src/frontend/src/optimizer/plan_node/stream_index_scan.rs index e1578cb7fca92..81dd555d212b3 100644 --- a/src/frontend/src/optimizer/plan_node/stream_index_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_index_scan.rs @@ -18,10 +18,11 @@ use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::StreamNode as ProstStreamPlan; -use super::{LogicalScan, PlanBase, PlanNodeId, ToStreamProst}; +use super::{LogicalScan, PlanBase, PlanNodeId, StreamNode}; use crate::catalog::ColumnId; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::property::{Distribution, DistributionDisplay}; +use crate::stream_fragmenter::BuildFragmentGraphState; /// `StreamIndexScan` is a virtual plan node to represent a stream table scan. It will be converted /// to chain + merge node (for upstream materialize) + batch table scan when converting to `MView` @@ -106,8 +107,8 @@ impl fmt::Display for StreamIndexScan { } } -impl ToStreamProst for StreamIndexScan { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamIndexScan { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode { unreachable!("stream index scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead.") } } diff --git a/src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs index 29de389b7bd59..6ad9f39885dc6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs @@ -18,8 +18,9 @@ use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use super::logical_agg::PlanAggCall; -use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; +use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::RequiredDist; +use crate::stream_fragmenter::BuildFragmentGraphState; /// Streaming local simple agg. /// Should only be used for stateless agg, including sum, count and append-only min/max. @@ -73,8 +74,8 @@ impl PlanTreeNodeUnary for StreamLocalSimpleAgg { } impl_plan_tree_node_for_unary! { StreamLocalSimpleAgg } -impl ToStreamProst for StreamLocalSimpleAgg { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamLocalSimpleAgg { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode { use risingwave_pb::stream_plan::*; ProstStreamNode::LocalSimpleAgg(SimpleAggNode { agg_calls: self diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 0dcf996cab6c1..ad9e5420690b5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -23,12 +23,13 @@ use risingwave_common::error::ErrorCode::InternalError; use risingwave_common::error::Result; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; -use super::{PlanRef, PlanTreeNodeUnary, ToStreamProst}; +use super::{PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::catalog::column_catalog::ColumnCatalog; use crate::catalog::table_catalog::TableCatalog; use crate::catalog::FragmentId; use crate::optimizer::plan_node::{PlanBase, PlanNode}; use crate::optimizer::property::{Direction, Distribution, FieldOrder, Order, RequiredDist}; +use crate::stream_fragmenter::BuildFragmentGraphState; /// The first column id to allocate for a new materialized view. /// @@ -247,8 +248,8 @@ impl PlanTreeNodeUnary for StreamMaterialize { impl_plan_tree_node_for_unary! { StreamMaterialize } -impl ToStreamProst for StreamMaterialize { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamMaterialize { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode { use risingwave_pb::stream_plan::*; ProstStreamNode::Materialize(MaterializeNode { @@ -261,6 +262,7 @@ impl ToStreamProst for StreamMaterialize { .iter() .map(FieldOrder::to_protobuf) .collect(), + // TODO(tesla): use id generated by state? table: Some(self.table().to_state_table_prost()), }) } diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index 7e828f45d46b0..6a51a3c980247 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -17,8 +17,9 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::ProjectNode; -use super::{LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; +use super::{LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::Expr; +use crate::stream_fragmenter::BuildFragmentGraphState; /// `StreamProject` implements [`super::LogicalProject`] to evaluate specified expressions on input /// rows. @@ -71,8 +72,8 @@ impl PlanTreeNodeUnary for StreamProject { } impl_plan_tree_node_for_unary! {StreamProject} -impl ToStreamProst for StreamProject { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamProject { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode { ProstStreamNode::Project(ProjectNode { select_list: self .logical diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index d916bc7f0a359..72ceaff903426 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -18,7 +18,8 @@ use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::ProjectSetNode; -use super::{LogicalProjectSet, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; +use super::{LogicalProjectSet, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone)] pub struct StreamProjectSet { @@ -66,8 +67,8 @@ impl PlanTreeNodeUnary for StreamProjectSet { impl_plan_tree_node_for_unary! { StreamProjectSet } -impl ToStreamProst for StreamProjectSet { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamProjectSet { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode { ProstStreamNode::ProjectSet(ProjectSetNode { select_list: self .logical diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 963d0c3dbcf6a..31c2652f89e21 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -18,8 +18,9 @@ use std::fmt; use risingwave_common::error::Result; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; -use super::{PlanBase, PlanRef, ToStreamProst}; +use super::{PlanBase, PlanRef, StreamNode}; use crate::optimizer::plan_node::PlanTreeNodeUnary; +use crate::stream_fragmenter::BuildFragmentGraphState; /// [`StreamSink`] represents a table/connector sink at the very end of the graph. #[derive(Debug, Clone)] @@ -77,8 +78,8 @@ impl fmt::Display for StreamSink { } } -impl ToStreamProst for StreamSink { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamSink { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode { use risingwave_pb::stream_plan::*; let input = self.input.clone(); diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 305598ca99313..8a3b1914692ae 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -17,9 +17,9 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::SourceNode; -use super::{LogicalSource, PlanBase, ToStreamProst}; -use crate::catalog::TableId; +use super::{LogicalSource, PlanBase, StreamNode}; use crate::optimizer::property::Distribution; +use crate::stream_fragmenter::BuildFragmentGraphState; /// [`StreamSource`] represents a table/connector source at the very beginning of the graph. #[derive(Debug, Clone)] @@ -65,8 +65,8 @@ impl fmt::Display for StreamSource { } } -impl ToStreamProst for StreamSource { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamSource { + fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> ProstStreamNode { ProstStreamNode::Source(SourceNode { source_id: self.logical.source_catalog.id, column_ids: self @@ -77,7 +77,7 @@ impl ToStreamProst for StreamSource { .map(|c| c.column_id().into()) .collect(), source_type: self.logical.source_catalog.source_type as i32, - state_table_id: TableId::placeholder().table_id, + state_table_id: state.gen_table_id(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 3922b77c3d4eb..b0ca4ed7bdec5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -21,10 +21,11 @@ use risingwave_common::catalog::TableDesc; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::StreamNode as ProstStreamPlan; -use super::{LogicalScan, PlanBase, PlanNodeId, StreamIndexScan, ToStreamProst}; +use super::{LogicalScan, PlanBase, PlanNodeId, StreamIndexScan, StreamNode}; use crate::catalog::ColumnId; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::property::{Distribution, DistributionDisplay}; +use crate::stream_fragmenter::BuildFragmentGraphState; /// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted /// to chain + merge node (for upstream materialize) + batch table scan when converting to `MView` @@ -131,8 +132,8 @@ impl fmt::Display for StreamTableScan { } } -impl ToStreamProst for StreamTableScan { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamTableScan { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode { unreachable!("stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead.") } } diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index 1388c8318340b..84ceb8d61923d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -16,8 +16,9 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; -use super::{LogicalTopN, PlanBase, PlanRef, PlanTreeNodeUnary, ToStreamProst}; +use super::{LogicalTopN, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::Distribution; +use crate::stream_fragmenter::BuildFragmentGraphState; /// `StreamTopN` implements [`super::LogicalTopN`] to find the top N elements with a heap #[derive(Debug, Clone)] @@ -68,8 +69,8 @@ impl PlanTreeNodeUnary for StreamTopN { impl_plan_tree_node_for_unary! { StreamTopN } -impl ToStreamProst for StreamTopN { - fn to_stream_prost_body(&self) -> ProstStreamNode { +impl StreamNode for StreamTopN { + fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> ProstStreamNode { use risingwave_pb::stream_plan::*; let topn_node = TopNNode { limit: self.logical.limit() as u64, @@ -77,6 +78,7 @@ impl ToStreamProst for StreamTopN { table: Some( self.logical .infer_internal_table_catalog(None) + .with_id(state.gen_table_id_wrapped()) .to_state_table_prost(), ), }; diff --git a/src/frontend/src/optimizer/plan_node/to_prost.rs b/src/frontend/src/optimizer/plan_node/to_prost.rs index 0c856c9b48496..577e5922fea85 100644 --- a/src/frontend/src/optimizer/plan_node/to_prost.rs +++ b/src/frontend/src/optimizer/plan_node/to_prost.rs @@ -21,7 +21,7 @@ use crate::{ for_all_plan_nodes, for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes, }; -pub trait ToProst: ToBatchProst + ToStreamProst {} +pub trait ToProst: ToBatchProst + StreamNode {} pub trait ToBatchProst { fn to_batch_prost_body(&self) -> pb_batch_node::NodeBody { @@ -29,8 +29,11 @@ pub trait ToBatchProst { } } -pub trait ToStreamProst { - fn to_stream_prost_body(&self) -> pb_stream_node::NodeBody { +pub trait StreamNode { + fn to_stream_prost_body( + &self, + _state: &mut BuildFragmentGraphState, + ) -> pb_stream_node::NodeBody { unimplemented!() } } @@ -62,8 +65,8 @@ for_stream_plan_nodes! { ban_to_batch_prost } macro_rules! ban_to_stream_prost { ([], $( { $convention:ident, $name:ident }),*) => { paste!{ - $(impl ToStreamProst for [<$convention $name>] { - fn to_stream_prost_body(&self) -> pb_stream_node::NodeBody { + $(impl StreamNode for [<$convention $name>] { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> pb_stream_node::NodeBody { panic!("convert into distributed is only allowed on stream plan") } })* diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index c4645dfdb1c76..a82ac61ba02a1 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -34,7 +34,7 @@ use crate::optimizer::PlanRef; /// The mutable state when building fragment graph. #[derive(Derivative)] #[derivative(Default)] -pub(crate) struct BuildFragmentGraphState { +pub struct BuildFragmentGraphState { /// fragment graph field, transformed from input streaming plan. fragment_graph: StreamFragmentGraph, /// local fragment id @@ -67,16 +67,21 @@ impl BuildFragmentGraphState { } /// Generate an table id - fn gen_table_id(&mut self) -> u32 { + pub fn gen_table_id(&mut self) -> u32 { let ret = self.next_table_id; self.next_table_id += 1; ret } + + /// Generate an table id + pub fn gen_table_id_wrapped(&mut self) -> TableId { + TableId::new(self.gen_table_id()) + } } pub fn build_graph(plan_node: PlanRef) -> StreamFragmentGraphProto { let mut state = BuildFragmentGraphState::default(); - let stream_node = plan_node.to_stream_prost(); + let stream_node = plan_node.to_stream_prost(&mut state); generate_fragment_graph(&mut state, stream_node).unwrap(); let mut fragment_graph = state.fragment_graph.to_protobuf(); fragment_graph.dependent_table_ids = state @@ -207,8 +212,6 @@ fn build_fragment( _ => {} }; - assign_local_table_id_to_stream_node(state, &mut stream_node); - // handle join logic if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap() { if delta_index_join.get_join_type()? == JoinType::Inner @@ -261,310 +264,3 @@ fn build_fragment( .collect::>()?; Ok(stream_node) } - -/// This function assigns the `table_id` based on the type of `StreamNode` -/// Be careful it has side effects and will change the `StreamNode` -fn assign_local_table_id_to_stream_node( - state: &mut BuildFragmentGraphState, - stream_node: &mut StreamNode, -) { - match stream_node.node_body.as_mut().unwrap() { - // For HashJoin nodes, attempting to rewrite to delta joins only on inner join - // with only equal conditions - NodeBody::HashJoin(hash_join_node) => { - // Allocate local table id. It will be rewrite to global table id after get table id - // offset from id generator. - if let Some(left_table) = &mut hash_join_node.left_table { - left_table.id = state.gen_table_id(); - } - if let Some(right_table) = &mut hash_join_node.right_table { - right_table.id = state.gen_table_id(); - } - } - - NodeBody::Source(node) => { - node.state_table_id = state.gen_table_id(); - } - - NodeBody::GlobalSimpleAgg(node) => { - for table in &mut node.internal_tables { - table.id = state.gen_table_id(); - } - } - - // Rewrite hash agg. One agg call -> one table id. - NodeBody::HashAgg(hash_agg_node) => { - for table in &mut hash_agg_node.internal_tables { - table.id = state.gen_table_id(); - } - } - - NodeBody::AppendOnlyTopN(append_only_top_n_node) => { - if let Some(table) = &mut append_only_top_n_node.table { - table.id = state.gen_table_id(); - } else { - panic!("Append only TopN node's table shouldn't be None"); - } - } - NodeBody::TopN(top_n_node) => { - if let Some(table) = &mut top_n_node.table { - table.id = state.gen_table_id(); - } else { - panic!("TopNNode's table shouldn't be None"); - } - } - - NodeBody::GroupTopN(group_top_n_node) => { - if let Some(table) = &mut group_top_n_node.table { - table.id = state.gen_table_id(); - } else { - panic!("GroupTopNNode's table shouldn't be None"); - } - } - - NodeBody::DynamicFilter(dynamic_filter_node) => { - if let Some(left_table) = &mut dynamic_filter_node.left_table { - left_table.id = state.gen_table_id(); - } - if let Some(right_table) = &mut dynamic_filter_node.right_table { - right_table.id = state.gen_table_id(); - } - } - - _ => {} - } -} - -#[cfg(test)] -mod tests { - use risingwave_pb::catalog::{Table, Table as ProstTable}; - use risingwave_pb::data::data_type::TypeName; - use risingwave_pb::data::DataType; - use risingwave_pb::expr::agg_call::{Arg, Type}; - use risingwave_pb::expr::{AggCall, InputRefExpr}; - use risingwave_pb::plan_common::{ColumnCatalog, ColumnDesc, ColumnOrder}; - use risingwave_pb::stream_plan::*; - - use super::*; - - fn make_sum_aggcall(idx: i32) -> AggCall { - AggCall { - r#type: Type::Sum as i32, - args: vec![Arg { - input: Some(InputRefExpr { column_idx: idx }), - r#type: Some(DataType { - type_name: TypeName::Int64 as i32, - ..Default::default() - }), - }], - return_type: Some(DataType { - type_name: TypeName::Int64 as i32, - ..Default::default() - }), - distinct: false, - order_by_fields: vec![], - filter: None, - } - } - - fn make_column(column_type: TypeName, column_id: i32) -> ColumnCatalog { - ColumnCatalog { - column_desc: Some(ColumnDesc { - column_type: Some(DataType { - type_name: column_type as i32, - ..Default::default() - }), - column_id, - ..Default::default() - }), - is_hidden: false, - } - } - - fn make_internal_table(is_agg_value: bool) -> ProstTable { - let mut columns = vec![make_column(TypeName::Int64, 0)]; - if !is_agg_value { - columns.push(make_column(TypeName::Int32, 1)); - } - ProstTable { - id: TableId::placeholder().table_id, - name: String::new(), - columns, - order_key: vec![ColumnOrder { - index: 0, - order_type: 2, - }], - stream_key: vec![2], - ..Default::default() - } - } - - #[test] - fn test_assign_local_table_id_to_stream_node() { - // let fragmenter = StreamFragmenter {}; - let mut state = BuildFragmentGraphState::default(); - let mut expect_table_id = 0; - state.gen_table_id(); // to consume one table_id - - { - // test HashJoin Type - let mut stream_node = StreamNode { - node_body: Some(NodeBody::HashJoin(HashJoinNode { - left_table: Some(Table { - id: 0, - ..Default::default() - }), - right_table: Some(Table { - id: 0, - ..Default::default() - }), - ..Default::default() - })), - ..Default::default() - }; - assign_local_table_id_to_stream_node(&mut state, &mut stream_node); - - if let NodeBody::HashJoin(hash_join_node) = stream_node.node_body.as_ref().unwrap() { - expect_table_id += 1; - assert_eq!( - expect_table_id, - hash_join_node.left_table.as_ref().unwrap().id - ); - expect_table_id += 1; - assert_eq!( - expect_table_id, - hash_join_node.right_table.as_ref().unwrap().id - ); - } - } - - { - // test SimpleAgg Type - let mut stream_node = StreamNode { - node_body: Some(NodeBody::GlobalSimpleAgg(SimpleAggNode { - agg_calls: vec![ - make_sum_aggcall(0), - make_sum_aggcall(1), - make_sum_aggcall(2), - ], - internal_tables: vec![ - make_internal_table(true), - make_internal_table(false), - make_internal_table(false), - ], - ..Default::default() - })), - ..Default::default() - }; - assign_local_table_id_to_stream_node(&mut state, &mut stream_node); - - if let NodeBody::GlobalSimpleAgg(global_simple_agg_node) = - stream_node.node_body.as_ref().unwrap() - { - assert_eq!( - global_simple_agg_node.agg_calls.len(), - global_simple_agg_node.internal_tables.len() - ); - for table in &global_simple_agg_node.internal_tables { - expect_table_id += 1; - assert_eq!(expect_table_id, table.id); - } - } - } - - { - // test HashAgg Type - let mut stream_node = StreamNode { - node_body: Some(NodeBody::HashAgg(HashAggNode { - agg_calls: vec![ - make_sum_aggcall(0), - make_sum_aggcall(1), - make_sum_aggcall(2), - make_sum_aggcall(3), - ], - internal_tables: vec![ - make_internal_table(true), - make_internal_table(false), - make_internal_table(false), - make_internal_table(false), - ], - ..Default::default() - })), - ..Default::default() - }; - assign_local_table_id_to_stream_node(&mut state, &mut stream_node); - - if let NodeBody::HashAgg(hash_agg_node) = stream_node.node_body.as_ref().unwrap() { - assert_eq!( - hash_agg_node.agg_calls.len(), - hash_agg_node.internal_tables.len() - ); - for table in &hash_agg_node.internal_tables { - expect_table_id += 1; - assert_eq!(expect_table_id, table.id); - } - } - } - - { - // test TopN Type - let mut stream_node = StreamNode { - node_body: Some(NodeBody::TopN(TopNNode { - table: Some(Table { - id: 0, - ..Default::default() - }), - ..Default::default() - })), - ..Default::default() - }; - assign_local_table_id_to_stream_node(&mut state, &mut stream_node); - if let NodeBody::TopN(top_n_node) = stream_node.node_body.as_ref().unwrap() { - expect_table_id += 1; - assert_eq!(expect_table_id, top_n_node.table.as_ref().unwrap().id); - } - } - { - // test Group TopN Type - let mut stream_node = StreamNode { - node_body: Some(NodeBody::GroupTopN(GroupTopNNode { - table: Some(Table { - id: 0, - ..Default::default() - }), - ..Default::default() - })), - ..Default::default() - }; - assign_local_table_id_to_stream_node(&mut state, &mut stream_node); - if let NodeBody::GroupTopN(node) = stream_node.node_body.as_ref().unwrap() { - expect_table_id += 1; - assert_eq!(expect_table_id, node.table.as_ref().unwrap().id); - } - } - - { - // test AppendOnlyTopN Type - let mut stream_node = StreamNode { - node_body: Some(NodeBody::AppendOnlyTopN(TopNNode { - table: Some(Table { - id: 0, - ..Default::default() - }), - ..Default::default() - })), - ..Default::default() - }; - assign_local_table_id_to_stream_node(&mut state, &mut stream_node); - if let NodeBody::AppendOnlyTopN(append_only_top_n_node) = - stream_node.node_body.as_ref().unwrap() - { - expect_table_id += 1; - assert_eq!( - expect_table_id, - append_only_top_n_node.table.as_ref().unwrap().id - ); - } - } - } -}