Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bench: Replace RoundRobinBatch with OnDemandRepartition #60

Draft
wants to merge 20 commits into
base: apache_main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ main() {
tpch10)
data_tpch "10"
;;
tpch50)
data_tpch "50"
;;
tpch_mem10)
# same data as for tpch10
data_tpch "10"
Expand Down Expand Up @@ -233,6 +236,9 @@ main() {
tpch10)
run_tpch "10"
;;
tpch50)
run_tpch "50"
;;
tpch_mem10)
run_tpch_mem "10"
;;
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl ConvertOpt {
// optionally, repartition the file
let partitions = self.partitions;
if partitions > 1 {
csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))?
csv = csv.repartition(Partitioning::OnDemand(partitions))?
}

// create the physical plan
Expand Down
2 changes: 2 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ config_namespace! {
/// repartitioning to increase parallelism to leverage more CPU cores
pub enable_round_robin_repartition: bool, default = true

pub enable_on_demand_repartition: bool, default = true

/// When set to true, the optimizer will attempt to perform limit operations
/// during aggregations, if possible
pub enable_topk_aggregation: bool, default = true
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use datafusion_expr::SortExpr;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use futures::StreamExt;
use log::debug;
use parking_lot::Mutex;
Expand Down Expand Up @@ -165,9 +166,14 @@ impl MemTable {
let exec = MemoryExec::try_new(&data, Arc::clone(&schema), None)?;

if let Some(num_partitions) = output_partitions {
let exec = RepartitionExec::try_new(
// TODO: replaced with OnDemandRepartitionExec
// let exec = RepartitionExec::try_new(
// Arc::new(exec),
// Partitioning::RoundRobinBatch(num_partitions),
// )?;
let exec = OnDemandRepartitionExec::try_new(
Arc::new(exec),
Partitioning::RoundRobinBatch(num_partitions),
Partitioning::OnDemand(num_partitions),
)?;

// execute and collect results
Expand Down
273 changes: 157 additions & 116 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs

Large diffs are not rendered by default.

100 changes: 62 additions & 38 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion_physical_expr::{
utils::collect_columns, Partitioning, PhysicalExpr, PhysicalExprRef,
PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use datafusion_physical_plan::streaming::StreamingTableExec;
use datafusion_physical_plan::union::UnionExec;

Expand Down Expand Up @@ -130,6 +131,10 @@ pub fn remove_unnecessary_projections(
)?
} else if let Some(repartition) = input.downcast_ref::<RepartitionExec>() {
try_swapping_with_repartition(projection, repartition)?
} else if let Some(on_demand_repartition) =
input.downcast_ref::<OnDemandRepartitionExec>()
{
try_swapping_with_on_demand_repartition(projection, on_demand_repartition)?
} else if let Some(sort) = input.downcast_ref::<SortExec>() {
try_swapping_with_sort(projection, sort)?
} else if let Some(spm) = input.downcast_ref::<SortPreservingMergeExec>() {
Expand Down Expand Up @@ -413,6 +418,34 @@ fn try_swapping_with_filter(
.map(|e| Some(Arc::new(e) as _))
}

/// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done,
/// it returns the new swapped version having the [`RepartitionExec`] as the top plan.
/// Otherwise, it returns None.
fn try_swapping_with_on_demand_repartition(
projection: &ProjectionExec,
repartition: &OnDemandRepartitionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
// If the projection does not narrow the schema, we should not try to push it down.
if projection.expr().len() >= projection.input().schema().fields().len() {
return Ok(None);
}

// If pushdown is not beneficial or applicable, break it.
if projection.benefits_from_input_partitioning()[0] || !all_columns(projection.expr())
{
return Ok(None);
}

let new_projection = make_with_child(projection, repartition.input())?;

let new_partitioning = repartition.partitioning().clone();

Ok(Some(Arc::new(OnDemandRepartitionExec::try_new(
new_projection,
new_partitioning,
)?)))
}

/// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done,
/// it returns the new swapped version having the [`RepartitionExec`] as the top plan.
/// Otherwise, it returns None.
Expand Down
Loading
Loading