Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bench: Replace RoundRobinBatch with OnDemandRepartition #60

Draft
wants to merge 20 commits into
base: apache_main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ impl RunOpt {
.config()
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
config.options_mut().optimizer.enable_on_demand_repartition = true;
config.options_mut().optimizer.enable_round_robin_repartition = false;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down
2 changes: 2 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ config_namespace! {
/// repartitioning to increase parallelism to leverage more CPU cores
pub enable_round_robin_repartition: bool, default = true

pub enable_on_demand_repartition: bool, default = true

/// When set to true, the optimizer will attempt to perform limit operations
/// during aggregations, if possible
pub enable_topk_aggregation: bool, default = true
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use datafusion_expr::SortExpr;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use futures::StreamExt;
use log::debug;
use parking_lot::Mutex;
Expand Down Expand Up @@ -165,9 +166,14 @@ impl MemTable {
let exec = MemoryExec::try_new(&data, Arc::clone(&schema), None)?;

if let Some(num_partitions) = output_partitions {
let exec = RepartitionExec::try_new(
// TODO: replaced with OnDemandRepartitionExec
// let exec = RepartitionExec::try_new(
// Arc::new(exec),
// Partitioning::RoundRobinBatch(num_partitions),
// )?;
let exec = OnDemandRepartitionExec::try_new(
Arc::new(exec),
Partitioning::RoundRobinBatch(num_partitions),
Partitioning::OnDemand(num_partitions),
)?;

// execute and collect results
Expand Down
59 changes: 59 additions & 0 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use datafusion_physical_expr::{
};
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
use datafusion_physical_plan::ExecutionPlanProperties;

Expand Down Expand Up @@ -404,6 +405,10 @@ fn adjust_input_keys_ordering(
requirements.data.clear();
}
} else if plan.as_any().downcast_ref::<RepartitionExec>().is_some()
|| plan
.as_any()
.downcast_ref::<OnDemandRepartitionExec>()
.is_some()
|| plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
Expand Down Expand Up @@ -857,6 +862,32 @@ fn add_roundrobin_on_top(
}
}

fn add_on_demand_repartition_on_top(
input: DistributionContext,
n_target: usize,
) -> Result<DistributionContext> {
// Adding repartition is helpful:
if input.plan.output_partitioning().partition_count() < n_target {
// When there is an existing ordering, we preserve ordering
// during repartition. This will be un-done in the future
// If any of the following conditions is true
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.prefer_existing_sort`)
let partitioning = Partitioning::OnDemand(n_target);
let repartition =
OnDemandRepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
.with_preserve_order();

let new_plan = Arc::new(repartition) as _;

Ok(DistributionContext::new(new_plan, true, vec![input]))
} else {
// Partition is not helpful, we already have desired number of partitions.
Ok(input)
}
}

/// Adds a hash repartition operator:
/// - to increase parallelism, and/or
/// - to satisfy requirements of the subsequent operators.
Expand Down Expand Up @@ -1035,6 +1066,18 @@ fn replace_order_preserving_variants(
)?);
return Ok(context);
}
} else if let Some(repartition) = context
.plan
.as_any()
.downcast_ref::<OnDemandRepartitionExec>()
{
if repartition.preserve_order() {
context.plan = Arc::new(OnDemandRepartitionExec::try_new(
Arc::clone(&context.children[0].plan),
repartition.partitioning().clone(),
)?);
return Ok(context);
}
}

context.update_plan_from_children()
Expand Down Expand Up @@ -1156,6 +1199,7 @@ fn ensure_distribution(
let target_partitions = config.execution.target_partitions;
// When `false`, round robin repartition will not be added to increase parallelism
let enable_round_robin = config.optimizer.enable_round_robin_repartition;
let enable_on_demand_repartition = config.optimizer.enable_on_demand_repartition;
let repartition_file_scans = config.optimizer.repartition_file_scans;
let batch_size = config.execution.batch_size;
let should_use_estimates = config
Expand Down Expand Up @@ -1251,13 +1295,21 @@ fn ensure_distribution(
child =
add_hash_on_top(child, exprs.to_vec(), target_partitions)?;
}
if enable_on_demand_repartition {
child =
add_on_demand_repartition_on_top(child, target_partitions)?;
}
}
Distribution::UnspecifiedDistribution => {
if add_roundrobin {
// Add round-robin repartitioning on top of the operator
// to increase parallelism.
child = add_roundrobin_on_top(child, target_partitions)?;
}
if enable_on_demand_repartition {
child =
add_on_demand_repartition_on_top(child, target_partitions)?;
}
}
};

Expand Down Expand Up @@ -1365,6 +1417,13 @@ fn update_children(mut dist_context: DistributionContext) -> Result<Distribution
repartition.partitioning(),
Partitioning::UnknownPartitioning(_)
)
} else if let Some(repartition) =
child_plan_any.downcast_ref::<OnDemandRepartitionExec>()
{
!matches!(
repartition.partitioning(),
Partitioning::UnknownPartitioning(_)
)
} else {
child_plan_any.is::<SortPreservingMergeExec>()
|| child_plan_any.is::<CoalescePartitionsExec>()
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,11 @@ fn remove_bottleneck_in_subplan(
if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() {
can_remove |= *n_out == input_partitioning.partition_count();
}
// TODO: replaced with OnDemand
if let Partitioning::OnDemand(n_out) = repartition.partitioning() {
can_remove |= *n_out == input_partitioning.partition_count();
}

if can_remove {
new_reqs = new_reqs.children.swap_remove(0)
}
Expand Down
12 changes: 10 additions & 2 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,20 @@ pub fn global_limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan
}

pub fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap())
// TODO: replace with OnDemand
Arc::new(RepartitionExec::try_new(input, Partitioning::OnDemand(10)).unwrap())
// Arc::new(RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap())
}

pub fn spr_repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
// TODO: replaced with OnDemand
// Arc::new(
// RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10))
// .unwrap()
// .with_preserve_order(),
// )
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10))
RepartitionExec::try_new(input, Partitioning::OnDemand(10))
.unwrap()
.with_preserve_order(),
)
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,8 +788,10 @@ impl DefaultPhysicalPlanner {
let physical_input = children.one()?;
let input_dfschema = input.as_ref().schema();
let physical_partitioning = match partitioning_scheme {
LogicalPartitioning::RoundRobinBatch(n) => {
Partitioning::RoundRobinBatch(*n)
LogicalPartitioning::RoundRobinBatch(n)
| LogicalPartitioning::OnDemand(n) => {
// TODO: replaced by OnDemand
Partitioning::OnDemand(*n)
}
LogicalPartitioning::Hash(expr, n) => {
let runtime_expr = expr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod sp_repartition_fuzz_tests {
expressions::{col, Column},
ConstExpr, PhysicalExpr, PhysicalSortExpr,
};
use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec;
use test_utils::add_empty_batches;

use datafusion_physical_expr_common::sort_expr::LexOrdering;
Expand Down Expand Up @@ -390,8 +391,14 @@ mod sp_repartition_fuzz_tests {
fn sort_preserving_repartition_exec_round_robin(
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
// TODO: replaced with OnDemandRepartitionExec
// Arc::new(
// RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2))
// .unwrap()
// .with_preserve_order(),
// )
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2))
OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(2))
.unwrap()
.with_preserve_order(),
)
Expand All @@ -400,8 +407,12 @@ mod sp_repartition_fuzz_tests {
fn repartition_exec_round_robin(
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
// TODO: replaced with OnDemandRepartitionExec
// Arc::new(
// RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2)).unwrap(),
// )
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2)).unwrap(),
OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(2)).unwrap(),
)
}

Expand Down
7 changes: 7 additions & 0 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,13 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"Partition Count": n
})
}
Partitioning::OnDemand(n) => {
json!({
"Node Type": "Repartition",
"Partitioning Scheme": "OnDemand",
"Partition Count": n
})
}
Partitioning::Hash(expr, n) => {
let hash_expr: Vec<String> =
expr.iter().map(|e| format!("{e}")).collect();
Expand Down
12 changes: 12 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,14 @@ impl LogicalPlan {
input: Arc::new(input),
}))
}
Partitioning::OnDemand(n) => {
self.assert_no_expressions(expr)?;
let input = self.only_input(inputs)?;
Ok(LogicalPlan::Repartition(Repartition {
partitioning_scheme: Partitioning::OnDemand(*n),
input: Arc::new(input),
}))
}
Partitioning::Hash(_, n) => {
let input = self.only_input(inputs)?;
Ok(LogicalPlan::Repartition(Repartition {
Expand Down Expand Up @@ -1913,6 +1921,9 @@ impl LogicalPlan {
Partitioning::RoundRobinBatch(n) => {
write!(f, "Repartition: RoundRobinBatch partition_count={n}")
}
Partitioning::OnDemand(n) => {
write!(f, "Repartition: OnDemand partition_count={n}")
}
Partitioning::Hash(expr, n) => {
let hash_expr: Vec<String> =
expr.iter().map(|e| format!("{e}")).collect();
Expand Down Expand Up @@ -3391,6 +3402,7 @@ pub enum Partitioning {
Hash(Vec<Expr>, usize),
/// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
DistributeBy(Vec<Expr>),
OnDemand(usize),
}

/// Represent the unnesting operation on a list column, such as the recursion depth and
Expand Down
8 changes: 6 additions & 2 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,9 @@ impl LogicalPlan {
Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
expr.apply_elements(f)
}
Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
Partitioning::RoundRobinBatch(_) | Partitioning::OnDemand(_) => {
Ok(TreeNodeRecursion::Continue)
}
},
LogicalPlan::Window(Window { window_expr, .. }) => {
window_expr.apply_elements(f)
Expand Down Expand Up @@ -527,7 +529,9 @@ impl LogicalPlan {
Partitioning::DistributeBy(expr) => expr
.map_elements(f)?
.update_data(Partitioning::DistributeBy),
Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
Partitioning::RoundRobinBatch(_) | Partitioning::OnDemand(_) => {
Transformed::no(partitioning_scheme)
}
}
.update_data(|partitioning_scheme| {
LogicalPlan::Repartition(Repartition {
Expand Down
5 changes: 4 additions & 1 deletion datafusion/physical-expr/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ pub enum Partitioning {
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
/// Unknown partitioning scheme with a known number of partitions
UnknownPartitioning(usize),
/// On Demand partitioning, where the partitioning is determined at runtime
OnDemand(usize),
}

impl Display for Partitioning {
Expand All @@ -136,6 +138,7 @@ impl Display for Partitioning {
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
Partitioning::OnDemand(size) => write!(f, "OnDemand({size})"),
}
}
}
Expand All @@ -144,7 +147,7 @@ impl Partitioning {
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) | OnDemand(n) => *n,
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ indexmap = { workspace = true }
itertools = { workspace = true, features = ["use_std"] }
log = { workspace = true }
once_cell = "1.18.0"
async-channel = "2.3.1"
parking_lot = { workspace = true }
pin-project-lite = "^0.2.7"
rand = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Execution plan for reading in-memory batches of data

use log::debug;
use parking_lot::RwLock;
use std::any::Any;
use std::fmt;
Expand Down Expand Up @@ -338,6 +339,8 @@ impl Stream for MemoryStream {
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
// TODO: To be removed
debug!("Memory Stream poll");
Poll::Ready(if self.index < self.data.len() {
self.index += 1;
let batch = &self.data[self.index - 1];
Expand All @@ -350,6 +353,8 @@ impl Stream for MemoryStream {

Some(Ok(batch))
} else {
// TODO: To be removed
debug!("Memory stream exhausted!");
None
})
}
Expand Down
Loading
Loading