Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(plan_node): simplify batch-delete #9785

Merged
merged 7 commits into from
May 14, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::DeleteNode;

use super::{
ExprRewritable, LogicalDelete, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb,
ToDistributedBatch,
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order, RequiredDist};
Expand All @@ -29,14 +28,13 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchDelete {
pub base: PlanBase,
pub logical: LogicalDelete,
pub logical: generic::Delete<PlanRef>,
}

impl BatchDelete {
pub fn new(logical: LogicalDelete) -> Self {
let ctx = logical.base.ctx.clone();
pub fn new(logical: generic::Delete<PlanRef>) -> Self {
let base = PlanBase::new_batch(
ctx,
logical.ctx(),
logical.schema().clone(),
Distribution::Single,
Order::any(),
Expand All @@ -53,11 +51,13 @@ impl fmt::Display for BatchDelete {

impl PlanTreeNodeUnary for BatchDelete {
fn input(&self) -> PlanRef {
self.logical.input()
self.logical.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(self.logical.clone_with_input(input))
let mut core = self.logical.clone();
core.input = input;
Self::new(core)
}
}

Expand All @@ -74,9 +74,9 @@ impl ToDistributedBatch for BatchDelete {
impl ToBatchPb for BatchDelete {
fn to_batch_prost_body(&self) -> NodeBody {
NodeBody::Delete(DeleteNode {
table_id: self.logical.table_id().table_id(),
table_version_id: self.logical.table_version_id(),
returning: self.logical.has_returning(),
table_id: self.logical.table_id.table_id(),
table_version_id: self.logical.table_version_id,
returning: self.logical.returning,
})
}
}
Expand Down
58 changes: 58 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::fmt;

use risingwave_common::catalog::{Schema, TableVersionId};

use super::GenericPlanRef;
use crate::catalog::TableId;
use crate::OptimizerContextRef;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Delete<PlanRef> {
pub table_name: String, // explain-only
pub table_id: TableId,
pub table_version_id: TableVersionId,
pub input: PlanRef,
pub returning: bool,
}

impl<PlanRef: GenericPlanRef> Delete<PlanRef> {
pub fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

pub fn schema(&self) -> &Schema {
self.input.schema()
}
}

impl<PlanRef> Delete<PlanRef> {
pub fn new(
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
returning: bool,
) -> Self {
Self {
input,
table_name,
table_id,
table_version_id,
returning,
}
}

pub(crate) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
write!(
f,
"{} {{ table: {}{} }}",
name,
self.table_name,
if self.returning {
", returning: true"
} else {
""
}
)
}
}
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ mod except;
pub use except::*;
mod update;
pub use update::*;
mod delete;
pub use delete::*;

pub trait GenericPlanRef {
fn schema(&self) -> &Schema;
Expand Down
98 changes: 24 additions & 74 deletions src/frontend/src/optimizer/plan_node/logical_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::error::Result;
use risingwave_common::types::DataType;

use super::{
gen_filter_and_pushdown, BatchDelete, ColPrunable, ExprRewritable, PlanBase, PlanRef,
gen_filter_and_pushdown, generic, BatchDelete, ColPrunable, ExprRewritable, PlanBase, PlanRef,
PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream,
};
use crate::catalog::TableId;
Expand All @@ -35,113 +35,62 @@ use crate::utils::{ColIndexMapping, Condition};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LogicalDelete {
pub base: PlanBase,
table_name: String, // explain-only
table_id: TableId,
table_version_id: TableVersionId,
input: PlanRef,
returning: bool,
core: generic::Delete<PlanRef>,
}

impl LogicalDelete {
/// Create a [`LogicalDelete`] node. Used internally by optimizer.
pub fn new(
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
returning: bool,
) -> Self {
let ctx = input.ctx();
let schema = if returning {
input.schema().clone()
impl From<generic::Delete<PlanRef>> for LogicalDelete {
fn from(core: generic::Delete<PlanRef>) -> Self {
let schema = if core.returning {
core.schema().clone()
} else {
Schema::new(vec![Field::unnamed(DataType::Int64)])
};
let fd_set = FunctionalDependencySet::new(schema.len());
let base = PlanBase::new_logical(ctx, schema, vec![], fd_set);
Self {
base,
table_name,
table_id,
table_version_id,
input,
returning,
}
}

/// Create a [`LogicalDelete`] node. Used by planner.
pub fn create(
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
returning: bool,
) -> Result<Self> {
Ok(Self::new(
input,
table_name,
table_id,
table_version_id,
returning,
))
}

pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
write!(
f,
"{} {{ table: {}{} }}",
name,
self.table_name,
if self.returning {
", returning: true"
} else {
""
}
)
let base = PlanBase::new_logical(core.ctx(), schema, vec![], fd_set);
Self { base, core }
}
}

impl LogicalDelete {
#[must_use]
pub fn table_id(&self) -> TableId {
self.table_id
self.core.table_id
}

pub fn has_returning(&self) -> bool {
self.returning
self.core.returning
}

pub fn table_version_id(&self) -> TableVersionId {
self.table_version_id
self.core.table_version_id
}
}

impl PlanTreeNodeUnary for LogicalDelete {
fn input(&self) -> PlanRef {
self.input.clone()
self.core.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(
input,
self.table_name.clone(),
self.table_id,
self.table_version_id,
self.returning,
)
let mut core = self.core.clone();
core.input = input;
core.into()
}
}

impl_plan_tree_node_for_unary! { LogicalDelete }

impl fmt::Display for LogicalDelete {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.fmt_with_name(f, "LogicalDelete")
self.core.fmt_with_name(f, "LogicalDelete")
}
}

impl ColPrunable for LogicalDelete {
fn prune_col(&self, _required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
let required_cols: Vec<_> = (0..self.input.schema().len()).collect();
self.clone_with_input(self.input.prune_col(&required_cols, ctx))
let input = &self.core.input;
let required_cols: Vec<_> = (0..input.schema().len()).collect();
self.clone_with_input(input.prune_col(&required_cols, ctx))
.into()
}
}
Expand All @@ -161,8 +110,9 @@ impl PredicatePushdown for LogicalDelete {
impl ToBatch for LogicalDelete {
fn to_batch(&self) -> Result<PlanRef> {
let new_input = self.input().to_batch()?;
let new_logical = self.clone_with_input(new_input);
Ok(BatchDelete::new(new_logical).into())
let mut core = self.core.clone();
core.input = new_input;
Ok(BatchDelete::new(core).into())
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/planner/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use risingwave_common::error::Result;

use super::Planner;
use crate::binder::BoundDelete;
use crate::optimizer::plan_node::{LogicalDelete, LogicalFilter, LogicalProject};
use crate::optimizer::plan_node::{generic, LogicalDelete, LogicalFilter, LogicalProject};
use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{PlanRef, PlanRoot};

Expand All @@ -30,13 +30,13 @@ impl Planner {
scan
};
let returning = !delete.returning_list.is_empty();
let mut plan: PlanRef = LogicalDelete::create(
let mut plan: PlanRef = LogicalDelete::from(generic::Delete::new(
input,
delete.table_name.clone(),
delete.table_id,
delete.table_version_id,
returning,
)?
))
.into();

if returning {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ impl BatchPlanFragmenter {
} else if let Some(update) = node.as_batch_update() {
Some(update.logical.table_id)
} else if let Some(delete) = node.as_batch_delete() {
Some(delete.logical.table_id())
Some(delete.logical.table_id)
} else {
node.inputs()
.into_iter()
Expand Down