Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow FileSource-specific repartitioning #14754

Merged
merged 5 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions datafusion/core/src/datasource/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use crate::datasource::physical_plan::{FileOpener, FileScanConfig};

use arrow::datatypes::SchemaRef;
use datafusion_common::Statistics;
use datafusion_datasource::file_groups::FileGroupPartitioner;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;

Expand Down Expand Up @@ -62,9 +64,33 @@ pub trait FileSource: Send + Sync {
fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
Ok(())
}
/// Return true if the file format supports repartition

/// If supported by the [`FileSource`], redistribute files across partitions according to their size.
/// Allows custom file formats to implement their own repartitioning logic.
///
/// If this returns true, the DataSourceExec may repartition the data
/// by breaking up the input files into multiple smaller groups.
fn supports_repartition(&self, config: &FileScanConfig) -> bool;
/// Provides a default repartitioning behavior, see comments on [`FileGroupPartitioner`] for more detail.
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> datafusion_common::Result<Option<FileScanConfig>> {
if config.file_compression_type.is_compressed() || config.new_lines_in_values {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the first one just makes sense to me as it makes it impossible (or at least very hard) to map byte ranges to actual offset in the uncompressed file. The second one is CSV only but as long as its there I think its fine to respect it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree in general the new_lines_in_values is pretty CSV specific, but I also think this code is backwards compatible and can be overridden by sub classes now. We can revise the default value in the future if needed

return Ok(None);
}

let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(output_ordering.is_some())
.repartition_file_groups(&config.file_groups);

if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut source = config.clone();
source.file_groups = repartitioned_file_groups;
return Ok(Some(source));
}
Ok(None)
}
}
4 changes: 0 additions & 4 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,6 @@ impl FileSource for ArrowSource {
fn file_type(&self) -> &str {
"arrow"
}

fn supports_repartition(&self, config: &FileScanConfig) -> bool {
!(config.file_compression_type.is_compressed() || config.new_lines_in_values)
}
}

/// The struct arrow that implements `[FileOpener]` trait
Expand Down
13 changes: 9 additions & 4 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,15 @@ impl FileSource for AvroSource {
fn file_type(&self) -> &str {
"avro"
}
fn supports_repartition(&self, config: &FileScanConfig) -> bool {
!(config.file_compression_type.is_compressed()
|| config.new_lines_in_values
|| self.as_any().downcast_ref::<AvroSource>().is_some())

fn repartitioned(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so much nicer ❤️

&self,
_target_partitions: usize,
_repartition_file_min_size: usize,
_output_ordering: Option<LexOrdering>,
_config: &FileScanConfig,
) -> Result<Option<FileScanConfig>> {
Ok(None)
}
}

Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,6 @@ impl FileSource for CsvSource {
fn fmt_extra(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, ", has_header={}", self.has_header)
}
fn supports_repartition(&self, config: &FileScanConfig) -> bool {
!(config.file_compression_type.is_compressed() || config.new_lines_in_values)
}
}

impl FileOpener for CsvOpener {
Expand Down
31 changes: 11 additions & 20 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
//! file sources.

use super::{
get_projected_output_ordering, statistics::MinMaxStatistics, FileGroupPartitioner,
FileGroupsDisplay, FileStream,
get_projected_output_ordering, statistics::MinMaxStatistics, FileGroupsDisplay,
FileStream,
};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl};
Expand Down Expand Up @@ -203,30 +203,21 @@ impl DataSource for FileScanConfig {
self.fmt_file_source(t, f)
}

/// Redistribute files across partitions according to their size
/// See comments on [`FileGroupPartitioner`] for more detail.
/// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>> {
if !self.source.supports_repartition(self) {
return Ok(None);
}

let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(output_ordering.is_some())
.repartition_file_groups(&self.file_groups);

if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut source = self.clone();
source.file_groups = repartitioned_file_groups;
return Ok(Some(Arc::new(source)));
}
Ok(None)
let source = self.source.repartitioned(
target_partitions,
repartition_file_min_size,
output_ordering,
self,
)?;

Ok(source.map(|s| Arc::new(s) as _))
}

fn output_partitioning(&self) -> Partitioning {
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,6 @@ impl FileSource for JsonSource {
fn file_type(&self) -> &str {
"json"
}

fn supports_repartition(&self, config: &FileScanConfig) -> bool {
!(config.file_compression_type.is_compressed() || config.new_lines_in_values)
}
}

impl FileOpener for JsonOpener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,4 @@ impl FileSource for ParquetSource {
}
}
}
fn supports_repartition(&self, _config: &FileScanConfig) -> bool {
true
}
}