Skip to content

Commit

Permalink
bench: replace all RoundRorbin with OnDemand Repartition
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H committed Jan 16, 2025
1 parent f51176e commit 2c95ca3
Show file tree
Hide file tree
Showing 15 changed files with 207 additions and 34 deletions.
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use datafusion_expr::SortExpr;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use futures::StreamExt;
use log::debug;
use parking_lot::Mutex;
Expand Down Expand Up @@ -165,9 +166,14 @@ impl MemTable {
let exec = MemoryExec::try_new(&data, Arc::clone(&schema), None)?;

if let Some(num_partitions) = output_partitions {
let exec = RepartitionExec::try_new(
// TODO: replaced with OnDemandRepartitionExec
// let exec = RepartitionExec::try_new(
// Arc::new(exec),
// Partitioning::RoundRobinBatch(num_partitions),
// )?;
let exec = OnDemandRepartitionExec::try_new(
Arc::new(exec),
Partitioning::RoundRobinBatch(num_partitions),
Partitioning::OnDemand(num_partitions),
)?;

// execute and collect results
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,11 @@ fn remove_bottleneck_in_subplan(
if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() {
can_remove |= *n_out == input_partitioning.partition_count();
}
// TODO: replaced with OnDemand
if let Partitioning::OnDemand(n_out) = repartition.partitioning() {
can_remove |= *n_out == input_partitioning.partition_count();
}

if can_remove {
new_reqs = new_reqs.children.swap_remove(0)
}
Expand Down
12 changes: 10 additions & 2 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,20 @@ pub fn global_limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan
}

pub fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap())
// TODO: replace with OnDemand
Arc::new(RepartitionExec::try_new(input, Partitioning::OnDemand(10)).unwrap())
// Arc::new(RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap())
}

pub fn spr_repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
// TODO: replaced with OnDemand
// Arc::new(
// RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10))
// .unwrap()
// .with_preserve_order(),
// )
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10))
RepartitionExec::try_new(input, Partitioning::OnDemand(10))
.unwrap()
.with_preserve_order(),
)
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,8 +788,10 @@ impl DefaultPhysicalPlanner {
let physical_input = children.one()?;
let input_dfschema = input.as_ref().schema();
let physical_partitioning = match partitioning_scheme {
LogicalPartitioning::RoundRobinBatch(n) => {
Partitioning::RoundRobinBatch(*n)
LogicalPartitioning::RoundRobinBatch(n)
| LogicalPartitioning::OnDemand(n) => {
// TODO: replaced by OnDemand
Partitioning::OnDemand(*n)
}
LogicalPartitioning::Hash(expr, n) => {
let runtime_expr = expr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod sp_repartition_fuzz_tests {
expressions::{col, Column},
ConstExpr, PhysicalExpr, PhysicalSortExpr,
};
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use test_utils::add_empty_batches;

use datafusion_physical_expr_common::sort_expr::LexOrdering;
Expand Down Expand Up @@ -390,8 +391,14 @@ mod sp_repartition_fuzz_tests {
fn sort_preserving_repartition_exec_round_robin(
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
// TODO: replaced with OnDemandRepartitionExec
// Arc::new(
// RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2))
// .unwrap()
// .with_preserve_order(),
// )
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2))
OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(2))
.unwrap()
.with_preserve_order(),
)
Expand All @@ -400,8 +407,12 @@ mod sp_repartition_fuzz_tests {
fn repartition_exec_round_robin(
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
// TODO: replaced with OnDemandRepartitionExec
// Arc::new(
// RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2)).unwrap(),
// )
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2)).unwrap(),
OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(2)).unwrap(),
)
}

Expand Down
7 changes: 7 additions & 0 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,13 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"Partition Count": n
})
}
Partitioning::OnDemand(n) => {
json!({
"Node Type": "Repartition",
"Partitioning Scheme": "OnDemand",
"Partition Count": n
})
}
Partitioning::Hash(expr, n) => {
let hash_expr: Vec<String> =
expr.iter().map(|e| format!("{e}")).collect();
Expand Down
12 changes: 12 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,14 @@ impl LogicalPlan {
input: Arc::new(input),
}))
}
Partitioning::OnDemand(n) => {
self.assert_no_expressions(expr)?;
let input = self.only_input(inputs)?;
Ok(LogicalPlan::Repartition(Repartition {
partitioning_scheme: Partitioning::OnDemand(*n),
input: Arc::new(input),
}))
}
Partitioning::Hash(_, n) => {
let input = self.only_input(inputs)?;
Ok(LogicalPlan::Repartition(Repartition {
Expand Down Expand Up @@ -1913,6 +1921,9 @@ impl LogicalPlan {
Partitioning::RoundRobinBatch(n) => {
write!(f, "Repartition: RoundRobinBatch partition_count={n}")
}
Partitioning::OnDemand(n) => {
write!(f, "Repartition: OnDemand partition_count={n}")
}
Partitioning::Hash(expr, n) => {
let hash_expr: Vec<String> =
expr.iter().map(|e| format!("{e}")).collect();
Expand Down Expand Up @@ -3391,6 +3402,7 @@ pub enum Partitioning {
Hash(Vec<Expr>, usize),
/// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
DistributeBy(Vec<Expr>),
OnDemand(usize),
}

/// Represent the unnesting operation on a list column, such as the recursion depth and
Expand Down
8 changes: 6 additions & 2 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,9 @@ impl LogicalPlan {
Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
expr.apply_elements(f)
}
Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
Partitioning::RoundRobinBatch(_) | Partitioning::OnDemand(_) => {
Ok(TreeNodeRecursion::Continue)
}
},
LogicalPlan::Window(Window { window_expr, .. }) => {
window_expr.apply_elements(f)
Expand Down Expand Up @@ -527,7 +529,9 @@ impl LogicalPlan {
Partitioning::DistributeBy(expr) => expr
.map_elements(f)?
.update_data(Partitioning::DistributeBy),
Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
Partitioning::RoundRobinBatch(_) | Partitioning::OnDemand(_) => {
Transformed::no(partitioning_scheme)
}
}
.update_data(|partitioning_scheme| {
LogicalPlan::Repartition(Repartition {
Expand Down
16 changes: 14 additions & 2 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,18 @@ impl ExecutionPlan for RepartitionExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
// TODO: make sure that this is only called for hash partitioning
// match self.partitioning() {
// Partitioning::Hash(_, _) => {}
// _ => {
// panic!(
// "RepartitionExec::execute should never be called directly. \
// Partition type: {:?}",
// self.partitioning()
// );
// }
// }

trace!(
"Start {}::execute for partition: {}",
self.name(),
Expand Down Expand Up @@ -1772,7 +1784,8 @@ mod tests {
MemoryExec::try_new(&partitions, Arc::clone(&schema), None).unwrap(),
);
let exec =
OnDemandRepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2)).unwrap();
OnDemandRepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2))
.unwrap();

let coalesce_exec =
CoalescePartitionsExec::new(Arc::new(exec) as Arc<dyn ExecutionPlan>);
Expand Down Expand Up @@ -1816,7 +1829,6 @@ mod tests {
Ok(())
}


fn sort_exprs(schema: &Schema) -> LexOrdering {
let options = SortOptions::default();
LexOrdering::new(vec![PhysicalSortExpr {
Expand Down
Loading

0 comments on commit 2c95ca3

Please sign in to comment.