diff --git a/e2e_test/batch/basic/time_window.slt.part b/e2e_test/batch/basic/time_window.slt.part index ec7187da29319..11b70352f3113 100644 --- a/e2e_test/batch/basic/time_window.slt.part +++ b/e2e_test/batch/basic/time_window.slt.part @@ -28,6 +28,21 @@ from tumble(t1, created_at, interval '30' minute) order by row_id, window_start; 7 1 2022-01-01 10:51:00 2022-01-01 10:30:00 2022-01-01 11:00:00 8 3 2022-01-01 11:02:00 2022-01-01 11:00:00 2022-01-01 11:30:00 + +query IITTT +select row_id, uid, created_at, window_start, window_end +from tumble(t1, created_at, interval '30' minute, interval '13' minute) order by row_id, window_start; +---- +1 1 2022-01-01 10:00:00 2022-01-01 09:43:00 2022-01-01 10:13:00 +2 3 2022-01-01 10:05:00 2022-01-01 09:43:00 2022-01-01 10:13:00 +3 2 2022-01-01 10:14:00 2022-01-01 10:13:00 2022-01-01 10:43:00 +4 1 2022-01-01 10:22:00 2022-01-01 10:13:00 2022-01-01 10:43:00 +5 3 2022-01-01 10:33:00 2022-01-01 10:13:00 2022-01-01 10:43:00 +6 2 2022-01-01 10:42:00 2022-01-01 10:13:00 2022-01-01 10:43:00 +7 1 2022-01-01 10:51:00 2022-01-01 10:43:00 2022-01-01 11:13:00 +8 3 2022-01-01 11:02:00 2022-01-01 10:43:00 2022-01-01 11:13:00 + + query IITTT select row_id, uid, created_at, window_start, window_end from hop(t1, created_at, interval '15' minute, interval '30' minute) order by row_id, window_start; @@ -49,6 +64,27 @@ from hop(t1, created_at, interval '15' minute, interval '30' minute) order by ro 8 3 2022-01-01 11:02:00 2022-01-01 10:45:00 2022-01-01 11:15:00 8 3 2022-01-01 11:02:00 2022-01-01 11:00:00 2022-01-01 11:30:00 +query IITTT +select row_id, uid, created_at, window_start, window_end +from hop(t1, created_at, interval '15' minute, interval '30' minute, interval '13' minute) order by row_id, window_start; +---- +1 1 2022-01-01 10:00:00 2022-01-01 09:43:00 2022-01-01 10:13:00 +1 1 2022-01-01 10:00:00 2022-01-01 09:58:00 2022-01-01 10:28:00 +2 3 2022-01-01 10:05:00 2022-01-01 09:43:00 2022-01-01 10:13:00 +2 3 2022-01-01 10:05:00 2022-01-01 09:58:00 2022-01-01 10:28:00 +3 2 2022-01-01 10:14:00 2022-01-01 09:58:00 2022-01-01 10:28:00 +3 2 2022-01-01 10:14:00 2022-01-01 10:13:00 2022-01-01 10:43:00 +4 1 2022-01-01 10:22:00 2022-01-01 09:58:00 2022-01-01 10:28:00 +4 1 2022-01-01 10:22:00 2022-01-01 10:13:00 2022-01-01 10:43:00 +5 3 2022-01-01 10:33:00 2022-01-01 10:13:00 2022-01-01 10:43:00 +5 3 2022-01-01 10:33:00 2022-01-01 10:28:00 2022-01-01 10:58:00 +6 2 2022-01-01 10:42:00 2022-01-01 10:13:00 2022-01-01 10:43:00 +6 2 2022-01-01 10:42:00 2022-01-01 10:28:00 2022-01-01 10:58:00 +7 1 2022-01-01 10:51:00 2022-01-01 10:28:00 2022-01-01 10:58:00 +7 1 2022-01-01 10:51:00 2022-01-01 10:43:00 2022-01-01 11:13:00 +8 3 2022-01-01 11:02:00 2022-01-01 10:43:00 2022-01-01 11:13:00 +8 3 2022-01-01 11:02:00 2022-01-01 10:58:00 2022-01-01 11:28:00 + query IIT rowsort select row_id, uid, created_at from hop(t1, created_at, interval '15' minute, interval '30' minute); @@ -70,6 +106,29 @@ from hop(t1, created_at, interval '15' minute, interval '30' minute); 8 3 2022-01-01 11:02:00 8 3 2022-01-01 11:02:00 + +query IIT rowsort +select row_id, uid, created_at +from hop(t1, created_at, interval '15' minute, interval '30' minute, interval '13' minute); +---- +1 1 2022-01-01 10:00:00 +1 1 2022-01-01 10:00:00 +2 3 2022-01-01 10:05:00 +2 3 2022-01-01 10:05:00 +3 2 2022-01-01 10:14:00 +3 2 2022-01-01 10:14:00 +4 1 2022-01-01 10:22:00 +4 1 2022-01-01 10:22:00 +5 3 2022-01-01 10:33:00 +5 3 2022-01-01 10:33:00 +6 2 2022-01-01 10:42:00 +6 2 2022-01-01 10:42:00 +7 1 2022-01-01 10:51:00 +7 1 2022-01-01 10:51:00 +8 3 2022-01-01 11:02:00 +8 3 2022-01-01 11:02:00 + + query IT select sum(v), window_start from tumble(t1, created_at, interval '30' minute) @@ -79,6 +138,15 @@ group by window_start order by window_start; 18 2022-01-01 10:30:00 8 2022-01-01 11:00:00 +query IT +select sum(v), window_start +from tumble(t1, created_at, interval '30' minute, interval '13' minute) +group by window_start order by window_start; +---- +7 2022-01-01 09:43:00 +15 2022-01-01 10:13:00 +14 2022-01-01 10:43:00 + query IIT select uid, sum(v), window_start from tumble(t1, created_at, interval '30' minute) @@ -92,6 +160,20 @@ group by window_start, uid order by window_start, uid; 3 5 2022-01-01 10:30:00 3 8 2022-01-01 11:00:00 +query IIT +select uid, sum(v), window_start +from tumble(t1, created_at, interval '30' minute, interval '13' minute) +group by window_start, uid order by window_start, uid; +---- +1 4 2022-01-01 09:43:00 +3 3 2022-01-01 09:43:00 +1 1 2022-01-01 10:13:00 +2 9 2022-01-01 10:13:00 +3 5 2022-01-01 10:13:00 +1 6 2022-01-01 10:43:00 +3 8 2022-01-01 10:43:00 + + query IT select sum(v), window_start from hop(t1, created_at, interval '15' minute, interval '30' minute) @@ -104,6 +186,19 @@ group by window_start order by window_start; 14 2022-01-01 10:45:00 8 2022-01-01 11:00:00 + +query IT +select sum(v), window_start +from hop(t1, created_at, interval '15' minute, interval '30' minute, interval '13' minute) +group by window_start order by window_start; +---- +7 2022-01-01 09:43:00 +10 2022-01-01 09:58:00 +15 2022-01-01 10:13:00 +18 2022-01-01 10:28:00 +14 2022-01-01 10:43:00 +8 2022-01-01 10:58:00 + query IIT select uid, sum(v), window_start from hop(t1, created_at, interval '15' minute, interval '30' minute) @@ -125,6 +220,28 @@ group by window_start, uid order by window_start, uid; 3 8 2022-01-01 10:45:00 3 8 2022-01-01 11:00:00 + + +query IIT +select uid, sum(v), window_start +from hop(t1, created_at, interval '15' minute, interval '30' minute, interval '13' minute) +group by window_start, uid order by window_start, uid; +---- +1 4 2022-01-01 09:43:00 +3 3 2022-01-01 09:43:00 +1 5 2022-01-01 09:58:00 +2 2 2022-01-01 09:58:00 +3 3 2022-01-01 09:58:00 +1 1 2022-01-01 10:13:00 +2 9 2022-01-01 10:13:00 +3 5 2022-01-01 10:13:00 +1 6 2022-01-01 10:28:00 +2 7 2022-01-01 10:28:00 +3 5 2022-01-01 10:28:00 +1 6 2022-01-01 10:43:00 +3 8 2022-01-01 10:43:00 +3 8 2022-01-01 10:58:00 + statement error select * from hop(t1, created_at, interval '0', interval '1'); diff --git a/src/batch/src/executor/hop_window.rs b/src/batch/src/executor/hop_window.rs index 6f84a692f0fd6..c5c598b3baa80 100644 --- a/src/batch/src/executor/hop_window.rs +++ b/src/batch/src/executor/hop_window.rs @@ -151,7 +151,6 @@ impl HopWindowExecutor { async fn do_execute(self: Box) { let Self { child, - window_slide, window_size, output_indices, @@ -219,7 +218,12 @@ mod tests { use super::*; use crate::executor::test_utils::MockExecutor; - fn create_executor(output_indices: Vec) -> Box { + fn create_executor( + output_indices: Vec, + window_slide: IntervalUnit, + window_size: IntervalUnit, + window_offset: IntervalUnit, + ) -> Box { let field1 = Field::unnamed(DataType::Int64); let field2 = Field::unnamed(DataType::Int64); let field3 = Field::with_name(DataType::Timestamp, "created_at"); @@ -237,14 +241,17 @@ mod tests { 8 3 ^11:02:00" .replace('^', "2022-2-2T"), ); - let mut mock_executor = MockExecutor::new(schema.clone()); mock_executor.add(chunk); - let window_slide = IntervalUnit::from_minutes(15); - let window_size = IntervalUnit::from_minutes(30); - let (window_start_exprs, window_end_exprs) = - make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap(); + let (window_start_exprs, window_end_exprs) = make_hop_window_expression( + DataType::Timestamp, + 2, + window_size, + window_slide, + window_offset, + ) + .unwrap(); Box::new(HopWindowExecutor::new( Box::new(mock_executor), @@ -259,10 +266,94 @@ mod tests { )) } + #[tokio::test] + async fn test_window_offset() { + async fn test_window_offset_helper(window_offset: IntervalUnit) -> DataChunk { + let default_indices = (0..3 + 2).collect_vec(); + let window_slide = IntervalUnit::from_minutes(15); + let window_size = IntervalUnit::from_minutes(30); + let executor = + create_executor(default_indices, window_slide, window_size, window_offset); + let mut stream = executor.execute(); + stream.next().await.unwrap().unwrap() + } + + let window_size = 30; + for offset in 0..window_size { + for coefficient in -5..0 { + assert_eq!( + test_window_offset_helper(IntervalUnit::from_minutes( + coefficient * window_size + offset + )) + .await, + test_window_offset_helper(IntervalUnit::from_minutes( + (coefficient - 1) * window_size + offset + )) + .await + ); + } + } + for offset in 0..window_size { + for coefficient in 0..5 { + assert_eq!( + test_window_offset_helper(IntervalUnit::from_minutes( + coefficient * window_size + offset + )) + .await, + test_window_offset_helper(IntervalUnit::from_minutes( + (coefficient + 1) * window_size + offset + )) + .await + ); + } + } + for offset in -window_size..window_size { + assert_eq!( + test_window_offset_helper(IntervalUnit::from_minutes(window_size + offset)).await, + test_window_offset_helper(IntervalUnit::from_minutes(-window_size + offset)).await + ); + } + + assert_eq!( + test_window_offset_helper(IntervalUnit::from_minutes(-31)).await, + DataChunk::from_pretty( + &"I I TS TS TS + 1 1 ^10:00:00 ^09:44:00 ^10:14:00 + 2 3 ^10:05:00 ^09:44:00 ^10:14:00 + 3 2 ^10:14:00 ^09:59:00 ^10:29:00 + 4 1 ^10:22:00 ^09:59:00 ^10:29:00 + 5 3 ^10:33:00 ^10:14:00 ^10:44:00 + 6 2 ^10:42:00 ^10:14:00 ^10:44:00 + 7 1 ^10:51:00 ^10:29:00 ^10:59:00 + 8 3 ^11:02:00 ^10:44:00 ^11:14:00" + .replace('^', "2022-2-2T"), + ) + ); + assert_eq!( + test_window_offset_helper(IntervalUnit::from_minutes(29)).await, + DataChunk::from_pretty( + &"I I TS TS TS + 1 1 ^10:00:00 ^09:44:00 ^10:14:00 + 2 3 ^10:05:00 ^09:44:00 ^10:14:00 + 3 2 ^10:14:00 ^09:59:00 ^10:29:00 + 4 1 ^10:22:00 ^09:59:00 ^10:29:00 + 5 3 ^10:33:00 ^10:14:00 ^10:44:00 + 6 2 ^10:42:00 ^10:14:00 ^10:44:00 + 7 1 ^10:51:00 ^10:29:00 ^10:59:00 + 8 3 ^11:02:00 ^10:44:00 ^11:14:00" + .replace('^', "2022-2-2T"), + ) + ); + } + #[tokio::test] async fn test_execute() { let default_indices = (0..3 + 2).collect_vec(); - let executor = create_executor(default_indices); + + let window_slide = IntervalUnit::from_minutes(15); + let window_size = IntervalUnit::from_minutes(30); + let window_offset = IntervalUnit::from_minutes(0); + let executor = create_executor(default_indices, window_slide, window_size, window_offset); let mut stream = executor.execute(); // TODO: add more test infra to reduce the duplicated codes below. @@ -303,7 +394,10 @@ mod tests { } #[tokio::test] async fn test_output_indices() { - let executor = create_executor(vec![1, 3, 4, 2]); + let window_slide = IntervalUnit::from_minutes(15); + let window_size = IntervalUnit::from_minutes(30); + let window_offset = IntervalUnit::from_minutes(0); + let executor = create_executor(vec![1, 3, 4, 2], window_slide, window_size, window_offset); let mut stream = executor.execute(); // TODO: add more test infra to reduce the duplicated codes below. diff --git a/src/common/src/types/interval.rs b/src/common/src/types/interval.rs index 153eadc9076ab..515b66f40c526 100644 --- a/src/common/src/types/interval.rs +++ b/src/common/src/types/interval.rs @@ -45,9 +45,9 @@ pub struct IntervalUnit { usecs: i64, } -const USECS_PER_SEC: i64 = 1_000_000; -const USECS_PER_DAY: i64 = 86400 * USECS_PER_SEC; -const USECS_PER_MONTH: i64 = 30 * USECS_PER_DAY; +pub const USECS_PER_SEC: i64 = 1_000_000; +pub const USECS_PER_DAY: i64 = 86400 * USECS_PER_SEC; +pub const USECS_PER_MONTH: i64 = 30 * USECS_PER_DAY; impl IntervalUnit { /// Smallest interval value. diff --git a/src/expr/src/expr/build_expr_from_prost.rs b/src/expr/src/expr/build_expr_from_prost.rs index 7db4ea1e46a89..eadc170431e15 100644 --- a/src/expr/src/expr/build_expr_from_prost.rs +++ b/src/expr/src/expr/build_expr_from_prost.rs @@ -51,6 +51,8 @@ use super::expr_unary::{ use super::expr_vnode::VnodeExpression; use crate::expr::expr_array_distinct::ArrayDistinctExpression; use crate::expr::expr_array_to_string::ArrayToStringExpression; +use crate::expr::expr_binary_nonnull::new_tumble_start; +use crate::expr::expr_ternary::new_tumble_start_offset; use crate::expr::{ build_from_prost as expr_build_from_prost, BoxedExpression, Expression, InputRefExpression, LiteralExpression, @@ -69,9 +71,9 @@ pub fn build_from_prost(prost: &ExprNode) -> Result { build_unary_expr_prost(prost) } Equal | NotEqual | LessThan | LessThanOrEqual | GreaterThan | GreaterThanOrEqual | Add - | Subtract | Multiply | Divide | Modulus | Extract | RoundDigit | Pow | TumbleStart - | Position | BitwiseShiftLeft | BitwiseShiftRight | BitwiseAnd | BitwiseOr | BitwiseXor - | ConcatOp | AtTimeZone | CastWithTimeZone | JsonbAccessInner | JsonbAccessStr => { + | Subtract | Multiply | Divide | Modulus | Extract | RoundDigit | Pow | Position + | BitwiseShiftLeft | BitwiseShiftRight | BitwiseAnd | BitwiseOr | BitwiseXor | ConcatOp + | AtTimeZone | CastWithTimeZone | JsonbAccessInner | JsonbAccessStr => { build_binary_expr_prost(prost) } And | Or | IsDistinctFrom | IsNotDistinctFrom | ArrayAccess | FormatType => { @@ -87,6 +89,7 @@ pub fn build_from_prost(prost: &ExprNode) -> Result { Translate => build_translate_expr(prost), // Variable number of arguments and based on `Unary/Binary/Ternary/...Expression` + TumbleStart => build_tumble_start_expr(prost), Substr => build_substr_expr(prost), Overlay => build_overlay_expr(prost), Trim => build_trim_expr(prost), @@ -272,6 +275,21 @@ fn build_date_trunc_expr(prost: &ExprNode) -> Result { Ok(new_date_trunc_expr(ret_type, field, source, time_zone)) } +fn build_tumble_start_expr(prost: &ExprNode) -> Result { + let (children, ret_type) = get_children_and_return_type(prost)?; + ensure!(children.len() == 2 || children.len() == 3); + let time = expr_build_from_prost(&children[0])?; + let window_size = expr_build_from_prost(&children[1])?; + if children.len() == 2 { + new_tumble_start(time, window_size, ret_type) + } else if children.len() == 3 { + let offset = expr_build_from_prost(&children[2])?; + new_tumble_start_offset(time, window_size, offset, ret_type) + } else { + unreachable!() + } +} + fn build_length_expr(prost: &ExprNode) -> Result { let (children, ret_type) = get_children_and_return_type(prost)?; // TODO: add encoding length expr diff --git a/src/expr/src/expr/expr_binary_nonnull.rs b/src/expr/src/expr/expr_binary_nonnull.rs index 613e5c8fe8f3d..2e4a273cf05b7 100644 --- a/src/expr/src/expr/expr_binary_nonnull.rs +++ b/src/expr/src/expr/expr_binary_nonnull.rs @@ -688,7 +688,6 @@ pub fn new_binary_expr( Type::Position => Box::new(BinaryExpression::::new( l, r, ret, position, )), - Type::TumbleStart => new_tumble_start(l, r, ret)?, Type::ConcatOp => new_concat_op(l, r, ret), Type::JsonbAccessInner => match r.return_type() { DataType::Varchar => { @@ -722,7 +721,6 @@ pub fn new_binary_expr( .boxed(), t => return Err(ExprError::UnsupportedFunction(format!("jsonb ->> {t}"))), }, - tp => { return Err(ExprError::UnsupportedFunction(format!( "{:?}({:?}, {:?})", @@ -735,7 +733,7 @@ pub fn new_binary_expr( Ok(expr) } -fn new_tumble_start( +pub fn new_tumble_start( expr_ia1: BoxedExpression, expr_ia2: BoxedExpression, return_type: DataType, diff --git a/src/expr/src/expr/expr_ternary.rs b/src/expr/src/expr/expr_ternary.rs new file mode 100644 index 0000000000000..e249a5ee73c1b --- /dev/null +++ b/src/expr/src/expr/expr_ternary.rs @@ -0,0 +1,80 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::array::{I64Array, IntervalArray, NaiveDateArray, NaiveDateTimeArray}; +use risingwave_common::types::DataType; + +use super::template::TernaryExpression; +use super::BoxedExpression; +use crate::vector_op::tumble::{ + tumble_start_offset_date, tumble_start_offset_date_time, tumble_start_offset_timestamptz, +}; +use crate::{ExprError, Result}; + +pub(crate) fn new_tumble_start_offset( + time: BoxedExpression, + window_size: BoxedExpression, + offset: BoxedExpression, + return_type: DataType, +) -> Result { + let expr: BoxedExpression = match time.return_type() { + DataType::Date => Box::new(TernaryExpression::< + NaiveDateArray, + IntervalArray, + IntervalArray, + NaiveDateTimeArray, + _, + >::new( + time, + window_size, + offset, + return_type, + tumble_start_offset_date, + )), + DataType::Timestamp => Box::new(TernaryExpression::< + NaiveDateTimeArray, + IntervalArray, + IntervalArray, + NaiveDateTimeArray, + _, + >::new( + time, + window_size, + offset, + return_type, + tumble_start_offset_date_time, + )), + DataType::Timestamptz => Box::new(TernaryExpression::< + I64Array, + IntervalArray, + IntervalArray, + I64Array, + _, + >::new( + time, + window_size, + offset, + return_type, + tumble_start_offset_timestamptz, + )), + _ => { + return Err(ExprError::UnsupportedFunction(format!( + "tumble_start_offset is not supported for {:?}", + time.return_type() + ))) + } + }; + + Ok(expr) +} diff --git a/src/expr/src/expr/mod.rs b/src/expr/src/expr/mod.rs index 3f9ea94e36ec9..f9f3b6ec7a535 100644 --- a/src/expr/src/expr/mod.rs +++ b/src/expr/src/expr/mod.rs @@ -51,6 +51,7 @@ mod expr_nested_construct; mod expr_quaternary_bytes; pub mod expr_regexp; mod expr_some_all; +mod expr_ternary; mod expr_ternary_bytes; mod expr_to_char_const_tmpl; mod expr_to_timestamp_const_tmpl; diff --git a/src/expr/src/expr/test_utils.rs b/src/expr/src/expr/test_utils.rs index 9ce1f141eaa98..d32cada054ac3 100644 --- a/src/expr/src/expr/test_utils.rs +++ b/src/expr/src/expr/test_utils.rs @@ -26,6 +26,7 @@ use risingwave_pb::expr::expr_node::Type::{Field, InputRef}; use risingwave_pb::expr::expr_node::{self, RexNode, Type}; use risingwave_pb::expr::{ExprNode, FunctionCall}; +use super::expr_ternary::new_tumble_start_offset; use super::{ new_binary_expr, BoxedExpression, Expression, InputRefExpression, LiteralExpression, Result, }; @@ -101,6 +102,7 @@ pub fn make_hop_window_expression( time_col_idx: usize, window_size: IntervalUnit, window_slide: IntervalUnit, + window_offset: IntervalUnit, ) -> Result<(Vec, Vec)> { let units = window_size .exact_div(&window_slide) @@ -121,10 +123,15 @@ pub fn make_hop_window_expression( let window_slide_expr = LiteralExpression::new(DataType::Interval, Some(ScalarImpl::Interval(window_slide))) .boxed(); + let window_offset_expr = LiteralExpression::new( + DataType::Interval, + Some(ScalarImpl::Interval(window_offset)), + ) + .boxed(); // The first window_start of hop window should be: - // tumble_start(`time_col` - (`window_size` - `window_slide`), `window_slide`). - // Let's pre calculate (`window_size` - `window_slide`). + // tumble_start(`time_col` - (`window_size` - `window_slide`), `window_slide`, + // `window_offset`). Let's pre calculate (`window_size` - `window_slide`). let window_size_sub_slide = window_size .checked_sub(&window_slide) @@ -141,9 +148,7 @@ pub fn make_hop_window_expression( ) .boxed(); - let hop_start = new_binary_expr( - expr_node::Type::TumbleStart, - output_type.clone(), + let hop_start = new_tumble_start_offset( new_binary_expr( expr_node::Type::Subtract, output_type.clone(), @@ -151,7 +156,10 @@ pub fn make_hop_window_expression( window_size_sub_slide_expr, )?, window_slide_expr, + window_offset_expr, + output_type.clone(), )?; + Ok(hop_start) }; diff --git a/src/expr/src/sig/func.rs b/src/expr/src/sig/func.rs index c430bcd22eab8..24dcbf383871c 100644 --- a/src/expr/src/sig/func.rs +++ b/src/expr/src/sig/func.rs @@ -233,12 +233,22 @@ fn build_type_derive_map() -> FuncSigMap { } for t in [T::Timestamp, T::Date] { map.insert(E::TumbleStart, vec![t, T::Interval], T::Timestamp); + map.insert( + E::TumbleStart, + vec![t, T::Interval, T::Interval], + T::Timestamp, + ); } map.insert( E::TumbleStart, vec![T::Timestamptz, T::Interval], T::Timestamptz, ); + map.insert( + E::TumbleStart, + vec![T::Timestamptz, T::Interval, T::Interval], + T::Timestamptz, + ); map.insert(E::ToTimestamp, vec![T::Float64], T::Timestamptz); map.insert(E::ToTimestamp1, vec![T::Varchar, T::Varchar], T::Timestamp); map.insert( diff --git a/src/expr/src/vector_op/tumble.rs b/src/expr/src/vector_op/tumble.rs index d8eee168eaaef..f6e0c29120d45 100644 --- a/src/expr/src/vector_op/tumble.rs +++ b/src/expr/src/vector_op/tumble.rs @@ -12,73 +12,122 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::{IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper}; +use num_traits::Zero; +use risingwave_common::types::{ + IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper, USECS_PER_DAY, USECS_PER_MONTH, +}; -use crate::{ExprError, Result}; +use crate::Result; + +#[inline(always)] +fn interval_unit_to_micro_second(t: IntervalUnit) -> i64 { + t.get_months() as i64 * USECS_PER_MONTH + t.get_days() as i64 * USECS_PER_DAY + t.get_usecs() +} #[inline(always)] pub fn tumble_start_date( - time: NaiveDateWrapper, - window: IntervalUnit, + timestamp: NaiveDateWrapper, + window_size: IntervalUnit, ) -> Result { - tumble_start_date_time(time.into(), window) + tumble_start_date_time(timestamp.into(), window_size) } #[inline(always)] pub fn tumble_start_date_time( - time: NaiveDateTimeWrapper, - window: IntervalUnit, + timestamp: NaiveDateTimeWrapper, + window_size: IntervalUnit, ) -> Result { - let diff = time.0.timestamp_micros(); - let window_start = tm_diff_bin(diff, window)?; + let timestamp_micro_second = timestamp.0.timestamp_micros(); + let window_start_micro_second = get_window_start(timestamp_micro_second, window_size)?; Ok(NaiveDateTimeWrapper::from_timestamp_uncheck( - window_start / 1_000_000, - (window_start % 1_000_000 * 1000) as u32, + window_start_micro_second / 1_000_000, + (window_start_micro_second % 1_000_000 * 1000) as u32, )) } #[inline(always)] -pub fn tumble_start_timestamptz(time: i64, window: IntervalUnit) -> Result { - // Actually directly calls into the helper `tm_diff_bin`. But we keep the shared utility and - // enduser function separate. - let diff = time; - let window_start = tm_diff_bin(diff, window)?; - Ok(window_start) +pub fn tumble_start_timestamptz( + timestamp_micro_second: i64, + window_size: IntervalUnit, +) -> Result { + let timestamp_micro_second = timestamp_micro_second; + let window_size = window_size; + get_window_start(timestamp_micro_second, window_size) } /// The common part of PostgreSQL function `timestamp_bin` and `timestamptz_bin`. #[inline(always)] -fn tm_diff_bin(diff_usecs: i64, window: IntervalUnit) -> Result { - if window.get_months() != 0 { - return Err(ExprError::InvalidParam { - name: "window", - reason: "unimplemented: tumble_start only support days or milliseconds".to_string(), - }); - } - let window_usecs = window.get_days() as i64 * 24 * 60 * 60 * 1_000_000 + window.get_usecs(); +fn get_window_start(timestamp_micro_second: i64, window_size: IntervalUnit) -> Result { + get_window_start_with_offset(timestamp_micro_second, window_size, IntervalUnit::zero()) +} - if window_usecs <= 0 { - return Err(ExprError::InvalidParam { - name: "window", - reason: "window must be positive".to_string(), - }); +#[inline(always)] +pub fn tumble_start_offset_date( + timestamp_date: NaiveDateWrapper, + window_size: IntervalUnit, + offset: IntervalUnit, +) -> Result { + tumble_start_offset_date_time(timestamp_date.into(), window_size, offset) +} + +#[inline(always)] +pub fn tumble_start_offset_date_time( + time: NaiveDateTimeWrapper, + window_size: IntervalUnit, + offset: IntervalUnit, +) -> Result { + let timestamp_micro_second = time.0.timestamp_micros(); + let window_start_micro_second = + get_window_start_with_offset(timestamp_micro_second, window_size, offset)?; + + Ok(NaiveDateTimeWrapper::from_timestamp_uncheck( + window_start_micro_second / 1_000_000, + (window_start_micro_second % 1_000_000 * 1000) as u32, + )) +} + +#[inline(always)] +fn get_window_start_with_offset( + timestamp_micro_second: i64, + window_size: IntervalUnit, + offset: IntervalUnit, +) -> Result { + let window_size_micro_second = interval_unit_to_micro_second(window_size); + let offset_micro_second = interval_unit_to_micro_second(offset); + + // Inspired by https://issues.apache.org/jira/browse/FLINK-26334 + let remainder = (timestamp_micro_second - offset_micro_second) % window_size_micro_second; + if remainder < 0 { + Ok(timestamp_micro_second - (remainder + window_size_micro_second)) + } else { + Ok(timestamp_micro_second - remainder) } +} - let delta_usecs = diff_usecs - diff_usecs % window_usecs; - Ok(delta_usecs) +#[inline(always)] +pub fn tumble_start_offset_timestamptz( + timestamp_micro_second: i64, + window_size: IntervalUnit, + offset: IntervalUnit, +) -> Result { + get_window_start_with_offset(timestamp_micro_second, window_size, offset) } #[cfg(test)] mod tests { use chrono::{Datelike, Timelike}; + use risingwave_common::types::test_utils::IntervalUnitTestExt; use risingwave_common::types::{IntervalUnit, NaiveDateWrapper}; - use super::tumble_start_date_time; + use super::tumble_start_offset_date_time; + use crate::vector_op::tumble::{ + get_window_start, interval_unit_to_micro_second, tumble_start_date_time, + }; #[test] fn test_tumble_start_date_time() { let dt = NaiveDateWrapper::from_ymd_uncheck(2022, 2, 22).and_hms_uncheck(22, 22, 22); - let interval = IntervalUnit::from_month_day_usec(0, 0, 30 * 60 * 1_000_000); + let interval = IntervalUnit::from_minutes(30); let w = tumble_start_date_time(dt, interval).unwrap().0; assert_eq!(w.year(), 2022); assert_eq!(w.month(), 2); @@ -87,4 +136,59 @@ mod tests { assert_eq!(w.minute(), 0); assert_eq!(w.second(), 0); } + + #[test] + fn test_tumble_start_offset_date_time() { + let dt = NaiveDateWrapper::from_ymd_uncheck(2022, 2, 22).and_hms_uncheck(22, 22, 22); + let window_size = 30; + for offset in 0..window_size { + for coefficient in 0..5 { + let w = tumble_start_date_time(dt, IntervalUnit::from_minutes(window_size)) + .unwrap() + .0; + println!("{}", w); + let w = tumble_start_offset_date_time( + dt, + IntervalUnit::from_minutes(window_size), + IntervalUnit::from_minutes(coefficient * window_size + offset), + ) + .unwrap() + .0; + assert_eq!(w.year(), 2022); + assert_eq!(w.month(), 2); + assert_eq!(w.day(), 22); + if offset > 22 { + assert_eq!(w.hour(), 21); + assert_eq!(w.minute(), 30 + offset as u32); + } else { + assert_eq!(w.hour(), 22); + assert_eq!(w.minute(), offset as u32); + } + + assert_eq!(w.second(), 0); + } + } + } + + #[test] + fn test_remainder_necessary() { + let mut wrong_cnt = 0; + for i in -30..30 { + let timestamp_micro_second = IntervalUnit::from_minutes(i).get_usecs(); + let window_size = IntervalUnit::from_minutes(5); + let window_start = get_window_start(timestamp_micro_second, window_size).unwrap(); + + let window_size_micro_second = interval_unit_to_micro_second(window_size); + let default_window_start = timestamp_micro_second + - (timestamp_micro_second + window_size_micro_second) % window_size_micro_second; + + if timestamp_micro_second < default_window_start { + // which is wrong + wrong_cnt += 1; + } + + assert!(timestamp_micro_second >= window_start) + } + assert_ne!(wrong_cnt, 0); + } } diff --git a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs index 05b7adcd6b5a1..475cfc2f781aa 100644 --- a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs @@ -36,6 +36,7 @@ pub struct HopWindow { pub time_col: InputRef, pub window_slide: IntervalUnit, pub window_size: IntervalUnit, + pub window_offset: IntervalUnit, /// Provides mapping from input schema, window_start, window_end to output schema. /// For example, if we had: /// input schema: | 0: trip_time | 1: trip_name | @@ -118,12 +119,22 @@ impl GenericPlanNode for HopWindow { } impl HopWindow { - pub fn into_parts(self) -> (PlanRef, InputRef, IntervalUnit, IntervalUnit, Vec) { + pub fn into_parts( + self, + ) -> ( + PlanRef, + InputRef, + IntervalUnit, + IntervalUnit, + IntervalUnit, + Vec, + ) { ( self.input, self.time_col, self.window_slide, self.window_size, + self.window_offset, self.output_indices, ) } @@ -170,6 +181,7 @@ impl HopWindow { let Self { window_size, window_slide, + window_offset, time_col, .. } = &self; @@ -184,9 +196,13 @@ impl HopWindow { ), })? .get(); - let window_size_expr = Literal::new(Some((*window_size).into()), DataType::Interval).into(); + let window_size_expr: ExprImpl = + Literal::new(Some((*window_size).into()), DataType::Interval).into(); let window_slide_expr: ExprImpl = Literal::new(Some((*window_slide).into()), DataType::Interval).into(); + let window_offset_expr: ExprImpl = + Literal::new(Some((*window_offset).into()), DataType::Interval).into(); + let window_size_sub_slide = FunctionCall::new( ExprType::Subtract, vec![window_size_expr, window_slide_expr.clone()], @@ -204,7 +220,7 @@ impl HopWindow { let hop_start: ExprImpl = FunctionCall::new( ExprType::TumbleStart, - vec![time_col_shifted, window_slide_expr], + vec![time_col_shifted, window_slide_expr, window_offset_expr], )? .into(); diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index 86daeb1030be2..6559b4c69e195 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -46,6 +46,7 @@ impl LogicalHopWindow { time_col: InputRef, window_slide: IntervalUnit, window_size: IntervalUnit, + window_offset: IntervalUnit, output_indices: Option>, ) -> Self { // if output_indices is not specified, use default output_indices @@ -56,6 +57,7 @@ impl LogicalHopWindow { time_col, window_slide, window_size, + window_offset, output_indices, }; @@ -82,7 +84,16 @@ impl LogicalHopWindow { LogicalHopWindow { base, core } } - pub fn into_parts(self) -> (PlanRef, InputRef, IntervalUnit, IntervalUnit, Vec) { + pub fn into_parts( + self, + ) -> ( + PlanRef, + InputRef, + IntervalUnit, + IntervalUnit, + IntervalUnit, + Vec, + ) { self.core.into_parts() } @@ -93,6 +104,7 @@ impl LogicalHopWindow { time_col: InputRef, window_slide: IntervalUnit, window_size: IntervalUnit, + window_offset: IntervalUnit, ) -> PlanRef { let input = LogicalFilter::create_with_expr( input, @@ -100,7 +112,15 @@ impl LogicalHopWindow { .unwrap() .into(), ); - Self::new(input, time_col, window_slide, window_size, None).into() + Self::new( + input, + time_col, + window_slide, + window_size, + window_offset, + None, + ) + .into() } pub fn internal_window_start_col_idx(&self) -> usize { @@ -133,6 +153,7 @@ impl LogicalHopWindow { self.core.time_col.clone(), self.core.window_slide, self.core.window_size, + self.core.window_offset, Some(output_indices), ) } @@ -168,6 +189,7 @@ impl PlanTreeNodeUnary for LogicalHopWindow { self.core.time_col.clone(), self.core.window_slide, self.core.window_size, + self.core.window_offset, Some(self.core.output_indices.clone()), ) } @@ -209,6 +231,7 @@ impl PlanTreeNodeUnary for LogicalHopWindow { time_col, self.core.window_slide, self.core.window_size, + self.core.window_offset, Some(new_output_indices), ); ( @@ -341,7 +364,8 @@ impl ToStream for LogicalHopWindow { ) -> Result<(PlanRef, ColIndexMapping)> { let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?; let (hop, out_col_change) = self.rewrite_with_input(input, input_col_change); - let (input, time_col, window_slide, window_size, mut output_indices) = hop.into_parts(); + let (input, time_col, window_slide, window_size, window_offset, mut output_indices) = + hop.into_parts(); if !output_indices.contains(&input.schema().len()) && !output_indices.contains(&(input.schema().len() + 1)) // When both `window_start` and `window_end` are not in `output_indices`, @@ -362,6 +386,7 @@ impl ToStream for LogicalHopWindow { time_col, window_slide, window_size, + window_offset, Some(output_indices), ); Ok((new_hop.into(), out_col_change)) @@ -411,6 +436,7 @@ mod test { InputRef::new(0, DataType::Date), IntervalUnit::from_month_day_usec(0, 1, 0), IntervalUnit::from_month_day_usec(0, 3, 0), + IntervalUnit::from_month_day_usec(0, 0, 0), None, ) .into(); @@ -466,6 +492,7 @@ mod test { InputRef::new(0, DataType::Date), IntervalUnit::from_month_day_usec(0, 1, 0), IntervalUnit::from_month_day_usec(0, 3, 0), + IntervalUnit::from_month_day_usec(0, 0, 0), None, ) .into(); diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index f3f1946d1f51c..2a6b56a80159d 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -16,7 +16,7 @@ use std::rc::Rc; use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result}; -use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_common::types::{DataType, IntervalUnit, ScalarImpl}; use crate::binder::{ BoundBaseTable, BoundJoin, BoundShare, BoundSource, BoundSystemTable, BoundWatermark, @@ -195,8 +195,8 @@ impl Planner { let mut args = args.into_iter(); let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?; - match (args.next(), args.next()) { - (Some(window_size @ ExprImpl::Literal(_)), None) => { + match (args.next(), args.next(), args.next()) { + (Some(window_size @ ExprImpl::Literal(_)), None, None) => { let mut exprs = Vec::with_capacity(col_data_types.len() + 2); for (idx, col_dt) in col_data_types.iter().enumerate() { exprs.push(InputRef::new(idx, col_dt.clone()).into()); @@ -218,6 +218,36 @@ impl Planner { let project = LogicalProject::create(base, exprs); Ok(project) } + ( + Some(window_size @ ExprImpl::Literal(_)), + Some(window_offset @ ExprImpl::Literal(_)), + None, + ) => { + let mut exprs = Vec::with_capacity(col_data_types.len() + 2); + for (idx, col_dt) in col_data_types.iter().enumerate() { + exprs.push(InputRef::new(idx, col_dt.clone()).into()); + } + let window_start: ExprImpl = FunctionCall::new( + ExprType::TumbleStart, + vec![ + ExprImpl::InputRef(Box::new(time_col)), + window_size.clone(), + window_offset, + ], + )? + .into(); + // TODO: `window_end` may be optimized to avoid double calculation of + // `tumble_start`, or we can depends on common expression + // optimization. + let window_end = + FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])? + .into(); + exprs.push(window_start); + exprs.push(window_end); + let base = self.plan_relation(input)?; + let project = LogicalProject::create(base, exprs); + Ok(project) + } _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_string()).into()), } } @@ -233,6 +263,7 @@ impl Planner { let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) = args.next_tuple() else { return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_string()).into()); }; + let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else { return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_string()).into()); }; @@ -240,6 +271,15 @@ impl Planner { return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_string()).into()); }; + let window_offset = match (args.next(), args.next()) { + (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() { + Some(ScalarImpl::Interval(window_offset)) => window_offset, + _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_string()).into()), + }, + (None, None) => IntervalUnit::from_month_day_usec(0, 0, 0), + _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_string()).into()), + }; + if !window_size.is_positive() || !window_slide.is_positive() { return Err(ErrorCode::BindError(format!( "window_size {} and window_slide {} must be positive", @@ -251,11 +291,13 @@ impl Planner { if window_size.exact_div(&window_slide).is_none() { return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into()); } + Ok(LogicalHopWindow::create( input, time_col, window_slide, window_size, + window_offset, )) } } diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs index 05bf09d990c26..ded6e3fcf4fe3 100644 --- a/src/stream/src/executor/hop_window.rs +++ b/src/stream/src/executor/hop_window.rs @@ -29,7 +29,6 @@ pub struct HopWindowExecutor { ctx: ActorContextRef, pub input: BoxedExecutor, pub info: ExecutorInfo, - pub time_col_idx: usize, pub window_slide: IntervalUnit, pub window_size: IntervalUnit, @@ -230,8 +229,15 @@ mod tests { MockSource::with_chunks(schema.clone(), pk_indices.clone(), vec![chunk]).boxed(); let window_slide = IntervalUnit::from_minutes(15); let window_size = IntervalUnit::from_minutes(30); - let (window_start_exprs, window_end_exprs) = - make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap(); + let window_offset = IntervalUnit::from_minutes(0); + let (window_start_exprs, window_end_exprs) = make_hop_window_expression( + DataType::Timestamp, + 2, + window_size, + window_slide, + window_offset, + ) + .unwrap(); super::HopWindowExecutor::new( ActorContext::create(123), @@ -346,8 +352,10 @@ mod tests { let window_slide = IntervalUnit::from_minutes(15); let window_size = IntervalUnit::from_minutes(30); + let offset = IntervalUnit::from_minutes(0); let (window_start_exprs, window_end_exprs) = - make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap(); + make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide, offset) + .unwrap(); ( tx,