Skip to content

Commit b9d943e

Browse files
author
Jiayu Liu
committed
closing up type checks
1 parent 8495f95 commit b9d943e

File tree

9 files changed

+278
-84
lines changed

9 files changed

+278
-84
lines changed

ballista/rust/core/proto/ballista.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ message WindowExprNode {
174174
// udaf = 3
175175
}
176176
LogicalExprNode expr = 4;
177-
// repeated LogicalExprNode partition_by = 5;
177+
repeated LogicalExprNode partition_by = 5;
178178
repeated LogicalExprNode order_by = 6;
179179
// repeated LogicalExprNode filter = 7;
180180
oneof window_frame {

ballista/rust/core/src/serde/logical_plan/from_proto.rs

+8
Original file line numberDiff line numberDiff line change
@@ -910,6 +910,12 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
910910
.window_function
911911
.as_ref()
912912
.ok_or_else(|| proto_error("Received empty window function"))?;
913+
let partition_by = expr
914+
.partition_by
915+
.iter()
916+
.map(|e| e.try_into())
917+
.into_iter()
918+
.collect::<Result<Vec<_>, _>>()?;
913919
let order_by = expr
914920
.order_by
915921
.iter()
@@ -940,6 +946,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
940946
AggregateFunction::from(aggr_function),
941947
),
942948
args: vec![parse_required_expr(&expr.expr)?],
949+
partition_by,
943950
order_by,
944951
window_frame,
945952
})
@@ -960,6 +967,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
960967
BuiltInWindowFunction::from(built_in_function),
961968
),
962969
args: vec![parse_required_expr(&expr.expr)?],
970+
partition_by,
963971
order_by,
964972
window_frame,
965973
})

ballista/rust/core/src/serde/logical_plan/to_proto.rs

+6
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
10061006
Expr::WindowFunction {
10071007
ref fun,
10081008
ref args,
1009+
ref partition_by,
10091010
ref order_by,
10101011
ref window_frame,
10111012
..
@@ -1023,6 +1024,10 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
10231024
}
10241025
};
10251026
let arg = &args[0];
1027+
let partition_by = partition_by
1028+
.iter()
1029+
.map(|e| e.try_into())
1030+
.collect::<Result<Vec<_>, _>>()?;
10261031
let order_by = order_by
10271032
.iter()
10281033
.map(|e| e.try_into())
@@ -1035,6 +1040,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
10351040
let window_expr = Box::new(protobuf::WindowExprNode {
10361041
expr: Some(Box::new(arg.try_into()?)),
10371042
window_function: Some(window_function),
1043+
partition_by,
10381044
order_by,
10391045
window_frame,
10401046
});

ballista/rust/core/src/serde/physical_plan/from_proto.rs

