Skip to content

Commit

Permalink
add get_window_frame in window_expr, show frame info in window_agg_exec
Browse files Browse the repository at this point in the history
Signed-off-by: yangjiang <[email protected]>
  • Loading branch information
Ted-Jiang committed Dec 5, 2022
1 parent 34d9bb5 commit e87a000
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 3 deletions.
9 changes: 8 additions & 1 deletion datafusion/core/src/physical_plan/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,14 @@ impl ExecutionPlan for WindowAggExec {
let g: Vec<String> = self
.window_expr
.iter()
.map(|e| format!("{}: {:?}", e.name().to_owned(), e.field()))
.map(|e| {
format!(
"{}: {:?}, frame: {:?}",
e.name().to_owned(),
e.field(),
e.get_window_frame()
)
})
.collect();
write!(f, "wdw=[{}]", g.join(", "))?;
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1628,8 +1628,8 @@ async fn test_window_agg_sort() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST]@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST]@1 as sum2]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: None]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: None]",
" SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
]
};
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,8 @@ impl WindowExpr for AggregateWindowExpr {
fn order_by(&self) -> &[PhysicalSortExpr] {
&self.order_by
}

fn get_window_frame(&self) -> Option<&Arc<WindowFrame>> {
self.window_frame.as_ref()
}
}
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/window/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,8 @@ impl WindowExpr for BuiltInWindowExpr {
let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
concat(&results).map_err(DataFusionError::ArrowError)
}

fn get_window_frame(&self) -> Option<&Arc<WindowFrame>> {
self.window_frame.as_ref()
}
}
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::WindowFrame;
use std::any::Any;
use std::fmt::Debug;
use std::ops::Range;
Expand Down Expand Up @@ -127,4 +128,7 @@ pub trait WindowExpr: Send + Sync + Debug {
order_by_columns.iter().map(|s| s.values.clone()).collect();
Ok((values, order_bys))
}

// Get window frame of this WindowExpr, None if absent
fn get_window_frame(&self) -> Option<&Arc<WindowFrame>>;
}

0 comments on commit e87a000

Please sign in to comment.