Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H committed Jan 17, 2025
1 parent 7dbd8e3 commit d0409d5
Show file tree
Hide file tree
Showing 24 changed files with 459 additions and 328 deletions.
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl ConvertOpt {
// optionally, repartition the file
let partitions = self.partitions;
if partitions > 1 {
csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))?
csv = csv.repartition(Partitioning::OnDemand(partitions))?
}

// create the physical plan
Expand Down
5 changes: 4 additions & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ impl RunOpt {
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
config.options_mut().optimizer.enable_on_demand_repartition = true;
config.options_mut().optimizer.enable_round_robin_repartition = false;
config
.options_mut()
.optimizer
.enable_round_robin_repartition = false;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down
281 changes: 149 additions & 132 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs

Large diffs are not rendered by default.

99 changes: 59 additions & 40 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion_physical_expr::{
utils::collect_columns, Partitioning, PhysicalExpr, PhysicalExprRef,
PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use datafusion_physical_plan::streaming::StreamingTableExec;
use datafusion_physical_plan::union::UnionExec;

Expand Down Expand Up @@ -130,6 +131,10 @@ pub fn remove_unnecessary_projections(
)?
} else if let Some(repartition) = input.downcast_ref::<RepartitionExec>() {
try_swapping_with_repartition(projection, repartition)?
} else if let Some(on_demand_repartition) =
input.downcast_ref::<OnDemandRepartitionExec>()
{
try_swapping_with_on_demand_repartition(projection, on_demand_repartition)?
} else if let Some(sort) = input.downcast_ref::<SortExec>() {
try_swapping_with_sort(projection, sort)?
} else if let Some(spm) = input.downcast_ref::<SortPreservingMergeExec>() {
Expand Down Expand Up @@ -413,6 +418,34 @@ fn try_swapping_with_filter(
.map(|e| Some(Arc::new(e) as _))
}

/// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done,
/// it returns the new swapped version having the [`RepartitionExec`] as the top plan.
/// Otherwise, it returns None.
fn try_swapping_with_on_demand_repartition(
projection: &ProjectionExec,
repartition: &OnDemandRepartitionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
// If the projection does not narrow the schema, we should not try to push it down.
if projection.expr().len() >= projection.input().schema().fields().len() {
return Ok(None);
}

// If pushdown is not beneficial or applicable, break it.
if projection.benefits_from_input_partitioning()[0] || !all_columns(projection.expr())
{
return Ok(None);
}

let new_projection = make_with_child(projection, repartition.input())?;

let new_partitioning = repartition.partitioning().clone();

Ok(Some(Arc::new(OnDemandRepartitionExec::try_new(
new_projection,
new_partitioning,
)?)))
}

/// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done,
/// it returns the new swapped version having the [`RepartitionExec`] as the top plan.
/// Otherwise, it returns None.
Expand Down

Large diffs are not rendered by default.

13 changes: 9 additions & 4 deletions datafusion/core/src/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ mod tests {
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::Partitioning;
use datafusion_physical_plan::displayable;
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use datafusion_physical_plan::repartition::RepartitionExec;

fn create_test_schema() -> SchemaRef {
Expand Down Expand Up @@ -504,7 +505,7 @@ mod tests {
limit.as_ref(),
vec![
"GlobalLimitExec: skip=0, fetch=100",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" OnDemandRepartitionExec: partitioning=OnDemand(10), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[0]",
],
);
Expand Down Expand Up @@ -635,9 +636,13 @@ mod tests {
let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)];
let left = sort_exec(sort_exprs1, source1);
let right = sort_exec(sort_exprs2, source2);
let right = Arc::new(RepartitionExec::try_new(
// let right = Arc::new(RepartitionExec::try_new(
// right,
// Partitioning::RoundRobinBatch(10),
// )?);
let right = Arc::new(OnDemandRepartitionExec::try_new(
right,
Partitioning::RoundRobinBatch(10),
Partitioning::OnDemand(10),
)?);
let left_jcol = col("c9", &left.schema()).unwrap();
let right_jcol = col("a", &right.schema()).unwrap();
Expand All @@ -659,7 +664,7 @@ mod tests {
" RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1",
" SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" OnDemandRepartitionExec: partitioning=OnDemand(10), input_partitions=1",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
],
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion_physical_expr::PhysicalSortRequirement;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_plan::joins::utils::ColumnIndex;
use datafusion_physical_plan::joins::HashJoinExec;
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;

/// This is a "data class" we use within the [`EnforceSorting`] rule to push
/// down [`SortExec`] in the plan. In some cases, we can reduce the total
Expand Down Expand Up @@ -271,6 +272,7 @@ fn pushdown_requirement_to_children(
} else if maintains_input_order.is_empty()
|| !maintains_input_order.iter().any(|o| *o)
|| plan.as_any().is::<RepartitionExec>()
|| plan.as_any().is::<OnDemandRepartitionExec>()
|| plan.as_any().is::<FilterExec>()
// TODO: Add support for Projection push down
|| plan.as_any().is::<ProjectionExec>()
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::{
displayable, DisplayAs, DisplayFormatType, PlanProperties,
Expand Down Expand Up @@ -324,7 +325,7 @@ pub fn global_limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan

pub fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
// TODO: replace with OnDemand
Arc::new(RepartitionExec::try_new(input, Partitioning::OnDemand(10)).unwrap())
Arc::new(OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(10)).unwrap())
// Arc::new(RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap())
}

Expand All @@ -336,7 +337,7 @@ pub fn spr_repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionP
// .with_preserve_order(),
// )
Arc::new(
RepartitionExec::try_new(input, Partitioning::OnDemand(10))
OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(10))
.unwrap()
.with_preserve_order(),
)
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use datafusion_physical_expr::LexRequirement;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use datafusion_physical_plan::tree_node::PlanContext;

/// This utility function adds a `SortExec` above an operator according to the
Expand Down Expand Up @@ -108,3 +109,8 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<RepartitionExec>()
}

/// Checks whether the given operator is a [`OnDemandRepartitionExec`].
pub fn is_on_demand_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<OnDemandRepartitionExec>()
}
7 changes: 6 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::utils::window_expr_common_partition_keys;

Expand Down Expand Up @@ -791,7 +792,11 @@ impl DefaultPhysicalPlanner {
LogicalPartitioning::RoundRobinBatch(n)
| LogicalPartitioning::OnDemand(n) => {
// TODO: replaced by OnDemand
Partitioning::OnDemand(*n)

return Ok(Arc::new(OnDemandRepartitionExec::try_new(
physical_input,
Partitioning::OnDemand(*n),
)?));
}
LogicalPartitioning::Hash(expr, n) => {
let runtime_expr = expr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use datafusion_physical_plan::displayable;
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::ExecutionPlan;

Expand Down Expand Up @@ -122,7 +123,8 @@ fn final_aggregate_exec(
}

fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap())
// Arc::new(RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap())
Arc::new(OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(10)).unwrap())
}

