Skip to content

Commit

Permalink
Simplify FileSource::create_file_opener's signature (#14798)
Browse files Browse the repository at this point in the history
* simplify fn signature

* .
  • Loading branch information
AdamGS authored Feb 23, 2025
1 parent 0bd9083 commit 1c54b38
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 41 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn csv_opener() -> Result<()> {
.with_batch_size(8192)
.with_projection(&scan_config);

let opener = config.create_file_opener(Ok(object_store), &scan_config, 0)?;
let opener = config.create_file_opener(object_store, &scan_config, 0);

let mut result = vec![];
let mut stream =
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ pub trait FileSource: Send + Sync {
/// Creates a `dyn FileOpener` based on given parameters
fn create_file_opener(
&self,
object_store: datafusion_common::Result<Arc<dyn ObjectStore>>,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> datafusion_common::Result<Arc<dyn FileOpener>>;
) -> Arc<dyn FileOpener>;
/// Any
fn as_any(&self) -> &dyn Any;
/// Initialize new type with batch size configuration
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ pub struct ArrowSource {
impl FileSource for ArrowSource {
fn create_file_opener(
&self,
object_store: Result<Arc<dyn ObjectStore>>,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
Ok(Arc::new(ArrowOpener {
object_store: object_store?,
) -> Arc<dyn FileOpener> {
Arc::new(ArrowOpener {
object_store,
projection: base_config.file_column_projection_indices(),
}))
})
}

fn as_any(&self) -> &dyn Any {
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,23 +194,23 @@ impl FileSource for AvroSource {
#[cfg(feature = "avro")]
fn create_file_opener(
&self,
object_store: Result<Arc<dyn ObjectStore>>,
object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
Ok(Arc::new(private::AvroOpener {
) -> Arc<dyn FileOpener> {
Arc::new(private::AvroOpener {
config: Arc::new(self.clone()),
object_store: object_store?,
}))
object_store,
})
}

#[cfg(not(feature = "avro"))]
fn create_file_opener(
&self,
_object_store: Result<Arc<dyn ObjectStore>>,
_object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
) -> Arc<dyn FileOpener> {
panic!("Avro feature is not enabled in this build")
}

Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,15 +564,15 @@ impl CsvOpener {
impl FileSource for CsvSource {
fn create_file_opener(
&self,
object_store: Result<Arc<dyn ObjectStore>>,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
Ok(Arc::new(CsvOpener {
) -> Arc<dyn FileOpener> {
Arc::new(CsvOpener {
config: Arc::new(self.clone()),
file_compression_type: base_config.file_compression_type,
object_store: object_store?,
}))
object_store,
})
}

fn as_any(&self) -> &dyn Any {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ impl DataSource for FileScanConfig {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let object_store = context.runtime_env().object_store(&self.object_store_url);
let object_store = context.runtime_env().object_store(&self.object_store_url)?;

let source = self
.source
.with_batch_size(context.session_config().batch_size())
.with_schema(Arc::clone(&self.file_schema))
.with_projection(self);

let opener = source.create_file_opener(object_store, self, partition)?;
let opener = source.create_file_opener(object_store, self, partition);

let stream = FileStream::new(self, partition, opener, source.metrics())?;
Ok(Box::pin(stream))
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,18 @@ impl JsonSource {
impl FileSource for JsonSource {
fn create_file_opener(
&self,
object_store: Result<Arc<dyn ObjectStore>>,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
Ok(Arc::new(JsonOpener {
) -> Arc<dyn FileOpener> {
Arc::new(JsonOpener {
batch_size: self
.batch_size
.expect("Batch size must set before creating opener"),
projected_schema: base_config.projected_file_schema(),
file_compression_type: base_config.file_compression_type,
object_store: object_store?,
}))
object_store,
})
}

fn as_any(&self) -> &dyn Any {
Expand Down
23 changes: 9 additions & 14 deletions datafusion/core/src/datasource/physical_plan/parquet/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,10 @@ impl ParquetSource {
impl FileSource for ParquetSource {
fn create_file_opener(
&self,
object_store: datafusion_common::Result<Arc<dyn ObjectStore>>,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
) -> Arc<dyn FileOpener> {
let projection = base_config
.file_column_projection_indices()
.unwrap_or_else(|| (0..base_config.file_schema.fields().len()).collect());
Expand All @@ -475,17 +475,12 @@ impl FileSource for ParquetSource {
.clone()
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));

let parquet_file_reader_factory = self
.parquet_file_reader_factory
.as_ref()
.map(|f| Ok(Arc::clone(f)))
.unwrap_or_else(|| {
object_store.map(|store| {
Arc::new(DefaultParquetFileReaderFactory::new(store)) as _
})
})?;

Ok(Arc::new(ParquetOpener {
let parquet_file_reader_factory =
self.parquet_file_reader_factory.clone().unwrap_or_else(|| {
Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _
});

Arc::new(ParquetOpener {
partition_index: partition,
projection: Arc::from(projection),
batch_size: self
Expand All @@ -504,7 +499,7 @@ impl FileSource for ParquetSource {
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
schema_adapter_factory,
}))
})
}

fn as_any(&self) -> &dyn Any {
Expand Down

0 comments on commit 1c54b38

Please sign in to comment.