From 0d63c68a2ab5aae34281d51c39f6b082f3279e10 Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Wed, 7 Dec 2022 05:03:08 +0800 Subject: [PATCH] Add get_window_frame in window_expr, show frame info in window_agg_exec (#4508) * fix confilt Signed-off-by: yangjiang # Conflicts: # datafusion/core/tests/sql/window.rs * fix ut Signed-off-by: yangjiang Signed-off-by: yangjiang --- .../core/src/physical_plan/windows/window_agg_exec.rs | 9 ++++++++- datafusion/core/tests/sql/window.rs | 4 ++-- datafusion/physical-expr/src/window/aggregate.rs | 4 ++++ datafusion/physical-expr/src/window/built_in.rs | 4 ++++ datafusion/physical-expr/src/window/window_expr.rs | 4 ++++ 5 files changed, 22 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index 5b4dc79eaf22..914e3e71dbad 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -268,7 +268,14 @@ impl ExecutionPlan for WindowAggExec { let g: Vec = self .window_expr .iter() - .map(|e| format!("{}: {:?}", e.name().to_owned(), e.field())) + .map(|e| { + format!( + "{}: {:?}, frame: {:?}", + e.name().to_owned(), + e.field(), + e.get_window_frame() + ) + }) .collect(); write!(f, "wdw=[{}]", g.join(", "))?; } diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index f8fbe67ab89f..2668a1097cf5 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -1628,8 +1628,8 @@ async fn test_window_agg_sort() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", " SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]", ] }; diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index da2421fd998d..52a43050b1cc 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -151,4 +151,8 @@ impl WindowExpr for AggregateWindowExpr { fn order_by(&self) -> &[PhysicalSortExpr] { &self.order_by } + + fn get_window_frame(&self) -> &Arc { + &self.window_frame + } } diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs index e291e8ca5b1d..95bf01608b82 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/built_in.rs @@ -133,4 +133,8 @@ impl WindowExpr for BuiltInWindowExpr { let results = results.iter().map(|i| i.as_ref()).collect::>(); concat(&results).map_err(DataFusionError::ArrowError) } + + fn get_window_frame(&self) -> &Arc { + &self.window_frame + } } diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index fe381935bb76..209e0544f2fa 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -21,6 +21,7 @@ use arrow::compute::kernels::sort::{SortColumn, SortOptions}; use arrow::record_batch::RecordBatch; use arrow::{array::ArrayRef, datatypes::Field}; use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::WindowFrame; use std::any::Any; use std::fmt::Debug; use std::ops::Range; @@ -127,4 +128,7 @@ pub trait WindowExpr: Send + Sync + Debug { order_by_columns.iter().map(|s| s.values.clone()).collect(); Ok((values, order_bys)) } + + // Get window frame of this WindowExpr, None if absent + fn get_window_frame(&self) -> &Arc; }