diff --git a/src/frontend/src/optimizer/plan_node/batch_delete.rs b/src/frontend/src/optimizer/plan_node/batch_delete.rs index c50965eb08807..f9c233eb61cdb 100644 --- a/src/frontend/src/optimizer/plan_node/batch_delete.rs +++ b/src/frontend/src/optimizer/plan_node/batch_delete.rs @@ -19,8 +19,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::DeleteNode; use super::{ - ExprRewritable, LogicalDelete, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, - ToDistributedBatch, + generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, }; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -29,14 +28,13 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchDelete { pub base: PlanBase, - pub logical: LogicalDelete, + pub logical: generic::Delete, } impl BatchDelete { - pub fn new(logical: LogicalDelete) -> Self { - let ctx = logical.base.ctx.clone(); + pub fn new(logical: generic::Delete) -> Self { let base = PlanBase::new_batch( - ctx, + logical.ctx(), logical.schema().clone(), Distribution::Single, Order::any(), @@ -53,11 +51,13 @@ impl fmt::Display for BatchDelete { impl PlanTreeNodeUnary for BatchDelete { fn input(&self) -> PlanRef { - self.logical.input() + self.logical.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(self.logical.clone_with_input(input)) + let mut core = self.logical.clone(); + core.input = input; + Self::new(core) } } @@ -74,9 +74,9 @@ impl ToDistributedBatch for BatchDelete { impl ToBatchPb for BatchDelete { fn to_batch_prost_body(&self) -> NodeBody { NodeBody::Delete(DeleteNode { - table_id: self.logical.table_id().table_id(), - table_version_id: self.logical.table_version_id(), - returning: self.logical.has_returning(), + table_id: self.logical.table_id.table_id(), + table_version_id: self.logical.table_version_id, + returning: self.logical.returning, }) } } diff --git a/src/frontend/src/optimizer/plan_node/generic/delete.rs b/src/frontend/src/optimizer/plan_node/generic/delete.rs new file mode 100644 index 0000000000000..7f700b44bc2e2 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/delete.rs @@ -0,0 +1,71 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::fmt; + +use risingwave_common::catalog::{Schema, TableVersionId}; + +use super::GenericPlanRef; +use crate::catalog::TableId; +use crate::OptimizerContextRef; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Delete { + pub table_name: String, // explain-only + pub table_id: TableId, + pub table_version_id: TableVersionId, + pub input: PlanRef, + pub returning: bool, +} + +impl Delete { + pub fn ctx(&self) -> OptimizerContextRef { + self.input.ctx() + } + + pub fn schema(&self) -> &Schema { + self.input.schema() + } +} + +impl Delete { + pub fn new( + input: PlanRef, + table_name: String, + table_id: TableId, + table_version_id: TableVersionId, + returning: bool, + ) -> Self { + Self { + table_name, + table_id, + table_version_id, + input, + returning, + } + } + + pub(crate) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { + write!( + f, + "{} {{ table: {}{} }}", + name, + self.table_name, + if self.returning { + ", returning: true" + } else { + "" + } + ) + } +} diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 43df44ba9647e..683e5ad16041a 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -54,6 +54,8 @@ mod except; pub use except::*; mod update; pub use update::*; +mod delete; +pub use delete::*; pub trait GenericPlanRef { fn schema(&self) -> &Schema; diff --git a/src/frontend/src/optimizer/plan_node/logical_delete.rs b/src/frontend/src/optimizer/plan_node/logical_delete.rs index db0b07412a858..c1d4d6c7244a2 100644 --- a/src/frontend/src/optimizer/plan_node/logical_delete.rs +++ b/src/frontend/src/optimizer/plan_node/logical_delete.rs @@ -19,7 +19,7 @@ use risingwave_common::error::Result; use risingwave_common::types::DataType; use super::{ - gen_filter_and_pushdown, BatchDelete, ColPrunable, ExprRewritable, PlanBase, PlanRef, + gen_filter_and_pushdown, generic, BatchDelete, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; use crate::catalog::TableId; @@ -35,98 +35,46 @@ use crate::utils::{ColIndexMapping, Condition}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalDelete { pub base: PlanBase, - table_name: String, // explain-only - table_id: TableId, - table_version_id: TableVersionId, - input: PlanRef, - returning: bool, + core: generic::Delete, } -impl LogicalDelete { - /// Create a [`LogicalDelete`] node. Used internally by optimizer. - pub fn new( - input: PlanRef, - table_name: String, - table_id: TableId, - table_version_id: TableVersionId, - returning: bool, - ) -> Self { - let ctx = input.ctx(); - let schema = if returning { - input.schema().clone() +impl From> for LogicalDelete { + fn from(core: generic::Delete) -> Self { + let schema = if core.returning { + core.schema().clone() } else { Schema::new(vec![Field::unnamed(DataType::Int64)]) }; let fd_set = FunctionalDependencySet::new(schema.len()); - let base = PlanBase::new_logical(ctx, schema, vec![], fd_set); - Self { - base, - table_name, - table_id, - table_version_id, - input, - returning, - } - } - - /// Create a [`LogicalDelete`] node. Used by planner. - pub fn create( - input: PlanRef, - table_name: String, - table_id: TableId, - table_version_id: TableVersionId, - returning: bool, - ) -> Result { - Ok(Self::new( - input, - table_name, - table_id, - table_version_id, - returning, - )) - } - - pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { - write!( - f, - "{} {{ table: {}{} }}", - name, - self.table_name, - if self.returning { - ", returning: true" - } else { - "" - } - ) + let base = PlanBase::new_logical(core.ctx(), schema, vec![], fd_set); + Self { base, core } } +} +impl LogicalDelete { #[must_use] pub fn table_id(&self) -> TableId { - self.table_id + self.core.table_id } pub fn has_returning(&self) -> bool { - self.returning + self.core.returning } pub fn table_version_id(&self) -> TableVersionId { - self.table_version_id + self.core.table_version_id } } impl PlanTreeNodeUnary for LogicalDelete { fn input(&self) -> PlanRef { - self.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new( - input, - self.table_name.clone(), - self.table_id, - self.table_version_id, - self.returning, - ) + let mut core = self.core.clone(); + core.input = input; + core.into() } } @@ -134,14 +82,15 @@ impl_plan_tree_node_for_unary! { LogicalDelete } impl fmt::Display for LogicalDelete { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.fmt_with_name(f, "LogicalDelete") + self.core.fmt_with_name(f, "LogicalDelete") } } impl ColPrunable for LogicalDelete { fn prune_col(&self, _required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { - let required_cols: Vec<_> = (0..self.input.schema().len()).collect(); - self.clone_with_input(self.input.prune_col(&required_cols, ctx)) + let input = &self.core.input; + let required_cols: Vec<_> = (0..input.schema().len()).collect(); + self.clone_with_input(input.prune_col(&required_cols, ctx)) .into() } } @@ -161,8 +110,9 @@ impl PredicatePushdown for LogicalDelete { impl ToBatch for LogicalDelete { fn to_batch(&self) -> Result { let new_input = self.input().to_batch()?; - let new_logical = self.clone_with_input(new_input); - Ok(BatchDelete::new(new_logical).into()) + let mut core = self.core.clone(); + core.input = new_input; + Ok(BatchDelete::new(core).into()) } } diff --git a/src/frontend/src/planner/delete.rs b/src/frontend/src/planner/delete.rs index 68cb349281b01..9c862ac9f396d 100644 --- a/src/frontend/src/planner/delete.rs +++ b/src/frontend/src/planner/delete.rs @@ -17,7 +17,7 @@ use risingwave_common::error::Result; use super::Planner; use crate::binder::BoundDelete; -use crate::optimizer::plan_node::{LogicalDelete, LogicalFilter, LogicalProject}; +use crate::optimizer::plan_node::{generic, LogicalDelete, LogicalFilter, LogicalProject}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{PlanRef, PlanRoot}; @@ -30,13 +30,13 @@ impl Planner { scan }; let returning = !delete.returning_list.is_empty(); - let mut plan: PlanRef = LogicalDelete::create( + let mut plan: PlanRef = LogicalDelete::from(generic::Delete::new( input, delete.table_name.clone(), delete.table_id, delete.table_version_id, returning, - )? + )) .into(); if returning { diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 6204b498e32a5..b07997370ac53 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -950,7 +950,7 @@ impl BatchPlanFragmenter { } else if let Some(update) = node.as_batch_update() { Some(update.logical.table_id) } else if let Some(delete) = node.as_batch_delete() { - Some(delete.logical.table_id()) + Some(delete.logical.table_id) } else { node.inputs() .into_iter()