From 3aeca0914e19b267dfebe4f396717df70d739105 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 9 May 2023 11:04:12 -0400 Subject: [PATCH] Improve parallelism of repartition operator --- .../core/src/physical_plan/repartition/mod.rs | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 67fc63d23522..32605cd977ea 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -220,6 +220,14 @@ impl BatchPartitioner { Ok(it) } + + // return the number of output partitions + fn num_partitions(&self) -> usize { + match self.state { + BatchPartitionerState::RoundRobin { num_partitions, .. } => num_partitions, + BatchPartitionerState::Hash { num_partitions, .. } => num_partitions, + } + } } /// The repartition operator maps N input partitions to M output partitions based on a @@ -502,6 +510,7 @@ impl RepartitionExec { // While there are still outputs to send to, keep // pulling inputs + let mut batches_until_yield = partitioner.num_partitions(); while !txs.is_empty() { // fetch the next batch let timer = r_metrics.fetch_time.timer(); @@ -532,9 +541,28 @@ impl RepartitionExec { timer.done(); } - // If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield. - // See https://github.com/apache/arrow-datafusion/issues/5278. - tokio::task::yield_now().await; + // If the input stream is endless, we may spin forever and + // never yield back to tokio. See + // https://github.com/apache/arrow-datafusion/issues/5278. + // + // However, yielding on every batch causes a bottleneck + // when running with multiple cores. See + // https://github.com/apache/arrow-datafusion/issues/6290 + // + // Thus, heuristically yield after producing num_partition + // batches + // + // In round robin this is ideal as each input will get a + // new batch. In hash partitioning it may yield too often + // on uneven distributions even if some partition can not + // make progress, but parallelism is going to be limited + // in that case anyways + if batches_until_yield == 0 { + tokio::task::yield_now().await; + batches_until_yield = partitioner.num_partitions(); + } else { + batches_until_yield -= 1; + } } Ok(())