From b5088dc726cd036fdbe8f96d6c891a665c088e80 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 2 Dec 2022 18:03:39 +0800 Subject: [PATCH 1/3] Add window func related logic plan to proto ability. Signed-off-by: yangjiang --- datafusion/proto/src/from_proto.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index a30e7b323aba..d45ebe5ab9cb 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -806,11 +806,20 @@ pub fn parse_expr( .ok_or_else(|| Error::unknown("BuiltInWindowFunction", *i))? .into(); + let args = match parse_optional_expr(&expr.expr, registry)? { + None => { + vec![] + } + Some(x) => { + vec![x] + } + }; + Ok(Expr::WindowFunction { fun: datafusion_expr::window_function::WindowFunction::BuiltInWindowFunction( built_in_function, ), - args: vec![parse_required_expr(&expr.expr, registry, "expr")?], + args, partition_by, order_by, window_frame, @@ -1234,16 +1243,14 @@ impl TryFrom for WindowFrameBound { })?; match bound_type { protobuf::WindowFrameBoundType::CurrentRow => Ok(Self::CurrentRow), - protobuf::WindowFrameBoundType::Preceding => { - // FIXME implement bound value parsing - // https://github.com/apache/arrow-datafusion/issues/361 - Ok(Self::Preceding(ScalarValue::UInt64(Some(1)))) - } - protobuf::WindowFrameBoundType::Following => { - // FIXME implement bound value parsing - // https://github.com/apache/arrow-datafusion/issues/361 - Ok(Self::Following(ScalarValue::UInt64(Some(1)))) - } + protobuf::WindowFrameBoundType::Preceding => match bound.bound_value { + Some(x) => Ok(Self::Preceding(ScalarValue::try_from(&x)?)), + None => Ok(Self::Preceding(ScalarValue::UInt64(None))), + }, + protobuf::WindowFrameBoundType::Following => match bound.bound_value { + Some(x) => Ok(Self::Following(ScalarValue::try_from(&x)?)), + None => Ok(Self::Following(ScalarValue::UInt64(None))), + }, } } } From b2be1bd333cce0baed4482d13319668141a72d33 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sun, 4 Dec 2022 16:07:09 +0800 Subject: [PATCH 2/3] add test. Signed-off-by: yangjiang --- datafusion/proto/src/lib.rs | 67 ++++++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index bf4b777ffab3..12c2a5e784a6 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -70,7 +70,6 @@ mod roundtrip_tests { }; use datafusion::test_util::{TestTableFactory, TestTableProvider}; use datafusion_common::{DFSchemaRef, DataFusionError, ScalarValue}; - use datafusion_expr::create_udaf; use datafusion_expr::expr::{Between, BinaryExpr, Case, Cast, GroupingSet, Like}; use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNode}; use datafusion_expr::{ @@ -78,6 +77,9 @@ mod roundtrip_tests { BuiltinScalarFunction::{Sqrt, Substr}, Expr, LogicalPlan, Operator, Volatility, }; + use datafusion_expr::{ + create_udaf, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunction, + }; use prost::Message; use std::any::Any; use std::collections::HashMap; @@ -1331,4 +1333,67 @@ mod roundtrip_tests { roundtrip_expr_test(test_expr, ctx.clone()); roundtrip_expr_test(test_expr_with_count, ctx); } + #[test] + fn roundtrip_window() { + let ctx = SessionContext::new(); + + // 1. without window_frame + let test_expr1 = Expr::WindowFunction { + fun: WindowFunction::BuiltInWindowFunction( + datafusion_expr::window_function::BuiltInWindowFunction::Rank, + ), + args: vec![], + partition_by: vec![col("col1")], + order_by: vec![col("col2")], + window_frame: None, + }; + + // 2. with default window_frame + let test_expr2 = Expr::WindowFunction { + fun: WindowFunction::BuiltInWindowFunction( + datafusion_expr::window_function::BuiltInWindowFunction::Rank, + ), + args: vec![], + partition_by: vec![col("col1")], + order_by: vec![col("col2")], + window_frame: Some(WindowFrame::default()), + }; + + // 3. with window_frame with row numbers + let range_number_frame = WindowFrame { + units: WindowFrameUnits::Range, + start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(Some(2))), + end_bound: WindowFrameBound::Following(ScalarValue::UInt64(Some(2))), + }; + + let test_expr3 = Expr::WindowFunction { + fun: WindowFunction::BuiltInWindowFunction( + datafusion_expr::window_function::BuiltInWindowFunction::Rank, + ), + args: vec![], + partition_by: vec![col("col1")], + order_by: vec![col("col2")], + window_frame: Some(range_number_frame), + }; + + // 4. test with AggregateFunction + let row_number_frame = WindowFrame { + units: WindowFrameUnits::Rows, + start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(Some(2))), + end_bound: WindowFrameBound::Following(ScalarValue::UInt64(Some(2))), + }; + + let test_expr4 = Expr::WindowFunction { + fun: WindowFunction::AggregateFunction(AggregateFunction::Max), + args: vec![col("col1")], + partition_by: vec![col("col1")], + order_by: vec![col("col2")], + window_frame: Some(row_number_frame), + }; + + roundtrip_expr_test(test_expr1, ctx.clone()); + roundtrip_expr_test(test_expr2, ctx.clone()); + roundtrip_expr_test(test_expr3, ctx.clone()); + roundtrip_expr_test(test_expr4, ctx); + } } From f2344db4fd09526750ae61e14d67423059616cd0 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sun, 4 Dec 2022 22:48:49 +0800 Subject: [PATCH 3/3] more functional Signed-off-by: yangjiang --- datafusion/proto/src/from_proto.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index d45ebe5ab9cb..12ea8f863be3 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -806,14 +806,9 @@ pub fn parse_expr( .ok_or_else(|| Error::unknown("BuiltInWindowFunction", *i))? .into(); - let args = match parse_optional_expr(&expr.expr, registry)? { - None => { - vec![] - } - Some(x) => { - vec![x] - } - }; + let args = parse_optional_expr(&expr.expr, registry)? + .map(|e| vec![e]) + .unwrap_or_else(Vec::new); Ok(Expr::WindowFunction { fun: datafusion_expr::window_function::WindowFunction::BuiltInWindowFunction(