Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce (default) number of partitions option, use it in DataFusion/Ballista #683

Closed
wants to merge 5 commits into from

Conversation

Dandandan
Copy link
Contributor

@Dandandan Dandandan commented Jul 5, 2021

Which issue does this PR close?

Closes apache/datafusion-ballista#20

Rationale for this change

Currently we have concurrency=partitions which makes sense for good use of parallelism when you can load results of a single partition in memory, but is not what you want in a distributed / big data setting.

What changes are included in this PR?

This PR adds a new option to DataFusion partitions and with_partitions that is used for the default number of partitions in a Hash or RoundRobin repartitioning, or a shuffle in Ballista.

I experimented a bit with setting the number of partitions in Ballista a bit higher, but it seems the writing / reading of small partition has quite a bit of overhead and maybe picking up the tasks in the executors too (?), for a small dataset this is very slow.
I used a couple of executors too.

Are there any user-facing changes?

Added members to ExecutionConfig.

As the ExecutionConfig is public and a field partitions has been added it is technically an api change

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Jul 5, 2021
@Dandandan
Copy link
Contributor Author

FYI @andygrove

@Dandandan Dandandan changed the title Introduce (default) number of partitions option, use it in DataFusion Introduce (default) number of partitions option, use it in DataFusion/Ballista Jul 5, 2021
let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins
let config = ExecutionConfig::new()
.with_concurrency(1)
.with_partitions(4); // TODO: make it easier to configure from Ballista
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -681,6 +684,14 @@ impl ExecutionConfig {
self
}

/// Customize default number of partitions being used in repartioning
pub fn with_partitions(mut self, n: usize) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps with_default_partitions would be better?

@@ -639,6 +641,7 @@ impl Default for ExecutionConfig {
fn default() -> Self {
Self {
concurrency: num_cpus::get(),
partitions: num_cpus::get(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backwards compatibility what would you think about defaulting partitions to None and if it was not set using the value for concurrency instead? Otherwise users may have to tune two separate knobs when they may just want a big hammer "how many cores should data fusion try and keep busy"

@andygrove
Copy link
Member

I spent some more time reviewing the codebase this morning and it appears that we no longer use the concurrency config to determine concurrency as in number of threads, but only use it to determine partition counts when repartitioning, so I think we should just rename this field, rather than introduce a new config. Tokio is managing concurrency level automatically.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the comment I added. I don't think we want two configs for concurrency vs min partitions.

@andygrove
Copy link
Member

Please take a look at my proposal in #706

@alamb
Copy link
Contributor

alamb commented Aug 20, 2021

Closing as this seems to be superceded by #706 -- please reopen if that is not correct

@alamb alamb closed this Aug 20, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ballista: Fix hacks around concurrency=2 to force hash-partitioned joins
3 participants