Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Option from window frame #4516

Merged
merged 8 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,8 @@ mod tests {
use arrow::datatypes::DataType;
use datafusion_expr::{
avg, cast, count, count_distinct, create_udf, lit, max, min, sum,
BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFunction,
BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFrame,
WindowFunction,
};
use datafusion_physical_expr::expressions::Column;

Expand Down Expand Up @@ -896,7 +897,7 @@ mod tests {
args: vec![col("aggregate_test_100.c1")],
partition_by: vec![col("aggregate_test_100.c2")],
order_by: vec![],
window_frame: None,
window_frame: WindowFrame::new(false),
};
let t2 = t.select(vec![col("c1"), first_row])?;
let plan = t2.plan.clone();
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1484,15 +1484,14 @@ pub fn create_window_expr_with_name(
)),
})
.collect::<Result<Vec<_>>>()?;
if let Some(ref window_frame) = window_frame {
if !is_window_valid(window_frame) {
return Err(DataFusionError::Execution(format!(
"Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
window_frame.start_bound, window_frame.end_bound
)));
}
}
let window_frame = window_frame.clone().map(Arc::new);

let window_frame = Arc::new(window_frame.clone());
windows::create_window_expr(
fun,
name,
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub fn create_window_expr(
args: &[Arc<dyn PhysicalExpr>],
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
window_frame: Option<Arc<WindowFrame>>,
window_frame: Arc<WindowFrame>,
input_schema: &Schema,
) -> Result<Arc<dyn WindowExpr>> {
Ok(match fun {
Expand Down Expand Up @@ -194,7 +194,7 @@ mod tests {
&[col("c3", &schema)?],
&[],
&[],
Some(Arc::new(WindowFrame::default())),
Arc::new(WindowFrame::new(false)),
schema.as_ref(),
)?,
create_window_expr(
Expand All @@ -203,7 +203,7 @@ mod tests {
&[col("c3", &schema)?],
&[],
&[],
Some(Arc::new(WindowFrame::default())),
Arc::new(WindowFrame::new(false)),
schema.as_ref(),
)?,
create_window_expr(
Expand All @@ -212,7 +212,7 @@ mod tests {
&[col("c3", &schema)?],
&[],
&[],
Some(Arc::new(WindowFrame::default())),
Arc::new(WindowFrame::new(false)),
schema.as_ref(),
)?,
],
Expand Down Expand Up @@ -260,7 +260,7 @@ mod tests {
&[col("a", &schema)?],
&[],
&[],
Some(Arc::new(WindowFrame::default())),
Arc::new(WindowFrame::new(false)),
schema.as_ref(),
)?],
blocking_exec,
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/tests/sql/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,16 +893,16 @@ async fn query_on_string_dictionary() -> Result<()> {
assert_batches_sorted_eq!(expected, &actual);

// window functions
let sql = "SELECT d1, row_number() OVER (partition by d1) FROM test";
let sql = "SELECT d1, row_number() OVER (partition by d1) as rn1 FROM test";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-------+--------------+",
"| d1 | ROW_NUMBER() |",
"+-------+--------------+",
"+-------+-----+",
"| d1 | rn1 |",
"+-------+-----+",
"| | 1 |",
"| one | 1 |",
"| three | 1 |",
"+-------+--------------+",
"+-------+-----+",
];
assert_batches_sorted_eq!(expected, &actual);

Expand Down
238 changes: 119 additions & 119 deletions datafusion/core/tests/sql/window.rs

Large diffs are not rendered by default.

10 changes: 2 additions & 8 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub enum Expr {
/// List of order by expressions
order_by: Vec<Expr>,
/// Window frame
window_frame: Option<window_frame::WindowFrame>,
window_frame: window_frame::WindowFrame,
},
/// aggregate function
AggregateUDF {
Expand Down Expand Up @@ -827,15 +827,11 @@ impl fmt::Debug for Expr {
if !order_by.is_empty() {
write!(f, " ORDER BY {:?}", order_by)?;
}
if let Some(window_frame) = window_frame {
write!(
f,
" {} BETWEEN {} AND {}",
window_frame.units,
window_frame.start_bound,
window_frame.end_bound
window_frame.units, window_frame.start_bound, window_frame.end_bound
)?;
}
Ok(())
}
Expr::AggregateFunction {
Expand Down Expand Up @@ -1187,9 +1183,7 @@ fn create_name(e: &Expr) -> Result<String> {
if !order_by.is_empty() {
parts.push(format!("ORDER BY {:?}", order_by));
}
if let Some(window_frame) = window_frame {
parts.push(format!("{}", window_frame));
}
Ok(parts.join(" "))
}
Expr::AggregateFunction {
Expand Down
22 changes: 11 additions & 11 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ pub fn check_all_column_from_schema(
#[cfg(test)]
mod tests {
use super::*;
use crate::{col, AggregateFunction, WindowFunction};
use crate::{col, AggregateFunction, WindowFrame, WindowFunction};

#[test]
fn test_group_window_expr_by_sort_keys_empty_case() -> Result<()> {
Expand All @@ -806,28 +806,28 @@ mod tests {
args: vec![col("name")],
partition_by: vec![],
order_by: vec![],
window_frame: None,
window_frame: WindowFrame::new(false),
};
let max2 = Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
args: vec![col("name")],
partition_by: vec![],
order_by: vec![],
window_frame: None,
window_frame: WindowFrame::new(false),
};
let min3 = Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Min),
args: vec![col("name")],
partition_by: vec![],
order_by: vec![],
window_frame: None,
window_frame: WindowFrame::new(false),
};
let sum4 = Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Sum),
args: vec![col("age")],
partition_by: vec![],
order_by: vec![],
window_frame: None,
window_frame: WindowFrame::new(false),
};
let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
let result = group_window_expr_by_sort_keys(exprs)?;
Expand Down Expand Up @@ -860,28 +860,28 @@ mod tests {
args: vec![col("name")],
partition_by: vec![],
order_by: vec![age_asc.clone(), name_desc.clone()],
window_frame: None,
window_frame: WindowFrame::new(true),
};
let max2 = Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
args: vec![col("name")],
partition_by: vec![],
order_by: vec![],
window_frame: None,
window_frame: WindowFrame::new(false),
};
let min3 = Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Min),
args: vec![col("name")],
partition_by: vec![],
order_by: vec![age_asc.clone(), name_desc.clone()],
window_frame: None,
window_frame: WindowFrame::new(true),
};
let sum4 = Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Sum),
args: vec![col("age")],
partition_by: vec![],
order_by: vec![name_desc.clone(), age_asc.clone(), created_at_desc.clone()],
window_frame: None,
window_frame: WindowFrame::new(true),
};
// FIXME use as_ref
let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
Expand Down Expand Up @@ -919,7 +919,7 @@ mod tests {
nulls_first: true,
},
],
window_frame: None,
window_frame: WindowFrame::new(true),
},
Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Sum),
Expand All @@ -942,7 +942,7 @@ mod tests {
nulls_first: true,
},
],
window_frame: None,
window_frame: WindowFrame::new(true),
},
];
let expected = vec![
Expand Down
66 changes: 43 additions & 23 deletions datafusion/expr/src/window_frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,34 +66,52 @@ impl TryFrom<ast::WindowFrame> for WindowFrame {
None => WindowFrameBound::CurrentRow,
};

if let WindowFrameBound::Following(ScalarValue::Utf8(None)) = start_bound {
Err(DataFusionError::Execution(
if let WindowFrameBound::Following(val) = &start_bound {
if val.is_null() {
return Err(DataFusionError::Execution(
"Invalid window frame: start bound cannot be unbounded following"
.to_owned(),
))
} else if let WindowFrameBound::Preceding(ScalarValue::Utf8(None)) = end_bound {
Err(DataFusionError::Execution(
));
}
} else if let WindowFrameBound::Preceding(val) = &end_bound {
if val.is_null() {
return Err(DataFusionError::Execution(
"Invalid window frame: end bound cannot be unbounded preceding"
.to_owned(),
))
} else {
let units = value.units.into();
));
}
};
Ok(Self {
units,
units: value.units.into(),
start_bound,
end_bound,
})
}
}
}

