Skip to content

Commit

Permalink
Always Prefer SortPreservingMergeExec to the global SortExec
Browse files Browse the repository at this point in the history
  • Loading branch information
kyotoYaho committed Dec 14, 2022
1 parent ce8742d commit d0cff54
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
11 changes: 6 additions & 5 deletions datafusion/core/src/physical_optimizer/enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,15 +846,16 @@ fn ensure_distribution_and_ordering(
if plan.children().is_empty() {
return Ok(plan);
}
// If we have a `LIMIT` can run sort/limits in parallel (similar to TopK)
// It's mainly for changing the single node global SortExec to
// the SortPreservingMergeExec with multiple local SortExec.
// What's more, if limit exists, it can also be pushed down to the local sort
let plan = plan
.as_any()
.downcast_ref::<SortExec>()
.and_then(|sort_exec| {
if sort_exec.input().output_partitioning().partition_count() > 1
&& !sort_exec.preserve_partitioning()
&& sort_exec.fetch().is_some()
{
// If it's already preserving the partitioning, it can be regarded as a local sort
// and there's no need for this optimization
if !sort_exec.preserve_partitioning() {
let sort = SortExec::new_with_partitioning(
sort_exec.expr().to_vec(),
sort_exec.input().clone(),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ async fn explain_analyze_baseline_metrics() {
"CoalesceBatchesExec: target_batch_size=4096",
"metrics=[output_rows=5, elapsed_compute"
);
// The number of output rows becomes less after changing the global sort to the local sort with limit push down
assert_metrics!(
&formatted,
"CoalescePartitionsExec",
"metrics=[output_rows=5, elapsed_compute="
"metrics=[output_rows=3, elapsed_compute="
);
assert_metrics!(
&formatted,
Expand Down
16 changes: 8 additions & 8 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2040,8 +2040,8 @@ async fn left_semi_join() -> Result<()> {
let physical_plan = state.create_physical_plan(&logical_plan).await?;
let expected = if repartition_joins {
vec![
"SortExec: [t1_id@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
"SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
" SortExec: [t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
Expand All @@ -2057,8 +2057,8 @@ async fn left_semi_join() -> Result<()> {
]
} else {
vec![
"SortExec: [t1_id@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
"SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
" SortExec: [t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
Expand Down Expand Up @@ -2230,8 +2230,8 @@ async fn right_semi_join() -> Result<()> {
let logical_plan = state.optimize(&plan)?;
let physical_plan = state.create_physical_plan(&logical_plan).await?;
let expected = if repartition_joins {
vec![ "SortExec: [t1_id@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
vec![ "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
" SortExec: [t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
Expand All @@ -2246,8 +2246,8 @@ async fn right_semi_join() -> Result<()> {
]
} else {
vec![
"SortExec: [t1_id@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
"SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
" SortExec: [t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
Expand Down

0 comments on commit d0cff54

Please sign in to comment.