Skip to content

Commit

Permalink
refactor(plan_node): simplify batch-delete (#9785)
Browse files Browse the repository at this point in the history
  • Loading branch information
ice1000 authored May 14, 2023
1 parent 0995d35 commit 4d9e9e8
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 89 deletions.
22 changes: 11 additions & 11 deletions src/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::DeleteNode;

use super::{
ExprRewritable, LogicalDelete, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb,
ToDistributedBatch,
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order, RequiredDist};
Expand All @@ -29,14 +28,13 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchDelete {
pub base: PlanBase,
pub logical: LogicalDelete,
pub logical: generic::Delete<PlanRef>,
}

impl BatchDelete {
pub fn new(logical: LogicalDelete) -> Self {
let ctx = logical.base.ctx.clone();
pub fn new(logical: generic::Delete<PlanRef>) -> Self {
let base = PlanBase::new_batch(
ctx,
logical.ctx(),
logical.schema().clone(),
Distribution::Single,
Order::any(),
Expand All @@ -53,11 +51,13 @@ impl fmt::Display for BatchDelete {

impl PlanTreeNodeUnary for BatchDelete {
fn input(&self) -> PlanRef {
self.logical.input()
self.logical.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(self.logical.clone_with_input(input))
let mut core = self.logical.clone();
core.input = input;
Self::new(core)
}
}

Expand All @@ -74,9 +74,9 @@ impl ToDistributedBatch for BatchDelete {
impl ToBatchPb for BatchDelete {
fn to_batch_prost_body(&self) -> NodeBody {
NodeBody::Delete(DeleteNode {
table_id: self.logical.table_id().table_id(),
table_version_id: self.logical.table_version_id(),
returning: self.logical.has_returning(),
table_id: self.logical.table_id.table_id(),
table_version_id: self.logical.table_version_id,
returning: self.logical.returning,
})
}
}
Expand Down
71 changes: 71 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;

use risingwave_common::catalog::{Schema, TableVersionId};

use super::GenericPlanRef;
use crate::catalog::TableId;
use crate::OptimizerContextRef;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Delete<PlanRef> {
pub table_name: String, // explain-only
pub table_id: TableId,
pub table_version_id: TableVersionId,
pub input: PlanRef,
pub returning: bool,
}

impl<PlanRef: GenericPlanRef> Delete<PlanRef> {
pub fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

pub fn schema(&self) -> &Schema {
self.input.schema()
}
}

impl<PlanRef> Delete<PlanRef> {
pub fn new(
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
returning: bool,
) -> Self {
Self {
table_name,
table_id,
table_version_id,
input,
returning,
}
}

pub(crate) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
write!(
f,
"{} {{ table: {}{} }}",
name,
self.table_name,
if self.returning {
", returning: true"
} else {
""
}
)
}
}
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ mod except;
pub use except::*;
mod update;
pub use update::*;
mod delete;
pub use delete::*;

pub trait GenericPlanRef {
fn schema(&self) -> &Schema;
Expand Down
98 changes: 24 additions & 74 deletions src/frontend/src/optimizer/plan_node/logical_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::error::Result;
use risingwave_common::types::DataType;

use super::{
gen_filter_and_pushdown, BatchDelete, ColPrunable, ExprRewritable, PlanBase, PlanRef,
gen_filter_and_pushdown, generic, BatchDelete, ColPrunable, ExprRewritable, PlanBase, PlanRef,
PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream,
};
use crate::catalog::TableId;
Expand All @@ -35,113 +35,62 @@ use crate::utils::{ColIndexMapping, Condition};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LogicalDelete {
pub base: PlanBase,
table_name: String, // explain-only
table_id: TableId,
table_version_id: TableVersionId,
input: PlanRef,
returning: bool,
core: generic::Delete<PlanRef>,
}

impl LogicalDelete {
/// Create a [`LogicalDelete`] node. Used internally by optimizer.
pub fn new(
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
returning: bool,
) -> Self {
let ctx = input.ctx();
let schema = if returning {
input.schema().clone()
impl From<generic::Delete<PlanRef>> for LogicalDelete {
fn from(core: generic::Delete<PlanRef>) -> Self {
let schema = if core.returning {
core.schema().clone()
} else {
Schema::new(vec![Field::unnamed(DataType::Int64)])
};
let fd_set = FunctionalDependencySet::new(schema.len());
let base = PlanBase::new_logical(ctx, schema, vec![], fd_set);
Self {
base,
table_name,
table_id,
table_version_id,
input,
returning,
}
}

/// Create a [`LogicalDelete`] node. Used by planner.
pub fn create(
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
returning: bool,
) -> Result<Self> {
Ok(Self::new(
input,
table_name,
table_id,
table_version_id,
returning,
))
}

pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
write!(
f,
"{} {{ table: {}{} }}",
name,
self.table_name,
if self.returning {
", returning: true"
} else {
""
}
)
let base = PlanBase::new_logical(core.ctx(), schema, vec![], fd_set);
Self { base, core }
}
}

impl LogicalDelete {
#[must_use]
pub fn table_id(&self) -> TableId {
self.table_id
self.core.table_id
}

pub fn has_returning(&self) -> bool {
self.returning
self.core.returning
}

pub fn table_version_id(&self) -> TableVersionId {
self.table_version_id
self.core.table_version_id
}
}

impl PlanTreeNodeUnary for LogicalDelete {
fn input(&self) -> PlanRef {
self.input.clone()
self.core.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(
input,
self.table_name.clone(),
self.table_id,
self.table_version_id,
self.returning,
)
let mut core = self.core.clone();
core.input = input;
core.into()
}
}

impl_plan_tree_node_for_unary! { LogicalDelete }

impl fmt::Display for LogicalDelete {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.fmt_with_name(f, "LogicalDelete")
self.core.fmt_with_name(f, "LogicalDelete")
}
}

impl ColPrunable for LogicalDelete {
fn prune_col(&self, _required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
let required_cols: Vec<_> = (0..self.input.schema().len()).collect();
self.clone_with_input(self.input.prune_col(&required_cols, ctx))
let input = &self.core.input;
let required_cols: Vec<_> = (0..input.schema().len()).collect();
self.clone_with_input(input.prune_col(&required_cols, ctx))
.into()
}
}
Expand All @@ -161,8 +110,9 @@ impl PredicatePushdown for LogicalDelete {
impl ToBatch for LogicalDelete {
fn to_batch(&self) -> Result<PlanRef> {
let new_input = self.input().to_batch()?;
let new_logical = self.clone_with_input(new_input);
Ok(BatchDelete::new(new_logical).into())
let mut core = self.core.clone();
core.input = new_input;
Ok(BatchDelete::new(core).into())
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/planner/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use risingwave_common::error::Result;

use super::Planner;
use crate::binder::BoundDelete;
use crate::optimizer::plan_node::{LogicalDelete, LogicalFilter, LogicalProject};
use crate::optimizer::plan_node::{generic, LogicalDelete, LogicalFilter, LogicalProject};
use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{PlanRef, PlanRoot};

Expand All @@ -30,13 +30,13 @@ impl Planner {
scan
};
let returning = !delete.returning_list.is_empty();
let mut plan: PlanRef = LogicalDelete::create(
let mut plan: PlanRef = LogicalDelete::from(generic::Delete::new(
input,
delete.table_name.clone(),
delete.table_id,
delete.table_version_id,
returning,
)?
))
.into();

if returning {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ impl BatchPlanFragmenter {
} else if let Some(update) = node.as_batch_update() {
Some(update.logical.table_id)
} else if let Some(delete) = node.as_batch_delete() {
Some(delete.logical.table_id())
Some(delete.logical.table_id)
} else {
node.inputs()
.into_iter()
Expand Down

0 comments on commit 4d9e9e8

Please sign in to comment.