+8
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
236236
Expr::WindowFunction {
237237
fun,
238238
args,
239+
partition_by,
239240
order_by,
241+
window_frame,
240242
..
241243
} => {
242244
let arg = df_planner
@@ -248,9 +250,15 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
248250
.map_err(|e| {
249251
BallistaError::General(format!("{:?}", e))
250252
})?;
253+
if !partition_by.is_empty() {
254+
return Err(BallistaError::NotImplemented("Window function with partition by is not yet implemented".to_owned()));
255+
}
251256
if !order_by.is_empty() {
252257
return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
253258
}
259+
if window_frame.is_some() {
260+
return Err(BallistaError::NotImplemented("Window function with window frame is not yet implemented".to_owned()));
261+
}
254262
let window_expr = create_window_expr(
255263
&fun,
256264
&[arg],

datafusion/src/logical_plan/expr.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ pub enum Expr {
194194
fun: window_functions::WindowFunction,
195195
/// List of expressions to feed to the functions as arguments
196196
args: Vec<Expr>,
197+
/// List of partition by expressions
198+
partition_by: Vec<Expr>,
197199
/// List of order by expressions
198200
order_by: Vec<Expr>,
199201
/// Window frame
@@ -588,10 +590,18 @@ impl Expr {
588590
Expr::ScalarUDF { args, .. } => args
589591
.iter()
590592
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
591-
Expr::WindowFunction { args, order_by, .. } => {
593+
Expr::WindowFunction {
594+
args,
595+
partition_by,
596+
order_by,
597+
..
598+
} => {
592599
let visitor = args
593600
.iter()
594601
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
602+
let visitor = partition_by
603+
.iter()
604+
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
595605
let visitor = order_by
596606
.iter()
597607
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
@@ -733,11 +743,13 @@ impl Expr {
733743
Expr::WindowFunction {
734744
args,
735745
fun,
746+
partition_by,
736747
order_by,
737748
window_frame,
738749
} => Expr::WindowFunction {
739750
args: rewrite_vec(args, rewriter)?,
740751
fun,
752+
partition_by: rewrite_vec(partition_by, rewriter)?,
741753
order_by: rewrite_vec(order_by, rewriter)?,
742754
window_frame,
743755
},

datafusion/src/logical_plan/plan.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -687,11 +687,7 @@ impl LogicalPlan {
687687
LogicalPlan::Window {
688688
ref window_expr, ..
689689
} => {
690-
write!(
691-
f,
692-
"WindowAggr: windowExpr=[{:?}] partitionBy=[]",
693-
window_expr
694-
)
690+
write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
695691
}
696692
LogicalPlan::Aggregate {
697693
ref group_expr,

datafusion/src/optimizer/utils.rs

+36-8
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::{
3636

3737
const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__";
3838
const CASE_ELSE_MARKER: &str = "__DATAFUSION_CASE_ELSE__";
39+
const WINDOW_PARTITION_MARKER: &str = "__DATAFUSION_WINDOW_PARTITION__";
3940
const WINDOW_SORT_MARKER: &str = "__DATAFUSION_WINDOW_SORT__";
4041

4142
/// Recursively walk a list of expression trees, collecting the unique set of column
@@ -258,9 +259,16 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
258259
Expr::IsNotNull(e) => Ok(vec![e.as_ref().to_owned()]),
259260
Expr::ScalarFunction { args, .. } => Ok(args.clone()),
260261
Expr::ScalarUDF { args, .. } => Ok(args.clone()),
261-
Expr::WindowFunction { args, order_by, .. } => {
262+
Expr::WindowFunction {
263+
args,
264+
partition_by,
265+
order_by,
266+
..
267+
} => {
262268
let mut expr_list: Vec<Expr> = vec![];
263269
expr_list.extend(args.clone());
270+
expr_list.push(lit(WINDOW_PARTITION_MARKER));
271+
expr_list.extend(partition_by.clone());
264272
expr_list.push(lit(WINDOW_SORT_MARKER));
265273
expr_list.extend(order_by.clone());
266274
Ok(expr_list)
@@ -340,7 +348,19 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
340348
Expr::WindowFunction {
341349
fun, window_frame, ..
342350
} => {
343-
let index = expressions
351+
let partition_index = expressions
352+
.iter()
353+
.position(|expr| {
354+
matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(str)))
355+
if str == WINDOW_PARTITION_MARKER)
356+
})
357+
.ok_or_else(|| {
358+
DataFusionError::Internal(
359+
"Ill-formed window function expressions: unexpected marker".to_owned(),
360+
)
361+
})?;
362+
363+
let sort_index = expressions
344364
.iter()
345365
.position(|expr| {
346366
matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(str)))
@@ -351,12 +371,20 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
351371
"Ill-formed window function expressions".to_owned(),
352372
)
353373
})?;
354-
Ok(Expr::WindowFunction {
355-
fun: fun.clone(),
356-
args: expressions[..index].to_vec(),
357-
order_by: expressions[index + 1..].to_vec(),
358-
window_frame: *window_frame,
359-
})
374+
375+
if partition_index >= sort_index {
376+
Err(DataFusionError::Internal(
377+
"Ill-formed window function expressions: partition index too large".to_owned(),
378+
))
379+
} else {
380+
Ok(Expr::WindowFunction {
381+
fun: fun.clone(),
382+
args: expressions[..partition_index].to_vec(),
383+
partition_by: expressions[partition_index + 1..sort_index].to_vec(),
384+
order_by: expressions[sort_index + 1..].to_vec(),
385+
window_frame: *window_frame,
386+
})
387+
}
360388
}
361389
Expr::AggregateFunction { fun, distinct, .. } => Ok(Expr::AggregateFunction {
362390
fun: fun.clone(),

0 commit comments

Comments
 (0)