diff --git a/src/frontend/src/optimizer/plan_node/batch.rs b/src/frontend/src/optimizer/plan_node/batch.rs new file mode 100644 index 0000000000000..2ac1e278f7d8b --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/batch.rs @@ -0,0 +1,20 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::generic::GenericPlanRef; +use crate::optimizer::property::Order; + +pub trait BatchPlanRef: GenericPlanRef { + fn order(&self) -> &Order; +} diff --git a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs index ccfa664f9bc40..334a0843a5564 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs @@ -19,8 +19,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::HopWindowNode; use super::{ - ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, - ToDistributedBatch, + generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, }; use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::ToLocalBatch; @@ -32,24 +31,25 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchHopWindow { pub base: PlanBase, - logical: LogicalHopWindow, + logical: generic::HopWindow, window_start_exprs: Vec, window_end_exprs: Vec, } impl BatchHopWindow { pub fn new( - logical: LogicalHopWindow, + logical: generic::HopWindow, window_start_exprs: Vec, window_end_exprs: Vec, ) -> Self { - let ctx = logical.base.ctx.clone(); + let base = PlanBase::new_logical_with_core(&logical); + let ctx = base.ctx; let distribution = logical .i2o_col_mapping() - .rewrite_provided_distribution(logical.input().distribution()); + .rewrite_provided_distribution(logical.input.distribution()); let base = PlanBase::new_batch( ctx, - logical.schema().clone(), + base.schema, distribution, logical.get_out_column_index_order(), ); @@ -70,12 +70,14 @@ impl fmt::Display for BatchHopWindow { impl PlanTreeNodeUnary for BatchHopWindow { fn input(&self) -> PlanRef { - self.logical.input() + self.logical.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { + let mut logical = self.logical.clone(); + logical.input = input; Self::new( - self.logical.clone_with_input(input), + logical, self.window_start_exprs.clone(), self.window_end_exprs.clone(), ) @@ -105,7 +107,8 @@ impl ToDistributedBatch for BatchHopWindow { let new_input = self .input() .to_distributed_with_required(required_order, &input_required)?; - let new_logical = self.logical.clone_with_input(new_input); + let mut new_logical = self.logical.clone(); + new_logical.input = new_input; let batch_plan = BatchHopWindow::new( new_logical, self.window_start_exprs.clone(), @@ -119,12 +122,11 @@ impl ToDistributedBatch for BatchHopWindow { impl ToBatchPb for BatchHopWindow { fn to_batch_prost_body(&self) -> NodeBody { NodeBody::HopWindow(HopWindowNode { - time_col: self.logical.core.time_col.index() as _, - window_slide: Some(self.logical.core.window_slide.into()), - window_size: Some(self.logical.core.window_size.into()), + time_col: self.logical.time_col.index() as _, + window_slide: Some(self.logical.window_slide.into()), + window_size: Some(self.logical.window_size.into()), output_indices: self .logical - .core .output_indices .iter() .map(|&x| x as u32) diff --git a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs index 85848c3323a94..9bb9597d1650c 100644 --- a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs @@ -26,7 +26,8 @@ use super::super::utils::IndicesDisplay; use super::{GenericPlanNode, GenericPlanRef}; use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal}; use crate::optimizer::optimizer_context::OptimizerContextRef; -use crate::optimizer::property::FunctionalDependencySet; +use crate::optimizer::plan_node::batch::BatchPlanRef; +use crate::optimizer::property::{FunctionalDependencySet, Order}; use crate::utils::ColIndexMappingRewriteExt; /// [`HopWindow`] implements Hop Table Function. @@ -118,7 +119,24 @@ impl GenericPlanNode for HopWindow { } } +impl HopWindow { + pub fn get_out_column_index_order(&self) -> Order { + self.i2o_col_mapping() + .rewrite_provided_order(self.input.order()) + } +} + impl HopWindow { + pub fn output_window_start_col_idx(&self) -> Option { + self.internal2output_col_mapping() + .try_map(self.internal_window_start_col_idx()) + } + + pub fn output_window_end_col_idx(&self) -> Option { + self.internal2output_col_mapping() + .try_map(self.internal_window_end_col_idx()) + } + pub fn into_parts(self) -> (PlanRef, InputRef, Interval, Interval, Interval, Vec) { ( self.input, diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index d9babaab86e17..0bd9d8d8c9f20 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -28,14 +28,13 @@ use crate::expr::{ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; -use crate::optimizer::property::Order; use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition}; /// `LogicalHopWindow` implements Hop Table Function. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalHopWindow { pub base: PlanBase, - pub(super) core: generic::HopWindow, + core: generic::HopWindow, } impl LogicalHopWindow { @@ -118,44 +117,22 @@ impl LogicalHopWindow { .into() } - pub fn internal_window_start_col_idx(&self) -> usize { - self.core.internal_window_start_col_idx() - } - - pub fn internal_window_end_col_idx(&self) -> usize { - self.core.internal_window_end_col_idx() - } - pub fn output_window_start_col_idx(&self) -> Option { - self.internal2output_col_mapping() - .try_map(self.internal_window_start_col_idx()) + self.core.output_window_start_col_idx() } pub fn output_window_end_col_idx(&self) -> Option { - self.internal2output_col_mapping() - .try_map(self.internal_window_end_col_idx()) + self.core.output_window_end_col_idx() } pub fn o2i_col_mapping(&self) -> ColIndexMapping { self.core.o2i_col_mapping() } - pub fn i2o_col_mapping(&self) -> ColIndexMapping { - self.core.i2o_col_mapping() - } - - pub fn internal_column_num(&self) -> usize { - self.core.internal_column_num() - } - pub fn output2internal_col_mapping(&self) -> ColIndexMapping { self.core.output2internal_col_mapping() } - pub fn internal2output_col_mapping(&self) -> ColIndexMapping { - self.core.internal2output_col_mapping() - } - pub fn clone_with_output_indices(&self, output_indices: Vec) -> Self { Self::new( self.input(), @@ -167,20 +144,6 @@ impl LogicalHopWindow { ) } - pub fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { - self.core.fmt_with_name(f, name) - } - - pub fn fmt_fields_with_builder(&self, builder: &mut fmt::DebugStruct<'_, '_>) { - self.core.fmt_fields_with_builder(builder) - } - - /// Map the order of the input to use the updated indices - pub fn get_out_column_index_order(&self) -> Order { - self.i2o_col_mapping() - .rewrite_provided_order(self.input().order()) - } - /// Get output indices pub fn output_indices(&self) -> &Vec { &self.core.output_indices @@ -223,10 +186,10 @@ impl PlanTreeNodeUnary for LogicalHopWindow { Some(new_idx) } None => { - if idx == self.internal_window_start_col_idx() { + if idx == self.core.internal_window_start_col_idx() { columns_to_be_kept.push(i); Some(input.schema().len()) - } else if idx == self.internal_window_end_col_idx() { + } else if idx == self.core.internal_window_end_col_idx() { columns_to_be_kept.push(i); Some(input.schema().len() + 1) } else { @@ -257,7 +220,7 @@ impl_plan_tree_node_for_unary! {LogicalHopWindow} impl fmt::Display for LogicalHopWindow { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.fmt_with_name(f, "LogicalHopWindow") + self.core.fmt_with_name(f, "LogicalHopWindow") } } @@ -296,9 +259,9 @@ impl ColPrunable for LogicalHopWindow { if let Some(idx) = o2i.try_map(idx) { Some(IndexType::Input(idx)) } else if let Some(idx) = output2internal.try_map(idx) { - if idx == self.internal_window_start_col_idx() { + if idx == self.core.internal_window_start_col_idx() { Some(IndexType::WindowStart) - } else if idx == self.internal_window_end_col_idx() { + } else if idx == self.core.internal_window_end_col_idx() { Some(IndexType::WindowEnd) } else { None @@ -313,8 +276,8 @@ impl ColPrunable for LogicalHopWindow { .iter() .filter_map(|&idx| match idx { IndexType::Input(x) => input_change.try_map(x), - IndexType::WindowStart => Some(new_hop.internal_window_start_col_idx()), - IndexType::WindowEnd => Some(new_hop.internal_window_end_col_idx()), + IndexType::WindowStart => Some(new_hop.core.internal_window_start_col_idx()), + IndexType::WindowEnd => Some(new_hop.core.internal_window_end_col_idx()), }) .collect_vec() }; @@ -334,8 +297,8 @@ impl PredicatePushdown for LogicalHopWindow { ) -> PlanRef { let mut window_columns = FixedBitSet::with_capacity(self.schema().len()); - let window_start_idx = self.internal_window_start_col_idx(); - let window_end_idx = self.internal_window_end_col_idx(); + let window_start_idx = self.core.internal_window_start_col_idx(); + let window_end_idx = self.core.internal_window_end_col_idx(); for (i, v) in self.output_indices().iter().enumerate() { if *v == window_start_idx || *v == window_end_idx { window_columns.insert(i); @@ -351,9 +314,10 @@ impl PredicatePushdown for LogicalHopWindow { impl ToBatch for LogicalHopWindow { fn to_batch(&self) -> Result { let new_input = self.input().to_batch()?; - let new_logical = self.clone_with_input(new_input); + let mut new_logical = self.core.clone(); + new_logical.input = new_input; let (window_start_exprs, window_end_exprs) = - new_logical.core.derive_window_start_and_end_exprs()?; + new_logical.derive_window_start_and_end_exprs()?; Ok(BatchHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into()) } } @@ -361,9 +325,10 @@ impl ToBatch for LogicalHopWindow { impl ToStream for LogicalHopWindow { fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { let new_input = self.input().to_stream(ctx)?; - let new_logical = self.clone_with_input(new_input); + let mut new_logical = self.core.clone(); + new_logical.input = new_input; let (window_start_exprs, window_end_exprs) = - new_logical.core.derive_window_start_and_end_exprs()?; + new_logical.derive_window_start_and_end_exprs()?; Ok(StreamHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into()) } @@ -382,7 +347,7 @@ impl ToStream for LogicalHopWindow { { output_indices.push(input.schema().len()); } - let i2o = self.i2o_col_mapping(); + let i2o = self.core.i2o_col_mapping(); output_indices.extend( input .logical_pk() diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 500962f9c3956..8196556ea4f20 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -46,6 +46,7 @@ use risingwave_pb::stream_plan::StreamNode as StreamPlanPb; use serde::Serialize; use smallvec::SmallVec; +use self::batch::BatchPlanRef; use self::generic::GenericPlanRef; use self::stream::StreamPlanRef; use super::property::{Distribution, FunctionalDependencySet, Order}; @@ -385,6 +386,12 @@ impl StreamPlanRef for PlanRef { } } +impl BatchPlanRef for PlanRef { + fn order(&self) -> &Order { + &self.plan_base().order + } +} + impl GenericPlanRef for PlanRef { fn schema(&self) -> &Schema { &self.plan_base().schema @@ -594,6 +601,7 @@ pub use predicate_pushdown::*; mod merge_eq_nodes; pub use merge_eq_nodes::*; +pub mod batch; pub mod generic; pub mod stream; pub mod stream_derive; diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs index 30749ed0ce6e4..ddef83c0c215c 100644 --- a/src/frontend/src/optimizer/plan_node/plan_base.rs +++ b/src/frontend/src/optimizer/plan_node/plan_base.rs @@ -79,6 +79,11 @@ impl stream::StreamPlanRef for PlanBase { self.append_only } } +impl batch::BatchPlanRef for PlanBase { + fn order(&self) -> &Order { + &self.order + } +} impl PlanBase { pub fn new_logical( ctx: OptimizerContextRef, diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index 221ed3e678d6e..17f1687dc4662 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -20,7 +20,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::HopWindowNode; -use super::{ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; @@ -29,21 +29,22 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamHopWindow { pub base: PlanBase, - logical: LogicalHopWindow, + logical: generic::HopWindow, window_start_exprs: Vec, window_end_exprs: Vec, } impl StreamHopWindow { pub fn new( - logical: LogicalHopWindow, + logical: generic::HopWindow, window_start_exprs: Vec, window_end_exprs: Vec, ) -> Self { - let ctx = logical.base.ctx.clone(); - let pk_indices = logical.base.logical_pk.to_vec(); - let input = logical.input(); - let schema = logical.schema().clone(); + let base = PlanBase::new_logical_with_core(&logical); + let ctx = base.ctx; + let pk_indices = base.logical_pk; + let input = logical.input.clone(); + let schema = base.schema; let i2o = logical.i2o_col_mapping(); let dist = i2o.rewrite_provided_distribution(input.distribution()); @@ -51,13 +52,13 @@ impl StreamHopWindow { let mut watermark_columns = input.watermark_columns().clone(); watermark_columns.grow(logical.internal_column_num()); - if watermark_columns.contains(logical.core.time_col.index) { + if watermark_columns.contains(logical.time_col.index) { // Watermark on `time_col` indicates watermark on both `window_start` and `window_end`. watermark_columns.insert(logical.internal_window_start_col_idx()); watermark_columns.insert(logical.internal_window_end_col_idx()); } let watermark_columns = ColIndexMapping::with_remaining_columns( - logical.output_indices(), + &logical.output_indices, logical.internal_column_num(), ) .rewrite_bitset(&watermark_columns); @@ -66,9 +67,9 @@ impl StreamHopWindow { ctx, schema, pk_indices, - logical.functional_dependency().clone(), + base.functional_dependency, dist, - logical.input().append_only(), + logical.input.append_only(), watermark_columns, ); Self { @@ -103,12 +104,14 @@ impl fmt::Display for StreamHopWindow { impl PlanTreeNodeUnary for StreamHopWindow { fn input(&self) -> PlanRef { - self.logical.input() + self.logical.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { + let mut logical = self.logical.clone(); + logical.input = input; Self::new( - self.logical.clone_with_input(input), + logical, self.window_start_exprs.clone(), self.window_end_exprs.clone(), ) @@ -120,12 +123,11 @@ impl_plan_tree_node_for_unary! {StreamHopWindow} impl StreamNode for StreamHopWindow { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { PbNodeBody::HopWindow(HopWindowNode { - time_col: self.logical.core.time_col.index() as _, - window_slide: Some(self.logical.core.window_slide.into()), - window_size: Some(self.logical.core.window_size.into()), + time_col: self.logical.time_col.index() as _, + window_slide: Some(self.logical.window_slide.into()), + window_size: Some(self.logical.window_size.into()), output_indices: self .logical - .core .output_indices .iter() .map(|&x| x as u32)