Skip to content

Commit

Permalink
Change required input ordering physical plan API to allow any NULLS F…
Browse files Browse the repository at this point in the history
…IRST / LAST and ASC / DESC (#5772)

* Change required input ordering to format to not absolutely require direction.

* remove unnecessary code
  • Loading branch information
mustafasrepo authored Mar 30, 2023
1 parent 8e125d2 commit c9bf3f3
Show file tree
Hide file tree
Showing 16 changed files with 356 additions and 113 deletions.
9 changes: 7 additions & 2 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ fn init() {
mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortRequirement,
};

use super::*;
use crate::datasource::listing::PartitionedFile;
Expand Down Expand Up @@ -1131,8 +1134,10 @@ mod tests {
}

// model that it requires the output ordering of its input
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![self.input.output_ordering()]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![self
.output_ordering()
.map(make_sort_requirements_from_exprs)]
}

fn with_new_children(
Expand Down
32 changes: 15 additions & 17 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ use crate::physical_plan::{with_new_children_if_necessary, Distribution, Executi
use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{reverse_sort_options, DataFusionError};
use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete};
use datafusion_physical_expr::utils::{
make_sort_exprs_from_requirements, ordering_satisfy,
ordering_satisfy_requirement_concrete,
};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use itertools::{concat, izip};
use std::iter::zip;
Expand Down Expand Up @@ -471,17 +474,20 @@ fn ensure_sorting(
let physical_ordering = child.output_ordering();
match (required_ordering, physical_ordering) {
(Some(required_ordering), Some(physical_ordering)) => {
let is_ordering_satisfied = ordering_satisfy_concrete(
if !ordering_satisfy_requirement_concrete(
physical_ordering,
required_ordering,
&required_ordering,
|| child.equivalence_properties(),
);
if !is_ordering_satisfied {
) {
// Make sure we preserve the ordering requirements:
update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
let sort_expr = required_ordering.to_vec();
let sort_expr = make_sort_exprs_from_requirements(&required_ordering);
add_sort_above(child, sort_expr)?;
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
if is_sort(child) {
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
} else {
*sort_onwards = None;
}
}
if let Some(tree) = sort_onwards {
// For window expressions, we can remove some sorts when we can
Expand All @@ -497,7 +503,8 @@ fn ensure_sorting(
}
(Some(required), None) => {
// Ordering requirement is not met, we should add a `SortExec` to the plan.
add_sort_above(child, required.to_vec())?;
let sort_expr = make_sort_exprs_from_requirements(&required);
add_sort_above(child, sort_expr)?;
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
}
(None, Some(_)) => {
Expand Down Expand Up @@ -592,7 +599,6 @@ fn analyze_window_sort_removal(
};

let mut first_should_reverse = None;
let mut physical_ordering_common = vec![];
for sort_any in sort_tree.get_leaves() {
let sort_output_ordering = sort_any.output_ordering();
// Variable `sort_any` will either be a `SortExec` or a
Expand All @@ -609,11 +615,6 @@ fn analyze_window_sort_removal(
DataFusionError::Plan("A SortExec should have output ordering".to_string())
})?;
if let Some(physical_ordering) = physical_ordering {
if physical_ordering_common.is_empty()
|| physical_ordering.len() < physical_ordering_common.len()
{
physical_ordering_common = physical_ordering.to_vec();
}
let (can_skip_sorting, should_reverse) = can_skip_sort(
window_expr[0].partition_by(),
required_ordering,
Expand Down Expand Up @@ -664,15 +665,13 @@ fn analyze_window_sort_removal(
new_child,
new_schema,
partition_keys.to_vec(),
Some(physical_ordering_common),
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
new_child,
new_schema,
partition_keys.to_vec(),
Some(physical_ordering_common),
)?) as _
};
return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
Expand Down Expand Up @@ -1889,7 +1888,6 @@ mod tests {
input.clone(),
input.schema(),
vec![],
Some(sort_exprs),
)
.unwrap(),
)
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/physical_plan/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ use crate::physical_plan::{
};

use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortRequirement,
};

/// join execution plan executes partitions in parallel and combines them into a set of
/// partitions.
Expand Down Expand Up @@ -225,8 +228,11 @@ impl ExecutionPlan for SortMergeJoinExec {
]
}

fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![Some(&self.left_sort_exprs), Some(&self.right_sort_exprs)]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![
Some(make_sort_requirements_from_exprs(&self.left_sort_exprs)),
Some(make_sort_requirements_from_exprs(&self.right_sort_exprs)),
]
}

fn output_partitioning(&self) -> Partitioning {
Expand Down
14 changes: 9 additions & 5 deletions datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ use hashbrown::{raw::RawTable, HashSet};

use datafusion_common::{utils::bisect, ScalarValue};
use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortRequirement,
};

use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
Expand Down Expand Up @@ -399,11 +402,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
self.schema.clone()
}

fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![
Some(&self.left_required_sort_exprs),
Some(&self.right_required_sort_exprs),
]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
let left_required =
make_sort_requirements_from_exprs(&self.left_required_sort_exprs);
let right_required =
make_sort_requirements_from_exprs(&self.right_required_sort_exprs);
vec![Some(left_required), Some(right_required)]
}

fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// NOTE that checking `!is_empty()` does **not** check for a
/// required input ordering. Instead, the correct check is that at
/// least one entry must be `Some`
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![None; self.children().len()]
}

