From e6f25613ad28132f1fb23605e4db40fbd78bcb9c Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Wed, 8 Mar 2023 14:28:37 +0800 Subject: [PATCH] fix(executor, frontend): `BatchHopWindow` executor should derive window expr in frontend (#8403) Co-authored-by: jon-chuang --- dashboard/proto/gen/batch_plan.ts | 29 ++- e2e_test/batch/basic/time_window_utc.slt.part | 27 +++ proto/batch_plan.proto | 2 + src/batch/src/executor/hop_window.rs | 184 +++++------------- src/expr/src/expr/test_utils.rs | 128 +++++++++++- .../optimizer/plan_node/batch_hop_window.rs | 63 +++++- .../optimizer/plan_node/logical_hop_window.rs | 99 +++++++++- 7 files changed, 382 insertions(+), 150 deletions(-) diff --git a/dashboard/proto/gen/batch_plan.ts b/dashboard/proto/gen/batch_plan.ts index ca3aadd92a16f..ab9083be3eead 100644 --- a/dashboard/proto/gen/batch_plan.ts +++ b/dashboard/proto/gen/batch_plan.ts @@ -216,6 +216,8 @@ export interface HopWindowNode { windowSlide: IntervalUnit | undefined; windowSize: IntervalUnit | undefined; outputIndices: number[]; + windowStartExprs: ExprNode[]; + windowEndExprs: ExprNode[]; } export interface TableFunctionNode { @@ -1373,7 +1375,14 @@ export const SortMergeJoinNode = { }; function createBaseHopWindowNode(): HopWindowNode { - return { timeCol: 0, windowSlide: undefined, windowSize: undefined, outputIndices: [] }; + return { + timeCol: 0, + windowSlide: undefined, + windowSize: undefined, + outputIndices: [], + windowStartExprs: [], + windowEndExprs: [], + }; } export const HopWindowNode = { @@ -1383,6 +1392,12 @@ export const HopWindowNode = { windowSlide: isSet(object.windowSlide) ? IntervalUnit.fromJSON(object.windowSlide) : undefined, windowSize: isSet(object.windowSize) ? IntervalUnit.fromJSON(object.windowSize) : undefined, outputIndices: Array.isArray(object?.outputIndices) ? object.outputIndices.map((e: any) => Number(e)) : [], + windowStartExprs: Array.isArray(object?.windowStartExprs) + ? object.windowStartExprs.map((e: any) => ExprNode.fromJSON(e)) + : [], + windowEndExprs: Array.isArray(object?.windowEndExprs) + ? object.windowEndExprs.map((e: any) => ExprNode.fromJSON(e)) + : [], }; }, @@ -1398,6 +1413,16 @@ export const HopWindowNode = { } else { obj.outputIndices = []; } + if (message.windowStartExprs) { + obj.windowStartExprs = message.windowStartExprs.map((e) => e ? ExprNode.toJSON(e) : undefined); + } else { + obj.windowStartExprs = []; + } + if (message.windowEndExprs) { + obj.windowEndExprs = message.windowEndExprs.map((e) => e ? ExprNode.toJSON(e) : undefined); + } else { + obj.windowEndExprs = []; + } return obj; }, @@ -1411,6 +1436,8 @@ export const HopWindowNode = { ? IntervalUnit.fromPartial(object.windowSize) : undefined; message.outputIndices = object.outputIndices?.map((e) => e) || []; + message.windowStartExprs = object.windowStartExprs?.map((e) => ExprNode.fromPartial(e)) || []; + message.windowEndExprs = object.windowEndExprs?.map((e) => ExprNode.fromPartial(e)) || []; return message; }, }; diff --git a/e2e_test/batch/basic/time_window_utc.slt.part b/e2e_test/batch/basic/time_window_utc.slt.part index da90438ecf2ce..7f9c2080ad651 100644 --- a/e2e_test/batch/basic/time_window_utc.slt.part +++ b/e2e_test/batch/basic/time_window_utc.slt.part @@ -104,6 +104,33 @@ group by window_start, uid order by window_start, uid; 3 8 2022-01-01 10:45:00+00:00 3 8 2022-01-01 11:00:00+00:00 +statement ok +insert into t1 values + (9, 1, 4, '2022-01-02 10:00:00Z'), + (10, 3, 3, '2022-01-03 10:05:00Z'), + (11, 2, 2, '2022-01-04 10:14:00Z'), + (12, 1, 1, '2022-01-05 10:22:00Z'); + +query IIT +select uid, sum(v), window_start +from hop(t1, created_at, interval '1' day, interval '2' day) +group by window_start, uid order by window_start, uid; +---- +1 11 2021-12-31 00:00:00+00:00 +2 9 2021-12-31 00:00:00+00:00 +3 16 2021-12-31 00:00:00+00:00 +1 15 2022-01-01 00:00:00+00:00 +2 9 2022-01-01 00:00:00+00:00 +3 16 2022-01-01 00:00:00+00:00 +1 4 2022-01-02 00:00:00+00:00 +3 3 2022-01-02 00:00:00+00:00 +2 2 2022-01-03 00:00:00+00:00 +3 3 2022-01-03 00:00:00+00:00 +1 1 2022-01-04 00:00:00+00:00 +2 2 2022-01-04 00:00:00+00:00 +1 1 2022-01-05 00:00:00+00:00 + + statement error select * from hop(t1, created_at, interval '0', interval '1'); diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index c5f417763d7f5..2e7c66dd344ae 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -187,6 +187,8 @@ message HopWindowNode { data.IntervalUnit window_slide = 2; data.IntervalUnit window_size = 3; repeated uint32 output_indices = 4; + repeated expr.ExprNode window_start_exprs = 5; + repeated expr.ExprNode window_end_exprs = 6; } message TableFunctionNode { diff --git a/src/batch/src/executor/hop_window.rs b/src/batch/src/executor/hop_window.rs index a7e5975ed38ba..a1268702f7f84 100644 --- a/src/batch/src/executor/hop_window.rs +++ b/src/batch/src/executor/hop_window.rs @@ -14,21 +14,17 @@ use std::num::NonZeroUsize; -use anyhow::anyhow; use futures_async_stream::try_stream; use itertools::Itertools; -use num_traits::CheckedSub; use risingwave_common::array::column::Column; use risingwave_common::array::{DataChunk, Vis}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::{DataType, IntervalUnit, ScalarImpl}; -use risingwave_expr::expr::{new_binary_expr, Expression, InputRefExpression, LiteralExpression}; +use risingwave_common::types::{DataType, IntervalUnit}; +use risingwave_expr::expr::{build_from_prost, BoxedExpression}; use risingwave_expr::ExprError; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::expr::expr_node; -use crate::error::BatchError; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; @@ -40,6 +36,8 @@ pub struct HopWindowExecutor { time_col_idx: usize, window_slide: IntervalUnit, window_size: IntervalUnit, + window_start_exprs: Vec, + window_end_exprs: Vec, output_indices: Vec, } @@ -61,10 +59,22 @@ impl BoxedExecutorBuilder for HopWindowExecutor { let output_indices = hop_window_node .get_output_indices() .iter() - .copied() + .cloned() .map(|x| x as usize) .collect_vec(); + let window_start_exprs: Vec<_> = hop_window_node + .get_window_start_exprs() + .iter() + .map(build_from_prost) + .try_collect()?; + let window_end_exprs: Vec<_> = hop_window_node + .get_window_end_exprs() + .iter() + .map(build_from_prost) + .try_collect()?; + assert_eq!(window_start_exprs.len(), window_end_exprs.len()); + let time_col_data_type = child.schema().fields()[time_col].data_type(); let output_type = DataType::window_of(&time_col_data_type).unwrap(); let original_schema: Schema = child @@ -88,12 +98,15 @@ impl BoxedExecutorBuilder for HopWindowExecutor { window_slide, window_size, source.plan_node().get_identity().clone(), + window_start_exprs, + window_end_exprs, output_indices, ))) } } impl HopWindowExecutor { + #[allow(clippy::too_many_arguments)] fn new( child: BoxedExecutor, schema: Schema, @@ -101,6 +114,8 @@ impl HopWindowExecutor { window_slide: IntervalUnit, window_size: IntervalUnit, identity: String, + window_start_exprs: Vec, + window_end_exprs: Vec, output_indices: Vec, ) -> Self { Self { @@ -110,6 +125,8 @@ impl HopWindowExecutor { time_col_idx, window_slide, window_size, + window_start_exprs, + window_end_exprs, output_indices, } } @@ -134,7 +151,7 @@ impl HopWindowExecutor { async fn do_execute(self: Box) { let Self { child, - time_col_idx, + window_slide, window_size, output_indices, @@ -152,81 +169,6 @@ impl HopWindowExecutor { })? .get(); - let time_col_data_type = child.schema().fields()[time_col_idx].data_type(); - let output_type = DataType::window_of(&time_col_data_type).unwrap(); - let time_col_ref = InputRefExpression::new(time_col_data_type, self.time_col_idx).boxed(); - - let window_slide_expr = - LiteralExpression::new(DataType::Interval, Some(ScalarImpl::Interval(window_slide))) - .boxed(); - - // The first window_start of hop window should be: - // tumble_start(`time_col` - (`window_size` - `window_slide`), `window_slide`). - // Let's pre calculate (`window_size` - `window_slide`). - let window_size_sub_slide = window_size.checked_sub(&window_slide).ok_or_else(|| { - BatchError::Internal(anyhow!(format!( - "window_size {} cannot be subtracted by window_slide {}", - window_size, window_slide - ))) - })?; - let window_size_sub_slide_expr = LiteralExpression::new( - DataType::Interval, - Some(ScalarImpl::Interval(window_size_sub_slide)), - ) - .boxed(); - let hop_expr = new_binary_expr( - expr_node::Type::TumbleStart, - output_type.clone(), - new_binary_expr( - expr_node::Type::Subtract, - output_type.clone(), - time_col_ref, - window_size_sub_slide_expr, - )?, - window_slide_expr, - )?; - - let mut window_start_exprs = Vec::with_capacity(units); - let mut window_end_exprs = Vec::with_capacity(units); - - for i in 0..units { - let window_start_offset = window_slide.checked_mul_int(i).ok_or_else(|| { - BatchError::Internal(anyhow!(format!( - "window_slide {} cannot be multiplied by {}", - window_slide, i - ))) - })?; - let window_start_offset_expr = LiteralExpression::new( - DataType::Interval, - Some(ScalarImpl::Interval(window_start_offset)), - ) - .boxed(); - let window_end_offset = window_slide.checked_mul_int(i + units).ok_or_else(|| { - BatchError::Internal(anyhow!(format!( - "window_slide {} cannot be multiplied by {}", - window_slide, i - ))) - })?; - let window_end_offset_expr = LiteralExpression::new( - DataType::Interval, - Some(ScalarImpl::Interval(window_end_offset)), - ) - .boxed(); - let window_start_expr = new_binary_expr( - expr_node::Type::Add, - output_type.clone(), - InputRefExpression::new(output_type.clone(), 0).boxed(), - window_start_offset_expr, - )?; - window_start_exprs.push(window_start_expr); - let window_end_expr = new_binary_expr( - expr_node::Type::Add, - output_type.clone(), - InputRefExpression::new(output_type.clone(), 0).boxed(), - window_end_offset_expr, - )?; - window_end_exprs.push(window_end_expr); - } let window_start_col_index = child.schema().len(); let window_end_col_index = child.schema().len() + 1; let contains_window_start = output_indices.contains(&window_start_col_index); @@ -234,27 +176,16 @@ impl HopWindowExecutor { #[for_await] for data_chunk in child.execute() { let data_chunk = data_chunk?; - let hop_start = hop_expr.eval(&data_chunk)?; - let len = hop_start.len(); - let hop_start_chunk = DataChunk::new(vec![Column::new(hop_start)], len); - let (origin_cols, visibility) = data_chunk.into_parts(); - let len = match visibility { - Vis::Compact(len) => len, - Vis::Bitmap(_) => { - return Err(BatchError::Internal(anyhow!( - "Input array should already been compacted!" - )) - .into()); - } - }; + assert!(matches!(data_chunk.vis(), Vis::Compact(_))); + let len = data_chunk.cardinality(); for i in 0..units { let window_start_col = if contains_window_start { - Some(window_start_exprs[i].eval(&hop_start_chunk)?) + Some(self.window_start_exprs[i].eval(&data_chunk)?) } else { None }; let window_end_col = if contains_window_end { - Some(window_end_exprs[i].eval(&hop_start_chunk)?) + Some(self.window_end_exprs[i].eval(&data_chunk)?) } else { None }; @@ -262,7 +193,7 @@ impl HopWindowExecutor { .iter() .filter_map(|&idx| { if idx < window_start_col_index { - Some(origin_cols[idx].clone()) + Some(data_chunk.column_at(idx).clone()) } else if idx == window_start_col_index { Some(Column::new(window_start_col.clone().unwrap())) } else if idx == window_end_col_index { @@ -284,12 +215,12 @@ mod tests { use risingwave_common::array::{DataChunk, DataChunkTestExt}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; + use risingwave_expr::expr::test_utils::make_hop_window_expression; use super::*; use crate::executor::test_utils::MockExecutor; - #[tokio::test] - async fn test_execute() { + fn create_executor(output_indices: Vec) -> Box { let field1 = Field::unnamed(DataType::Int64); let field2 = Field::unnamed(DataType::Int64); let field3 = Field::with_name(DataType::Timestamp, "created_at"); @@ -313,16 +244,26 @@ mod tests { let window_slide = IntervalUnit::from_minutes(15); let window_size = IntervalUnit::from_minutes(30); - let default_indices = (0..schema.len() + 2).collect_vec(); - let executor = Box::new(HopWindowExecutor::new( + let (window_start_exprs, window_end_exprs) = + make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap(); + + Box::new(HopWindowExecutor::new( Box::new(mock_executor), schema, 2, window_slide, window_size, "test".to_string(), - default_indices, - )); + window_start_exprs, + window_end_exprs, + output_indices, + )) + } + + #[tokio::test] + async fn test_execute() { + let default_indices = (0..3 + 2).collect_vec(); + let executor = create_executor(default_indices); let mut stream = executor.execute(); // TODO: add more test infra to reduce the duplicated codes below. @@ -363,38 +304,7 @@ mod tests { } #[tokio::test] async fn test_output_indices() { - let field1 = Field::unnamed(DataType::Int64); - let field2 = Field::unnamed(DataType::Int64); - let field3 = Field::with_name(DataType::Timestamp, "created_at"); - let schema = Schema::new(vec![field1, field2, field3]); - - let chunk = DataChunk::from_pretty( - &"I I TS - 1 1 ^10:00:00 - 2 3 ^10:05:00 - 3 2 ^10:14:00 - 4 1 ^10:22:00 - 5 3 ^10:33:00 - 6 2 ^10:42:00 - 7 1 ^10:51:00 - 8 3 ^11:02:00" - .replace('^', "2022-2-2T"), - ); - - let mut mock_executor = MockExecutor::new(schema.clone()); - mock_executor.add(chunk); - - let window_slide = IntervalUnit::from_minutes(15); - let window_size = IntervalUnit::from_minutes(30); - let executor = Box::new(HopWindowExecutor::new( - Box::new(mock_executor), - schema, - 2, - window_slide, - window_size, - "test".to_string(), - vec![1, 3, 4, 2], - )); + let executor = create_executor(vec![1, 3, 4, 2]); let mut stream = executor.execute(); // TODO: add more test infra to reduce the duplicated codes below. diff --git a/src/expr/src/expr/test_utils.rs b/src/expr/src/expr/test_utils.rs index 1c810b8213a7a..9ce1f141eaa98 100644 --- a/src/expr/src/expr/test_utils.rs +++ b/src/expr/src/expr/test_utils.rs @@ -14,22 +14,30 @@ //! Helper functions to construct prost [`ExprNode`] for test. -use risingwave_common::types::ScalarImpl; +use std::num::NonZeroUsize; + +use num_traits::CheckedSub; +use risingwave_common::types::{DataType, IntervalUnit, ScalarImpl}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::value_encoding::serialize_datum; use risingwave_pb::data::data_type::TypeName; -use risingwave_pb::data::{DataType as ProstDataType, DataType, Datum as ProstDatum}; +use risingwave_pb::data::{DataType as ProstDataType, Datum as ProstDatum}; use risingwave_pb::expr::expr_node::Type::{Field, InputRef}; -use risingwave_pb::expr::expr_node::{RexNode, Type}; +use risingwave_pb::expr::expr_node::{self, RexNode, Type}; use risingwave_pb::expr::{ExprNode, FunctionCall}; +use super::{ + new_binary_expr, BoxedExpression, Expression, InputRefExpression, LiteralExpression, Result, +}; +use crate::ExprError; + pub fn make_expression(kind: Type, rets: &[TypeName], indices: &[usize]) -> ExprNode { let mut exprs = Vec::new(); for (idx, ret) in indices.iter().zip_eq_fast(rets.iter()) { exprs.push(make_input_ref(*idx, *ret)); } let function_call = FunctionCall { children: exprs }; - let return_type = DataType { + let return_type = ProstDataType { type_name: TypeName::Timestamp as i32, ..Default::default() }; @@ -43,7 +51,7 @@ pub fn make_expression(kind: Type, rets: &[TypeName], indices: &[usize]) -> Expr pub fn make_input_ref(idx: usize, ret: TypeName) -> ExprNode { ExprNode { expr_type: InputRef as i32, - return_type: Some(DataType { + return_type: Some(ProstDataType { type_name: ret as i32, ..Default::default() }), @@ -87,3 +95,113 @@ pub fn make_field_function(children: Vec, ret: TypeName) -> ExprNode { rex_node: Some(RexNode::FuncCall(FunctionCall { children })), } } + +pub fn make_hop_window_expression( + time_col_data_type: DataType, + time_col_idx: usize, + window_size: IntervalUnit, + window_slide: IntervalUnit, +) -> Result<(Vec, Vec)> { + let units = window_size + .exact_div(&window_slide) + .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?)) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_size {} cannot be divided by window_slide {}", + window_size, window_slide + ), + })? + .get(); + + let output_type = DataType::window_of(&time_col_data_type).unwrap(); + let get_hop_window_start = || -> Result { + let time_col_ref = InputRefExpression::new(time_col_data_type, time_col_idx).boxed(); + + let window_slide_expr = + LiteralExpression::new(DataType::Interval, Some(ScalarImpl::Interval(window_slide))) + .boxed(); + + // The first window_start of hop window should be: + // tumble_start(`time_col` - (`window_size` - `window_slide`), `window_slide`). + // Let's pre calculate (`window_size` - `window_slide`). + let window_size_sub_slide = + window_size + .checked_sub(&window_slide) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_size {} cannot be subtracted by window_slide {}", + window_size, window_slide + ), + })?; + let window_size_sub_slide_expr = LiteralExpression::new( + DataType::Interval, + Some(ScalarImpl::Interval(window_size_sub_slide)), + ) + .boxed(); + + let hop_start = new_binary_expr( + expr_node::Type::TumbleStart, + output_type.clone(), + new_binary_expr( + expr_node::Type::Subtract, + output_type.clone(), + time_col_ref, + window_size_sub_slide_expr, + )?, + window_slide_expr, + )?; + Ok(hop_start) + }; + + let mut window_start_exprs = Vec::with_capacity(units); + let mut window_end_exprs = Vec::with_capacity(units); + for i in 0..units { + let window_start_offset = + window_slide + .checked_mul_int(i) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_slide {} cannot be multiplied by {}", + window_slide, i + ), + })?; + let window_start_offset_expr = LiteralExpression::new( + DataType::Interval, + Some(ScalarImpl::Interval(window_start_offset)), + ) + .boxed(); + let window_end_offset = + window_slide + .checked_mul_int(i + units) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_slide {} cannot be multiplied by {}", + window_slide, i + ), + })?; + let window_end_offset_expr = LiteralExpression::new( + DataType::Interval, + Some(ScalarImpl::Interval(window_end_offset)), + ) + .boxed(); + let window_start_expr = new_binary_expr( + expr_node::Type::Add, + output_type.clone(), + get_hop_window_start.clone()()?, + window_start_offset_expr, + )?; + window_start_exprs.push(window_start_expr); + let window_end_expr = new_binary_expr( + expr_node::Type::Add, + output_type.clone(), + get_hop_window_start.clone()()?, + window_end_offset_expr, + )?; + window_end_exprs.push(window_end_expr); + } + Ok((window_start_exprs, window_end_exprs)) +} diff --git a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs index 45bed2efce028..49454c4ed8c96 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs @@ -22,6 +22,7 @@ use super::{ ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch, }; +use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Order, RequiredDist}; use crate::utils::ColIndexMappingRewriteExt; @@ -32,10 +33,16 @@ use crate::utils::ColIndexMappingRewriteExt; pub struct BatchHopWindow { pub base: PlanBase, logical: LogicalHopWindow, + window_start_exprs: Vec, + window_end_exprs: Vec, } impl BatchHopWindow { - pub fn new(logical: LogicalHopWindow) -> Self { + pub fn new( + logical: LogicalHopWindow, + window_start_exprs: Vec, + window_end_exprs: Vec, + ) -> Self { let ctx = logical.base.ctx.clone(); let distribution = logical .i2o_col_mapping() @@ -46,7 +53,12 @@ impl BatchHopWindow { distribution, logical.get_out_column_index_order(), ); - BatchHopWindow { base, logical } + BatchHopWindow { + base, + logical, + window_start_exprs, + window_end_exprs, + } } } @@ -62,7 +74,11 @@ impl PlanTreeNodeUnary for BatchHopWindow { } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(self.logical.clone_with_input(input)) + Self::new( + self.logical.clone_with_input(input), + self.window_start_exprs.clone(), + self.window_end_exprs.clone(), + ) } } @@ -90,7 +106,11 @@ impl ToDistributedBatch for BatchHopWindow { .input() .to_distributed_with_required(required_order, &input_required)?; let new_logical = self.logical.clone_with_input(new_input); - let batch_plan = BatchHopWindow::new(new_logical); + let batch_plan = BatchHopWindow::new( + new_logical, + self.window_start_exprs.clone(), + self.window_end_exprs.clone(), + ); let batch_plan = required_order.enforce_if_not_satisfies(batch_plan.into())?; required_dist.enforce_if_not_satisfies(batch_plan, required_order) } @@ -109,6 +129,18 @@ impl ToBatchProst for BatchHopWindow { .iter() .map(|&x| x as u32) .collect(), + window_start_exprs: self + .window_start_exprs + .clone() + .iter() + .map(|x| x.to_expr_proto()) + .collect(), + window_end_exprs: self + .window_end_exprs + .clone() + .iter() + .map(|x| x.to_expr_proto()) + .collect(), }) } } @@ -120,4 +152,25 @@ impl ToLocalBatch for BatchHopWindow { } } -impl ExprRewritable for BatchHopWindow {} +impl ExprRewritable for BatchHopWindow { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical.clone(), + self.window_start_exprs + .clone() + .into_iter() + .map(|e| r.rewrite_expr(e)) + .collect(), + self.window_end_exprs + .clone() + .into_iter() + .map(|e| r.rewrite_expr(e)) + .collect(), + ) + .into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index 39e0932ed861b..d418f604a3c3d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -13,19 +13,21 @@ // limitations under the License. use std::fmt; +use std::num::NonZeroUsize; use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::Result; use risingwave_common::types::{DataType, IntervalUnit}; +use risingwave_expr::ExprError; use super::generic::GenericPlanNode; use super::{ gen_filter_and_pushdown, generic, BatchHopWindow, ColPrunable, ExprRewritable, LogicalFilter, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamHopWindow, ToBatch, ToStream, }; -use crate::expr::{ExprType, FunctionCall, InputRef}; +use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, Literal}; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; @@ -205,6 +207,97 @@ impl LogicalHopWindow { pub fn output_indices(&self) -> &Vec { &self.core.output_indices } + + fn derive_window_start_and_end_exprs(&self) -> Result<(Vec, Vec)> { + let generic::HopWindow:: { + window_size, + window_slide, + time_col, + .. + } = &self.core; + let units = window_size + .exact_div(window_slide) + .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?)) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_size {} cannot be divided by window_slide {}", + window_size, window_slide + ), + })? + .get(); + let window_size_expr = Literal::new(Some((*window_size).into()), DataType::Interval).into(); + let window_slide_expr: ExprImpl = + Literal::new(Some((*window_slide).into()), DataType::Interval).into(); + let window_size_sub_slide = FunctionCall::new( + ExprType::Subtract, + vec![window_size_expr, window_slide_expr.clone()], + )? + .into(); + + let time_col_shifted = FunctionCall::new( + ExprType::Subtract, + vec![ + ExprImpl::InputRef(Box::new(time_col.clone())), + window_size_sub_slide, + ], + )? + .into(); + + let hop_start: ExprImpl = FunctionCall::new( + ExprType::TumbleStart, + vec![time_col_shifted, window_slide_expr], + )? + .into(); + + let mut window_start_exprs = Vec::with_capacity(units); + let mut window_end_exprs = Vec::with_capacity(units); + for i in 0..units { + { + let window_start_offset = + window_slide + .checked_mul_int(i) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_slide {} cannot be multiplied by {}", + window_slide, i + ), + })?; + let window_start_offset_expr = + Literal::new(Some(window_start_offset.into()), DataType::Interval).into(); + let window_start_expr = FunctionCall::new( + ExprType::Add, + vec![hop_start.clone(), window_start_offset_expr], + )? + .into(); + window_start_exprs.push(window_start_expr); + } + { + let window_end_offset = + window_slide.checked_mul_int(i + units).ok_or_else(|| { + ExprError::InvalidParam { + name: "window", + reason: format!( + "window_slide {} cannot be multiplied by {}", + window_slide, + i + units + ), + } + })?; + let window_end_offset_expr = + Literal::new(Some(window_end_offset.into()), DataType::Interval).into(); + let window_end_expr = FunctionCall::new( + ExprType::Add, + vec![hop_start.clone(), window_end_offset_expr], + )? + .into(); + window_end_exprs.push(window_end_expr); + } + } + assert_eq!(window_start_exprs.len(), window_end_exprs.len()); + Ok((window_start_exprs, window_end_exprs)) + } } impl PlanTreeNodeUnary for LogicalHopWindow { @@ -370,7 +463,9 @@ impl ToBatch for LogicalHopWindow { fn to_batch(&self) -> Result { let new_input = self.input().to_batch()?; let new_logical = self.clone_with_input(new_input); - Ok(BatchHopWindow::new(new_logical).into()) + let (window_start_exprs, window_end_exprs) = + new_logical.derive_window_start_and_end_exprs()?; + Ok(BatchHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into()) } }