From c9bf3f3d1da7fd55b6d7c07d7347ea3ff56dc577 Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Thu, 30 Mar 2023 16:17:33 +0300 Subject: [PATCH] Change required input ordering physical plan API to allow any NULLS FIRST / LAST and ASC / DESC (#5772) * Change required input ordering to format to not absolutely require direction. * remove unnecessary code --- .../src/physical_optimizer/repartition.rs | 9 +- .../physical_optimizer/sort_enforcement.rs | 32 +++-- .../physical_plan/joins/sort_merge_join.rs | 10 +- .../joins/symmetric_hash_join.rs | 14 ++- datafusion/core/src/physical_plan/mod.rs | 4 +- datafusion/core/src/physical_plan/planner.rs | 29 ----- .../sorts/sort_preserving_merge.rs | 14 ++- .../windows/bounded_window_agg_exec.rs | 21 ++-- .../core/src/physical_plan/windows/mod.rs | 102 +++++++++++++++- .../physical_plan/windows/window_agg_exec.rs | 17 ++- datafusion/core/tests/sql/select.rs | 7 +- datafusion/core/tests/window_fuzz.rs | 13 +- datafusion/physical-expr/src/lib.rs | 4 +- datafusion/physical-expr/src/sort_expr.rs | 78 ++++++++++-- datafusion/physical-expr/src/utils.rs | 114 ++++++++++++++++-- datafusion/proto/src/physical_plan/mod.rs | 1 - 16 files changed, 356 insertions(+), 113 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 8557769c3171..71274d177d5f 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -321,6 +321,9 @@ fn init() { mod tests { use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion_physical_expr::{ + make_sort_requirements_from_exprs, PhysicalSortRequirement, + }; use super::*; use crate::datasource::listing::PartitionedFile; @@ -1131,8 +1134,10 @@ mod tests { } // model that it requires the output ordering of its input - fn required_input_ordering(&self) -> Vec> { - vec![self.input.output_ordering()] + fn required_input_ordering(&self) -> Vec>> { + vec![self + .output_ordering() + .map(make_sort_requirements_from_exprs)] } fn with_new_children( diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 9a87796bc180..265c86cdf1e4 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -47,7 +47,10 @@ use crate::physical_plan::{with_new_children_if_necessary, Distribution, Executi use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::{reverse_sort_options, DataFusionError}; -use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete}; +use datafusion_physical_expr::utils::{ + make_sort_exprs_from_requirements, ordering_satisfy, + ordering_satisfy_requirement_concrete, +}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use itertools::{concat, izip}; use std::iter::zip; @@ -471,17 +474,20 @@ fn ensure_sorting( let physical_ordering = child.output_ordering(); match (required_ordering, physical_ordering) { (Some(required_ordering), Some(physical_ordering)) => { - let is_ordering_satisfied = ordering_satisfy_concrete( + if !ordering_satisfy_requirement_concrete( physical_ordering, - required_ordering, + &required_ordering, || child.equivalence_properties(), - ); - if !is_ordering_satisfied { + ) { // Make sure we preserve the ordering requirements: update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?; - let sort_expr = required_ordering.to_vec(); + let sort_expr = make_sort_exprs_from_requirements(&required_ordering); add_sort_above(child, sort_expr)?; - *sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![])); + if is_sort(child) { + *sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![])); + } else { + *sort_onwards = None; + } } if let Some(tree) = sort_onwards { // For window expressions, we can remove some sorts when we can @@ -497,7 +503,8 @@ fn ensure_sorting( } (Some(required), None) => { // Ordering requirement is not met, we should add a `SortExec` to the plan. - add_sort_above(child, required.to_vec())?; + let sort_expr = make_sort_exprs_from_requirements(&required); + add_sort_above(child, sort_expr)?; *sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![])); } (None, Some(_)) => { @@ -592,7 +599,6 @@ fn analyze_window_sort_removal( }; let mut first_should_reverse = None; - let mut physical_ordering_common = vec![]; for sort_any in sort_tree.get_leaves() { let sort_output_ordering = sort_any.output_ordering(); // Variable `sort_any` will either be a `SortExec` or a @@ -609,11 +615,6 @@ fn analyze_window_sort_removal( DataFusionError::Plan("A SortExec should have output ordering".to_string()) })?; if let Some(physical_ordering) = physical_ordering { - if physical_ordering_common.is_empty() - || physical_ordering.len() < physical_ordering_common.len() - { - physical_ordering_common = physical_ordering.to_vec(); - } let (can_skip_sorting, should_reverse) = can_skip_sort( window_expr[0].partition_by(), required_ordering, @@ -664,7 +665,6 @@ fn analyze_window_sort_removal( new_child, new_schema, partition_keys.to_vec(), - Some(physical_ordering_common), )?) as _ } else { Arc::new(WindowAggExec::try_new( @@ -672,7 +672,6 @@ fn analyze_window_sort_removal( new_child, new_schema, partition_keys.to_vec(), - Some(physical_ordering_common), )?) as _ }; return Ok(Some(PlanWithCorrespondingSort::new(new_plan))); @@ -1889,7 +1888,6 @@ mod tests { input.clone(), input.schema(), vec![], - Some(sort_exprs), ) .unwrap(), ) diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index de9199ea8297..623bafe0937a 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -55,6 +55,9 @@ use crate::physical_plan::{ }; use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_physical_expr::{ + make_sort_requirements_from_exprs, PhysicalSortRequirement, +}; /// join execution plan executes partitions in parallel and combines them into a set of /// partitions. @@ -225,8 +228,11 @@ impl ExecutionPlan for SortMergeJoinExec { ] } - fn required_input_ordering(&self) -> Vec> { - vec![Some(&self.left_sort_exprs), Some(&self.right_sort_exprs)] + fn required_input_ordering(&self) -> Vec>> { + vec![ + Some(make_sort_requirements_from_exprs(&self.left_sort_exprs)), + Some(make_sort_requirements_from_exprs(&self.right_sort_exprs)), + ] } fn output_partitioning(&self) -> Partitioning { diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 3af983d8f06a..5a249e433dd2 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -46,6 +46,9 @@ use hashbrown::{raw::RawTable, HashSet}; use datafusion_common::{utils::bisect, ScalarValue}; use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval}; +use datafusion_physical_expr::{ + make_sort_requirements_from_exprs, PhysicalSortRequirement, +}; use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; @@ -399,11 +402,12 @@ impl ExecutionPlan for SymmetricHashJoinExec { self.schema.clone() } - fn required_input_ordering(&self) -> Vec> { - vec![ - Some(&self.left_required_sort_exprs), - Some(&self.right_required_sort_exprs), - ] + fn required_input_ordering(&self) -> Vec>> { + let left_required = + make_sort_requirements_from_exprs(&self.left_required_sort_exprs); + let right_required = + make_sort_requirements_from_exprs(&self.right_required_sort_exprs); + vec![Some(left_required), Some(right_required)] } fn unbounded_output(&self, children: &[bool]) -> Result { diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index c59dd0c62ee5..9815d9491e02 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -142,7 +142,7 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// NOTE that checking `!is_empty()` does **not** check for a /// required input ordering. Instead, the correct check is that at /// least one entry must be `Some` - fn required_input_ordering(&self) -> Vec> { + fn required_input_ordering(&self) -> Vec>> { vec![None; self.children().len()] } @@ -591,11 +591,11 @@ impl Distribution { use datafusion_physical_expr::expressions::Column; pub use datafusion_physical_expr::window::WindowExpr; -use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_expr::{ expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, }; pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr}; +use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement}; /// Applies an optional projection to a [`SchemaRef`], returning the /// projected schema diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 51653450a699..4b21a9bd735e 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -577,33 +577,6 @@ impl DefaultPhysicalPlanner { let logical_input_schema = input.schema(); - let physical_sort_keys = if sort_keys.is_empty() { - None - } else { - let physical_input_schema = input_exec.schema(); - let sort_keys = sort_keys - .iter() - .map(|(e, _)| match e { - Expr::Sort(expr::Sort { - expr, - asc, - nulls_first, - }) => create_physical_sort_expr( - expr, - logical_input_schema, - &physical_input_schema, - SortOptions { - descending: !*asc, - nulls_first: *nulls_first, - }, - session_state.execution_props(), - ), - _ => unreachable!(), - }) - .collect::>>()?; - Some(sort_keys) - }; - let physical_input_schema = input_exec.schema(); let window_expr = window_expr .iter() @@ -628,7 +601,6 @@ impl DefaultPhysicalPlanner { input_exec, physical_input_schema, physical_partition_keys, - physical_sort_keys, )?) } else { Arc::new(WindowAggExec::try_new( @@ -636,7 +608,6 @@ impl DefaultPhysicalPlanner { input_exec, physical_input_schema, physical_partition_keys, - physical_sort_keys, )?) }) } diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 7ef4d3bf8e86..edacae4052d8 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -46,7 +46,9 @@ use crate::physical_plan::{ Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::{ + make_sort_requirements_from_exprs, EquivalenceProperties, PhysicalSortRequirement, +}; /// Sort preserving merge execution plan /// @@ -125,12 +127,16 @@ impl ExecutionPlan for SortPreservingMergeExec { vec![Distribution::UnspecifiedDistribution] } - fn required_input_ordering(&self) -> Vec> { - vec![Some(&self.expr)] + fn required_input_ordering(&self) -> Vec>> { + vec![Some(make_sort_requirements_from_exprs(&self.expr))] } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - Some(&self.expr) + self.input.output_ordering() + } + + fn maintains_input_order(&self) -> Vec { + vec![true] } fn equivalence_properties(&self) -> EquivalenceProperties { diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs index fa276c423879..4b01d3b4ed38 100644 --- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs @@ -50,11 +50,14 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use crate::physical_plan::windows::calc_requirements; use datafusion_physical_expr::window::{ PartitionBatchState, PartitionBatches, PartitionKey, PartitionWindowAggStates, WindowAggState, WindowState, }; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_expr::{ + EquivalenceProperties, PhysicalExpr, PhysicalSortRequirement, +}; use indexmap::IndexMap; use log::debug; @@ -71,8 +74,6 @@ pub struct BoundedWindowAggExec { input_schema: SchemaRef, /// Partition Keys pub partition_keys: Vec>, - /// Sort Keys - pub sort_keys: Option>, /// Execution metrics metrics: ExecutionPlanMetricsSet, } @@ -84,7 +85,6 @@ impl BoundedWindowAggExec { input: Arc, input_schema: SchemaRef, partition_keys: Vec>, - sort_keys: Option>, ) -> Result { let schema = create_schema(&input_schema, &window_expr)?; let schema = Arc::new(schema); @@ -94,7 +94,6 @@ impl BoundedWindowAggExec { schema, input_schema, partition_keys, - sort_keys, metrics: ExecutionPlanMetricsSet::new(), }) } @@ -123,7 +122,7 @@ impl BoundedWindowAggExec { let mut result = vec![]; // All window exprs have the same partition by, so we just use the first one: let partition_by = self.window_expr()[0].partition_by(); - let sort_keys = self.sort_keys.as_deref().unwrap_or(&[]); + let sort_keys = self.input.output_ordering().unwrap_or(&[]); for item in partition_by { if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) { result.push(a.clone()); @@ -167,9 +166,11 @@ impl ExecutionPlan for BoundedWindowAggExec { self.input().output_ordering() } - fn required_input_ordering(&self) -> Vec> { - let sort_keys = self.sort_keys.as_deref(); - vec![sort_keys] + fn required_input_ordering(&self) -> Vec>> { + let partition_bys = self.window_expr()[0].partition_by(); + let order_keys = self.window_expr()[0].order_by(); + let requirements = calc_requirements(partition_bys, order_keys); + vec![requirements] } fn required_input_distribution(&self) -> Vec { @@ -177,7 +178,6 @@ impl ExecutionPlan for BoundedWindowAggExec { debug!("No partition defined for BoundedWindowAggExec!!!"); vec![Distribution::SinglePartition] } else { - //TODO support PartitionCollections if there is no common partition columns in the window_expr vec![Distribution::HashPartitioned(self.partition_keys.clone())] } } @@ -199,7 +199,6 @@ impl ExecutionPlan for BoundedWindowAggExec { children[0].clone(), self.input_schema.clone(), self.partition_keys.clone(), - self.sort_keys.clone(), )?)) } diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index bdb9aa32645f..f7f9bb76b3f4 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -46,6 +46,7 @@ pub use bounded_window_agg_exec::BoundedWindowAggExec; pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, }; +use datafusion_physical_expr::PhysicalSortRequirement; pub use window_agg_exec::WindowAggExec; /// Create a physical expression for window function @@ -187,6 +188,30 @@ fn create_built_in_window_expr( }) } +pub(crate) fn calc_requirements( + partition_by_exprs: &[Arc], + orderby_sort_exprs: &[PhysicalSortExpr], +) -> Option> { + let mut sort_reqs = vec![]; + for partition_by in partition_by_exprs { + sort_reqs.push(PhysicalSortRequirement { + expr: partition_by.clone(), + options: None, + }); + } + for PhysicalSortExpr { expr, options } in orderby_sort_exprs { + let contains = sort_reqs.iter().any(|e| expr.eq(&e.expr)); + if !contains { + sort_reqs.push(PhysicalSortRequirement { + expr: expr.clone(), + options: Some(*options), + }); + } + } + // Convert empty result to None. Otherwise wrap result inside Some() + (!sort_reqs.is_empty()).then_some(sort_reqs) +} + #[cfg(test)] mod tests { use super::*; @@ -198,6 +223,7 @@ mod tests { use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending}; use arrow::array::*; + use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_primitive_array; @@ -210,6 +236,79 @@ mod tests { Ok((csv, schema)) } + fn create_test_schema2() -> Result { + let a = Field::new("a", DataType::Int32, true); + let b = Field::new("b", DataType::Int32, true); + let c = Field::new("c", DataType::Int32, true); + let d = Field::new("d", DataType::Int32, true); + let schema = Arc::new(Schema::new(vec![a, b, c, d])); + Ok(schema) + } + + #[tokio::test] + async fn test_calc_requirements() -> Result<()> { + let schema = create_test_schema2()?; + let test_data = vec![ + // PARTITION BY a, ORDER BY b ASC NULLS FIRST + ( + vec!["a"], + vec![("b", true, true)], + vec![("a", None), ("b", Some((true, true)))], + ), + // PARTITION BY a, ORDER BY a ASC NULLS FIRST + (vec!["a"], vec![("a", true, true)], vec![("a", None)]), + // PARTITION BY a, ORDER BY b ASC NULLS FIRST, c DESC NULLS LAST + ( + vec!["a"], + vec![("b", true, true), ("c", false, false)], + vec![ + ("a", None), + ("b", Some((true, true))), + ("c", Some((false, false))), + ], + ), + // PARTITION BY a, c, ORDER BY b ASC NULLS FIRST, c DESC NULLS LAST + ( + vec!["a", "c"], + vec![("b", true, true), ("c", false, false)], + vec![("a", None), ("c", None), ("b", Some((true, true)))], + ), + ]; + for (pb_params, ob_params, expected_params) in test_data { + let mut partitionbys = vec![]; + for col_name in pb_params { + partitionbys.push(col(col_name, &schema)?); + } + + let mut orderbys = vec![]; + for (col_name, descending, nulls_first) in ob_params { + let expr = col(col_name, &schema)?; + let options = SortOptions { + descending, + nulls_first, + }; + orderbys.push(PhysicalSortExpr { expr, options }); + } + + let mut expected: Option> = None; + for (col_name, reqs) in expected_params { + let options = reqs.map(|(descending, nulls_first)| SortOptions { + descending, + nulls_first, + }); + let expr = col(col_name, &schema)?; + let res = PhysicalSortRequirement { expr, options }; + if let Some(expected) = &mut expected { + expected.push(res); + } else { + expected = Some(vec![res]); + } + } + assert_eq!(calc_requirements(&partitionbys, &orderbys), expected); + } + Ok(()) + } + #[tokio::test] async fn window_function_with_udaf() -> Result<()> { #[derive(Debug)] @@ -269,7 +368,6 @@ mod tests { input, schema.clone(), vec![], - None, )?); let result: Vec = collect(window_exec, task_ctx).await?; @@ -323,7 +421,6 @@ mod tests { input, schema.clone(), vec![], - None, )?); let result: Vec = collect(window_exec, task_ctx).await?; @@ -371,7 +468,6 @@ mod tests { blocking_exec, schema, vec![], - None, )?); let fut = collect(window_agg_exec, task_ctx); diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index a667f0a3c216..29d5c4dc8f3d 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -24,6 +24,7 @@ use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; +use crate::physical_plan::windows::calc_requirements; use crate::physical_plan::{ ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, @@ -39,6 +40,7 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::DataFusionError; +use datafusion_physical_expr::PhysicalSortRequirement; use futures::stream::Stream; use futures::{ready, StreamExt}; use log::debug; @@ -61,8 +63,6 @@ pub struct WindowAggExec { input_schema: SchemaRef, /// Partition Keys pub partition_keys: Vec>, - /// Sort Keys - pub sort_keys: Option>, /// Execution metrics metrics: ExecutionPlanMetricsSet, } @@ -74,7 +74,6 @@ impl WindowAggExec { input: Arc, input_schema: SchemaRef, partition_keys: Vec>, - sort_keys: Option>, ) -> Result { let schema = create_schema(&input_schema, &window_expr)?; let schema = Arc::new(schema); @@ -85,7 +84,6 @@ impl WindowAggExec { schema, input_schema, partition_keys, - sort_keys, metrics: ExecutionPlanMetricsSet::new(), }) } @@ -114,7 +112,7 @@ impl WindowAggExec { let mut result = vec![]; // All window exprs have the same partition by, so we just use the first one: let partition_by = self.window_expr()[0].partition_by(); - let sort_keys = self.sort_keys.as_deref().unwrap_or(&[]); + let sort_keys = self.input.output_ordering().unwrap_or(&[]); for item in partition_by { if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) { result.push(a.clone()); @@ -172,9 +170,11 @@ impl ExecutionPlan for WindowAggExec { vec![true] } - fn required_input_ordering(&self) -> Vec> { - let sort_keys = self.sort_keys.as_deref(); - vec![sort_keys] + fn required_input_ordering(&self) -> Vec>> { + let partition_bys = self.window_expr()[0].partition_by(); + let order_keys = self.window_expr()[0].order_by(); + let requirements = calc_requirements(partition_bys, order_keys); + vec![requirements] } fn required_input_distribution(&self) -> Vec { @@ -200,7 +200,6 @@ impl ExecutionPlan for WindowAggExec { children[0].clone(), self.input_schema.clone(), self.partition_keys.clone(), - self.sort_keys.clone(), )?)) } diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 28924768489d..b6017f826c76 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -670,7 +670,7 @@ async fn sort_on_window_null_string() -> Result<()> { ]) .unwrap(); - let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(1)); ctx.register_batch("test", batch)?; let sql = @@ -689,7 +689,8 @@ async fn sort_on_window_null_string() -> Result<()> { ]; assert_batches_eq!(expected, &actual); - let sql = "SELECT d2, row_number() OVER (partition by d2) as rn1 FROM test"; + let sql = + "SELECT d2, row_number() OVER (partition by d2) as rn1 FROM test ORDER BY d2 asc"; let actual = execute_to_batches(&ctx, sql).await; // NULLS LAST let expected = vec![ @@ -704,7 +705,7 @@ async fn sort_on_window_null_string() -> Result<()> { assert_batches_eq!(expected, &actual); let sql = - "SELECT d2, row_number() OVER (partition by d2 order by d2 desc) as rn1 FROM test"; + "SELECT d2, row_number() OVER (partition by d2 order by d2 desc) as rn1 FROM test ORDER BY d2 desc"; let actual = execute_to_batches(&ctx, sql).await; // NULLS FIRST diff --git a/datafusion/core/tests/window_fuzz.rs b/datafusion/core/tests/window_fuzz.rs index a71aab280ea1..179bada53cc6 100644 --- a/datafusion/core/tests/window_fuzz.rs +++ b/datafusion/core/tests/window_fuzz.rs @@ -330,7 +330,9 @@ async fn run_window_test( let concat_input_record = concat_batches(&schema, &input1).unwrap(); let exec1 = Arc::new( - MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap(), + MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None) + .unwrap() + .with_sort_information(sort_keys.clone()), ); let usual_window_exec = Arc::new( WindowAggExec::try_new( @@ -347,12 +349,14 @@ async fn run_window_test( exec1, schema.clone(), vec![], - Some(sort_keys.clone()), ) .unwrap(), ); - let exec2 = - Arc::new(MemoryExec::try_new(&[input1.clone()], schema.clone(), None).unwrap()); + let exec2 = Arc::new( + MemoryExec::try_new(&[input1.clone()], schema.clone(), None) + .unwrap() + .with_sort_information(sort_keys), + ); let running_window_exec = Arc::new( BoundedWindowAggExec::try_new( vec![create_window_expr( @@ -368,7 +372,6 @@ async fn run_window_test( exec2, schema.clone(), vec![], - Some(sort_keys), ) .unwrap(), ); diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 56ca0824f46b..a1698fe072b4 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -53,7 +53,9 @@ pub use equivalence::EquivalentClass; pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef}; pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; -pub use sort_expr::PhysicalSortExpr; +pub use sort_expr::{ + make_sort_requirements_from_exprs, PhysicalSortExpr, PhysicalSortRequirement, +}; pub use utils::{ expr_list_eq_any_order, expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, normalize_out_expr_with_alias_schema, diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index f8172dabf65a..08bd394e6d11 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -41,14 +41,7 @@ impl PartialEq for PhysicalSortExpr { impl std::fmt::Display for PhysicalSortExpr { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let opts_string = match (self.options.descending, self.options.nulls_first) { - (true, true) => "DESC", - (true, false) => "DESC NULLS LAST", - (false, true) => "ASC", - (false, false) => "ASC NULLS LAST", - }; - - write!(f, "{} {}", self.expr, opts_string) + write!(f, "{} {}", self.expr, to_str(&self.options)) } } @@ -69,4 +62,73 @@ impl PhysicalSortExpr { options: Some(self.options), }) } + + /// Check whether sort expression satisfies `PhysicalSortRequirement`. + // If sort options is Some in `PhysicalSortRequirement`, `expr` and `options` field are compared for equality. + // If sort options is None in `PhysicalSortRequirement`, only `expr` is compared for equality. + pub fn satisfy(&self, requirement: &PhysicalSortRequirement) -> bool { + self.expr.eq(&requirement.expr) + && requirement + .options + .map_or(true, |opts| self.options == opts) + } +} + +/// Represents sort requirement associated with a plan +#[derive(Clone, Debug)] +pub struct PhysicalSortRequirement { + /// Physical expression representing the column to sort + pub expr: Arc, + /// Option to specify how the given column should be sorted. + /// If unspecified, there is no constraint on sort options. + pub options: Option, +} + +impl From for PhysicalSortRequirement { + fn from(value: PhysicalSortExpr) -> Self { + Self { + expr: value.expr, + options: Some(value.options), + } + } +} + +impl PartialEq for PhysicalSortRequirement { + fn eq(&self, other: &PhysicalSortRequirement) -> bool { + self.options == other.options && self.expr.eq(&other.expr) + } +} + +impl std::fmt::Display for PhysicalSortRequirement { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let opts_string = self.options.as_ref().map_or("NA", to_str); + write!(f, "{} {}", self.expr, opts_string) + } +} + +impl PhysicalSortRequirement { + /// Returns whether this requirement is equal or more specific than `other`. + pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool { + self.expr.eq(&other.expr) + && other.options.map_or(true, |other_opts| { + self.options.map_or(false, |opts| opts == other_opts) + }) + } +} + +pub fn make_sort_requirements_from_exprs( + ordering: &[PhysicalSortExpr], +) -> Vec { + ordering.iter().map(|e| e.clone().into()).collect() +} + +/// Returns the SQL string representation of the given [SortOptions] object. +#[inline] +fn to_str(options: &SortOptions) -> &str { + match (options.descending, options.nulls_first) { + (true, true) => "DESC", + (true, false) => "DESC NULLS LAST", + (false, true) => "ASC", + (false, false) => "ASC NULLS LAST", + } } diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 7c8c94c7d454..cd4ac6ff3d02 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -17,11 +17,14 @@ use crate::equivalence::EquivalentClass; use crate::expressions::{BinaryExpr, Column, UnKnownColumn}; -use crate::{EquivalenceProperties, PhysicalExpr, PhysicalSortExpr}; +use crate::{ + EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, +}; use arrow::datatypes::SchemaRef; use datafusion_common::Result; use datafusion_expr::Operator; +use arrow_schema::SortOptions; use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeRewriter, VisitRecursion, }; @@ -199,6 +202,24 @@ pub fn normalize_sort_expr_with_equivalence_properties( } } +pub fn normalize_sort_requirement_with_equivalence_properties( + sort_requirement: PhysicalSortRequirement, + eq_properties: &[EquivalentClass], +) -> PhysicalSortRequirement { + let normalized_expr = normalize_expr_with_equivalence_properties( + sort_requirement.expr.clone(), + eq_properties, + ); + if sort_requirement.expr.ne(&normalized_expr) { + PhysicalSortRequirement { + expr: normalized_expr, + options: sort_requirement.options, + } + } else { + sort_requirement + } +} + /// Checks whether given ordering requirements are satisfied by provided [PhysicalSortExpr]s. pub fn ordering_satisfy EquivalenceProperties>( provided: Option<&[PhysicalSortExpr]>, @@ -224,31 +245,102 @@ pub fn ordering_satisfy_concrete EquivalenceProperties>( } else if required .iter() .zip(provided.iter()) - .all(|(order1, order2)| order1.eq(order2)) + .all(|(req, given)| req.eq(given)) { true } else if let eq_classes @ [_, ..] = equal_properties().classes() { - let normalized_required_exprs = required + required .iter() .map(|e| { normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) }) - .collect::>(); - let normalized_provided_exprs = provided + .zip(provided.iter().map(|e| { + normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) + })) + .all(|(req, given)| req.eq(&given)) + } else { + false + } +} + +/// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the +/// provided [`PhysicalSortExpr`]s. +pub fn ordering_satisfy_requirement EquivalenceProperties>( + provided: Option<&[PhysicalSortExpr]>, + required: Option<&[PhysicalSortRequirement]>, + equal_properties: F, +) -> bool { + match (provided, required) { + (_, None) => true, + (None, Some(_)) => false, + (Some(provided), Some(required)) => { + ordering_satisfy_requirement_concrete(provided, required, equal_properties) + } + } +} + +/// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the +/// provided [`PhysicalSortExpr`]s. +pub fn ordering_satisfy_requirement_concrete EquivalenceProperties>( + provided: &[PhysicalSortExpr], + required: &[PhysicalSortRequirement], + equal_properties: F, +) -> bool { + if required.len() > provided.len() { + false + } else if required + .iter() + .zip(provided.iter()) + .all(|(req, given)| given.satisfy(req)) + { + true + } else if let eq_classes @ [_, ..] = equal_properties().classes() { + required .iter() .map(|e| { - normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) + normalize_sort_requirement_with_equivalence_properties( + e.clone(), + eq_classes, + ) }) - .collect::>(); - normalized_required_exprs - .iter() - .zip(normalized_provided_exprs.iter()) - .all(|(order1, order2)| order1.eq(order2)) + .zip(provided.iter().map(|e| { + normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) + })) + .all(|(req, given)| given.satisfy(&req)) } else { false } } +/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr` +/// for each entry in the input. If required ordering is None for an entry +/// default ordering `ASC, NULLS LAST` if given. +pub fn make_sort_exprs_from_requirements( + required: &[PhysicalSortRequirement], +) -> Vec { + required + .iter() + .map(|requirement| { + if let Some(options) = requirement.options { + PhysicalSortExpr { + expr: requirement.expr.clone(), + options, + } + } else { + PhysicalSortExpr { + expr: requirement.expr.clone(), + options: SortOptions { + // By default, create sort key with ASC is true and NULLS LAST to be consistent with + // PostgreSQL's rule: https://www.postgresql.org/docs/current/queries-order.html + descending: false, + nulls_first: false, + }, + } + } + }) + .collect() +} + #[derive(Clone, Debug)] pub struct ExprTreeNode { expr: Arc, diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 92986b0b39b2..7711b3c9617a 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -334,7 +334,6 @@ impl AsExecutionPlan for PhysicalPlanNode { input, Arc::new((&input_schema).try_into()?), vec![], - None, )?)) } PhysicalPlanType::Aggregate(hash_agg) => {