Skip to content

Commit

Permalink
Allow FileSource-specific repartitioning (#14754)
Browse files Browse the repository at this point in the history
* FileSource specific repartitioning

* fix doc typo

* remove

* Avro doesn't support repartitioning
  • Loading branch information
AdamGS authored Feb 21, 2025
1 parent faace2c commit 9ca09cf
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 42 deletions.
34 changes: 30 additions & 4 deletions datafusion/core/src/datasource/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use crate::datasource::physical_plan::{FileOpener, FileScanConfig};

use arrow::datatypes::SchemaRef;
use datafusion_common::Statistics;
use datafusion_datasource::file_groups::FileGroupPartitioner;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;

Expand Down Expand Up @@ -62,9 +64,33 @@ pub trait FileSource: Send + Sync {
fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
Ok(())
}
/// Return true if the file format supports repartition

/// If supported by the [`FileSource`], redistribute files across partitions according to their size.
/// Allows custom file formats to implement their own repartitioning logic.
///
/// If this returns true, the DataSourceExec may repartition the data
/// by breaking up the input files into multiple smaller groups.
fn supports_repartition(&self, config: &FileScanConfig) -> bool;
/// Provides a default repartitioning behavior, see comments on [`FileGroupPartitioner`] for more detail.
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> datafusion_common::Result<Option<FileScanConfig>> {
if config.file_compression_type.is_compressed() || config.new_lines_in_values {
return Ok(None);
}

let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(output_ordering.is_some())
.repartition_file_groups(&config.file_groups);

if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut source = config.clone();
source.file_groups = repartitioned_file_groups;
return Ok(Some(source));
}
Ok(None)
}
}
4 changes: 0 additions & 4 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,6 @@ impl FileSource for ArrowSource {
fn file_type(&self) -> &str {
"arrow"
}

fn supports_repartition(&self, config: &FileScanConfig) -> bool {
!(config.file_compression_type.is_compressed() || config.new_lines_in_values)
}
}

/// The struct arrow that implements `[FileOpener]` trait
Expand Down
13 changes: 9 additions & 4 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,15 @@ impl FileSource for AvroSource {
fn file_type(&self) -> &str {
"avro"
}
fn supports_repartition(&self, config: &FileScanConfig) -> bool {
!(config.file_compression_type.is_compressed()
|| config.new_lines_in_values
|| self.as_any().downcast_ref::<AvroSource>().is_some())

fn repartitioned(
&self,
_target_partitions: usize,
_repartition_file_min_size: usize,
_output_ordering: Option<LexOrdering>,
_config: &FileScanConfig,
) -> Result<Option<FileScanConfig>> {
Ok(None)
}
}

Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,6 @@ impl FileSource for CsvSource {
fn fmt_extra(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, ", has_header={}", self.has_header)
}
fn supports_repartition(&self, config: &FileScanConfig) -> bool {
!(config.file_compression_type.is_compressed() || config.new_lines_in_values)
}
}

impl FileOpener for CsvOpener {
Expand Down
31 changes: 11 additions & 20 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
//! file sources.
use super::{
get_projected_output_ordering, statistics::MinMaxStatistics, FileGroupPartitioner,
FileGroupsDisplay, FileStream,
get_projected_output_ordering, statistics::MinMaxStatistics, FileGroupsDisplay,
FileStream,
};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl};
Expand Down Expand Up @@ -203,30 +203,21 @@ impl DataSource for FileScanConfig {
self.fmt_file_source(t, f)
}

/// Redistribute files across partitions according to their size
/// See comments on [`FileGroupPartitioner`] for more detail.
/// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>> {
if !self.source.supports_repartition(self) {
return Ok(None);
}

let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(output_ordering.is_some())
.repartition_file_groups(&self.file_groups);

if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut source = self.clone();
source.file_groups = repartitioned_file_groups;
return Ok(Some(Arc::new(source)));
}
Ok(None)
let source = self.source.repartitioned(
target_partitions,
repartition_file_min_size,
output_ordering,
self,
)?;

Ok(source.map(|s| Arc::new(s) as _))
}

fn output_partitioning(&self) -> Partitioning {
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,6 @@ impl FileSource for JsonSource {
fn file_type(&self) -> &str {
"json"
}

fn supports_repartition(&self, config: &FileScanConfig) -> bool {
!(config.file_compression_type.is_compressed() || config.new_lines_in_values)
}
}

impl FileOpener for JsonOpener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,4 @@ impl FileSource for ParquetSource {
}
}
}
fn supports_repartition(&self, _config: &FileScanConfig) -> bool {
true
}
}

0 comments on commit 9ca09cf

Please sign in to comment.