Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR]: update benefits_from_input_partitioning implementation for projection and repartition #7246

Merged
merged 2 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions datafusion/core/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream, Statistics};

use datafusion_physical_expr::equivalence::update_ordering_equivalence_with_cast;
use datafusion_physical_expr::expressions::CastExpr;
use datafusion_physical_expr::expressions::{CastExpr, Literal};
use datafusion_physical_expr::{
normalize_out_expr_with_columns_map, project_equivalence_properties,
project_ordering_equivalence_properties, OrderingEquivalenceProperties,
Expand Down Expand Up @@ -283,13 +283,13 @@ impl ExecutionPlan for ProjectionExec {
}

fn benefits_from_input_partitioning(&self) -> bool {
let all_column_expr = self
let all_simple_exprs = self
.expr
.iter()
.all(|(e, _)| e.as_any().downcast_ref::<Column>().is_some());
// If expressions are all column_expr, then all computations in this projection are reorder or rename,
.all(|(e, _)| e.as_any().is::<Column>() || e.as_any().is::<Literal>());
// If expressions are all either column_expr or Literal, then all computations in this projection are reorder or rename,
// and projection would not benefit from the repartition, benefits_from_input_partitioning will return false.
!all_column_expr
!all_simple_exprs
}

fn execute(
Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ impl RepartitionExec {
&self.partitioning
}

/// Get preserve_order flag of the RepartitionExecutor
/// `true` means `SortPreservingRepartitionExec`, `false` means `RepartitionExec`
pub fn preserve_order(&self) -> bool {
self.preserve_order
}

/// Get name of the Executor
pub fn name(&self) -> &str {
if self.preserve_order {
Expand Down Expand Up @@ -432,6 +438,10 @@ impl ExecutionPlan for RepartitionExec {
Ok(children[0])
}

fn benefits_from_input_partitioning(&self) -> bool {
matches!(self.partitioning, Partitioning::Hash(_, _))
}

fn output_partitioning(&self) -> Partitioning {
self.partitioning.clone()
}
Expand Down
39 changes: 39 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,45 @@ statement error DataFusion error: Error during planning: EXCLUDE or EXCEPT conta
SELECT * EXCLUDE(a, a)
FROM table1

# run below query in multi partitions
statement ok
set datafusion.execution.target_partitions = 2;

# since query below contains computation
# inside projection expr, increasing partitions
# is beneficial
query TT
EXPLAIN SELECT a, a+b
FROM annotated_data_finite2
ORDER BY a ASC;
----
logical_plan
Sort: annotated_data_finite2.a ASC NULLS LAST
--Projection: annotated_data_finite2.a, annotated_data_finite2.a + annotated_data_finite2.b
----TableScan: annotated_data_finite2 projection=[a, b]
physical_plan
SortPreservingMergeExec: [a@0 ASC NULLS LAST]
--ProjectionExec: expr=[a@0 as a, a@0 + b@1 as annotated_data_finite2.a + annotated_data_finite2.b]
----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true

# since query below doesn't computation
# inside projection expr, increasing partitions
# is not beneficial. Hence plan doesn't contain
# RepartitionExec
query TT
EXPLAIN SELECT a, b, 2
FROM annotated_data_finite2
ORDER BY a ASC;
----
logical_plan
Sort: annotated_data_finite2.a ASC NULLS LAST
--Projection: annotated_data_finite2.a, annotated_data_finite2.b, Int64(2)
----TableScan: annotated_data_finite2 projection=[a, b]
physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b, 2 as Int64(2)]
--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true

statement ok
drop table annotated_data_finite2;

Expand Down