From d2b2b7e91afcfa345b9190d41107e512b47b0b9e Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Wed, 8 Mar 2023 00:10:36 +0800 Subject: [PATCH 01/12] fix for batch --- e2e_test/batch/basic/time_window_utc.slt.part | 29 +++ proto/batch_plan.proto | 2 + src/batch/src/executor/hop_window.rs | 177 +++++------------- src/expr/src/expr/test_utils.rs | 126 ++++++++++++- .../optimizer/plan_node/batch_hop_window.rs | 63 ++++++- .../optimizer/plan_node/logical_hop_window.rs | 101 +++++++++- 6 files changed, 354 insertions(+), 144 deletions(-) diff --git a/e2e_test/batch/basic/time_window_utc.slt.part b/e2e_test/batch/basic/time_window_utc.slt.part index da90438ecf2ce..749870e9abf1f 100644 --- a/e2e_test/batch/basic/time_window_utc.slt.part +++ b/e2e_test/batch/basic/time_window_utc.slt.part @@ -104,6 +104,35 @@ group by window_start, uid order by window_start, uid; 3 8 2022-01-01 10:45:00+00:00 3 8 2022-01-01 11:00:00+00:00 +statement ok +insert into t1 values + (9, 1, 4, '2022-01-02 10:00:00Z'), + (10, 3, 3, '2022-01-03 10:05:00Z'), + (11, 2, 2, '2022-01-04 10:14:00Z'), + (12, 1, 1, '2022-01-05 10:22:00Z'); + +query IIT +select uid, sum(v), window_start +from hop(t1, created_at, interval '1' day, interval '2' day) +group by window_start, uid order by window_start, uid; +---- +1 11 2021-12-31 00:00:00+00:00 +2 9 2021-12-31 00:00:00+00:00 +3 16 2021-12-31 00:00:00+00:00 +1 11 2022-01-01 00:00:00+00:00 +1 15 2022-01-01 00:00:00+00:00 +2 9 2022-01-01 00:00:00+00:00 +3 16 2022-01-01 00:00:00+00:00 +3 16 2022-01-01 00:00:00+00:00 +1 4 2022-01-02 00:00:00+00:00 +3 3 2022-01-02 00:00:00+00:00 +2 2 2022-01-03 00:00:00+00:00 +3 3 2022-01-03 00:00:00+00:00 +1 1 2022-01-04 00:00:00+00:00 +2 2 2022-01-04 00:00:00+00:00 +1 1 2022-01-05 00:00:00+00:00 + + statement error select * from hop(t1, created_at, interval '0', interval '1'); diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 4a9202fd267f2..329a68f2d6cfc 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -187,6 +187,8 @@ message HopWindowNode { data.IntervalUnit window_slide = 2; data.IntervalUnit window_size = 3; repeated uint32 output_indices = 4; + repeated expr.ExprNode window_start_exprs = 5; + repeated expr.ExprNode window_end_exprs = 6; } message TableFunctionNode { diff --git a/src/batch/src/executor/hop_window.rs b/src/batch/src/executor/hop_window.rs index 675ada448bea7..3bffe5250967f 100644 --- a/src/batch/src/executor/hop_window.rs +++ b/src/batch/src/executor/hop_window.rs @@ -14,21 +14,17 @@ use std::num::NonZeroUsize; -use anyhow::anyhow; use futures_async_stream::try_stream; use itertools::Itertools; -use num_traits::CheckedSub; use risingwave_common::array::column::Column; use risingwave_common::array::{DataChunk, Vis}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::{DataType, IntervalUnit, ScalarImpl}; -use risingwave_expr::expr::{new_binary_expr, Expression, InputRefExpression, LiteralExpression}; +use risingwave_common::types::{DataType, IntervalUnit}; +use risingwave_expr::expr::{build_from_prost, BoxedExpression}; use risingwave_expr::ExprError; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::expr::expr_node; -use crate::error::BatchError; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; @@ -40,6 +36,8 @@ pub struct HopWindowExecutor { time_col_idx: usize, window_slide: IntervalUnit, window_size: IntervalUnit, + window_start_exprs: Vec, + window_end_exprs: Vec, output_indices: Vec, } @@ -61,10 +59,22 @@ impl BoxedExecutorBuilder for HopWindowExecutor { let output_indices = hop_window_node .get_output_indices() .iter() - .copied() + .cloned() .map(|x| x as usize) .collect_vec(); + let window_start_exprs: Vec<_> = hop_window_node + .get_window_start_exprs() + .iter() + .map(build_from_prost) + .try_collect()?; + let window_end_exprs: Vec<_> = hop_window_node + .get_window_end_exprs() + .iter() + .map(build_from_prost) + .try_collect()?; + assert_eq!(window_start_exprs.len(), window_end_exprs.len()); + let time_col_data_type = child.schema().fields()[time_col].data_type(); let output_type = DataType::window_of(&time_col_data_type).unwrap(); let original_schema: Schema = child @@ -88,12 +98,15 @@ impl BoxedExecutorBuilder for HopWindowExecutor { window_slide, window_size, source.plan_node().get_identity().clone(), + window_start_exprs, + window_end_exprs, output_indices, ))) } } impl HopWindowExecutor { + #[allow(clippy::too_many_arguments)] fn new( child: BoxedExecutor, schema: Schema, @@ -101,6 +114,8 @@ impl HopWindowExecutor { window_slide: IntervalUnit, window_size: IntervalUnit, identity: String, + window_start_exprs: Vec, + window_end_exprs: Vec, output_indices: Vec, ) -> Self { Self { @@ -110,6 +125,8 @@ impl HopWindowExecutor { time_col_idx, window_slide, window_size, + window_start_exprs, + window_end_exprs, output_indices, } } @@ -153,80 +170,8 @@ impl HopWindowExecutor { .get(); let time_col_data_type = child.schema().fields()[time_col_idx].data_type(); - let output_type = DataType::window_of(&time_col_data_type).unwrap(); - let time_col_ref = InputRefExpression::new(time_col_data_type, self.time_col_idx).boxed(); - - let window_slide_expr = - LiteralExpression::new(DataType::Interval, Some(ScalarImpl::Interval(window_slide))) - .boxed(); - - // The first window_start of hop window should be: - // tumble_start(`time_col` - (`window_size` - `window_slide`), `window_slide`). - // Let's pre calculate (`window_size` - `window_slide`). - let window_size_sub_slide = window_size.checked_sub(&window_slide).ok_or_else(|| { - BatchError::Internal(anyhow!(format!( - "window_size {} cannot be subtracted by window_slide {}", - window_size, window_slide - ))) - })?; - let window_size_sub_slide_expr = LiteralExpression::new( - DataType::Interval, - Some(ScalarImpl::Interval(window_size_sub_slide)), - ) - .boxed(); - let hop_expr = new_binary_expr( - expr_node::Type::TumbleStart, - output_type.clone(), - new_binary_expr( - expr_node::Type::Subtract, - output_type.clone(), - time_col_ref, - window_size_sub_slide_expr, - )?, - window_slide_expr, - )?; - - let mut window_start_exprs = Vec::with_capacity(units); - let mut window_end_exprs = Vec::with_capacity(units); + let _output_type = DataType::window_of(&time_col_data_type).unwrap(); - for i in 0..units { - let window_start_offset = window_slide.checked_mul_int(i).ok_or_else(|| { - BatchError::Internal(anyhow!(format!( - "window_slide {} cannot be multiplied by {}", - window_slide, i - ))) - })?; - let window_start_offset_expr = LiteralExpression::new( - DataType::Interval, - Some(ScalarImpl::Interval(window_start_offset)), - ) - .boxed(); - let window_end_offset = window_slide.checked_mul_int(i + units).ok_or_else(|| { - BatchError::Internal(anyhow!(format!( - "window_slide {} cannot be multiplied by {}", - window_slide, i - ))) - })?; - let window_end_offset_expr = LiteralExpression::new( - DataType::Interval, - Some(ScalarImpl::Interval(window_end_offset)), - ) - .boxed(); - let window_start_expr = new_binary_expr( - expr_node::Type::Add, - output_type.clone(), - InputRefExpression::new(output_type.clone(), 0).boxed(), - window_start_offset_expr, - )?; - window_start_exprs.push(window_start_expr); - let window_end_expr = new_binary_expr( - expr_node::Type::Add, - output_type.clone(), - InputRefExpression::new(output_type.clone(), 0).boxed(), - window_end_offset_expr, - )?; - window_end_exprs.push(window_end_expr); - } let window_start_col_index = child.schema().len(); let window_end_col_index = child.schema().len() + 1; let contains_window_start = output_indices.contains(&window_start_col_index); @@ -234,27 +179,16 @@ impl HopWindowExecutor { #[for_await] for data_chunk in child.execute() { let data_chunk = data_chunk?; - let hop_start = hop_expr.eval(&data_chunk)?; - let len = hop_start.len(); - let hop_start_chunk = DataChunk::new(vec![Column::new(hop_start)], len); - let (origin_cols, visibility) = data_chunk.into_parts(); - let len = match visibility { - Vis::Compact(len) => len, - Vis::Bitmap(_) => { - return Err(BatchError::Internal(anyhow!( - "Input array should already been compacted!" - )) - .into()); - } - }; + assert!(matches!(data_chunk.vis(), Vis::Compact(_))); + let len = data_chunk.cardinality(); for i in 0..units { let window_start_col = if contains_window_start { - Some(window_start_exprs[i].eval(&hop_start_chunk)?) + Some(self.window_start_exprs[i].eval(&data_chunk)?) } else { None }; let window_end_col = if contains_window_end { - Some(window_end_exprs[i].eval(&hop_start_chunk)?) + Some(self.window_end_exprs[i].eval(&data_chunk)?) } else { None }; @@ -262,7 +196,7 @@ impl HopWindowExecutor { .iter() .filter_map(|&idx| { if idx < window_start_col_index { - Some(origin_cols[idx].clone()) + Some(data_chunk.column_at(idx).clone()) } else if idx == window_start_col_index { Some(Column::new(window_start_col.clone().unwrap())) } else if idx == window_end_col_index { @@ -284,12 +218,12 @@ mod tests { use risingwave_common::array::{DataChunk, DataChunkTestExt}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; + use risingwave_expr::expr::test_utils::make_hop_window_expression; use super::*; use crate::executor::test_utils::MockExecutor; - #[tokio::test] - async fn test_execute() { + fn create_executor() -> Box { let field1 = Field::unnamed(DataType::Int64); let field2 = Field::unnamed(DataType::Int64); let field3 = Field::with_name(DataType::Timestamp, "created_at"); @@ -313,16 +247,26 @@ mod tests { let window_slide = IntervalUnit::from_minutes(15); let window_size = IntervalUnit::from_minutes(30); + let (window_start_exprs, window_end_exprs) = + make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap(); let default_indices = (0..schema.len() + 2).collect_vec(); - let executor = Box::new(HopWindowExecutor::new( + + Box::new(HopWindowExecutor::new( Box::new(mock_executor), schema, 2, window_slide, window_size, "test".to_string(), + window_start_exprs, + window_end_exprs, default_indices, - )); + )) + } + + #[tokio::test] + async fn test_execute() { + let executor = create_executor(); let mut stream = executor.execute(); // TODO: add more test infra to reduce the duplicated codes below. @@ -363,38 +307,7 @@ mod tests { } #[tokio::test] async fn test_output_indices() { - let field1 = Field::unnamed(DataType::Int64); - let field2 = Field::unnamed(DataType::Int64); - let field3 = Field::with_name(DataType::Timestamp, "created_at"); - let schema = Schema::new(vec![field1, field2, field3]); - - let chunk = DataChunk::from_pretty( - &"I I TS - 1 1 ^10:00:00 - 2 3 ^10:05:00 - 3 2 ^10:14:00 - 4 1 ^10:22:00 - 5 3 ^10:33:00 - 6 2 ^10:42:00 - 7 1 ^10:51:00 - 8 3 ^11:02:00" - .replace('^', "2022-2-2T"), - ); - - let mut mock_executor = MockExecutor::new(schema.clone()); - mock_executor.add(chunk); - - let window_slide = IntervalUnit::from_minutes(15); - let window_size = IntervalUnit::from_minutes(30); - let executor = Box::new(HopWindowExecutor::new( - Box::new(mock_executor), - schema, - 2, - window_slide, - window_size, - "test".to_string(), - vec![1, 3, 4, 2], - )); + let executor = create_executor(); let mut stream = executor.execute(); // TODO: add more test infra to reduce the duplicated codes below. diff --git a/src/expr/src/expr/test_utils.rs b/src/expr/src/expr/test_utils.rs index d3107e2faf01d..26cce4c2374a3 100644 --- a/src/expr/src/expr/test_utils.rs +++ b/src/expr/src/expr/test_utils.rs @@ -14,22 +14,28 @@ //! Helper functions to construct prost [`ExprNode`] for test. -use risingwave_common::types::ScalarImpl; +use std::num::NonZeroUsize; + +use num_traits::CheckedSub; +use risingwave_common::types::{DataType, IntervalUnit, ScalarImpl}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::value_encoding::serialize_datum; use risingwave_pb::data::data_type::TypeName; -use risingwave_pb::data::{DataType as ProstDataType, DataType, Datum as ProstDatum}; +use risingwave_pb::data::{DataType as ProstDataType, Datum as ProstDatum}; use risingwave_pb::expr::expr_node::Type::{Field, InputRef}; -use risingwave_pb::expr::expr_node::{RexNode, Type}; +use risingwave_pb::expr::expr_node::{self, RexNode, Type}; use risingwave_pb::expr::{ExprNode, FunctionCall, InputRefExpr}; +use super::{new_binary_expr, BoxedExpression, Expression, InputRefExpression, LiteralExpression}; +use crate::{ExprError, Result}; + pub fn make_expression(kind: Type, rets: &[TypeName], indices: &[i32]) -> ExprNode { let mut exprs = Vec::new(); for (idx, ret) in indices.iter().zip_eq_fast(rets.iter()) { exprs.push(make_input_ref(*idx, *ret)); } let function_call = FunctionCall { children: exprs }; - let return_type = DataType { + let return_type = ProstDataType { type_name: TypeName::Timestamp as i32, ..Default::default() }; @@ -43,7 +49,7 @@ pub fn make_expression(kind: Type, rets: &[TypeName], indices: &[i32]) -> ExprNo pub fn make_input_ref(idx: i32, ret: TypeName) -> ExprNode { ExprNode { expr_type: InputRef as i32, - return_type: Some(DataType { + return_type: Some(ProstDataType { type_name: ret as i32, ..Default::default() }), @@ -87,3 +93,113 @@ pub fn make_field_function(children: Vec, ret: TypeName) -> ExprNode { rex_node: Some(RexNode::FuncCall(FunctionCall { children })), } } + +pub fn make_hop_window_expression( + time_col_data_type: DataType, + time_col_idx: usize, + window_size: IntervalUnit, + window_slide: IntervalUnit, +) -> Result<(Vec, Vec)> { + let units = window_size + .exact_div(&window_slide) + .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?)) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_size {} cannot be divided by window_slide {}", + window_size, window_slide + ), + })? + .get(); + + let output_type = DataType::window_of(&time_col_data_type).unwrap(); + let get_hop_window_start = || -> Result { + let time_col_ref = InputRefExpression::new(time_col_data_type, time_col_idx).boxed(); + + let window_slide_expr = + LiteralExpression::new(DataType::Interval, Some(ScalarImpl::Interval(window_slide))) + .boxed(); + + // The first window_start of hop window should be: + // tumble_start(`time_col` - (`window_size` - `window_slide`), `window_slide`). + // Let's pre calculate (`window_size` - `window_slide`). + let window_size_sub_slide = + window_size + .checked_sub(&window_slide) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_size {} cannot be subtracted by window_slide {}", + window_size, window_slide + ), + })?; + let window_size_sub_slide_expr = LiteralExpression::new( + DataType::Interval, + Some(ScalarImpl::Interval(window_size_sub_slide)), + ) + .boxed(); + + let hop_start = new_binary_expr( + expr_node::Type::TumbleStart, + output_type.clone(), + new_binary_expr( + expr_node::Type::Subtract, + output_type.clone(), + time_col_ref, + window_size_sub_slide_expr, + )?, + window_slide_expr, + )?; + Ok(hop_start) + }; + + let mut window_start_exprs = Vec::with_capacity(units); + let mut window_end_exprs = Vec::with_capacity(units); + for i in 0..units { + let window_start_offset = + window_slide + .checked_mul_int(i) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_slide {} cannot be multiplied by {}", + window_slide, i + ), + })?; + let window_start_offset_expr = LiteralExpression::new( + DataType::Interval, + Some(ScalarImpl::Interval(window_start_offset)), + ) + .boxed(); + let window_end_offset = + window_slide + .checked_mul_int(i + units) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_slide {} cannot be multiplied by {}", + window_slide, i + ), + })?; + let window_end_offset_expr = LiteralExpression::new( + DataType::Interval, + Some(ScalarImpl::Interval(window_end_offset)), + ) + .boxed(); + let window_start_expr = new_binary_expr( + expr_node::Type::Add, + output_type.clone(), + get_hop_window_start.clone()()?, + window_start_offset_expr, + )?; + window_start_exprs.push(window_start_expr); + let window_end_expr = new_binary_expr( + expr_node::Type::Add, + output_type.clone(), + get_hop_window_start.clone()()?, + window_end_offset_expr, + )?; + window_end_exprs.push(window_end_expr); + } + Ok((window_start_exprs, window_end_exprs)) +} diff --git a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs index 23b06e62a1c8b..ce4d3136db0bb 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs @@ -22,6 +22,7 @@ use super::{ ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, }; +use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Order, RequiredDist}; use crate::utils::ColIndexMappingRewriteExt; @@ -32,10 +33,16 @@ use crate::utils::ColIndexMappingRewriteExt; pub struct BatchHopWindow { pub base: PlanBase, logical: LogicalHopWindow, + window_start_exprs: Vec, + window_end_exprs: Vec, } impl BatchHopWindow { - pub fn new(logical: LogicalHopWindow) -> Self { + pub fn new( + logical: LogicalHopWindow, + window_start_exprs: Vec, + window_end_exprs: Vec, + ) -> Self { let ctx = logical.base.ctx.clone(); let distribution = logical .i2o_col_mapping() @@ -46,7 +53,12 @@ impl BatchHopWindow { distribution, logical.get_out_column_index_order(), ); - BatchHopWindow { base, logical } + BatchHopWindow { + base, + logical, + window_start_exprs, + window_end_exprs, + } } } @@ -62,7 +74,11 @@ impl PlanTreeNodeUnary for BatchHopWindow { } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(self.logical.clone_with_input(input)) + Self::new( + self.logical.clone_with_input(input), + self.window_start_exprs.clone(), + self.window_end_exprs.clone(), + ) } } @@ -90,7 +106,11 @@ impl ToDistributedBatch for BatchHopWindow { .input() .to_distributed_with_required(required_order, &input_required)?; let new_logical = self.logical.clone_with_input(new_input); - let batch_plan = BatchHopWindow::new(new_logical); + let batch_plan = BatchHopWindow::new( + new_logical, + self.window_start_exprs.clone(), + self.window_end_exprs.clone(), + ); let batch_plan = required_order.enforce_if_not_satisfies(batch_plan.into())?; required_dist.enforce_if_not_satisfies(batch_plan, required_order) } @@ -109,6 +129,18 @@ impl ToBatchProst for BatchHopWindow { .iter() .map(|&x| x as u32) .collect(), + window_start_exprs: self + .window_start_exprs + .clone() + .iter() + .map(|x| x.to_expr_proto()) + .collect(), + window_end_exprs: self + .window_end_exprs + .clone() + .iter() + .map(|x| x.to_expr_proto()) + .collect(), }) } } @@ -120,4 +152,25 @@ impl ToLocalBatch for BatchHopWindow { } } -impl ExprRewritable for BatchHopWindow {} +impl ExprRewritable for BatchHopWindow { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical.clone(), + self.window_start_exprs + .clone() + .into_iter() + .map(|e| r.rewrite_expr(e)) + .collect(), + self.window_end_exprs + .clone() + .into_iter() + .map(|e| r.rewrite_expr(e)) + .collect(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index b0ef811890a04..d176d997a6825 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -13,19 +13,21 @@ // limitations under the License. use std::fmt; +use std::num::NonZeroUsize; use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::Result; use risingwave_common::types::{DataType, IntervalUnit}; +use risingwave_expr::ExprError; use super::generic::GenericPlanNode; use super::{ gen_filter_and_pushdown, generic, BatchHopWindow, ColPrunable, ExprRewritable, LogicalFilter, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamHopWindow, ToBatch, ToStream, }; -use crate::expr::{ExprType, FunctionCall, InputRef}; +use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, Literal}; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; @@ -362,11 +364,106 @@ impl PredicatePushdown for LogicalHopWindow { } } +pub fn derive_window_start_and_end_exprs( + logical_hop: &LogicalHopWindow, +) -> Result<(Vec, Vec)> { + let generic::HopWindow:: { + window_size, + window_slide, + time_col, + .. + } = &logical_hop.core; + let units = window_size + .exact_div(window_slide) + .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?)) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_size {} cannot be divided by window_slide {}", + window_size, window_slide + ), + })? + .get(); + let window_size_expr = Literal::new(Some((*window_size).into()), DataType::Interval).into(); + let window_slide_expr: ExprImpl = + Literal::new(Some((*window_slide).into()), DataType::Interval).into(); + let window_size_sub_slide = FunctionCall::new( + ExprType::Subtract, + vec![window_size_expr, window_slide_expr.clone()], + )? + .into(); + + let time_col_shifted = FunctionCall::new( + ExprType::Subtract, + vec![ + ExprImpl::InputRef(Box::new(time_col.clone())), + window_size_sub_slide, + ], + )? + .into(); + + let hop_start: ExprImpl = FunctionCall::new( + ExprType::TumbleStart, + vec![time_col_shifted, window_slide_expr], + )? + .into(); + + let mut window_start_exprs = Vec::with_capacity(units); + let mut window_end_exprs = Vec::with_capacity(units); + for i in 0..units { + { + let window_start_offset = + window_slide + .checked_mul_int(i) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_slide {} cannot be multiplied by {}", + window_slide, i + ), + })?; + let window_start_offset_expr = + Literal::new(Some(window_start_offset.into()), DataType::Interval).into(); + let window_start_expr = FunctionCall::new( + ExprType::Add, + vec![hop_start.clone(), window_start_offset_expr], + )? + .into(); + window_start_exprs.push(window_start_expr); + } + { + let window_end_offset = + window_slide + .checked_mul_int(i + units) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_slide {} cannot be multiplied by {}", + window_slide, + i + units + ), + })?; + let window_end_offset_expr = + Literal::new(Some(window_end_offset.into()), DataType::Interval).into(); + let window_end_expr = FunctionCall::new( + ExprType::Add, + vec![hop_start.clone(), window_end_offset_expr], + )? + .into(); + window_end_exprs.push(window_end_expr); + } + } + assert_eq!(window_start_exprs.len(), window_end_exprs.len()); + Ok((window_start_exprs, window_end_exprs)) +} + impl ToBatch for LogicalHopWindow { fn to_batch(&self) -> Result { let new_input = self.input().to_batch()?; let new_logical = self.clone_with_input(new_input); - Ok(BatchHopWindow::new(new_logical).into()) + let (window_start_exprs, window_end_exprs) = + derive_window_start_and_end_exprs(&new_logical)?; + Ok(BatchHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into()) } } From 8bf75439802af3ac342b1a492effda2d10399b3b Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Wed, 8 Mar 2023 10:03:29 +0800 Subject: [PATCH 02/12] minor --- e2e_test/batch/basic/time_window_utc.slt.part | 2 -- src/batch/src/executor/hop_window.rs | 10 +++++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/e2e_test/batch/basic/time_window_utc.slt.part b/e2e_test/batch/basic/time_window_utc.slt.part index 749870e9abf1f..7f9c2080ad651 100644 --- a/e2e_test/batch/basic/time_window_utc.slt.part +++ b/e2e_test/batch/basic/time_window_utc.slt.part @@ -119,11 +119,9 @@ group by window_start, uid order by window_start, uid; 1 11 2021-12-31 00:00:00+00:00 2 9 2021-12-31 00:00:00+00:00 3 16 2021-12-31 00:00:00+00:00 -1 11 2022-01-01 00:00:00+00:00 1 15 2022-01-01 00:00:00+00:00 2 9 2022-01-01 00:00:00+00:00 3 16 2022-01-01 00:00:00+00:00 -3 16 2022-01-01 00:00:00+00:00 1 4 2022-01-02 00:00:00+00:00 3 3 2022-01-02 00:00:00+00:00 2 2 2022-01-03 00:00:00+00:00 diff --git a/src/batch/src/executor/hop_window.rs b/src/batch/src/executor/hop_window.rs index 3bffe5250967f..0e849b4fa2961 100644 --- a/src/batch/src/executor/hop_window.rs +++ b/src/batch/src/executor/hop_window.rs @@ -223,7 +223,7 @@ mod tests { use super::*; use crate::executor::test_utils::MockExecutor; - fn create_executor() -> Box { + fn create_executor(output_indices: Vec) -> Box { let field1 = Field::unnamed(DataType::Int64); let field2 = Field::unnamed(DataType::Int64); let field3 = Field::with_name(DataType::Timestamp, "created_at"); @@ -249,7 +249,6 @@ mod tests { let window_size = IntervalUnit::from_minutes(30); let (window_start_exprs, window_end_exprs) = make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap(); - let default_indices = (0..schema.len() + 2).collect_vec(); Box::new(HopWindowExecutor::new( Box::new(mock_executor), @@ -260,13 +259,14 @@ mod tests { "test".to_string(), window_start_exprs, window_end_exprs, - default_indices, + output_indices, )) } #[tokio::test] async fn test_execute() { - let executor = create_executor(); + let default_indices = (0..3 + 2).collect_vec(); + let executor = create_executor(default_indices); let mut stream = executor.execute(); // TODO: add more test infra to reduce the duplicated codes below. @@ -307,7 +307,7 @@ mod tests { } #[tokio::test] async fn test_output_indices() { - let executor = create_executor(); + let executor = create_executor(vec![1, 3, 4, 2]); let mut stream = executor.execute(); // TODO: add more test infra to reduce the duplicated codes below. From 71ae98755528628c3611b735af5fd42904a343cf Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Wed, 8 Mar 2023 10:40:56 +0800 Subject: [PATCH 03/12] minor --- src/frontend/planner_test/tests/testdata/nexmark.yaml | 4 ++-- src/frontend/planner_test/tests/testdata/nexmark_source.yaml | 4 ++-- src/frontend/planner_test/tests/testdata/share.yaml | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/nexmark.yaml b/src/frontend/planner_test/tests/testdata/nexmark.yaml index cd80fd6095a06..926cc42f3d888 100644 --- a/src/frontend/planner_test/tests/testdata/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/nexmark.yaml @@ -292,7 +292,7 @@ └─StreamHashJoin { type: Inner, predicate: window_start = window_start, output: all } ├─StreamExchange { dist: HashShard(window_start) } | └─StreamProject { exprs: [bid.auction, count, window_start] } - | └─StreamShare { id = 956 } + | └─StreamShare { id = 960 } | └─StreamProject { exprs: [bid.auction, window_start, count] } | └─StreamAppendOnlyHashAgg { group_key: [bid.auction, window_start], aggs: [count, count] } | └─StreamExchange { dist: HashShard(bid.auction, window_start) } @@ -303,7 +303,7 @@ └─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] } └─StreamExchange { dist: HashShard(window_start) } └─StreamProject { exprs: [bid.auction, window_start, count] } - └─StreamShare { id = 956 } + └─StreamShare { id = 960 } └─StreamProject { exprs: [bid.auction, window_start, count] } └─StreamAppendOnlyHashAgg { group_key: [bid.auction, window_start], aggs: [count, count] } └─StreamExchange { dist: HashShard(bid.auction, window_start) } diff --git a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml index c526eac5e0aee..2c7d65c679866 100644 --- a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml +++ b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml @@ -344,7 +344,7 @@ └─StreamHashJoin { type: Inner, predicate: window_start = window_start, output: all } ├─StreamExchange { dist: HashShard(window_start) } | └─StreamProject { exprs: [auction, count, window_start] } - | └─StreamShare { id = 1064 } + | └─StreamShare { id = 1068 } | └─StreamProject { exprs: [auction, window_start, count] } | └─StreamAppendOnlyHashAgg { group_key: [auction, window_start], aggs: [count, count] } | └─StreamExchange { dist: HashShard(auction, window_start) } @@ -357,7 +357,7 @@ └─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] } └─StreamExchange { dist: HashShard(window_start) } └─StreamProject { exprs: [auction, window_start, count] } - └─StreamShare { id = 1064 } + └─StreamShare { id = 1068 } └─StreamProject { exprs: [auction, window_start, count] } └─StreamAppendOnlyHashAgg { group_key: [auction, window_start], aggs: [count, count] } └─StreamExchange { dist: HashShard(auction, window_start) } diff --git a/src/frontend/planner_test/tests/testdata/share.yaml b/src/frontend/planner_test/tests/testdata/share.yaml index bccf8da8450a0..db3e4e0665096 100644 --- a/src/frontend/planner_test/tests/testdata/share.yaml +++ b/src/frontend/planner_test/tests/testdata/share.yaml @@ -117,7 +117,7 @@ └─StreamHashJoin { type: Inner, predicate: window_start = window_start, output: all } ├─StreamExchange { dist: HashShard(window_start) } | └─StreamProject { exprs: [auction, count, window_start] } - | └─StreamShare { id = 1064 } + | └─StreamShare { id = 1068 } | └─StreamProject { exprs: [auction, window_start, count] } | └─StreamAppendOnlyHashAgg { group_key: [auction, window_start], aggs: [count, count] } | └─StreamExchange { dist: HashShard(auction, window_start) } @@ -130,7 +130,7 @@ └─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] } └─StreamExchange { dist: HashShard(window_start) } └─StreamProject { exprs: [auction, window_start, count] } - └─StreamShare { id = 1064 } + └─StreamShare { id = 1068 } └─StreamProject { exprs: [auction, window_start, count] } └─StreamAppendOnlyHashAgg { group_key: [auction, window_start], aggs: [count, count] } └─StreamExchange { dist: HashShard(auction, window_start) } From dd4aa765e71daca1fc217eda49adec0ddf2c4827 Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Wed, 8 Mar 2023 12:22:16 +0800 Subject: [PATCH 04/12] minor --- src/batch/src/executor/hop_window.rs | 3 --- src/expr/src/expr/test_utils.rs | 5 +++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/batch/src/executor/hop_window.rs b/src/batch/src/executor/hop_window.rs index 3facd65602c15..8e12f67f1af99 100644 --- a/src/batch/src/executor/hop_window.rs +++ b/src/batch/src/executor/hop_window.rs @@ -169,9 +169,6 @@ impl HopWindowExecutor { })? .get(); - let time_col_data_type = child.schema().fields()[time_col_idx].data_type(); - let _output_type = DataType::window_of(&time_col_data_type).unwrap(); - let window_start_col_index = child.schema().len(); let window_end_col_index = child.schema().len() + 1; let contains_window_start = output_indices.contains(&window_start_col_index); diff --git a/src/expr/src/expr/test_utils.rs b/src/expr/src/expr/test_utils.rs index da5499199e811..793d7f0934e67 100644 --- a/src/expr/src/expr/test_utils.rs +++ b/src/expr/src/expr/test_utils.rs @@ -24,9 +24,10 @@ use risingwave_pb::data::data_type::TypeName; use risingwave_pb::data::{DataType as ProstDataType, Datum as ProstDatum}; use risingwave_pb::expr::expr_node::Type::{Field, InputRef}; use risingwave_pb::expr::expr_node::{self, RexNode, Type}; -use risingwave_pb::expr::{ExprNode, FunctionCall, InputRefExpr}; +use risingwave_pb::expr::{ExprNode, FunctionCall}; -use super::{new_binary_expr, BoxedExpression, Expression, InputRefExpression, LiteralExpression}; +use crate::ExprError; +use super::{new_binary_expr, Result, BoxedExpression, Expression, InputRefExpression, LiteralExpression}; pub fn make_expression(kind: Type, rets: &[TypeName], indices: &[usize]) -> ExprNode { let mut exprs = Vec::new(); From 0dbfa3640019fe8a58a8a0a4411b65612b7e05e9 Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Wed, 8 Mar 2023 12:24:33 +0800 Subject: [PATCH 05/12] minor --- dashboard/proto/gen/batch_plan.ts | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/dashboard/proto/gen/batch_plan.ts b/dashboard/proto/gen/batch_plan.ts index ca3aadd92a16f..ab9083be3eead 100644 --- a/dashboard/proto/gen/batch_plan.ts +++ b/dashboard/proto/gen/batch_plan.ts @@ -216,6 +216,8 @@ export interface HopWindowNode { windowSlide: IntervalUnit | undefined; windowSize: IntervalUnit | undefined; outputIndices: number[]; + windowStartExprs: ExprNode[]; + windowEndExprs: ExprNode[]; } export interface TableFunctionNode { @@ -1373,7 +1375,14 @@ export const SortMergeJoinNode = { }; function createBaseHopWindowNode(): HopWindowNode { - return { timeCol: 0, windowSlide: undefined, windowSize: undefined, outputIndices: [] }; + return { + timeCol: 0, + windowSlide: undefined, + windowSize: undefined, + outputIndices: [], + windowStartExprs: [], + windowEndExprs: [], + }; } export const HopWindowNode = { @@ -1383,6 +1392,12 @@ export const HopWindowNode = { windowSlide: isSet(object.windowSlide) ? IntervalUnit.fromJSON(object.windowSlide) : undefined, windowSize: isSet(object.windowSize) ? IntervalUnit.fromJSON(object.windowSize) : undefined, outputIndices: Array.isArray(object?.outputIndices) ? object.outputIndices.map((e: any) => Number(e)) : [], + windowStartExprs: Array.isArray(object?.windowStartExprs) + ? object.windowStartExprs.map((e: any) => ExprNode.fromJSON(e)) + : [], + windowEndExprs: Array.isArray(object?.windowEndExprs) + ? object.windowEndExprs.map((e: any) => ExprNode.fromJSON(e)) + : [], }; }, @@ -1398,6 +1413,16 @@ export const HopWindowNode = { } else { obj.outputIndices = []; } + if (message.windowStartExprs) { + obj.windowStartExprs = message.windowStartExprs.map((e) => e ? ExprNode.toJSON(e) : undefined); + } else { + obj.windowStartExprs = []; + } + if (message.windowEndExprs) { + obj.windowEndExprs = message.windowEndExprs.map((e) => e ? ExprNode.toJSON(e) : undefined); + } else { + obj.windowEndExprs = []; + } return obj; }, @@ -1411,6 +1436,8 @@ export const HopWindowNode = { ? IntervalUnit.fromPartial(object.windowSize) : undefined; message.outputIndices = object.outputIndices?.map((e) => e) || []; + message.windowStartExprs = object.windowStartExprs?.map((e) => ExprNode.fromPartial(e)) || []; + message.windowEndExprs = object.windowEndExprs?.map((e) => ExprNode.fromPartial(e)) || []; return message; }, }; From 1d582bf7f5cbde83663f0b5e86a7e29a9a9b4baf Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Wed, 8 Mar 2023 13:11:54 +0800 Subject: [PATCH 06/12] fix --- dashboard/proto/gen/stream_plan.ts | 29 ++- proto/stream_plan.proto | 2 + src/batch/src/executor/hop_window.rs | 8 +- src/expr/src/expr/test_utils.rs | 6 +- .../optimizer/plan_node/logical_hop_window.rs | 4 +- .../src/optimizer/plan_node/stream.rs | 16 ++ .../optimizer/plan_node/stream_hop_window.rs | 57 +++++- src/stream/src/executor/hop_window.rs | 185 ++++-------------- src/stream/src/from_proto/hop_window.rs | 14 ++ 9 files changed, 159 insertions(+), 162 deletions(-) diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index f9faad7f10d66..98fd2b55f6349 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -642,6 +642,8 @@ export interface HopWindowNode { windowSlide: IntervalUnit | undefined; windowSize: IntervalUnit | undefined; outputIndices: number[]; + windowStartExprs: ExprNode[]; + windowEndExprs: ExprNode[]; } export interface MergeNode { @@ -2868,7 +2870,14 @@ export const DeltaIndexJoinNode = { }; function createBaseHopWindowNode(): HopWindowNode { - return { timeCol: 0, windowSlide: undefined, windowSize: undefined, outputIndices: [] }; + return { + timeCol: 0, + windowSlide: undefined, + windowSize: undefined, + outputIndices: [], + windowStartExprs: [], + windowEndExprs: [], + }; } export const HopWindowNode = { @@ -2878,6 +2887,12 @@ export const HopWindowNode = { windowSlide: isSet(object.windowSlide) ? IntervalUnit.fromJSON(object.windowSlide) : undefined, windowSize: isSet(object.windowSize) ? IntervalUnit.fromJSON(object.windowSize) : undefined, outputIndices: Array.isArray(object?.outputIndices) ? object.outputIndices.map((e: any) => Number(e)) : [], + windowStartExprs: Array.isArray(object?.windowStartExprs) + ? object.windowStartExprs.map((e: any) => ExprNode.fromJSON(e)) + : [], + windowEndExprs: Array.isArray(object?.windowEndExprs) + ? object.windowEndExprs.map((e: any) => ExprNode.fromJSON(e)) + : [], }; }, @@ -2893,6 +2908,16 @@ export const HopWindowNode = { } else { obj.outputIndices = []; } + if (message.windowStartExprs) { + obj.windowStartExprs = message.windowStartExprs.map((e) => e ? ExprNode.toJSON(e) : undefined); + } else { + obj.windowStartExprs = []; + } + if (message.windowEndExprs) { + obj.windowEndExprs = message.windowEndExprs.map((e) => e ? ExprNode.toJSON(e) : undefined); + } else { + obj.windowEndExprs = []; + } return obj; }, @@ -2906,6 +2931,8 @@ export const HopWindowNode = { ? IntervalUnit.fromPartial(object.windowSize) : undefined; message.outputIndices = object.outputIndices?.map((e) => e) || []; + message.windowStartExprs = object.windowStartExprs?.map((e) => ExprNode.fromPartial(e)) || []; + message.windowEndExprs = object.windowEndExprs?.map((e) => ExprNode.fromPartial(e)) || []; return message; }, }; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 3fec372da2640..91f9b90565348 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -328,6 +328,8 @@ message HopWindowNode { data.IntervalUnit window_slide = 2; data.IntervalUnit window_size = 3; repeated uint32 output_indices = 4; + repeated expr.ExprNode window_start_exprs = 5; + repeated expr.ExprNode window_end_exprs = 6; } message MergeNode { diff --git a/src/batch/src/executor/hop_window.rs b/src/batch/src/executor/hop_window.rs index 8e12f67f1af99..f51d2056510b1 100644 --- a/src/batch/src/executor/hop_window.rs +++ b/src/batch/src/executor/hop_window.rs @@ -151,7 +151,7 @@ impl HopWindowExecutor { async fn do_execute(self: Box) { let Self { child, - time_col_idx, + window_slide, window_size, output_indices, @@ -171,20 +171,18 @@ impl HopWindowExecutor { let window_start_col_index = child.schema().len(); let window_end_col_index = child.schema().len() + 1; - let contains_window_start = output_indices.contains(&window_start_col_index); - let contains_window_end = output_indices.contains(&window_end_col_index); #[for_await] for data_chunk in child.execute() { let data_chunk = data_chunk?; assert!(matches!(data_chunk.vis(), Vis::Compact(_))); let len = data_chunk.cardinality(); for i in 0..units { - let window_start_col = if contains_window_start { + let window_start_col = if output_indices.contains(&window_start_col_index) { Some(self.window_start_exprs[i].eval(&data_chunk)?) } else { None }; - let window_end_col = if contains_window_end { + let window_end_col = if output_indices.contains(&window_end_col_index) { Some(self.window_end_exprs[i].eval(&data_chunk)?) } else { None diff --git a/src/expr/src/expr/test_utils.rs b/src/expr/src/expr/test_utils.rs index 793d7f0934e67..9ce1f141eaa98 100644 --- a/src/expr/src/expr/test_utils.rs +++ b/src/expr/src/expr/test_utils.rs @@ -24,10 +24,12 @@ use risingwave_pb::data::data_type::TypeName; use risingwave_pb::data::{DataType as ProstDataType, Datum as ProstDatum}; use risingwave_pb::expr::expr_node::Type::{Field, InputRef}; use risingwave_pb::expr::expr_node::{self, RexNode, Type}; -use risingwave_pb::expr::{ExprNode, FunctionCall}; +use risingwave_pb::expr::{ExprNode, FunctionCall}; +use super::{ + new_binary_expr, BoxedExpression, Expression, InputRefExpression, LiteralExpression, Result, +}; use crate::ExprError; -use super::{new_binary_expr, Result, BoxedExpression, Expression, InputRefExpression, LiteralExpression}; pub fn make_expression(kind: Type, rets: &[TypeName], indices: &[usize]) -> ExprNode { let mut exprs = Vec::new(); diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index d176d997a6825..12204b749c74a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -471,7 +471,9 @@ impl ToStream for LogicalHopWindow { fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { let new_input = self.input().to_stream(ctx)?; let new_logical = self.clone_with_input(new_input); - Ok(StreamHopWindow::new(new_logical).into()) + let (window_start_exprs, window_end_exprs) = + derive_window_start_and_end_exprs(&new_logical)?; + Ok(StreamHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into()) } fn logical_rewrite_for_stream( diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 914cf8dc3370b..a764c3d7acf5a 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -298,6 +298,8 @@ impl HashJoin { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct HopWindow { pub core: generic::HopWindow, + window_start_exprs: Vec, + window_end_exprs: Vec, } impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(HopWindow, core, input); @@ -682,12 +684,26 @@ pub fn to_stream_prost_body( }) } Node::HopWindow(me) => { + let window_start_exprs = me + .window_start_exprs + .clone() + .iter() + .map(|x| x.to_expr_proto()) + .collect(); + let window_end_exprs = me + .window_end_exprs + .clone() + .iter() + .map(|x| x.to_expr_proto()) + .collect(); let me = &me.core; ProstNode::HopWindow(HopWindowNode { time_col: me.time_col.index() as _, window_slide: Some(me.window_slide.into()), window_size: Some(me.window_size.into()), output_indices: me.output_indices.iter().map(|&x| x as u32).collect(), + window_start_exprs, + window_end_exprs, }) } Node::LocalSimpleAgg(me) => { diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index 1c457e9269f48..a922f898658c6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -18,6 +18,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::HopWindowNode; use super::{ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; @@ -26,10 +27,16 @@ use crate::utils::ColIndexMappingRewriteExt; pub struct StreamHopWindow { pub base: PlanBase, logical: LogicalHopWindow, + window_start_exprs: Vec, + window_end_exprs: Vec, } impl StreamHopWindow { - pub fn new(logical: LogicalHopWindow) -> Self { + pub fn new( + logical: LogicalHopWindow, + window_start_exprs: Vec, + window_end_exprs: Vec, + ) -> Self { let ctx = logical.base.ctx.clone(); let pk_indices = logical.base.logical_pk.to_vec(); let input = logical.input(); @@ -54,7 +61,12 @@ impl StreamHopWindow { logical.input().append_only(), watermark_columns, ); - Self { base, logical } + Self { + base, + logical, + window_start_exprs, + window_end_exprs, + } } } @@ -70,7 +82,11 @@ impl PlanTreeNodeUnary for StreamHopWindow { } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(self.logical.clone_with_input(input)) + Self::new( + self.logical.clone_with_input(input), + self.window_start_exprs.clone(), + self.window_end_exprs.clone(), + ) } } @@ -89,8 +105,41 @@ impl StreamNode for StreamHopWindow { .iter() .map(|&x| x as u32) .collect(), + window_start_exprs: self + .window_start_exprs + .clone() + .iter() + .map(|x| x.to_expr_proto()) + .collect(), + window_end_exprs: self + .window_end_exprs + .clone() + .iter() + .map(|x| x.to_expr_proto()) + .collect(), }) } } -impl ExprRewritable for StreamHopWindow {} +impl ExprRewritable for StreamHopWindow { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical.clone(), + self.window_start_exprs + .clone() + .into_iter() + .map(|e| r.rewrite_expr(e)) + .collect(), + self.window_end_exprs + .clone() + .into_iter() + .map(|e| r.rewrite_expr(e)) + .collect(), + ) + .into() + } +} diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs index 4972d9c858102..09c7b35ff5029 100644 --- a/src/stream/src/executor/hop_window.rs +++ b/src/stream/src/executor/hop_window.rs @@ -16,13 +16,15 @@ use std::num::NonZeroUsize; use futures::StreamExt; use futures_async_stream::try_stream; -use num_traits::CheckedSub; + use risingwave_common::array::column::Column; -use risingwave_common::array::{DataChunk, StreamChunk, Vis}; -use risingwave_common::types::{DataType, IntervalUnit, ScalarImpl}; -use risingwave_expr::expr::{new_binary_expr, Expression, InputRefExpression, LiteralExpression}; +use risingwave_common::array::{StreamChunk, Vis}; +use risingwave_common::types::{IntervalUnit}; +use risingwave_expr::expr::{ + BoxedExpression, Expression, +}; use risingwave_expr::ExprError; -use risingwave_pb::expr::expr_node; + use super::error::StreamExecutorError; use super::{ActorContextRef, BoxedExecutor, Executor, ExecutorInfo, Message}; @@ -36,10 +38,13 @@ pub struct HopWindowExecutor { pub time_col_idx: usize, pub window_slide: IntervalUnit, pub window_size: IntervalUnit, + window_start_exprs: Vec, + window_end_exprs: Vec, pub output_indices: Vec, } impl HopWindowExecutor { + #[allow(clippy::too_many_arguments)] pub fn new( ctx: ActorContextRef, input: BoxedExecutor, @@ -47,6 +52,8 @@ impl HopWindowExecutor { time_col_idx: usize, window_slide: IntervalUnit, window_size: IntervalUnit, + window_start_exprs: Vec, + window_end_exprs: Vec, output_indices: Vec, ) -> Self { HopWindowExecutor { @@ -56,6 +63,8 @@ impl HopWindowExecutor { time_col_idx, window_slide, window_size, + window_start_exprs, + window_end_exprs, output_indices, } } @@ -85,7 +94,7 @@ impl HopWindowExecutor { let Self { ctx, input, - time_col_idx, + window_slide, window_size, output_indices, @@ -104,92 +113,6 @@ impl HopWindowExecutor { })? .get(); - let time_col_data_type = input.schema().fields()[time_col_idx].data_type(); - let output_type = DataType::window_of(&time_col_data_type).unwrap(); - let time_col_ref = InputRefExpression::new(time_col_data_type, self.time_col_idx).boxed(); - - let window_slide_expr = - LiteralExpression::new(DataType::Interval, Some(ScalarImpl::Interval(window_slide))) - .boxed(); - - // The first window_start of hop window should be: - // tumble_start(`time_col` - (`window_size` - `window_slide`), `window_slide`). - // Let's pre calculate (`window_size` - `window_slide`). - let window_size_sub_slide = - window_size - .checked_sub(&window_slide) - .ok_or_else(|| ExprError::InvalidParam { - name: "window", - reason: format!( - "window_size {} cannot be subtracted by window_slide {}", - window_size, window_slide - ), - })?; - let window_size_sub_slide_expr = LiteralExpression::new( - DataType::Interval, - Some(ScalarImpl::Interval(window_size_sub_slide)), - ) - .boxed(); - - let hop_start = new_binary_expr( - expr_node::Type::TumbleStart, - output_type.clone(), - new_binary_expr( - expr_node::Type::Subtract, - output_type.clone(), - time_col_ref, - window_size_sub_slide_expr, - )?, - window_slide_expr, - )?; - let mut window_start_exprs = Vec::with_capacity(units); - let mut window_end_exprs = Vec::with_capacity(units); - for i in 0..units { - let window_start_offset = - window_slide - .checked_mul_int(i) - .ok_or_else(|| ExprError::InvalidParam { - name: "window", - reason: format!( - "window_slide {} cannot be multiplied by {}", - window_slide, i - ), - })?; - let window_start_offset_expr = LiteralExpression::new( - DataType::Interval, - Some(ScalarImpl::Interval(window_start_offset)), - ) - .boxed(); - let window_end_offset = - window_slide - .checked_mul_int(i + units) - .ok_or_else(|| ExprError::InvalidParam { - name: "window", - reason: format!( - "window_slide {} cannot be multiplied by {}", - window_slide, i - ), - })?; - let window_end_offset_expr = LiteralExpression::new( - DataType::Interval, - Some(ScalarImpl::Interval(window_end_offset)), - ) - .boxed(); - let window_start_expr = new_binary_expr( - expr_node::Type::Add, - output_type.clone(), - InputRefExpression::new(output_type.clone(), 0).boxed(), - window_start_offset_expr, - )?; - window_start_exprs.push(window_start_expr); - let window_end_expr = new_binary_expr( - expr_node::Type::Add, - output_type.clone(), - InputRefExpression::new(output_type.clone(), 0).boxed(), - window_end_offset_expr, - )?; - window_end_exprs.push(window_end_expr); - } let window_start_col_index = input.schema().len(); let window_end_col_index = input.schema().len() + 1; #[for_await] @@ -199,17 +122,13 @@ impl HopWindowExecutor { // TODO: compact may be not necessary here. let chunk = chunk.compact(); let (data_chunk, ops) = chunk.into_parts(); - let hop_start = hop_start - .eval_infallible(&data_chunk, |err| ctx.on_compute_error(err, &info.identity)); - let len = hop_start.len(); - let hop_start_chunk = DataChunk::new(vec![Column::new(hop_start)], len); - let (origin_cols, vis) = data_chunk.into_parts(); // SAFETY: Already compacted. - assert!(matches!(vis, Vis::Compact(_))); + assert!(matches!(data_chunk.vis(), Vis::Compact(_))); + let _len = data_chunk.cardinality(); for i in 0..units { let window_start_col = if output_indices.contains(&window_start_col_index) { Some( - window_start_exprs[i].eval_infallible(&hop_start_chunk, |err| { + self.window_start_exprs[i].eval_infallible(&data_chunk, |err| { ctx.on_compute_error(err, &info.identity) }), ) @@ -218,7 +137,7 @@ impl HopWindowExecutor { }; let window_end_col = if output_indices.contains(&window_end_col_index) { Some( - window_end_exprs[i].eval_infallible(&hop_start_chunk, |err| { + self.window_end_exprs[i].eval_infallible(&data_chunk, |err| { ctx.on_compute_error(err, &info.identity) }), ) @@ -229,7 +148,7 @@ impl HopWindowExecutor { .iter() .filter_map(|&idx| { if idx < window_start_col_index { - Some(origin_cols[idx].clone()) + Some(data_chunk.column_at(idx).clone()) } else if idx == window_start_col_index { Some(Column::new(window_start_col.clone().unwrap())) } else if idx == window_end_col_index { @@ -252,7 +171,7 @@ impl HopWindowExecutor { #[cfg(test)] mod tests { - use futures::StreamExt; + use futures::{StreamExt, executor}; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, IntervalUnit}; @@ -260,8 +179,9 @@ mod tests { use crate::executor::test_utils::MockSource; use crate::executor::{ActorContext, Executor, ExecutorInfo, StreamChunk}; - #[tokio::test] - async fn test_execute() { + use super::HopWindowExecutor; + + fn create_executor(output_indices: Vec) -> Box { let field1 = Field::unnamed(DataType::Int64); let field2 = Field::unnamed(DataType::Int64); let field3 = Field::with_name(DataType::Timestamp, "created_at"); @@ -280,14 +200,14 @@ mod tests { + 8 3 ^11:02:00" .replace('^', "2022-2-2T"), ); - let input = MockSource::with_chunks(schema.clone(), pk_indices.clone(), vec![chunk]).boxed(); - let window_slide = IntervalUnit::from_minutes(15); let window_size = IntervalUnit::from_minutes(30); - let default_indices: Vec<_> = (0..5).collect(); - let executor = super::HopWindowExecutor::new( + let (window_start_exprs, window_end_exprs) = + make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap(); + + super::HopWindowExecutor::new( ActorContext::create(123), input, ExecutorInfo { @@ -299,9 +219,14 @@ mod tests { 2, window_slide, window_size, - default_indices, + output_indices, ) - .boxed(); + .boxed() + } + #[tokio::test] + async fn test_execute() { + let default_indices: Vec<_> = (0..5).collect(); + let executor = create_executor(default_indices); let mut stream = executor.execute(); // TODO: add more test infra to reduce the duplicated codes below. @@ -343,45 +268,7 @@ mod tests { #[tokio::test] async fn test_output_indices() { - let field1 = Field::unnamed(DataType::Int64); - let field2 = Field::unnamed(DataType::Int64); - let field3 = Field::with_name(DataType::Timestamp, "created_at"); - let schema = Schema::new(vec![field1, field2, field3]); - let pk_indices = vec![0]; - - let chunk = StreamChunk::from_pretty( - &"I I TS - + 1 1 ^10:00:00 - + 2 3 ^10:05:00 - - 3 2 ^10:14:00 - + 4 1 ^10:22:00 - - 5 3 ^10:33:00 - + 6 2 ^10:42:00 - - 7 1 ^10:51:00 - + 8 3 ^11:02:00" - .replace('^', "2022-2-2T"), - ); - - let input = - MockSource::with_chunks(schema.clone(), pk_indices.clone(), vec![chunk]).boxed(); - - let window_slide = IntervalUnit::from_minutes(15); - let window_size = IntervalUnit::from_minutes(30); - let executor = super::HopWindowExecutor::new( - ActorContext::create(123), - input, - ExecutorInfo { - // TODO: the schema is incorrect, but it seems useless here. - schema: schema.clone(), - pk_indices, - identity: "test".to_string(), - }, - 2, - window_slide, - window_size, - vec![4, 1, 0, 2], - ) - .boxed(); + let executor = create_executor(vec![4, 1, 0, 2]); let mut stream = executor.execute(); // TODO: add more test infra to reduce the duplicated codes below. diff --git a/src/stream/src/from_proto/hop_window.rs b/src/stream/src/from_proto/hop_window.rs index ef27b16993c37..d0151c795183b 100644 --- a/src/stream/src/from_proto/hop_window.rs +++ b/src/stream/src/from_proto/hop_window.rs @@ -14,6 +14,7 @@ use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; +use risingwave_expr::expr::build_from_prost; use risingwave_pb::stream_plan::HopWindowNode; use super::*; @@ -47,6 +48,17 @@ impl ExecutorBuilder for HopWindowExecutorBuilder { .map(|&x| x as usize) .collect_vec(); + let window_start_exprs: Vec<_> = node + .get_window_start_exprs() + .iter() + .map(build_from_prost) + .try_collect()?; + let window_end_exprs: Vec<_> = node + .get_window_end_exprs() + .iter() + .map(build_from_prost) + .try_collect()?; + let time_col = node.get_time_col() as usize; let time_col_data_type = input.schema().fields()[time_col].data_type(); let output_type = DataType::window_of(&time_col_data_type).unwrap(); @@ -79,6 +91,8 @@ impl ExecutorBuilder for HopWindowExecutorBuilder { time_col, window_slide, window_size, + window_start_exprs, + window_end_exprs, output_indices, ) .boxed()) From 5203c1bc6f4e64479dccafefe16acd36bc8ffdd7 Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Wed, 8 Mar 2023 13:22:33 +0800 Subject: [PATCH 07/12] minor --- src/batch/src/executor/hop_window.rs | 2 +- src/stream/src/executor/hop_window.rs | 15 +++++---------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/batch/src/executor/hop_window.rs b/src/batch/src/executor/hop_window.rs index f51d2056510b1..e277dcd5bf43c 100644 --- a/src/batch/src/executor/hop_window.rs +++ b/src/batch/src/executor/hop_window.rs @@ -151,7 +151,7 @@ impl HopWindowExecutor { async fn do_execute(self: Box) { let Self { child, - + window_slide, window_size, output_indices, diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs index 09c7b35ff5029..0f2ff9f8b958b 100644 --- a/src/stream/src/executor/hop_window.rs +++ b/src/stream/src/executor/hop_window.rs @@ -16,16 +16,12 @@ use std::num::NonZeroUsize; use futures::StreamExt; use futures_async_stream::try_stream; - use risingwave_common::array::column::Column; use risingwave_common::array::{StreamChunk, Vis}; -use risingwave_common::types::{IntervalUnit}; -use risingwave_expr::expr::{ - BoxedExpression, Expression, -}; +use risingwave_common::types::IntervalUnit; +use risingwave_expr::expr::{BoxedExpression, Expression}; use risingwave_expr::ExprError; - use super::error::StreamExecutorError; use super::{ActorContextRef, BoxedExecutor, Executor, ExecutorInfo, Message}; use crate::common::InfallibleExpression; @@ -94,7 +90,7 @@ impl HopWindowExecutor { let Self { ctx, input, - + window_slide, window_size, output_indices, @@ -171,16 +167,15 @@ impl HopWindowExecutor { #[cfg(test)] mod tests { - use futures::{StreamExt, executor}; + use futures::{executor, StreamExt}; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, IntervalUnit}; + use super::HopWindowExecutor; use crate::executor::test_utils::MockSource; use crate::executor::{ActorContext, Executor, ExecutorInfo, StreamChunk}; - use super::HopWindowExecutor; - fn create_executor(output_indices: Vec) -> Box { let field1 = Field::unnamed(DataType::Int64); let field2 = Field::unnamed(DataType::Int64); From 628927f04c3cd04bb4378d415a93a0c8f468554a Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Wed, 8 Mar 2023 13:56:51 +0800 Subject: [PATCH 08/12] minor --- e2e_test/streaming/time_window.slt | 27 +++++++++++++++++++++++++++ src/stream/src/executor/hop_window.rs | 11 +++++++---- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/e2e_test/streaming/time_window.slt b/e2e_test/streaming/time_window.slt index a413c952e6921..227053ebfd7a0 100644 --- a/e2e_test/streaming/time_window.slt +++ b/e2e_test/streaming/time_window.slt @@ -128,6 +128,33 @@ select * from mv_hop_agg_2 order by window_start, uid; 3 8 2022-01-01 10:45:00 3 8 2022-01-01 11:00:00 +statement ok +insert into t1 values + (9, 1, 4, '2022-01-02 10:00:00'), + (10, 3, 3, '2022-01-03 10:05:00'), + (11, 2, 2, '2022-01-04 10:14:00'), + (12, 1, 1, '2022-01-05 10:22:00'), + +# Test for interval day +query IIT +select * from mv_hop_agg_2 order by window_start, uid; +---- +1 4 2022-01-01 09:45:00 +2 2 2022-01-01 09:45:00 +3 3 2022-01-01 09:45:00 +1 5 2022-01-01 10:00:00 +2 2 2022-01-01 10:00:00 +3 3 2022-01-01 10:00:00 +1 1 2022-01-01 10:15:00 +2 7 2022-01-01 10:15:00 +3 5 2022-01-01 10:15:00 +1 6 2022-01-01 10:30:00 +2 7 2022-01-01 10:30:00 +3 5 2022-01-01 10:30:00 +1 6 2022-01-01 10:45:00 +3 8 2022-01-01 10:45:00 +3 8 2022-01-01 11:00:00 + statement ok drop materialized view mv_tumble; diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs index 0f2ff9f8b958b..4dc27b7bb54b2 100644 --- a/src/stream/src/executor/hop_window.rs +++ b/src/stream/src/executor/hop_window.rs @@ -167,16 +167,17 @@ impl HopWindowExecutor { #[cfg(test)] mod tests { - use futures::{executor, StreamExt}; + use futures::{StreamExt}; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, IntervalUnit}; + use risingwave_expr::expr::test_utils::make_hop_window_expression; - use super::HopWindowExecutor; + use crate::executor::test_utils::MockSource; use crate::executor::{ActorContext, Executor, ExecutorInfo, StreamChunk}; - fn create_executor(output_indices: Vec) -> Box { + fn create_executor(output_indices: Vec) -> Box { let field1 = Field::unnamed(DataType::Int64); let field2 = Field::unnamed(DataType::Int64); let field3 = Field::with_name(DataType::Timestamp, "created_at"); @@ -207,13 +208,15 @@ mod tests { input, ExecutorInfo { // TODO: the schema is incorrect, but it seems useless here. - schema: schema.clone(), + schema: schema, pk_indices, identity: "test".to_string(), }, 2, window_slide, window_size, + window_start_exprs, + window_end_exprs, output_indices, ) .boxed() From a3545c8d610eec0a62afbe9d3ff9253484af66db Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Wed, 8 Mar 2023 13:58:54 +0800 Subject: [PATCH 09/12] minor --- src/stream/src/executor/hop_window.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs index 4dc27b7bb54b2..c76788b312ff6 100644 --- a/src/stream/src/executor/hop_window.rs +++ b/src/stream/src/executor/hop_window.rs @@ -167,13 +167,12 @@ impl HopWindowExecutor { #[cfg(test)] mod tests { - use futures::{StreamExt}; + use futures::StreamExt; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, IntervalUnit}; use risingwave_expr::expr::test_utils::make_hop_window_expression; - use crate::executor::test_utils::MockSource; use crate::executor::{ActorContext, Executor, ExecutorInfo, StreamChunk}; @@ -208,7 +207,7 @@ mod tests { input, ExecutorInfo { // TODO: the schema is incorrect, but it seems useless here. - schema: schema, + schema, pk_indices, identity: "test".to_string(), }, From 7083479e1307a5cac7fae3dc2c88e799df37a58d Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Wed, 8 Mar 2023 14:08:14 +0800 Subject: [PATCH 10/12] minor --- e2e_test/streaming/time_window.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/streaming/time_window.slt b/e2e_test/streaming/time_window.slt index 227053ebfd7a0..1ecad3127047f 100644 --- a/e2e_test/streaming/time_window.slt +++ b/e2e_test/streaming/time_window.slt @@ -133,7 +133,7 @@ insert into t1 values (9, 1, 4, '2022-01-02 10:00:00'), (10, 3, 3, '2022-01-03 10:05:00'), (11, 2, 2, '2022-01-04 10:14:00'), - (12, 1, 1, '2022-01-05 10:22:00'), + (12, 1, 1, '2022-01-05 10:22:00'); # Test for interval day query IIT From 45f4d89b70e6330d5c52e516794930be837ff085 Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Wed, 8 Mar 2023 14:13:52 +0800 Subject: [PATCH 11/12] minor --- e2e_test/streaming/time_window.slt | 44 +++++++++++++++++---------- src/stream/src/executor/hop_window.rs | 2 +- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/e2e_test/streaming/time_window.slt b/e2e_test/streaming/time_window.slt index 1ecad3127047f..bfe59d38ecfd7 100644 --- a/e2e_test/streaming/time_window.slt +++ b/e2e_test/streaming/time_window.slt @@ -135,25 +135,34 @@ insert into t1 values (11, 2, 2, '2022-01-04 10:14:00'), (12, 1, 1, '2022-01-05 10:22:00'); +statement ok +flush; + +statement ok +create materialized view mv_hop_agg_3 as +select uid, sum(v) as sum_v, window_start +from hop(t1, created_at, interval '1' day, interval '2' day) +group by window_start, uid; + + # Test for interval day query IIT -select * from mv_hop_agg_2 order by window_start, uid; +select * from mv_hop_agg_3 order by window_start, uid; ---- -1 4 2022-01-01 09:45:00 -2 2 2022-01-01 09:45:00 -3 3 2022-01-01 09:45:00 -1 5 2022-01-01 10:00:00 -2 2 2022-01-01 10:00:00 -3 3 2022-01-01 10:00:00 -1 1 2022-01-01 10:15:00 -2 7 2022-01-01 10:15:00 -3 5 2022-01-01 10:15:00 -1 6 2022-01-01 10:30:00 -2 7 2022-01-01 10:30:00 -3 5 2022-01-01 10:30:00 -1 6 2022-01-01 10:45:00 -3 8 2022-01-01 10:45:00 -3 8 2022-01-01 11:00:00 +1 11 2021-12-31 00:00:00 +2 9 2021-12-31 00:00:00 +3 16 2021-12-31 00:00:00 +1 15 2022-01-01 00:00:00 +2 9 2022-01-01 00:00:00 +3 16 2022-01-01 00:00:00 +1 4 2022-01-02 00:00:00 +3 3 2022-01-02 00:00:00 +2 2 2022-01-03 00:00:00 +3 3 2022-01-03 00:00:00 +1 1 2022-01-04 00:00:00 +2 2 2022-01-04 00:00:00 +1 1 2022-01-05 00:00:00 + statement ok drop materialized view mv_tumble; @@ -173,6 +182,9 @@ drop materialized view mv_hop_agg_1; statement ok drop materialized view mv_hop_agg_2; +statement ok +drop materialized view mv_hop_agg_3; + statement error create materialized view invalid_hop as select * from hop(t1, created_at, interval '0', interval '1'); diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs index c76788b312ff6..527c8df1598e7 100644 --- a/src/stream/src/executor/hop_window.rs +++ b/src/stream/src/executor/hop_window.rs @@ -19,7 +19,7 @@ use futures_async_stream::try_stream; use risingwave_common::array::column::Column; use risingwave_common::array::{StreamChunk, Vis}; use risingwave_common::types::IntervalUnit; -use risingwave_expr::expr::{BoxedExpression, Expression}; +use risingwave_expr::expr::BoxedExpression; use risingwave_expr::ExprError; use super::error::StreamExecutorError; From 1dd45fb44e25061189a292e35c07af376af2aaf4 Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Wed, 8 Mar 2023 14:33:41 +0800 Subject: [PATCH 12/12] remove --- .../optimizer/plan_node/logical_hop_window.rs | 93 ------------------- 1 file changed, 93 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index f0b4baf4fe28c..93888267a86dd 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -459,99 +459,6 @@ impl PredicatePushdown for LogicalHopWindow { } } -pub fn derive_window_start_and_end_exprs( - logical_hop: &LogicalHopWindow, -) -> Result<(Vec, Vec)> { - let generic::HopWindow:: { - window_size, - window_slide, - time_col, - .. - } = &logical_hop.core; - let units = window_size - .exact_div(window_slide) - .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?)) - .ok_or_else(|| ExprError::InvalidParam { - name: "window", - reason: format!( - "window_size {} cannot be divided by window_slide {}", - window_size, window_slide - ), - })? - .get(); - let window_size_expr = Literal::new(Some((*window_size).into()), DataType::Interval).into(); - let window_slide_expr: ExprImpl = - Literal::new(Some((*window_slide).into()), DataType::Interval).into(); - let window_size_sub_slide = FunctionCall::new( - ExprType::Subtract, - vec![window_size_expr, window_slide_expr.clone()], - )? - .into(); - - let time_col_shifted = FunctionCall::new( - ExprType::Subtract, - vec![ - ExprImpl::InputRef(Box::new(time_col.clone())), - window_size_sub_slide, - ], - )? - .into(); - - let hop_start: ExprImpl = FunctionCall::new( - ExprType::TumbleStart, - vec![time_col_shifted, window_slide_expr], - )? - .into(); - - let mut window_start_exprs = Vec::with_capacity(units); - let mut window_end_exprs = Vec::with_capacity(units); - for i in 0..units { - { - let window_start_offset = - window_slide - .checked_mul_int(i) - .ok_or_else(|| ExprError::InvalidParam { - name: "window", - reason: format!( - "window_slide {} cannot be multiplied by {}", - window_slide, i - ), - })?; - let window_start_offset_expr = - Literal::new(Some(window_start_offset.into()), DataType::Interval).into(); - let window_start_expr = FunctionCall::new( - ExprType::Add, - vec![hop_start.clone(), window_start_offset_expr], - )? - .into(); - window_start_exprs.push(window_start_expr); - } - { - let window_end_offset = - window_slide - .checked_mul_int(i + units) - .ok_or_else(|| ExprError::InvalidParam { - name: "window", - reason: format!( - "window_slide {} cannot be multiplied by {}", - window_slide, - i + units - ), - })?; - let window_end_offset_expr = - Literal::new(Some(window_end_offset.into()), DataType::Interval).into(); - let window_end_expr = FunctionCall::new( - ExprType::Add, - vec![hop_start.clone(), window_end_offset_expr], - )? - .into(); - window_end_exprs.push(window_end_expr); - } - } - assert_eq!(window_start_exprs.len(), window_end_exprs.len()); - Ok((window_start_exprs, window_end_exprs)) -} - impl ToBatch for LogicalHopWindow { fn to_batch(&self) -> Result { let new_input = self.input().to_batch()?;