Expand Down Expand Up @@ -591,11 +591,11 @@ impl Distribution {

use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
};
pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};

/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema
Expand Down
29 changes: 0 additions & 29 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,33 +577,6 @@ impl DefaultPhysicalPlanner {

let logical_input_schema = input.schema();

let physical_sort_keys = if sort_keys.is_empty() {
None
} else {
let physical_input_schema = input_exec.schema();
let sort_keys = sort_keys
.iter()
.map(|(e, _)| match e {
Expr::Sort(expr::Sort {
expr,
asc,
nulls_first,
}) => create_physical_sort_expr(
expr,
logical_input_schema,
&physical_input_schema,
SortOptions {
descending: !*asc,
nulls_first: *nulls_first,
},
session_state.execution_props(),
),
_ => unreachable!(),
})
.collect::<Result<Vec<_>>>()?;
Some(sort_keys)
};

let physical_input_schema = input_exec.schema();
let window_expr = window_expr
.iter()
Expand All @@ -628,15 +601,13 @@ impl DefaultPhysicalPlanner {
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?)
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?)
})
}
Expand Down
14 changes: 10 additions & 4 deletions datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ use crate::physical_plan::{
Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, EquivalenceProperties, PhysicalSortRequirement,
};

/// Sort preserving merge execution plan
///
Expand Down Expand Up @@ -125,12 +127,16 @@ impl ExecutionPlan for SortPreservingMergeExec {
vec![Distribution::UnspecifiedDistribution]
}

fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![Some(&self.expr)]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![Some(make_sort_requirements_from_exprs(&self.expr))]
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
Some(&self.expr)
self.input.output_ordering()
}

fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}

fn equivalence_properties(&self) -> EquivalenceProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::physical_plan::windows::calc_requirements;
use datafusion_physical_expr::window::{
PartitionBatchState, PartitionBatches, PartitionKey, PartitionWindowAggStates,
WindowAggState, WindowState,
};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
use datafusion_physical_expr::{
EquivalenceProperties, PhysicalExpr, PhysicalSortRequirement,
};
use indexmap::IndexMap;
use log::debug;

Expand All @@ -71,8 +74,6 @@ pub struct BoundedWindowAggExec {
input_schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Sort Keys
pub sort_keys: Option<Vec<PhysicalSortExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
Expand All @@ -84,7 +85,6 @@ impl BoundedWindowAggExec {
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
sort_keys: Option<Vec<PhysicalSortExpr>>,
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
Expand All @@ -94,7 +94,6 @@ impl BoundedWindowAggExec {
schema,
input_schema,
partition_keys,
sort_keys,
metrics: ExecutionPlanMetricsSet::new(),
})
}
Expand Down Expand Up @@ -123,7 +122,7 @@ impl BoundedWindowAggExec {
let mut result = vec![];
// All window exprs have the same partition by, so we just use the first one:
let partition_by = self.window_expr()[0].partition_by();
let sort_keys = self.sort_keys.as_deref().unwrap_or(&[]);
let sort_keys = self.input.output_ordering().unwrap_or(&[]);
for item in partition_by {
if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
result.push(a.clone());
Expand Down Expand Up @@ -167,17 +166,18 @@ impl ExecutionPlan for BoundedWindowAggExec {
self.input().output_ordering()
}

fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
let sort_keys = self.sort_keys.as_deref();
vec![sort_keys]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
let partition_bys = self.window_expr()[0].partition_by();
let order_keys = self.window_expr()[0].order_by();
let requirements = calc_requirements(partition_bys, order_keys);
vec![requirements]
}

fn required_input_distribution(&self) -> Vec<Distribution> {
if self.partition_keys.is_empty() {
debug!("No partition defined for BoundedWindowAggExec!!!");
vec![Distribution::SinglePartition]
} else {
//TODO support PartitionCollections if there is no common partition columns in the window_expr
vec![Distribution::HashPartitioned(self.partition_keys.clone())]
}
}
Expand All @@ -199,7 +199,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
children[0].clone(),
self.input_schema.clone(),
self.partition_keys.clone(),
self.sort_keys.clone(),
)?))
}

Expand Down
Loading

0 comments on commit c9bf3f3

Please sign in to comment.