Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parallelism of repartition operator with multiple cores #6310

Merged
merged 2 commits into from
May 12, 2023

Conversation

alamb
Copy link
Contributor

@alamb alamb commented May 9, 2023

Which issue does this PR close?

Close #6290

Rationale for this change

I was testing query performance for #6278 and noticed that only. a single core was being used on a query entirely in memory. When I spent some time looking into it, the plan looked correct with repartitioning but for some reason it wasn't properly repartitoning

What changes are included in this PR?

Don't yield on every batch -- yield only after we have made some decent progress (in this case at least partition_count batches)

Are these changes tested?

I manually tested this -- I will add a benchmark for it shortly

My manual test results are:

On main (Keeps only 1 core busy for most of the time)

4 rows in set. Query took 10.631 seconds.

With this PR (keeps the cores much busier)

4 rows in set. Query took 3.705 seconds.

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label May 9, 2023
@alamb
Copy link
Contributor Author

alamb commented May 9, 2023

cc @crepererum

@alamb alamb changed the title Improve parallelism of repartition operator Improve parallelism of repartition operator with multiple cores May 9, 2023
// If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield.
// See https://github.com/apache/arrow-datafusion/issues/5278.
tokio::task::yield_now().await;
// If the input stream is endless, we may spin forever and
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love some thoughts from reviewers about better heuristics here -- as the comments say I am happy with this heuristic for round robin partitioning but there may be a better way when hash partitioning (like ensure that all channels have at least one batch 🤔 )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use https://docs.rs/tokio/latest/tokio/task/fn.consume_budget.html but I'm honestly a little confused by this. "we may spin forever" would imply an issue with unbounded receivers, not an issue with the repartition operator?

TLDR I'd vote to not yield at all, I don't agree that this fixes #5278 rather just papers over it with a dubious fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the tokio executor has only a single thread and the input stream can provide data infinitely, without a yield it will buffer the entire input which seems non ideal

I agree #5278 as described seems somewhat more like "when we used blocking IO with a single tokio thread it blocked everything" -- as described on #5278 (comment)

Copy link
Contributor

@tustvold tustvold May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will buffer the entire input which seems non ideal

That seems like a bug in whatever is using unbounded buffers, which I thought we had removed the last of? Basically we shouldn't be relying on yield_now to return control, but the buffer filling up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can call it a bug or a design issue of DF / tokio. But if you run two spawned tasks and one never returns to Tokio then the other will never run. Unbounded buffers are NOT avoidable in the current DF design, because you cannot predict tokio scheduling and hash outputs. So the fix here is adequate. consume_budget would be the better solution but it's an unstable tokio feature, so that's not usable.

@alamb
Copy link
Contributor Author

alamb commented May 10, 2023

FYI @metesynnada I don't know if you have any thoughts on this approach (or the yield in general, as you reported #5278 initially)

@ozankabak
Copy link
Contributor

Hi @alamb, @metesynnada is unavailable for this month, he will be back at the end of the month. I checked this PR out and it looks better than the status quo to me.

@alamb
Copy link
Contributor Author

alamb commented May 11, 2023

I plan to leave this open for another day or two for comments and then will merge in

@alamb
Copy link
Contributor Author

alamb commented May 12, 2023

I merged up from main -- and once CI passes I plan to merge this PR

@crepererum
Copy link
Contributor

Side note: when I've added the yield statement I was wondering if this would be too much overhead, but my assumption was the the batches would hopefully be big enough so that wouldn't matter. Seems that I was wrong 😅

@alamb
Copy link
Contributor Author

alamb commented May 12, 2023

Side note: when I've added the yield statement I was wondering if this would be too much overhead, but my assumption was the the batches would hopefully be big enough so that wouldn't matter. Seems that I was wrong 😅

I think the fact that the source in this case is a MemoryExec (where all the data is already in memory and can be provided almost instantaneously hurts us)

@alamb alamb merged commit d0aadd6 into apache:main May 12, 2023
@alamb alamb deleted the alamb/more_parallelism branch May 12, 2023 15:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Datafusion not using all cores on a TPCH like query during query with repartiton
4 participants