Skip to content

Commit

Permalink
remove prefer_existing_sort effects
Browse files Browse the repository at this point in the history
remove prefer_existing_sort based test cases
  • Loading branch information
mertak-synnada committed Feb 12, 2025
1 parent 3c76bdc commit 265e00e
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ macro_rules! plans_matches_expected {
/// * `FIRST_ENFORCE_DIST` -
/// true: (EnforceDistribution, EnforceDistribution, EnforceSorting)
/// false: else runs (EnforceSorting, EnforceDistribution, EnforceDistribution)
/// * `PREFER_EXISTING_SORT` (optional) - if true, will not repartition / resort data if it is already sorted
/// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to
/// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans
/// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ macro_rules! assert_optimized {
plan_with_pipeline_fixer,
false,
true,
&config,
)
})
.data()
Expand Down Expand Up @@ -1476,7 +1475,6 @@ macro_rules! assert_optimized {
plan_with_pipeline_fixer,
false,
true,
&config,
)
})
.data()
Expand Down

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,8 +853,7 @@ fn add_roundrobin_on_top(
// during repartition. This will be un-done in the future
// If any of the following conditions is true
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.prefer_existing_sort`)
// - Required input ordering is a hard requirement
let partitioning = Partitioning::RoundRobinBatch(n_target);
let repartition =
RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
Expand Down Expand Up @@ -912,8 +911,7 @@ fn add_hash_on_top(
// following conditions is true:
// - Preserving ordering is not helpful in terms of satisfying ordering
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.prefer_existing_sort`).
// - Required input ordering is a hard requirement
let partitioning = dist.create_partitioning(n_target);
let repartition =
RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
Expand Down
11 changes: 3 additions & 8 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ impl PhysicalOptimizerRule for EnforceSorting {
plan_with_pipeline_fixer,
false,
true,
config,
)
})
.data()?;
Expand Down Expand Up @@ -400,7 +399,7 @@ fn analyze_immediate_sort_removal(
mut node: PlanWithCorrespondingSort,
) -> Transformed<PlanWithCorrespondingSort> {
let Some(sort_exec) = node.plan.as_any().downcast_ref::<SortExec>() else {
return Transformed::no(node)
return Transformed::no(node);
};
let sort_input = sort_exec.input();
// Check if the sort is unnecessary
Expand All @@ -410,7 +409,7 @@ fn analyze_immediate_sort_removal(
.output_ordering()
.unwrap_or(LexOrdering::empty()),
) {
return Transformed::no(node)
return Transformed::no(node);
};
node.plan = if !sort_exec.preserve_partitioning()
&& sort_input.output_partitioning().partition_count() > 1
Expand All @@ -432,11 +431,7 @@ fn analyze_immediate_sort_removal(
.partition_count()
== 1
{
Arc::new(GlobalLimitExec::new(
Arc::clone(sort_input),
0,
Some(fetch),
))
Arc::new(GlobalLimitExec::new(Arc::clone(sort_input), 0, Some(fetch)))
} else {
Arc::new(LocalLimitExec::new(Arc::clone(sort_input), fetch))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ use crate::utils::{
is_coalesce_partitions, is_repartition, is_sort, is_sort_preserving_merge,
};

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::Transformed;
use datafusion_common::Result;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::execution_plan::{EmissionType, RequiredInputOrdering};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::tree_node::PlanContext;
Expand Down Expand Up @@ -238,16 +237,17 @@ pub fn replace_with_order_preserving_variants(
// `SortExec` from the plan. If this flag is `false`, this replacement
// should only be made to fix the pipeline (streaming).
is_spm_better: bool,
config: &ConfigOptions,
) -> Result<Transformed<OrderPreservationContext>> {
update_children(&mut requirements);
if !(is_sort(&requirements.plan) && requirements.children[0].data) {
return Ok(Transformed::no(requirements));
}
let requirement = requirements.plan.required_input_ordering()[0].clone();

// For unbounded cases, we replace with the order-preserving variant in any
// case, as doing so helps fix the pipeline. Also replace if config allows.
let use_order_preserving_variant = config.optimizer.prefer_existing_sort
let use_order_preserving_variant = (requirement.is_some()
&& matches!(requirement.unwrap(), RequiredInputOrdering::Hard(_)))
|| (requirements.plan.boundedness().is_unbounded()
&& requirements.plan.pipeline_behavior() == EmissionType::Final);

Expand Down
13 changes: 0 additions & 13 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3220,9 +3220,6 @@ WITH ORDER (a ASC NULLS FIRST, b ASC, c ASC)
LOCATION '../core/tests/data/window_2.csv'
OPTIONS ('format.has_header' 'true');

statement ok
set datafusion.optimizer.prefer_existing_sort = true;

# sort merge join should propagate ordering equivalence of the left side
# for inner join. Hence final requirement rn1 ASC is already satisfied at
# the end of SortMergeJoinExec.
Expand Down Expand Up @@ -3291,9 +3288,6 @@ physical_plan
11)------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
12)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true

statement ok
set datafusion.optimizer.prefer_existing_sort = false;

# SortMergeJoin should add ordering equivalences of
# right table as lexicographical append to the global ordering
# below query shouldn't add any SortExec for order by clause.
Expand Down Expand Up @@ -3475,10 +3469,6 @@ physical_plan
statement ok
set datafusion.execution.target_partitions = 2;

# use bounded variants
statement ok
set datafusion.optimizer.prefer_existing_sort = true;

query TT
EXPLAIN SELECT l.a, LAST_VALUE(r.b ORDER BY r.a ASC NULLS FIRST) as last_col1
FROM annotated_data as l
Expand Down Expand Up @@ -3564,9 +3554,6 @@ set datafusion.optimizer.prefer_hash_join = true;
statement ok
set datafusion.execution.target_partitions = 2;

statement ok
set datafusion.optimizer.prefer_existing_sort = false;

statement ok
drop table annotated_data;

Expand Down
3 changes: 0 additions & 3 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5476,9 +5476,6 @@ LOCATION '../../testing/data/csv/aggregate_test_100.csv'
WITH ORDER (c1)
OPTIONS ('format.has_header' 'true');

statement ok
set datafusion.optimizer.prefer_existing_sort = true;

query TT
EXPLAIN SELECT c1, SUM(c9) OVER(PARTITION BY c1) as sum_c9 FROM aggregate_test_100_ordered ORDER BY c1, sum_c9;
----
Expand Down

0 comments on commit 265e00e

Please sign in to comment.