diff --git a/e2e_test/batch/functions/now.slt.part b/e2e_test/batch/functions/now.slt.part index ee706e80e2eb9..8d3ce7867b3d9 100644 --- a/e2e_test/batch/functions/now.slt.part +++ b/e2e_test/batch/functions/now.slt.part @@ -10,6 +10,13 @@ create table t (a timestamp); statement ok insert into t values(now()); +# constant eval of now in batch plan +# query T +# explain select now() + interval '1 hour' = now() + interval '30 minutes' + interval '30 minutes' true; +# ---- +# BatchProject { exprs: [true:Boolean] } +# └─BatchValues { rows: [[]] } + statement ok drop table tz diff --git a/src/frontend/planner_test/tests/testdata/expr.yaml b/src/frontend/planner_test/tests/testdata/expr.yaml index 0c365da376e6d..0ad24241013a8 100644 --- a/src/frontend/planner_test/tests/testdata/expr.yaml +++ b/src/frontend/planner_test/tests/testdata/expr.yaml @@ -148,8 +148,6 @@ - sql: | -- When it is invalid, PostgreSQL reports error during explain, but we have to wait until execution as of now. #4235 values(round('abc')); - batch_plan: | - BatchValues { rows: [[Round('abc':Varchar::Float64)]] } - sql: | values(extract(hour from timestamp '2001-02-16 20:38:40')); batch_plan: | @@ -538,3 +536,14 @@ logical_plan: | LogicalProject { exprs: [Array(1:Int32, null:Int32)] } └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } +- name: const_eval of const expr + sql: | + create table t(v1 int); + select 1 + 2 + v1 from t; + batch_plan: | + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [((1:Int32 + 2:Int32) + t.v1)] } + └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } +- name: const_eval of division by 0 error + sql: | + select 1 / 0 t1; diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 528cc3b2af746..f53161351b2c2 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -23,11 +23,12 @@ pub use plan_rewriter::PlanRewriter; mod plan_visitor; pub use plan_visitor::PlanVisitor; mod optimizer_context; +mod plan_expr_rewriter; mod rule; - use fixedbitset::FixedBitSet; use itertools::Itertools as _; pub use optimizer_context::*; +use plan_expr_rewriter::ConstEvalRewriter; use plan_rewriter::ShareSourceRewriter; use property::Order; use risingwave_common::catalog::{Field, Schema}; @@ -50,6 +51,7 @@ use crate::catalog::column_catalog::ColumnCatalog; use crate::catalog::table_catalog::{TableType, TableVersion}; use crate::optimizer::plan_node::{ BatchExchange, ColumnPruningContext, PlanNodeType, PlanTreeNode, PredicatePushdownContext, + RewriteExprsRecursive, }; use crate::optimizer::property::Distribution; use crate::utils::Condition; @@ -451,6 +453,15 @@ impl PlanRoot { // Convert to physical plan node plan = plan.to_batch_with_order_required(&self.required_order)?; + // SessionTimezone substitution + // Const eval of exprs at the last minute + // plan = const_eval_exprs(plan)?; + + // if explain_trace { + // ctx.trace("Const eval exprs:"); + // ctx.trace(plan.explain_to_string().unwrap()); + // } + #[cfg(debug_assertions)] InputRefValidator.validate(plan.clone()); assert!(*plan.distribution() == Distribution::Single, "{}", plan); @@ -712,6 +723,17 @@ impl PlanRoot { } } +#[allow(dead_code)] +fn const_eval_exprs(plan: PlanRef) -> Result { + let mut const_eval_rewriter = ConstEvalRewriter { error: None }; + + let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter); + if let Some(error) = const_eval_rewriter.error { + return Err(error); + } + Ok(plan) +} + #[cfg(test)] mod tests { use risingwave_common::catalog::Field; diff --git a/src/frontend/src/optimizer/plan_expr_rewriter/const_eval_rewriter.rs b/src/frontend/src/optimizer/plan_expr_rewriter/const_eval_rewriter.rs new file mode 100644 index 0000000000000..7fdfa1857713d --- /dev/null +++ b/src/frontend/src/optimizer/plan_expr_rewriter/const_eval_rewriter.rs @@ -0,0 +1,50 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::error::RwError; + +use crate::expr::{Expr, ExprImpl, ExprRewriter, Literal}; + +pub(crate) struct ConstEvalRewriter { + pub(crate) error: Option, +} +impl ExprRewriter for ConstEvalRewriter { + fn rewrite_expr(&mut self, expr: ExprImpl) -> ExprImpl { + if self.error.is_some() { + return expr; + } + if expr.is_const() { + let data_type = expr.return_type(); + match expr.eval_row_const() { + Ok(datum) => Literal::new(datum, data_type).into(), + Err(e) => { + self.error = Some(e); + expr + } + } + } else { + match expr { + ExprImpl::InputRef(inner) => self.rewrite_input_ref(*inner), + ExprImpl::Literal(inner) => self.rewrite_literal(*inner), + ExprImpl::FunctionCall(inner) => self.rewrite_function_call(*inner), + ExprImpl::AggCall(inner) => self.rewrite_agg_call(*inner), + ExprImpl::Subquery(inner) => self.rewrite_subquery(*inner), + ExprImpl::CorrelatedInputRef(inner) => self.rewrite_correlated_input_ref(*inner), + ExprImpl::TableFunction(inner) => self.rewrite_table_function(*inner), + ExprImpl::WindowFunction(inner) => self.rewrite_window_function(*inner), + ExprImpl::UserDefinedFunction(inner) => self.rewrite_user_defined_function(*inner), + } + } + } +} diff --git a/src/frontend/src/optimizer/plan_expr_rewriter/mod.rs b/src/frontend/src/optimizer/plan_expr_rewriter/mod.rs new file mode 100644 index 0000000000000..4c2df9169315f --- /dev/null +++ b/src/frontend/src/optimizer/plan_expr_rewriter/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod const_eval_rewriter; +pub(crate) use const_eval_rewriter::ConstEvalRewriter; diff --git a/src/frontend/src/optimizer/plan_node/batch_delete.rs b/src/frontend/src/optimizer/plan_node/batch_delete.rs index 84b88c3e598ab..8762d59834d8d 100644 --- a/src/frontend/src/optimizer/plan_node/batch_delete.rs +++ b/src/frontend/src/optimizer/plan_node/batch_delete.rs @@ -19,7 +19,8 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::DeleteNode; use super::{ - LogicalDelete, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, + ExprRewritable, LogicalDelete, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, + ToDistributedBatch, }; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -86,3 +87,5 @@ impl ToLocalBatch for BatchDelete { Ok(self.clone_with_input(new_input).into()) } } + +impl ExprRewritable for BatchDelete {} diff --git a/src/frontend/src/optimizer/plan_node/batch_exchange.rs b/src/frontend/src/optimizer/plan_node/batch_exchange.rs index d611cd64fecde..aabd57b898979 100644 --- a/src/frontend/src/optimizer/plan_node/batch_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/batch_exchange.rs @@ -18,7 +18,9 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ExchangeNode, MergeSortExchangeNode}; -use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; +use super::{ + ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, +}; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Distribution, DistributionDisplay, Order, OrderDisplay}; @@ -98,3 +100,5 @@ impl ToLocalBatch for BatchExchange { unreachable!() } } + +impl ExprRewritable for BatchExchange {} diff --git a/src/frontend/src/optimizer/plan_node/batch_expand.rs b/src/frontend/src/optimizer/plan_node/batch_expand.rs index c5099d190854a..7f6232a0bf921 100644 --- a/src/frontend/src/optimizer/plan_node/batch_expand.rs +++ b/src/frontend/src/optimizer/plan_node/batch_expand.rs @@ -20,6 +20,7 @@ use risingwave_pb::batch_plan::expand_node::Subset; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::ExpandNode; +use super::ExprRewritable; use crate::optimizer::plan_node::{ LogicalExpand, PlanBase, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, ToLocalBatch, }; @@ -99,3 +100,5 @@ impl ToLocalBatch for BatchExpand { Ok(self.clone_with_input(new_input).into()) } } + +impl ExprRewritable for BatchExpand {} diff --git a/src/frontend/src/optimizer/plan_node/batch_filter.rs b/src/frontend/src/optimizer/plan_node/batch_filter.rs index bf78553d7ee7d..c92634c2713e1 100644 --- a/src/frontend/src/optimizer/plan_node/batch_filter.rs +++ b/src/frontend/src/optimizer/plan_node/batch_filter.rs @@ -19,8 +19,10 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::FilterNode; use super::generic::GenericPlanRef; -use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; -use crate::expr::{Expr, ExprImpl}; +use super::{ + ExprRewritable, LogicalFilter, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, +}; +use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::{PlanBase, ToLocalBatch}; use crate::utils::Condition; @@ -93,3 +95,20 @@ impl ToLocalBatch for BatchFilter { Ok(self.clone_with_input(new_input).into()) } } + +impl ExprRewritable for BatchFilter { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_filter() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/batch_group_topn.rs b/src/frontend/src/optimizer/plan_node/batch_group_topn.rs index 4cac43850cb00..ac9ad13c5c99a 100644 --- a/src/frontend/src/optimizer/plan_node/batch_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/batch_group_topn.rs @@ -18,7 +18,10 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::GroupTopNNode; -use super::{LogicalTopN, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; +use super::{ + ExprRewritable, LogicalTopN, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, + ToDistributedBatch, +}; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Order, RequiredDist}; @@ -95,3 +98,5 @@ impl ToLocalBatch for BatchGroupTopN { Ok(self.clone_with_input(input).into()) } } + +impl ExprRewritable for BatchGroupTopN {} diff --git a/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs b/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs index fb03cb56a4885..db1847f18cb7e 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs @@ -19,7 +19,11 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::HashAggNode; use super::generic::{GenericPlanRef, PlanAggCall}; -use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; +use super::{ + ExprRewritable, LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, + ToDistributedBatch, +}; +use crate::expr::ExprRewriter; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -107,3 +111,20 @@ impl ToLocalBatch for BatchHashAgg { Ok(self.clone_with_input(new_input).into()) } } + +impl ExprRewritable for BatchHashAgg { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_agg() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/batch_hash_join.rs b/src/frontend/src/optimizer/plan_node/batch_hash_join.rs index fad3c2521ded1..77fdef9ffefeb 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hash_join.rs @@ -22,10 +22,10 @@ use risingwave_pb::plan_common::JoinType; use super::generic::GenericPlanRef; use super::{ - EqJoinPredicate, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchProst, - ToDistributedBatch, + EqJoinPredicate, ExprRewritable, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, + ToBatchProst, ToDistributedBatch, }; -use crate::expr::Expr; +use crate::expr::{Expr, ExprRewriter}; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{EqJoinPredicateDisplay, ToLocalBatch}; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -262,3 +262,21 @@ impl ToLocalBatch for BatchHashJoin { Ok(self.clone_with_left_right(left, right).into()) } } + +impl ExprRewritable for BatchHashJoin { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_join() + .unwrap() + .clone(), + self.eq_join_predicate.rewrite_exprs(r), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs index d112a47716139..dfd460642fc02 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs @@ -19,7 +19,8 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::HopWindowNode; use super::{ - LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, + ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, + ToDistributedBatch, }; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Order, RequiredDist}; @@ -117,3 +118,5 @@ impl ToLocalBatch for BatchHopWindow { Ok(self.clone_with_input(new_input).into()) } } + +impl ExprRewritable for BatchHopWindow {} diff --git a/src/frontend/src/optimizer/plan_node/batch_insert.rs b/src/frontend/src/optimizer/plan_node/batch_insert.rs index 5c2f105a883a7..f8aac43a7f797 100644 --- a/src/frontend/src/optimizer/plan_node/batch_insert.rs +++ b/src/frontend/src/optimizer/plan_node/batch_insert.rs @@ -19,7 +19,9 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::InsertNode; use risingwave_pb::catalog::ColumnIndex; -use super::{LogicalInsert, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; +use super::{ + ExprRewritable, LogicalInsert, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, +}; use crate::optimizer::plan_node::{PlanBase, ToLocalBatch}; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -97,3 +99,5 @@ impl ToLocalBatch for BatchInsert { Ok(self.clone_with_input(new_input).into()) } } + +impl ExprRewritable for BatchInsert {} diff --git a/src/frontend/src/optimizer/plan_node/batch_limit.rs b/src/frontend/src/optimizer/plan_node/batch_limit.rs index 75499fea09974..2d6ad614ce4a6 100644 --- a/src/frontend/src/optimizer/plan_node/batch_limit.rs +++ b/src/frontend/src/optimizer/plan_node/batch_limit.rs @@ -18,7 +18,10 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::LimitNode; -use super::{LogicalLimit, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; +use super::{ + ExprRewritable, LogicalLimit, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, + ToDistributedBatch, +}; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Order, RequiredDist}; @@ -94,3 +97,5 @@ impl ToLocalBatch for BatchLimit { self.two_phase_limit(self.input().to_local()?) } } + +impl ExprRewritable for BatchLimit {} diff --git a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs index 5b4011a51f129..ea16eda2816c7 100644 --- a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs @@ -20,7 +20,8 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode}; use super::generic::GenericPlanRef; -use crate::expr::Expr; +use super::ExprRewritable; +use crate::expr::{Expr, ExprRewriter}; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{ EqJoinPredicate, EqJoinPredicateDisplay, LogicalJoin, PlanBase, PlanTreeNodeBinary, @@ -281,3 +282,24 @@ impl ToLocalBatch for BatchLookupJoin { Ok(self.clone_with_distributed_lookup(input, false).into()) } } + +impl ExprRewritable for BatchLookupJoin { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self { + base: self.base.clone_with_new_plan_id(), + logical: self + .logical + .rewrite_exprs(r) + .as_logical_join() + .unwrap() + .clone(), + eq_join_predicate: self.eq_join_predicate.rewrite_exprs(r), + ..Self::clone(self) + } + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs b/src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs index 30c0110b46f91..490cc65d733c8 100644 --- a/src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs @@ -20,8 +20,11 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::NestedLoopJoinNode; use super::generic::GenericPlanRef; -use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchProst, ToDistributedBatch}; -use crate::expr::{Expr, ExprImpl}; +use super::{ + ExprRewritable, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchProst, + ToDistributedBatch, +}; +use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -155,3 +158,20 @@ impl ToLocalBatch for BatchNestedLoopJoin { Ok(self.clone_with_left_right(left, right).into()) } } + +impl ExprRewritable for BatchNestedLoopJoin { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_join() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/batch_project.rs b/src/frontend/src/optimizer/plan_node/batch_project.rs index 43256cc4f6a0c..538575bd2f903 100644 --- a/src/frontend/src/optimizer/plan_node/batch_project.rs +++ b/src/frontend/src/optimizer/plan_node/batch_project.rs @@ -21,9 +21,10 @@ use risingwave_pb::expr::ExprNode; use super::generic::GenericPlanRef; use super::{ - LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, + ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, + ToDistributedBatch, }; -use crate::expr::{Expr, ExprImpl}; +use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::ToLocalBatch; /// `BatchProject` implements [`super::LogicalProject`] to evaluate specified expressions on input @@ -105,3 +106,20 @@ impl ToLocalBatch for BatchProject { Ok(self.clone_with_input(new_input).into()) } } + +impl ExprRewritable for BatchProject { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_project() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/batch_project_set.rs b/src/frontend/src/optimizer/plan_node/batch_project_set.rs index 314a00d0198d1..9380ce37f7e25 100644 --- a/src/frontend/src/optimizer/plan_node/batch_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/batch_project_set.rs @@ -19,6 +19,8 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::ProjectSetNode; +use super::ExprRewritable; +use crate::expr::ExprRewriter; use crate::optimizer::plan_node::{ LogicalProjectSet, PlanBase, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, ToLocalBatch, }; @@ -93,3 +95,20 @@ impl ToLocalBatch for BatchProjectSet { Ok(self.clone_with_input(new_input).into()) } } + +impl ExprRewritable for BatchProjectSet { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_project_set() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs index 181197b4adfb6..5ddddb8105a1b 100644 --- a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -23,8 +23,9 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{RowSeqScanNode, SysRowSeqScanNode}; use risingwave_pb::plan_common::ColumnDesc as ProstColumnDesc; -use super::{PlanBase, PlanRef, ToBatchProst, ToDistributedBatch}; +use super::{ExprRewritable, PlanBase, PlanRef, ToBatchProst, ToDistributedBatch}; use crate::catalog::ColumnId; +use crate::expr::ExprRewriter; use crate::optimizer::plan_node::{LogicalScan, ToLocalBatch}; use crate::optimizer::property::{Distribution, DistributionDisplay, Order}; @@ -262,3 +263,21 @@ impl ToLocalBatch for BatchSeqScan { Ok(Self::new_inner(self.logical.clone(), dist, self.scan_ranges.clone()).into()) } } + +impl ExprRewritable for BatchSeqScan { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_scan() + .unwrap() + .clone(), + self.scan_ranges.clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs index 1d9846f524fad..5739b740145c4 100644 --- a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs @@ -19,7 +19,11 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SortAggNode; use super::generic::{GenericPlanRef, PlanAggCall}; -use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; +use super::{ + ExprRewritable, LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, + ToDistributedBatch, +}; +use crate::expr::ExprRewriter; use crate::optimizer::plan_node::{BatchExchange, ToLocalBatch}; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -128,3 +132,20 @@ impl ToLocalBatch for BatchSimpleAgg { Ok(self.clone_with_input(new_input).into()) } } + +impl ExprRewritable for BatchSimpleAgg { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_agg() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/batch_sort.rs b/src/frontend/src/optimizer/plan_node/batch_sort.rs index 5c8578c20313d..ca1d02c6f7d23 100644 --- a/src/frontend/src/optimizer/plan_node/batch_sort.rs +++ b/src/frontend/src/optimizer/plan_node/batch_sort.rs @@ -18,7 +18,9 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SortNode; -use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; +use super::{ + ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, +}; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Order, OrderDisplay}; @@ -84,3 +86,5 @@ impl ToLocalBatch for BatchSort { Ok(self.clone_with_input(new_input).into()) } } + +impl ExprRewritable for BatchSort {} diff --git a/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs b/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs index dae5a2c1159fb..1e3056dff4427 100644 --- a/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs @@ -21,8 +21,11 @@ use risingwave_pb::batch_plan::SortAggNode; use risingwave_pb::expr::ExprNode; use super::generic::{GenericPlanRef, PlanAggCall}; -use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; -use crate::expr::{Expr, ExprImpl, InputRef}; +use super::{ + ExprRewritable, LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, + ToDistributedBatch, +}; +use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef}; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -143,3 +146,20 @@ impl ToLocalBatch for BatchSortAgg { Ok(self.clone_with_input(new_input).into()) } } + +impl ExprRewritable for BatchSortAgg { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_agg() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index e7307e5df5da6..b4f7780f274e7 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -19,7 +19,10 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SourceNode; -use super::{LogicalSource, PlanBase, PlanRef, ToBatchProst, ToDistributedBatch, ToLocalBatch}; +use super::{ + ExprRewritable, LogicalSource, PlanBase, PlanRef, ToBatchProst, ToDistributedBatch, + ToLocalBatch, +}; use crate::optimizer::property::{Distribution, Order}; /// [`BatchSource`] represents a table/connector source at the very beginning of the graph. @@ -105,3 +108,4 @@ impl ToBatchProst for BatchSource { }) } } +impl ExprRewritable for BatchSource {} diff --git a/src/frontend/src/optimizer/plan_node/batch_table_function.rs b/src/frontend/src/optimizer/plan_node/batch_table_function.rs index d656eff516e00..355a6c60db0f6 100644 --- a/src/frontend/src/optimizer/plan_node/batch_table_function.rs +++ b/src/frontend/src/optimizer/plan_node/batch_table_function.rs @@ -18,7 +18,10 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::TableFunctionNode; -use super::{PlanBase, PlanRef, PlanTreeNodeLeaf, ToBatchProst, ToDistributedBatch}; +use super::{ + ExprRewritable, PlanBase, PlanRef, PlanTreeNodeLeaf, ToBatchProst, ToDistributedBatch, +}; +use crate::expr::ExprRewriter; use crate::optimizer::plan_node::logical_table_function::LogicalTableFunction; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Distribution, Order}; @@ -78,3 +81,20 @@ impl ToLocalBatch for BatchTableFunction { Ok(Self::with_dist(self.logical().clone(), Distribution::Single).into()) } } + +impl ExprRewritable for BatchTableFunction { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_table_function() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/batch_topn.rs b/src/frontend/src/optimizer/plan_node/batch_topn.rs index 6091e7e944fae..2dc5645efbca0 100644 --- a/src/frontend/src/optimizer/plan_node/batch_topn.rs +++ b/src/frontend/src/optimizer/plan_node/batch_topn.rs @@ -18,7 +18,10 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::TopNNode; -use super::{LogicalTopN, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch}; +use super::{ + ExprRewritable, LogicalTopN, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, + ToDistributedBatch, +}; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Order, RequiredDist}; @@ -102,3 +105,5 @@ impl ToLocalBatch for BatchTopN { self.two_phase_topn(self.input().to_local()?) } } + +impl ExprRewritable for BatchTopN {} diff --git a/src/frontend/src/optimizer/plan_node/batch_union.rs b/src/frontend/src/optimizer/plan_node/batch_union.rs index 72babbc2ca695..4ac6e87218ada 100644 --- a/src/frontend/src/optimizer/plan_node/batch_union.rs +++ b/src/frontend/src/optimizer/plan_node/batch_union.rs @@ -18,7 +18,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::UnionNode; -use super::{PlanRef, ToBatchProst, ToDistributedBatch}; +use super::{ExprRewritable, PlanRef, ToBatchProst, ToDistributedBatch}; use crate::optimizer::plan_node::{LogicalUnion, PlanBase, PlanTreeNode, ToLocalBatch}; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -100,3 +100,5 @@ impl ToLocalBatch for BatchUnion { Ok(self.clone_with_inputs(&new_inputs?)) } } + +impl ExprRewritable for BatchUnion {} diff --git a/src/frontend/src/optimizer/plan_node/batch_update.rs b/src/frontend/src/optimizer/plan_node/batch_update.rs index 57bc9d43e5d0d..381072d592051 100644 --- a/src/frontend/src/optimizer/plan_node/batch_update.rs +++ b/src/frontend/src/optimizer/plan_node/batch_update.rs @@ -20,9 +20,10 @@ use risingwave_pb::batch_plan::UpdateNode; use super::generic::GenericPlanRef; use super::{ - LogicalUpdate, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, + ExprRewritable, LogicalUpdate, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, + ToDistributedBatch, }; -use crate::expr::Expr; +use crate::expr::{Expr, ExprRewriter}; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -101,3 +102,20 @@ impl ToLocalBatch for BatchUpdate { Ok(self.clone_with_input(new_input).into()) } } + +impl ExprRewritable for BatchUpdate { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_update() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/batch_values.rs b/src/frontend/src/optimizer/plan_node/batch_values.rs index 3902895d5c858..21b719d3821eb 100644 --- a/src/frontend/src/optimizer/plan_node/batch_values.rs +++ b/src/frontend/src/optimizer/plan_node/batch_values.rs @@ -20,8 +20,11 @@ use risingwave_pb::batch_plan::values_node::ExprTuple; use risingwave_pb::batch_plan::ValuesNode; use super::generic::GenericPlanRef; -use super::{LogicalValues, PlanBase, PlanRef, PlanTreeNodeLeaf, ToBatchProst, ToDistributedBatch}; -use crate::expr::{Expr, ExprImpl}; +use super::{ + ExprRewritable, LogicalValues, PlanBase, PlanRef, PlanTreeNodeLeaf, ToBatchProst, + ToDistributedBatch, +}; +use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Distribution, Order}; @@ -104,3 +107,20 @@ impl ToLocalBatch for BatchValues { Ok(Self::with_dist(self.logical().clone(), Distribution::Single).into()) } } + +impl ExprRewritable for BatchValues { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_values() + .unwrap() + .clone(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/eq_join_predicate.rs b/src/frontend/src/optimizer/plan_node/eq_join_predicate.rs index 974158bdef124..b684cc70c2024 100644 --- a/src/frontend/src/optimizer/plan_node/eq_join_predicate.rs +++ b/src/frontend/src/optimizer/plan_node/eq_join_predicate.rs @@ -16,7 +16,7 @@ use std::fmt; use risingwave_common::catalog::Schema; -use crate::expr::{ExprType, FunctionCall, InputRef, InputRefDisplay}; +use crate::expr::{ExprRewriter, ExprType, FunctionCall, InputRef, InputRefDisplay}; use crate::utils::{ColIndexMapping, Condition, ConditionDisplay}; /// The join predicate used in optimizer @@ -242,6 +242,12 @@ impl EqJoinPredicate { Self::new(self.other_cond, new_eq_keys, self.left_cols_num) } + + pub fn rewrite_exprs(&self, rewriter: &mut (impl ExprRewriter + ?Sized)) -> Self { + let mut new = self.clone(); + new.other_cond = new.other_cond.rewrite_expr(rewriter); + new + } } pub struct EqJoinPredicateDisplay<'a> { diff --git a/src/frontend/src/optimizer/plan_node/expr_rewritable.rs b/src/frontend/src/optimizer/plan_node/expr_rewritable.rs index 9d04c163a607c..baeb4caa1d61a 100644 --- a/src/frontend/src/optimizer/plan_node/expr_rewritable.rs +++ b/src/frontend/src/optimizer/plan_node/expr_rewritable.rs @@ -18,31 +18,42 @@ use paste::paste; use super::*; use crate::expr::ExprRewriter; -use crate::{for_batch_plan_nodes, for_stream_plan_nodes}; +use crate::for_stream_plan_nodes; /// Rewrites expressions in a `PlanRef`. Due to `Share` operator, /// the `ExprRewriter` needs to be idempotent i.e. applying it more than once /// to the same `ExprImpl` will be a noop on subsequent applications. +/// `rewrite_exprs` should only return a plan with the given node modified. +/// To rewrite recursively, call `rewrite_exprs_recursive` on [`RewriteExprsRecursive`]. pub trait ExprRewritable { - fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef; + fn has_rewritable_expr(&self) -> bool { + false + } + + fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { + unimplemented!() + } } impl ExprRewritable for PlanRef { + fn has_rewritable_expr(&self) -> bool { + true + } + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - self.deref().rewrite_exprs(r) + if self.deref().has_rewritable_expr() { + self.deref().rewrite_exprs(r) + } else { + self.clone() + } } } macro_rules! ban_expr_rewritable { ($( { $convention:ident, $name:ident }),*) => { paste!{ - $(impl ExprRewritable for [<$convention $name>] { - fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { - unimplemented!() - } - })* + $(impl ExprRewritable for [<$convention $name>] {} )* } } } -for_batch_plan_nodes! {ban_expr_rewritable} for_stream_plan_nodes! {ban_expr_rewritable} diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index eb9ba67d9bba2..c75070280eb3f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -789,6 +789,10 @@ impl fmt::Display for LogicalAgg { } impl ExprRewritable for LogicalAgg { + fn has_rewritable_expr(&self) -> bool { + true + } + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut core = self.core.clone(); core.rewrite_exprs(r); diff --git a/src/frontend/src/optimizer/plan_node/logical_apply.rs b/src/frontend/src/optimizer/plan_node/logical_apply.rs index 118487d5d6f14..4ec4aac229f99 100644 --- a/src/frontend/src/optimizer/plan_node/logical_apply.rs +++ b/src/frontend/src/optimizer/plan_node/logical_apply.rs @@ -296,6 +296,10 @@ impl ColPrunable for LogicalApply { } impl ExprRewritable for LogicalApply { + fn has_rewritable_expr(&self) -> bool { + true + } + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut new = self.clone(); new.on = new.on.rewrite_expr(r); diff --git a/src/frontend/src/optimizer/plan_node/logical_delete.rs b/src/frontend/src/optimizer/plan_node/logical_delete.rs index eedf2e2a3c900..d69cc6c867c04 100644 --- a/src/frontend/src/optimizer/plan_node/logical_delete.rs +++ b/src/frontend/src/optimizer/plan_node/logical_delete.rs @@ -23,7 +23,6 @@ use super::{ PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; use crate::catalog::TableId; -use crate::expr::ExprRewriter; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; @@ -127,11 +126,7 @@ impl ColPrunable for LogicalDelete { } } -impl ExprRewritable for LogicalDelete { - fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { - self.clone().into() - } -} +impl ExprRewritable for LogicalDelete {} impl PredicatePushdown for LogicalDelete { fn predicate_pushdown( diff --git a/src/frontend/src/optimizer/plan_node/logical_expand.rs b/src/frontend/src/optimizer/plan_node/logical_expand.rs index 39fa5fd617e57..c314acdac6293 100644 --- a/src/frontend/src/optimizer/plan_node/logical_expand.rs +++ b/src/frontend/src/optimizer/plan_node/logical_expand.rs @@ -23,7 +23,6 @@ use super::{ gen_filter_and_pushdown, generic, BatchExpand, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamExpand, ToBatch, ToStream, }; -use crate::expr::ExprRewriter; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; @@ -158,11 +157,7 @@ impl ColPrunable for LogicalExpand { } } -impl ExprRewritable for LogicalExpand { - fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { - self.clone().into() - } -} +impl ExprRewritable for LogicalExpand {} impl PredicatePushdown for LogicalExpand { fn predicate_pushdown( diff --git a/src/frontend/src/optimizer/plan_node/logical_filter.rs b/src/frontend/src/optimizer/plan_node/logical_filter.rs index c9085af5b6341..aecb974010e17 100644 --- a/src/frontend/src/optimizer/plan_node/logical_filter.rs +++ b/src/frontend/src/optimizer/plan_node/logical_filter.rs @@ -176,6 +176,10 @@ impl ColPrunable for LogicalFilter { } impl ExprRewritable for LogicalFilter { + fn has_rewritable_expr(&self) -> bool { + true + } + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut core = self.core.clone(); core.rewrite_exprs(r); diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index b862c0933171b..7c7e838bb0451 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -25,7 +25,7 @@ use super::{ gen_filter_and_pushdown, generic, BatchHopWindow, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamHopWindow, ToBatch, ToStream, }; -use crate::expr::{ExprRewriter, InputRef}; +use crate::expr::InputRef; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; @@ -327,11 +327,7 @@ impl ColPrunable for LogicalHopWindow { } } -impl ExprRewritable for LogicalHopWindow { - fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { - self.clone().into() - } -} +impl ExprRewritable for LogicalHopWindow {} impl PredicatePushdown for LogicalHopWindow { /// Keep predicate on time window parameters (`window_start`, `window_end`), diff --git a/src/frontend/src/optimizer/plan_node/logical_insert.rs b/src/frontend/src/optimizer/plan_node/logical_insert.rs index e6d29a02950d3..e18f7b7518049 100644 --- a/src/frontend/src/optimizer/plan_node/logical_insert.rs +++ b/src/frontend/src/optimizer/plan_node/logical_insert.rs @@ -23,7 +23,6 @@ use super::{ PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; use crate::catalog::TableId; -use crate::expr::ExprRewriter; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; @@ -161,11 +160,7 @@ impl ColPrunable for LogicalInsert { } } -impl ExprRewritable for LogicalInsert { - fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { - self.clone().into() - } -} +impl ExprRewritable for LogicalInsert {} impl PredicatePushdown for LogicalInsert { fn predicate_pushdown( diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index cf6d45d253f72..f55bc0f96d6b8 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -786,6 +786,10 @@ impl ColPrunable for LogicalJoin { } impl ExprRewritable for LogicalJoin { + fn has_rewritable_expr(&self) -> bool { + true + } + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut core = self.core.clone(); core.rewrite_exprs(r); diff --git a/src/frontend/src/optimizer/plan_node/logical_limit.rs b/src/frontend/src/optimizer/plan_node/logical_limit.rs index 74b6a124a9717..3be0c575d02bd 100644 --- a/src/frontend/src/optimizer/plan_node/logical_limit.rs +++ b/src/frontend/src/optimizer/plan_node/logical_limit.rs @@ -20,7 +20,6 @@ use super::{ gen_filter_and_pushdown, BatchLimit, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; -use crate::expr::ExprRewriter; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; @@ -100,11 +99,7 @@ impl ColPrunable for LogicalLimit { } } -impl ExprRewritable for LogicalLimit { - fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { - self.clone().into() - } -} +impl ExprRewritable for LogicalLimit {} impl PredicatePushdown for LogicalLimit { fn predicate_pushdown( diff --git a/src/frontend/src/optimizer/plan_node/logical_now.rs b/src/frontend/src/optimizer/plan_node/logical_now.rs index d91aa943a731c..89c2df4775ef0 100644 --- a/src/frontend/src/optimizer/plan_node/logical_now.rs +++ b/src/frontend/src/optimizer/plan_node/logical_now.rs @@ -25,7 +25,6 @@ use super::{ ColPrunable, ColumnPruningContext, ExprRewritable, LogicalFilter, PlanBase, PlanRef, PredicatePushdown, RewriteStreamContext, StreamNow, ToBatch, ToStream, ToStreamContext, }; -use crate::expr::ExprRewriter; use crate::optimizer::property::FunctionalDependencySet; use crate::utils::ColIndexMapping; use crate::OptimizerContextRef; @@ -70,11 +69,7 @@ impl fmt::Display for LogicalNow { impl_plan_tree_node_for_leaf! { LogicalNow } -impl ExprRewritable for LogicalNow { - fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { - self.clone().into() - } -} +impl ExprRewritable for LogicalNow {} impl PredicatePushdown for LogicalNow { fn predicate_pushdown( diff --git a/src/frontend/src/optimizer/plan_node/logical_over_agg.rs b/src/frontend/src/optimizer/plan_node/logical_over_agg.rs index bc92040eaafe0..75e88e37ec2a1 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_agg.rs @@ -25,9 +25,7 @@ use super::{ gen_filter_and_pushdown, ColPrunable, ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; -use crate::expr::{ - Expr, ExprImpl, ExprRewriter, InputRef, InputRefDisplay, WindowFunction, WindowFunctionType, -}; +use crate::expr::{Expr, ExprImpl, InputRef, InputRefDisplay, WindowFunction, WindowFunctionType}; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; @@ -266,11 +264,7 @@ impl ColPrunable for LogicalOverAgg { } } -impl ExprRewritable for LogicalOverAgg { - fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { - self.clone().into() - } -} +impl ExprRewritable for LogicalOverAgg {} impl PredicatePushdown for LogicalOverAgg { fn predicate_pushdown( diff --git a/src/frontend/src/optimizer/plan_node/logical_project.rs b/src/frontend/src/optimizer/plan_node/logical_project.rs index 8bca6cb81b4c2..5be68d3a8daac 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project.rs @@ -205,6 +205,10 @@ impl ColPrunable for LogicalProject { } impl ExprRewritable for LogicalProject { + fn has_rewritable_expr(&self) -> bool { + true + } + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut core = self.core.clone(); core.rewrite_exprs(r); diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index 46fa700f457fb..2954922c77ed1 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -258,6 +258,10 @@ impl ColPrunable for LogicalProjectSet { } impl ExprRewritable for LogicalProjectSet { + fn has_rewritable_expr(&self) -> bool { + true + } + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut core = self.core.clone(); core.rewrite_exprs(r); diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index e790ecebf0c33..9528e02a58685 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -451,6 +451,10 @@ impl ColPrunable for LogicalScan { } impl ExprRewritable for LogicalScan { + fn has_rewritable_expr(&self) -> bool { + true + } + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut core = self.core.clone(); core.rewrite_exprs(r); diff --git a/src/frontend/src/optimizer/plan_node/logical_share.rs b/src/frontend/src/optimizer/plan_node/logical_share.rs index a6a01f60dc4c5..49caa4bd1a4db 100644 --- a/src/frontend/src/optimizer/plan_node/logical_share.rs +++ b/src/frontend/src/optimizer/plan_node/logical_share.rs @@ -23,7 +23,6 @@ use super::{ ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; -use crate::expr::ExprRewriter; use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{ ColumnPruningContext, LogicalProject, PredicatePushdownContext, RewriteStreamContext, @@ -120,11 +119,7 @@ impl ColPrunable for LogicalShare { } } -impl ExprRewritable for LogicalShare { - fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { - self.clone().into() - } -} +impl ExprRewritable for LogicalShare {} impl PredicatePushdown for LogicalShare { fn predicate_pushdown( diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index cf603ed6d7fc2..789edaf9ced68 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -29,7 +29,7 @@ use super::{ }; use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::ColumnId; -use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType}; +use crate::expr::{Expr, ExprImpl, ExprType}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, @@ -161,11 +161,7 @@ impl ColPrunable for LogicalSource { } } -impl ExprRewritable for LogicalSource { - fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { - self.clone().into() - } -} +impl ExprRewritable for LogicalSource {} /// A util function to extract kafka offset timestamp range. /// diff --git a/src/frontend/src/optimizer/plan_node/logical_table_function.rs b/src/frontend/src/optimizer/plan_node/logical_table_function.rs index 019201dbd0ce4..8f763cdcecc00 100644 --- a/src/frontend/src/optimizer/plan_node/logical_table_function.rs +++ b/src/frontend/src/optimizer/plan_node/logical_table_function.rs @@ -72,6 +72,10 @@ impl ColPrunable for LogicalTableFunction { } impl ExprRewritable for LogicalTableFunction { + fn has_rewritable_expr(&self) -> bool { + true + } + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut new = self.clone(); new.table_function.args = new diff --git a/src/frontend/src/optimizer/plan_node/logical_topn.rs b/src/frontend/src/optimizer/plan_node/logical_topn.rs index ddbe1b0aada01..3ddf04e476097 100644 --- a/src/frontend/src/optimizer/plan_node/logical_topn.rs +++ b/src/frontend/src/optimizer/plan_node/logical_topn.rs @@ -24,7 +24,7 @@ use super::{ PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamGroupTopN, StreamProject, ToBatch, ToStream, }; -use crate::expr::{ExprRewriter, ExprType, FunctionCall, InputRef}; +use crate::expr::{ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::{ BatchTopN, ColumnPruningContext, LogicalProject, PredicatePushdownContext, RewriteStreamContext, StreamTopN, ToStreamContext, @@ -354,11 +354,7 @@ impl ColPrunable for LogicalTopN { } } -impl ExprRewritable for LogicalTopN { - fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { - self.clone().into() - } -} +impl ExprRewritable for LogicalTopN {} impl PredicatePushdown for LogicalTopN { fn predicate_pushdown( diff --git a/src/frontend/src/optimizer/plan_node/logical_union.rs b/src/frontend/src/optimizer/plan_node/logical_union.rs index b92792b6267eb..17cb5575be68f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_union.rs @@ -19,7 +19,7 @@ use risingwave_common::error::Result; use risingwave_common::types::{DataType, Scalar}; use super::{ColPrunable, ExprRewritable, PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream}; -use crate::expr::{ExprImpl, ExprRewriter, InputRef, Literal}; +use crate::expr::{ExprImpl, InputRef, Literal}; use crate::optimizer::plan_node::generic::{GenericPlanNode, GenericPlanRef}; use crate::optimizer::plan_node::stream_union::StreamUnion; use crate::optimizer::plan_node::{ @@ -109,11 +109,7 @@ impl ColPrunable for LogicalUnion { } } -impl ExprRewritable for LogicalUnion { - fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef { - self.clone().into() - } -} +impl ExprRewritable for LogicalUnion {} impl PredicatePushdown for LogicalUnion { fn predicate_pushdown( diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs index 9d630df961f45..932d9a02fea9d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_update.rs +++ b/src/frontend/src/optimizer/plan_node/logical_update.rs @@ -136,6 +136,10 @@ impl fmt::Display for LogicalUpdate { } impl ExprRewritable for LogicalUpdate { + fn has_rewritable_expr(&self) -> bool { + true + } + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut new = self.clone(); new.exprs = new.exprs.into_iter().map(|e| r.rewrite_expr(e)).collect(); diff --git a/src/frontend/src/optimizer/plan_node/logical_values.rs b/src/frontend/src/optimizer/plan_node/logical_values.rs index 1d685b87a93e5..8795cefda3ad8 100644 --- a/src/frontend/src/optimizer/plan_node/logical_values.rs +++ b/src/frontend/src/optimizer/plan_node/logical_values.rs @@ -77,6 +77,10 @@ impl fmt::Display for LogicalValues { } impl ExprRewritable for LogicalValues { + fn has_rewritable_expr(&self) -> bool { + true + } + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut new = self.clone(); new.rows = new diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 5f36edaa45d15..6ecc81128b918 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -86,6 +86,22 @@ pub enum Convention { Stream, } +pub(crate) trait RewriteExprsRecursive { + fn rewrite_exprs_recursive(&self, r: &mut impl ExprRewriter) -> PlanRef; +} + +impl RewriteExprsRecursive for PlanRef { + fn rewrite_exprs_recursive(&self, r: &mut impl ExprRewriter) -> PlanRef { + let new = self.rewrite_exprs(r); + let inputs: Vec = new + .inputs() + .iter() + .map(|plan_ref| plan_ref.rewrite_exprs_recursive(r)) + .collect(); + new.clone_with_inputs(&inputs[..]) + } +} + impl ColPrunable for PlanRef { fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { if let Some(logical_share) = self.as_logical_share() { @@ -378,16 +394,6 @@ impl dyn PlanNode { } } - pub fn rewrite_exprs_recursive(&self, r: &mut impl ExprRewriter) -> PlanRef { - let new = self.rewrite_exprs(r); - let inputs: Vec = new - .inputs() - .iter() - .map(|plan_ref| plan_ref.rewrite_exprs_recursive(r)) - .collect(); - new.clone_with_inputs(&inputs[..]) - } - /// Serialize the plan node and its children to a batch plan proto. pub fn to_batch_prost(&self) -> BatchPlanProst { self.to_batch_prost_identity(true)