Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parallelism of repartition operator with multiple cores #6310

Merged
merged 2 commits into from
May 12, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ impl BatchPartitioner {

Ok(it)
}

// return the number of output partitions
fn num_partitions(&self) -> usize {
match self.state {
BatchPartitionerState::RoundRobin { num_partitions, .. } => num_partitions,
BatchPartitionerState::Hash { num_partitions, .. } => num_partitions,
}
}
}

/// The repartition operator maps N input partitions to M output partitions based on a
Expand Down Expand Up @@ -502,6 +510,7 @@ impl RepartitionExec {

// While there are still outputs to send to, keep
// pulling inputs
let mut batches_until_yield = partitioner.num_partitions();
while !txs.is_empty() {
// fetch the next batch
let timer = r_metrics.fetch_time.timer();
Expand Down Expand Up @@ -532,9 +541,28 @@ impl RepartitionExec {
timer.done();
}

// If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield.
// See https://github.com/apache/arrow-datafusion/issues/5278.
tokio::task::yield_now().await;
// If the input stream is endless, we may spin forever and
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love some thoughts from reviewers about better heuristics here -- as the comments say I am happy with this heuristic for round robin partitioning but there may be a better way when hash partitioning (like ensure that all channels have at least one batch 🤔 )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use https://docs.rs/tokio/latest/tokio/task/fn.consume_budget.html but I'm honestly a little confused by this. "we may spin forever" would imply an issue with unbounded receivers, not an issue with the repartition operator?

TLDR I'd vote to not yield at all, I don't agree that this fixes #5278 rather just papers over it with a dubious fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the tokio executor has only a single thread and the input stream can provide data infinitely, without a yield it will buffer the entire input which seems non ideal

I agree #5278 as described seems somewhat more like "when we used blocking IO with a single tokio thread it blocked everything" -- as described on #5278 (comment)

Copy link
Contributor

@tustvold tustvold May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will buffer the entire input which seems non ideal

That seems like a bug in whatever is using unbounded buffers, which I thought we had removed the last of? Basically we shouldn't be relying on yield_now to return control, but the buffer filling up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can call it a bug or a design issue of DF / tokio. But if you run two spawned tasks and one never returns to Tokio then the other will never run. Unbounded buffers are NOT avoidable in the current DF design, because you cannot predict tokio scheduling and hash outputs. So the fix here is adequate. consume_budget would be the better solution but it's an unstable tokio feature, so that's not usable.

// never yield back to tokio. See
// https://github.com/apache/arrow-datafusion/issues/5278.
//
// However, yielding on every batch causes a bottleneck
// when running with multiple cores. See
// https://github.com/apache/arrow-datafusion/issues/6290
//
// Thus, heuristically yield after producing num_partition
// batches
//
// In round robin this is ideal as each input will get a
// new batch. In hash partitioning it may yield too often
// on uneven distributions even if some partition can not
// make progress, but parallelism is going to be limited
// in that case anyways
if batches_until_yield == 0 {
tokio::task::yield_now().await;
batches_until_yield = partitioner.num_partitions();
} else {
batches_until_yield -= 1;
}
}

Ok(())
Expand Down