impl Default for WindowFrame {
fn default() -> Self {
impl WindowFrame {
/// Creates a new, default window frame (with the meaning of default depending on whether the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the clear comments 👍

/// frame contains an `ORDER BY` clause.
pub fn new(has_order_by: bool) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this fix, i also want to change this default behavior.

if has_order_by {
// This window frame covers the table (or partition if `PARTITION BY` is used)
// from beginning to the `CURRENT ROW` (with same rank). It is used when the `OVER`
// clause contains an `ORDER BY` clause but no frame.
WindowFrame {
units: WindowFrameUnits::Range,
start_bound: WindowFrameBound::Preceding(ScalarValue::Utf8(None)),
start_bound: WindowFrameBound::Preceding(ScalarValue::Null),
end_bound: WindowFrameBound::CurrentRow,
}
} else {
// This window frame covers the whole table (or partition if `PARTITION BY` is used).
// It is used when the `OVER` clause does not contain an `ORDER BY` clause and there is
// no frame.
WindowFrame {
units: WindowFrameUnits::Rows,
start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
end_bound: WindowFrameBound::Following(ScalarValue::UInt64(None)),
}
}
}
}

Expand Down Expand Up @@ -137,15 +155,11 @@ impl TryFrom<ast::WindowFrameBound> for WindowFrameBound {
ast::WindowFrameBound::Preceding(Some(v)) => {
Self::Preceding(convert_frame_bound_to_scalar_value(*v)?)
}
ast::WindowFrameBound::Preceding(None) => {
Self::Preceding(ScalarValue::Utf8(None))
}
ast::WindowFrameBound::Preceding(None) => Self::Preceding(ScalarValue::Null),
ast::WindowFrameBound::Following(Some(v)) => {
Self::Following(convert_frame_bound_to_scalar_value(*v)?)
}
ast::WindowFrameBound::Following(None) => {
Self::Following(ScalarValue::Utf8(None))
}
ast::WindowFrameBound::Following(None) => Self::Following(ScalarValue::Null),
ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
})
}
Expand Down Expand Up @@ -183,15 +197,21 @@ pub fn convert_frame_bound_to_scalar_value(v: ast::Expr) -> Result<ScalarValue>
impl fmt::Display for WindowFrameBound {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
WindowFrameBound::Preceding(ScalarValue::Utf8(None)) => {
WindowFrameBound::Preceding(n) => {
if n.is_null() {
f.write_str("UNBOUNDED PRECEDING")
} else {
write!(f, "{} PRECEDING", n)
}
}
WindowFrameBound::Preceding(n) => write!(f, "{} PRECEDING", n),
WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
WindowFrameBound::Following(ScalarValue::Utf8(None)) => {
WindowFrameBound::Following(n) => {
if n.is_null() {
f.write_str("UNBOUNDED FOLLOWING")
} else {
write!(f, "{} FOLLOWING", n)
}
}
WindowFrameBound::Following(n) => write!(f, "{} FOLLOWING", n),
}
}
}
Expand Down
Loading