diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index 5c4b66114328..36d4ecd1f4c6 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -42,7 +42,7 @@ use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream, Statistics}; use datafusion_physical_expr::equivalence::update_ordering_equivalence_with_cast; -use datafusion_physical_expr::expressions::CastExpr; +use datafusion_physical_expr::expressions::{CastExpr, Literal}; use datafusion_physical_expr::{ normalize_out_expr_with_columns_map, project_equivalence_properties, project_ordering_equivalence_properties, OrderingEquivalenceProperties, @@ -283,13 +283,13 @@ impl ExecutionPlan for ProjectionExec { } fn benefits_from_input_partitioning(&self) -> bool { - let all_column_expr = self + let all_simple_exprs = self .expr .iter() - .all(|(e, _)| e.as_any().downcast_ref::().is_some()); - // If expressions are all column_expr, then all computations in this projection are reorder or rename, + .all(|(e, _)| e.as_any().is::() || e.as_any().is::()); + // If expressions are all either column_expr or Literal, then all computations in this projection are reorder or rename, // and projection would not benefit from the repartition, benefits_from_input_partitioning will return false. - !all_column_expr + !all_simple_exprs } fn execute( diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 99b72a1b4064..4362bc509809 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -370,6 +370,12 @@ impl RepartitionExec { &self.partitioning } + /// Get preserve_order flag of the RepartitionExecutor + /// `true` means `SortPreservingRepartitionExec`, `false` means `RepartitionExec` + pub fn preserve_order(&self) -> bool { + self.preserve_order + } + /// Get name of the Executor pub fn name(&self) -> &str { if self.preserve_order { @@ -432,6 +438,10 @@ impl ExecutionPlan for RepartitionExec { Ok(children[0]) } + fn benefits_from_input_partitioning(&self) -> bool { + matches!(self.partitioning, Partitioning::Hash(_, _)) + } + fn output_partitioning(&self) -> Partitioning { self.partitioning.clone() } diff --git a/datafusion/core/tests/sqllogictests/test_files/select.slt b/datafusion/core/tests/sqllogictests/test_files/select.slt index cc8828ef879c..84f7c3be526b 100644 --- a/datafusion/core/tests/sqllogictests/test_files/select.slt +++ b/datafusion/core/tests/sqllogictests/test_files/select.slt @@ -843,6 +843,45 @@ statement error DataFusion error: Error during planning: EXCLUDE or EXCEPT conta SELECT * EXCLUDE(a, a) FROM table1 +# run below query in multi partitions +statement ok +set datafusion.execution.target_partitions = 2; + +# since query below contains computation +# inside projection expr, increasing partitions +# is beneficial +query TT +EXPLAIN SELECT a, a+b +FROM annotated_data_finite2 +ORDER BY a ASC; +---- +logical_plan +Sort: annotated_data_finite2.a ASC NULLS LAST +--Projection: annotated_data_finite2.a, annotated_data_finite2.a + annotated_data_finite2.b +----TableScan: annotated_data_finite2 projection=[a, b] +physical_plan +SortPreservingMergeExec: [a@0 ASC NULLS LAST] +--ProjectionExec: expr=[a@0 as a, a@0 + b@1 as annotated_data_finite2.a + annotated_data_finite2.b] +----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true + +# since query below doesn't computation +# inside projection expr, increasing partitions +# is not beneficial. Hence plan doesn't contain +# RepartitionExec +query TT +EXPLAIN SELECT a, b, 2 +FROM annotated_data_finite2 +ORDER BY a ASC; +---- +logical_plan +Sort: annotated_data_finite2.a ASC NULLS LAST +--Projection: annotated_data_finite2.a, annotated_data_finite2.b, Int64(2) +----TableScan: annotated_data_finite2 projection=[a, b] +physical_plan +ProjectionExec: expr=[a@0 as a, b@1 as b, 2 as Int64(2)] +--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true + statement ok drop table annotated_data_finite2;