Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change representation of partition in FileScanConfig #4295

Open
Cheappie opened this issue Nov 20, 2022 · 3 comments
Open

Change representation of partition in FileScanConfig #4295

Cheappie opened this issue Nov 20, 2022 · 3 comments
Labels
enhancement New feature or request

Comments

@Cheappie
Copy link
Contributor

Cheappie commented Nov 20, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Please correct me if I am wrong, but from what I understand each partition from FileScanConfig (file_group) is executed sequentially. That means if there is large disproportion of work that needs to be done (e.g. part A 10 files 10MB, part B 10 files 10GB), then query will take as long as largest partition requires to get done.

Describe the solution you'd like
I would like implement work stealing by e.g. sharing emitter of PartitionedFile among FileStream's.

Possible implementations:

  • Migrate FileScanConfig from { file_groups: Vec<Vec<PartitionedFile>> } -> { file_groups: Vec<Box<dyn Partition>> }, that way we keep existing interface very similar to what we have now. I would be able to make n virtual partitions that internally point to single partition.
  • Alternatively migrate FileScanConfig from { file_groups: Vec<Vec<PartitionedFile>> } -> queue/stream of files that can be shared among n workers (FileStream's, heads up naming collision)
@Cheappie Cheappie added the enhancement New feature or request label Nov 20, 2022
@Cheappie Cheappie changed the title Change representation of partition in datafusion Change representation of partition in FileScanConfig Nov 20, 2022
@tustvold
Copy link
Contributor

There is likely some overlap with #2293

I personally don't think we should have the concept of a partition at all, and should instead have a smarter work scheduler, but I haven't been able to work on that recently

@Cheappie
Copy link
Contributor Author

Cheappie commented Nov 25, 2022

I personally don't think we should have the concept of a partition at all, and should instead have a smarter work scheduler, but I haven't been able to work on that recently

Yep, having partitions seem to be a limiting factor right now.

There are two things on my plate right now:

  1. I would like to ensure that input data is well balanced among workers.
  2. Implementing prefetcher.

In both of these points replacing somehow partitions with single queue would be helpful for me. But I understand that It might not be a priority or good enough solution for the project right now. Anyway the concept of partition seems to sit pretty deep in codebase, I saw that It is passed through hierarchy of ExecutionPlan's execute(...).

I wonder what kind of scheduler do you have in mind ?

  • Any changes in regards to existing pull model ?
  • Will scheduler contain a DAG that would replace hierarchy based on children() from ExecutionPlan ?
  • How morsel paralellism will be implemented in DataFusion ? I wonder how fairness of sharing resources would be approached, because from what I have heard HyperDB processes single query at the time, that achieves ideal fairness with morsels. In concurrent systems queries from various users won't create equal morsels, e.g. one user might select more columns in projection. Different operators in queries will have different cost. In my opinion It would be interesting to create morsels by splitting dataset into fixed size chunks (e.g. 1 MB RecordBatch) instead of number of tuples as it was done in paper.
  • Rayon should deal pretty well with work stealing, but is It sufficient to tackle fair resources sharing (e.g. CPU) ? Do you plan to rely on OS to time slice cpu or follow approach taken in morsel driven parallelism paper with pinning cores and managing them ?

@tustvold
Copy link
Contributor

Anyway the concept of partition seems to sit pretty deep in codebase, I saw that It is passed through hierarchy of ExecutionPlan's execute(...).

The scheduler I started work on preserved the concept of partitions, but did not rely on them for work distribution, or at least wouldn't have if I had actually finished it 😅

Any changes in regards to existing pull model

Yes, the hope was to gradually change to a push model for operators where it is possible

Will scheduler contain a DAG that would replace hierarchy based on children() from ExecutionPlan

See https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/scheduler/pipeline/mod.rs#L27

I wonder how fairness of sharing resources would be approached, because from what I have heard HyperDB processes single query at the time, that achieves ideal fairness with morsels

IMO fairness is better handled at a higher level, e.g. with separate query pools or even separate query processes. The scheduler should focus on throughput at the expense of fairness, if nothing else fairly multiplexing queries is a recipe to blow your memory budget.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants