Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(plan_node): create BatchPlanRef, simplify HopWindow nodes #9044

Merged
merged 3 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::generic::GenericPlanRef;
use crate::optimizer::property::Order;

pub trait BatchPlanRef: GenericPlanRef {
fn order(&self) -> &Order;
}
30 changes: 16 additions & 14 deletions src/frontend/src/optimizer/plan_node/batch_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::HopWindowNode;

use super::{
ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb,
ToDistributedBatch,
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
use crate::optimizer::plan_node::ToLocalBatch;
Expand All @@ -32,24 +31,25 @@ use crate::utils::ColIndexMappingRewriteExt;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchHopWindow {
pub base: PlanBase,
logical: LogicalHopWindow,
logical: generic::HopWindow<PlanRef>,
window_start_exprs: Vec<ExprImpl>,
window_end_exprs: Vec<ExprImpl>,
}

impl BatchHopWindow {
pub fn new(
logical: LogicalHopWindow,
logical: generic::HopWindow<PlanRef>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we need to change the name here later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To what?

window_start_exprs: Vec<ExprImpl>,
window_end_exprs: Vec<ExprImpl>,
) -> Self {
let ctx = logical.base.ctx.clone();
let base = PlanBase::new_logical_with_core(&logical);
let ctx = base.ctx;
let distribution = logical
.i2o_col_mapping()
.rewrite_provided_distribution(logical.input().distribution());
.rewrite_provided_distribution(logical.input.distribution());
let base = PlanBase::new_batch(
ctx,
logical.schema().clone(),
base.schema,
distribution,
logical.get_out_column_index_order(),
);
Expand All @@ -70,12 +70,14 @@ impl fmt::Display for BatchHopWindow {

impl PlanTreeNodeUnary for BatchHopWindow {
fn input(&self) -> PlanRef {
self.logical.input()
self.logical.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
let mut logical = self.logical.clone();
logical.input = input;
Self::new(
self.logical.clone_with_input(input),
logical,
self.window_start_exprs.clone(),
self.window_end_exprs.clone(),
)
Expand Down Expand Up @@ -105,7 +107,8 @@ impl ToDistributedBatch for BatchHopWindow {
let new_input = self
.input()
.to_distributed_with_required(required_order, &input_required)?;
let new_logical = self.logical.clone_with_input(new_input);
let mut new_logical = self.logical.clone();
new_logical.input = new_input;
let batch_plan = BatchHopWindow::new(
new_logical,
self.window_start_exprs.clone(),
Expand All @@ -119,12 +122,11 @@ impl ToDistributedBatch for BatchHopWindow {
impl ToBatchPb for BatchHopWindow {
fn to_batch_prost_body(&self) -> NodeBody {
NodeBody::HopWindow(HopWindowNode {
time_col: self.logical.core.time_col.index() as _,
window_slide: Some(self.logical.core.window_slide.into()),
window_size: Some(self.logical.core.window_size.into()),
time_col: self.logical.time_col.index() as _,
window_slide: Some(self.logical.window_slide.into()),
window_size: Some(self.logical.window_size.into()),
output_indices: self
.logical
.core
.output_indices
.iter()
.map(|&x| x as u32)
Expand Down
20 changes: 19 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use super::super::utils::IndicesDisplay;
use super::{GenericPlanNode, GenericPlanRef};
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::optimizer::plan_node::batch::BatchPlanRef;
use crate::optimizer::property::{FunctionalDependencySet, Order};
use crate::utils::ColIndexMappingRewriteExt;

/// [`HopWindow`] implements Hop Table Function.
Expand Down Expand Up @@ -118,7 +119,24 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
}
}

impl<PlanRef: BatchPlanRef> HopWindow<PlanRef> {
pub fn get_out_column_index_order(&self) -> Order {
self.i2o_col_mapping()
.rewrite_provided_order(self.input.order())
}
}

impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
pub fn output_window_start_col_idx(&self) -> Option<usize> {
self.internal2output_col_mapping()
.try_map(self.internal_window_start_col_idx())
}

pub fn output_window_end_col_idx(&self) -> Option<usize> {
self.internal2output_col_mapping()
.try_map(self.internal_window_end_col_idx())
}

pub fn into_parts(self) -> (PlanRef, InputRef, Interval, Interval, Interval, Vec<usize>) {
(
self.input,
Expand Down
73 changes: 19 additions & 54 deletions src/frontend/src/optimizer/plan_node/logical_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ use crate::expr::{ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
};
use crate::optimizer::property::Order;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};

/// `LogicalHopWindow` implements Hop Table Function.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LogicalHopWindow {
pub base: PlanBase,
pub(super) core: generic::HopWindow<PlanRef>,
core: generic::HopWindow<PlanRef>,
}

impl LogicalHopWindow {
Expand Down Expand Up @@ -118,44 +117,22 @@ impl LogicalHopWindow {
.into()
}

pub fn internal_window_start_col_idx(&self) -> usize {
self.core.internal_window_start_col_idx()
}

pub fn internal_window_end_col_idx(&self) -> usize {
self.core.internal_window_end_col_idx()
}

pub fn output_window_start_col_idx(&self) -> Option<usize> {
self.internal2output_col_mapping()
.try_map(self.internal_window_start_col_idx())
self.core.output_window_start_col_idx()
}

pub fn output_window_end_col_idx(&self) -> Option<usize> {
self.internal2output_col_mapping()
.try_map(self.internal_window_end_col_idx())
self.core.output_window_end_col_idx()
}

pub fn o2i_col_mapping(&self) -> ColIndexMapping {
self.core.o2i_col_mapping()
}

pub fn i2o_col_mapping(&self) -> ColIndexMapping {
self.core.i2o_col_mapping()
}

pub fn internal_column_num(&self) -> usize {
self.core.internal_column_num()
}

pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
self.core.output2internal_col_mapping()
}

pub fn internal2output_col_mapping(&self) -> ColIndexMapping {
self.core.internal2output_col_mapping()
}

pub fn clone_with_output_indices(&self, output_indices: Vec<usize>) -> Self {
Self::new(
self.input(),
Expand All @@ -167,20 +144,6 @@ impl LogicalHopWindow {
)
}

pub fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
self.core.fmt_with_name(f, name)
}

pub fn fmt_fields_with_builder(&self, builder: &mut fmt::DebugStruct<'_, '_>) {
self.core.fmt_fields_with_builder(builder)
}

/// Map the order of the input to use the updated indices
pub fn get_out_column_index_order(&self) -> Order {
self.i2o_col_mapping()
.rewrite_provided_order(self.input().order())
}

/// Get output indices
pub fn output_indices(&self) -> &Vec<usize> {
&self.core.output_indices
Expand Down Expand Up @@ -223,10 +186,10 @@ impl PlanTreeNodeUnary for LogicalHopWindow {
Some(new_idx)
}
None => {
if idx == self.internal_window_start_col_idx() {
if idx == self.core.internal_window_start_col_idx() {
columns_to_be_kept.push(i);
Some(input.schema().len())
} else if idx == self.internal_window_end_col_idx() {
} else if idx == self.core.internal_window_end_col_idx() {
columns_to_be_kept.push(i);
Some(input.schema().len() + 1)
} else {
Expand Down Expand Up @@ -257,7 +220,7 @@ impl_plan_tree_node_for_unary! {LogicalHopWindow}

impl fmt::Display for LogicalHopWindow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.fmt_with_name(f, "LogicalHopWindow")
self.core.fmt_with_name(f, "LogicalHopWindow")
}
}

Expand Down Expand Up @@ -296,9 +259,9 @@ impl ColPrunable for LogicalHopWindow {
if let Some(idx) = o2i.try_map(idx) {
Some(IndexType::Input(idx))
} else if let Some(idx) = output2internal.try_map(idx) {
if idx == self.internal_window_start_col_idx() {
if idx == self.core.internal_window_start_col_idx() {
Some(IndexType::WindowStart)
} else if idx == self.internal_window_end_col_idx() {
} else if idx == self.core.internal_window_end_col_idx() {
Some(IndexType::WindowEnd)
} else {
None
Expand All @@ -313,8 +276,8 @@ impl ColPrunable for LogicalHopWindow {
.iter()
.filter_map(|&idx| match idx {
IndexType::Input(x) => input_change.try_map(x),
IndexType::WindowStart => Some(new_hop.internal_window_start_col_idx()),
IndexType::WindowEnd => Some(new_hop.internal_window_end_col_idx()),
IndexType::WindowStart => Some(new_hop.core.internal_window_start_col_idx()),
IndexType::WindowEnd => Some(new_hop.core.internal_window_end_col_idx()),
})
.collect_vec()
};
Expand All @@ -334,8 +297,8 @@ impl PredicatePushdown for LogicalHopWindow {
) -> PlanRef {
let mut window_columns = FixedBitSet::with_capacity(self.schema().len());

let window_start_idx = self.internal_window_start_col_idx();
let window_end_idx = self.internal_window_end_col_idx();
let window_start_idx = self.core.internal_window_start_col_idx();
let window_end_idx = self.core.internal_window_end_col_idx();
for (i, v) in self.output_indices().iter().enumerate() {
if *v == window_start_idx || *v == window_end_idx {
window_columns.insert(i);
Expand All @@ -351,19 +314,21 @@ impl PredicatePushdown for LogicalHopWindow {
impl ToBatch for LogicalHopWindow {
fn to_batch(&self) -> Result<PlanRef> {
let new_input = self.input().to_batch()?;
let new_logical = self.clone_with_input(new_input);
let mut new_logical = self.core.clone();
new_logical.input = new_input;
let (window_start_exprs, window_end_exprs) =
new_logical.core.derive_window_start_and_end_exprs()?;
new_logical.derive_window_start_and_end_exprs()?;
Ok(BatchHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into())
}
}

impl ToStream for LogicalHopWindow {
fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
let new_input = self.input().to_stream(ctx)?;
let new_logical = self.clone_with_input(new_input);
let mut new_logical = self.core.clone();
new_logical.input = new_input;
let (window_start_exprs, window_end_exprs) =
new_logical.core.derive_window_start_and_end_exprs()?;
new_logical.derive_window_start_and_end_exprs()?;
Ok(StreamHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into())
}

Expand All @@ -382,7 +347,7 @@ impl ToStream for LogicalHopWindow {
{
output_indices.push(input.schema().len());
}
let i2o = self.i2o_col_mapping();
let i2o = self.core.i2o_col_mapping();
output_indices.extend(
input
.logical_pk()
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use risingwave_pb::stream_plan::StreamNode as StreamPlanPb;
use serde::Serialize;
use smallvec::SmallVec;

use self::batch::BatchPlanRef;
use self::generic::GenericPlanRef;
use self::stream::StreamPlanRef;
use super::property::{Distribution, FunctionalDependencySet, Order};
Expand Down Expand Up @@ -385,6 +386,12 @@ impl StreamPlanRef for PlanRef {
}
}

impl BatchPlanRef for PlanRef {
fn order(&self) -> &Order {
&self.plan_base().order
}
}

impl GenericPlanRef for PlanRef {
fn schema(&self) -> &Schema {
&self.plan_base().schema
Expand Down Expand Up @@ -594,6 +601,7 @@ pub use predicate_pushdown::*;
mod merge_eq_nodes;
pub use merge_eq_nodes::*;

pub mod batch;
pub mod generic;
pub mod stream;
pub mod stream_derive;
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/optimizer/plan_node/plan_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ impl stream::StreamPlanRef for PlanBase {
self.append_only
}
}
impl batch::BatchPlanRef for PlanBase {
fn order(&self) -> &Order {
&self.order
}
}
impl PlanBase {
pub fn new_logical(
ctx: OptimizerContextRef,
Expand Down
Loading