Skip to content

Commit c2bfd60

Browse files
author
Jiayu Liu
committed
closing up type checks
1 parent 767eeb0 commit c2bfd60

File tree

10 files changed

+302
-105
lines changed

10 files changed

+302
-105
lines changed

ballista/rust/core/proto/ballista.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ message WindowExprNode {
174174
// udaf = 3
175175
}
176176
LogicalExprNode expr = 4;
177-
// repeated LogicalExprNode partition_by = 5;
177+
repeated LogicalExprNode partition_by = 5;
178178
repeated LogicalExprNode order_by = 6;
179179
// repeated LogicalExprNode filter = 7;
180180
oneof window_frame {

ballista/rust/core/src/serde/logical_plan/from_proto.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -910,6 +910,12 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
910910
.window_function
911911
.as_ref()
912912
.ok_or_else(|| proto_error("Received empty window function"))?;
913+
let partition_by = expr
914+
.partition_by
915+
.iter()
916+
.map(|e| e.try_into())
917+
.into_iter()
918+
.collect::<Result<Vec<_>, _>>()?;
913919
let order_by = expr
914920
.order_by
915921
.iter()
@@ -940,6 +946,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
940946
AggregateFunction::from(aggr_function),
941947
),
942948
args: vec![parse_required_expr(&expr.expr)?],
949+
partition_by,
943950
order_by,
944951
window_frame,
945952
})
@@ -960,6 +967,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
960967
BuiltInWindowFunction::from(built_in_function),
961968
),
962969
args: vec![parse_required_expr(&expr.expr)?],
970+
partition_by,
963971
order_by,
964972
window_frame,
965973
})
@@ -1338,8 +1346,7 @@ impl TryFrom<protobuf::WindowFrame> for WindowFrame {
13381346
end_bound.try_into()
13391347
}
13401348
})
1341-
.transpose()?
1342-
.unwrap_or(WindowFrameBound::CurrentRow);
1349+
.transpose()?;
13431350
Ok(WindowFrame {
13441351
units,
13451352
start_bound,

ballista/rust/core/src/serde/logical_plan/to_proto.rs

+6
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
10071007
Expr::WindowFunction {
10081008
ref fun,
10091009
ref args,
1010+
ref partition_by,
10101011
ref order_by,
10111012
ref window_frame,
10121013
..
@@ -1024,6 +1025,10 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
10241025
}
10251026
};
10261027
let arg = &args[0];
1028+
let partition_by = partition_by
1029+
.iter()
1030+
.map(|e| e.try_into())
1031+
.collect::<Result<Vec<_>, _>>()?;
10271032
let order_by = order_by
10281033
.iter()
10291034
.map(|e| e.try_into())
@@ -1036,6 +1041,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
10361041
let window_expr = Box::new(protobuf::WindowExprNode {
10371042
expr: Some(Box::new(arg.try_into()?)),
10381043
window_function: Some(window_function),
1044+
partition_by,
10391045
order_by,
10401046
window_frame,
10411047
});

ballista/rust/core/src/serde/physical_plan/from_proto.rs

+8
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
236236
Expr::WindowFunction {
237237
fun,
238238
args,
239+
partition_by,
239240
order_by,
241+
window_frame,
240242
..
241243
} => {
242244
let arg = df_planner
@@ -248,9 +250,15 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
248250
.map_err(|e| {
249251
BallistaError::General(format!("{:?}", e))
250252
})?;
253+
if !partition_by.is_empty() {
254+
return Err(BallistaError::NotImplemented("Window function with partition by is not yet implemented".to_owned()));
255+
}
251256
if !order_by.is_empty() {
252257
return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
253258
}
259+
if window_frame.is_some() {
260+
return Err(BallistaError::NotImplemented("Window function with window frame is not yet implemented".to_owned()));
261+
}
254262
let window_expr = create_window_expr(
255263
&fun,
256264
&[arg],

datafusion/src/logical_plan/expr.rs

+14-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
2929
use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
3030
use arrow::{compute::can_cast_types, datatypes::DataType};
3131
use functions::{ReturnTypeFunction, ScalarFunctionImplementation, Signature};
32+
use sqlparser::ast::WindowFrame;
3233
use std::collections::HashSet;
3334
use std::fmt;
3435
use std::sync::Arc;
@@ -194,6 +195,8 @@ pub enum Expr {
194195
fun: window_functions::WindowFunction,
195196
/// List of expressions to feed to the functions as arguments
196197
args: Vec<Expr>,
198+
/// List of partition by expressions
199+
partition_by: Vec<Expr>,
197200
/// List of order by expressions
198201
order_by: Vec<Expr>,
199202
/// Window frame
@@ -588,10 +591,18 @@ impl Expr {
588591
Expr::ScalarUDF { args, .. } => args
589592
.iter()
590593
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
591-
Expr::WindowFunction { args, order_by, .. } => {
594+
Expr::WindowFunction {
595+
args,
596+
partition_by,
597+
order_by,
598+
..
599+
} => {
592600
let visitor = args
593601
.iter()
594602
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
603+
let visitor = partition_by
604+
.iter()
605+
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
595606
let visitor = order_by
596607
.iter()
597608
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
@@ -733,11 +744,13 @@ impl Expr {
733744
Expr::WindowFunction {
734745
args,
735746
fun,
747+
partition_by,
736748
order_by,
737749
window_frame,
738750
} => Expr::WindowFunction {
739751
args: rewrite_vec(args, rewriter)?,
740752
fun,
753+
partition_by: rewrite_vec(partition_by, rewriter)?,
741754
order_by: rewrite_vec(order_by, rewriter)?,
742755
window_frame,
743756
},

datafusion/src/logical_plan/plan.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -687,11 +687,7 @@ impl LogicalPlan {
687687
LogicalPlan::Window {
688688
ref window_expr, ..
689689
} => {
690-
write!(
691-
f,
692-
"WindowAggr: windowExpr=[{:?}] partitionBy=[]",
693-
window_expr
694-
)
690+
write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
695691
}
696692
LogicalPlan::Aggregate {
697693
ref group_expr,

datafusion/src/optimizer/utils.rs

+36-8
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::{
3636

3737
const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__";
3838
const CASE_ELSE_MARKER: &str = "__DATAFUSION_CASE_ELSE__";
39+
const WINDOW_PARTITION_MARKER: &str = "__DATAFUSION_WINDOW_PARTITION__";
3940
const WINDOW_SORT_MARKER: &str = "__DATAFUSION_WINDOW_SORT__";
4041

4142
/// Recursively walk a list of expression trees, collecting the unique set of column
@@ -258,9 +259,16 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
258259
Expr::IsNotNull(e) => Ok(vec![e.as_ref().to_owned()]),
259260
Expr::ScalarFunction { args, .. } => Ok(args.clone()),
260261
Expr::ScalarUDF { args, .. } => Ok(args.clone()),
261-
Expr::WindowFunction { args, order_by, .. } => {
262+
Expr::WindowFunction {
263+
args,
264+
partition_by,
265+
order_by,
266+
..
267+
} => {
262268
let mut expr_list: Vec<Expr> = vec![];
263269
expr_list.extend(args.clone());
270+
expr_list.push(lit(WINDOW_PARTITION_MARKER));
271+
expr_list.extend(partition_by.clone());
264272
expr_list.push(lit(WINDOW_SORT_MARKER));
265273
expr_list.extend(order_by.clone());
266274
Ok(expr_list)
@@ -340,7 +348,19 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
340348
Expr::WindowFunction {
341349
fun, window_frame, ..
342350
} => {
343-
let index = expressions
351+
let partition_index = expressions
352+
.iter()
353+
.position(|expr| {
354+
matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(str)))
355+
if str == WINDOW_PARTITION_MARKER)
356+
})
357+
.ok_or_else(|| {
358+
DataFusionError::Internal(
359+
"Ill-formed window function expressions".to_owned(),
360+
)
361+
})?;
362+
363+
let sort_index = expressions
344364
.iter()
345365
.position(|expr| {
346366
matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(str)))
@@ -351,12 +371,20 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
351371
"Ill-formed window function expressions".to_owned(),
352372
)
353373
})?;
354-
Ok(Expr::WindowFunction {
355-
fun: fun.clone(),
356-
args: expressions[..index].to_vec(),
357-
order_by: expressions[index + 1..].to_vec(),
358-
window_frame: *window_frame,
359-
})
374+
375+
if partition_index >= sort_index {
376+
Err(DataFusionError::Internal(
377+
"Ill-formed window function expressions".to_owned(),
378+
))
379+
} else {
380+
Ok(Expr::WindowFunction {
381+
fun: fun.clone(),
382+
args: expressions[..partition_index].to_vec(),
383+
partition_by: expressions[partition_index + 1..sort_index].to_vec(),
384+
order_by: expressions[sort_index + 1..].to_vec(),
385+
window_frame: window_frame.clone(),
386+
})
387+
}
360388
}
361389
Expr::AggregateFunction { fun, distinct, .. } => Ok(Expr::AggregateFunction {
362390
fun: fun.clone(),

datafusion/src/physical_plan/planner.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::physical_plan::projection::ProjectionExec;
3737
use crate::physical_plan::repartition::RepartitionExec;
3838
use crate::physical_plan::sort::SortExec;
3939
use crate::physical_plan::udf;
40+
use crate::physical_plan::window_frames;
4041
use crate::physical_plan::windows::WindowAggExec;
4142
use crate::physical_plan::{hash_utils, Partitioning};
4243
use crate::physical_plan::{
@@ -746,7 +747,12 @@ impl DefaultPhysicalPlanner {
746747
};
747748

748749
match e {
749-
Expr::WindowFunction { fun, args, .. } => {
750+
Expr::WindowFunction {
751+
fun,
752+
args,
753+
window_frame,
754+
..
755+
} => {
750756
let args = args
751757
.iter()
752758
.map(|e| {
@@ -758,6 +764,7 @@ impl DefaultPhysicalPlanner {
758764
// "Window function with order by is not yet implemented".to_owned(),
759765
// ));
760766
// }
767+
let _window_frame = window_frames::validate_window_frame(window_frame)?;
761768
windows::create_window_expr(fun, &args, physical_input_schema, name)
762769
}
763770
other => Err(DataFusionError::Internal(format!(

0 commit comments

Comments
 (0)