Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow FileSource-specific repartitioning #14754

Merged
merged 5 commits into from
Feb 21, 2025

Conversation

AdamGS
Copy link
Contributor

@AdamGS AdamGS commented Feb 18, 2025

Which issue does this PR close?

I also want to suggest some changes around FileGroupPartitioner/PartitionedFile but I need to do some more thinking there and this change is useful without them IMO.

Rationale for this change

Allows specific file formats to define their own file repartitioning.

What changes are included in this PR?

Move the logic in FileScanConfig::repartitioned down to the FileSource trait as the default interface, allowing external FileSource implementation to control their repartitioning logic.

Are these changes tested?

Are there any user-facing changes?

Introduces a new function with a default implementation to a trait.

@github-actions github-actions bot added the core Core DataFusion crate label Feb 18, 2025
@AdamGS
Copy link
Contributor Author

AdamGS commented Feb 19, 2025

The CI failure doesn't seem related to the change

@mertak-synnada
Copy link
Contributor

While refactoring old sources into the DataSourceExec, I intended to centralize this file repartition logic for all file types but didn't consider a custom file type in that case.
This looks like a better approach for extensibility 👍 Documenting why the function is in FileSource may also help future devs. Thanks 🎉

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @AdamGS -- this looks great to me

Do you think we can remove supports_repartition as well in this PR?

I agree with @mertak-synnada that this is a nice API improvement

@@ -67,4 +69,33 @@ pub trait FileSource: Send + Sync {
/// If this returns true, the DataSourceExec may repartition the data
/// by breaking up the input files into multiple smaller groups.
fn supports_repartition(&self, config: &FileScanConfig) -> bool;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we could remove supports_repartition as well:

  1. This API hasn't yet been released so it wouldn't be a breaking change
  2. It would ensure that all places in the code that need to check repartition use a single API (and thus are consistently done)

Copy link
Contributor Author

@AdamGS AdamGS Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah absolutely, give me a second to pull it up and take a look

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the PR here: #14123

output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> datafusion_common::Result<Option<FileScanConfig>> {
if config.file_compression_type.is_compressed() || config.new_lines_in_values {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the first one just makes sense to me as it makes it impossible (or at least very hard) to map byte ranges to actual offset in the uncompressed file. The second one is CSV only but as long as its there I think its fine to respect it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree in general the new_lines_in_values is pretty CSV specific, but I also think this code is backwards compatible and can be overridden by sub classes now. We can revise the default value in the future if needed

@xudong963 xudong963 mentioned this pull request Feb 19, 2025
20 tasks
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> datafusion_common::Result<Option<FileScanConfig>> {
if config.file_compression_type.is_compressed() || config.new_lines_in_values {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree in general the new_lines_in_values is pretty CSV specific, but I also think this code is backwards compatible and can be overridden by sub classes now. We can revise the default value in the future if needed

|| config.new_lines_in_values
|| self.as_any().downcast_ref::<AvroSource>().is_some())

fn repartitioned(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so much nicer ❤️

@alamb
Copy link
Contributor

alamb commented Feb 21, 2025

THanks again @AdamGS and @xudong963

@alamb alamb merged commit 9ca09cf into apache:main Feb 21, 2025
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Logically repartition files by row splits
4 participants