// Return appropriate expr depending if COUNT is for col or table (*)
Expand Down Expand Up @@ -157,7 +159,7 @@ fn aggregations_not_combined() -> datafusion_common::Result<()> {
// should not combine the Partial/Final AggregateExecs
let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"OnDemandRepartitionExec: partitioning=OnDemand(10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
];
Expand Down
21 changes: 13 additions & 8 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
Expand Down Expand Up @@ -121,7 +122,7 @@ fn transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limi
" LocalLimitExec: fetch=5",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" OnDemandRepartitionExec: partitioning=OnDemand(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(initial, expected_initial);
Expand All @@ -134,7 +135,7 @@ fn transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limi
" CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192, fetch=5",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" OnDemandRepartitionExec: partitioning=OnDemand(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);
Expand Down Expand Up @@ -227,7 +228,7 @@ fn pushes_global_limit_into_multiple_fetch_plans() -> datafusion_common::Result<
"GlobalLimitExec: skip=0, fetch=5",
" SortPreservingMergeExec: [c1@0 ASC]",
" SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" OnDemandRepartitionExec: partitioning=OnDemand(8), input_partitions=1",
" ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
" CoalesceBatchesExec: target_batch_size=8192",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
Expand All @@ -241,7 +242,7 @@ fn pushes_global_limit_into_multiple_fetch_plans() -> datafusion_common::Result<
let expected = [
"SortPreservingMergeExec: [c1@0 ASC], fetch=5",
" SortExec: TopK(fetch=5), expr=[c1@0 ASC], preserve_partitioning=[false]",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" OnDemandRepartitionExec: partitioning=OnDemand(8), input_partitions=1",
" ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
" CoalesceBatchesExec: target_batch_size=8192",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
Expand All @@ -266,7 +267,7 @@ fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions(
"GlobalLimitExec: skip=0, fetch=5",
" CoalescePartitionsExec",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" OnDemandRepartitionExec: partitioning=OnDemand(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(initial, expected_initial);
Expand All @@ -278,7 +279,7 @@ fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions(
"GlobalLimitExec: skip=0, fetch=5",
" CoalescePartitionsExec",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" OnDemandRepartitionExec: partitioning=OnDemand(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);
Expand Down Expand Up @@ -479,9 +480,13 @@ fn coalesce_partitions_exec(
fn repartition_exec(
streaming_table: Arc<dyn ExecutionPlan>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(RepartitionExec::try_new(
// Ok(Arc::new(RepartitionExec::try_new(
// streaming_table,
// Partitioning::RoundRobinBatch(8),
// )?))
Ok(Arc::new(OnDemandRepartitionExec::try_new(
streaming_table,
Partitioning::RoundRobinBatch(8),
Partitioning::OnDemand(8),
)?))
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ async fn test_physical_plan_display_indent() {
" AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12@1 < 10",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" OnDemandRepartitionExec: partitioning=OnDemand(9000), input_partitions=1",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], has_header=true",
];

Expand Down Expand Up @@ -656,11 +656,11 @@ async fn test_physical_plan_display_indent_multi_children() {
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" OnDemandRepartitionExec: partitioning=OnDemand(9000), input_partitions=1",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=9000",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" OnDemandRepartitionExec: partitioning=OnDemand(9000), input_partitions=1",
" ProjectionExec: expr=[c1@0 as c2]",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true",
];
Expand Down
Loading

0 comments on commit d0409d5

Please sign in to comment.