Skip to content

Commit

Permalink
Add get_window_frame in window_expr, show frame info in window_agg_ex…
Browse files Browse the repository at this point in the history
…ec (#4508)

* fix confilt

Signed-off-by: yangjiang <[email protected]>

# Conflicts:
#	datafusion/core/tests/sql/window.rs

* fix ut

Signed-off-by: yangjiang <[email protected]>

Signed-off-by: yangjiang <[email protected]>
  • Loading branch information
Ted-Jiang authored Dec 6, 2022
1 parent bfd41a3 commit 0d63c68
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 3 deletions.
9 changes: 8 additions & 1 deletion datafusion/core/src/physical_plan/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,14 @@ impl ExecutionPlan for WindowAggExec {
let g: Vec<String> = self
.window_expr
.iter()
.map(|e| format!("{}: {:?}", e.name().to_owned(), e.field()))
.map(|e| {
format!(
"{}: {:?}, frame: {:?}",
e.name().to_owned(),
e.field(),
e.get_window_frame()
)
})
.collect();
write!(f, "wdw=[{}]", g.join(", "))?;
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1628,8 +1628,8 @@ async fn test_window_agg_sort() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
]
};
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,8 @@ impl WindowExpr for AggregateWindowExpr {
fn order_by(&self) -> &[PhysicalSortExpr] {
&self.order_by
}

fn get_window_frame(&self) -> &Arc<WindowFrame> {
&self.window_frame
}
}
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/window/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,8 @@ impl WindowExpr for BuiltInWindowExpr {
let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
concat(&results).map_err(DataFusionError::ArrowError)
}

fn get_window_frame(&self) -> &Arc<WindowFrame> {
&self.window_frame
}
}
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::WindowFrame;
use std::any::Any;
use std::fmt::Debug;
use std::ops::Range;
Expand Down Expand Up @@ -127,4 +128,7 @@ pub trait WindowExpr: Send + Sync + Debug {
order_by_columns.iter().map(|s| s.values.clone()).collect();
Ok((values, order_bys))
}

// Get window frame of this WindowExpr, None if absent
fn get_window_frame(&self) -> &Arc<WindowFrame>;
}

0 comments on commit 0d63c68

Please sign in to comment.