From 04291e1b1d41ce8e365cbe3514f6c02289318e63 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 26 Jan 2022 15:47:24 +0800 Subject: [PATCH 1/3] fix can not load parquet table form spark --- benchmarks/src/bin/tpch.rs | 6 ++-- .../examples/parquet_sql_multiple_files.rs | 4 +-- datafusion/src/datasource/file_format/avro.rs | 2 ++ datafusion/src/datasource/file_format/csv.rs | 2 ++ datafusion/src/datasource/file_format/json.rs | 2 ++ datafusion/src/datasource/listing/table.rs | 6 ++-- datafusion/src/execution/context.rs | 31 ++++++++++--------- datafusion/src/execution/options.rs | 3 +- 8 files changed, 35 insertions(+), 21 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 0b9fba52140b..33694f4d4b1c 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -55,6 +55,8 @@ use datafusion::{ }; use structopt::StructOpt; +use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; +use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; #[cfg(feature = "snmalloc")] #[global_allocator] @@ -652,13 +654,13 @@ fn get_table( .with_delimiter(b',') .with_has_header(true); - (Arc::new(format), path, ".csv") + (Arc::new(format), path, DEFAULT_CSV_EXTENSION) } "parquet" => { let path = format!("{}/{}", path, table); let format = ParquetFormat::default().with_enable_pruning(true); - (Arc::new(format), path, ".parquet") + (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION) } other => { unimplemented!("Invalid file format '{}'", other); diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs index 2e954276083e..caf02a31c38a 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::file_format::parquet::{DEFAULT_PARQUET_EXTENSION, ParquetFormat}; use datafusion::datasource::listing::ListingOptions; use datafusion::error::Result; use datafusion::prelude::*; @@ -33,7 +33,7 @@ async fn main() -> Result<()> { // Configure listing options let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions { - file_extension: ".parquet".to_owned(), + file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), format: Arc::new(file_format), table_partition_cols: vec![], collect_stat: true, diff --git a/datafusion/src/datasource/file_format/avro.rs b/datafusion/src/datasource/file_format/avro.rs index 08eb34386fb2..fa02d1ae2833 100644 --- a/datafusion/src/datasource/file_format/avro.rs +++ b/datafusion/src/datasource/file_format/avro.rs @@ -34,6 +34,8 @@ use crate::physical_plan::file_format::{AvroExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; +/// The default file extension of avro files +pub const DEFAULT_AVRO_EXTENSION: &str = ".avro"; /// Avro `FileFormat` implementation. #[derive(Default, Debug)] pub struct AvroFormat; diff --git a/datafusion/src/datasource/file_format/csv.rs b/datafusion/src/datasource/file_format/csv.rs index f0a70d9176db..6aa0d21235a4 100644 --- a/datafusion/src/datasource/file_format/csv.rs +++ b/datafusion/src/datasource/file_format/csv.rs @@ -33,6 +33,8 @@ use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; +/// The default file extension of csv files +pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; /// Character Separated Value `FileFormat` implementation. #[derive(Debug)] pub struct CsvFormat { diff --git a/datafusion/src/datasource/file_format/json.rs b/datafusion/src/datasource/file_format/json.rs index d7a278d72a6e..bdd5ef81d559 100644 --- a/datafusion/src/datasource/file_format/json.rs +++ b/datafusion/src/datasource/file_format/json.rs @@ -37,6 +37,8 @@ use crate::physical_plan::file_format::NdJsonExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; +/// The default file extension of json files +pub const DEFAULT_JSON_EXTENSION: &str = ".json"; /// New line delimited JSON `FileFormat` implementation. #[derive(Debug, Default)] pub struct JsonFormat { diff --git a/datafusion/src/datasource/listing/table.rs b/datafusion/src/datasource/listing/table.rs index 2f8f70f5ede5..e406959b21f5 100644 --- a/datafusion/src/datasource/listing/table.rs +++ b/datafusion/src/datasource/listing/table.rs @@ -274,6 +274,8 @@ mod tests { logical_plan::{col, lit}, test::{columns, object_store::TestObjectStore}, }; + use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; + use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; use super::*; @@ -318,7 +320,7 @@ mod tests { let store = TestObjectStore::new_arc(&[("table/p1=v1/file.avro", 100)]); let opt = ListingOptions { - file_extension: ".avro".to_owned(), + file_extension: DEFAULT_AVRO_EXTENSION.to_owned(), format: Arc::new(AvroFormat {}), table_partition_cols: vec![String::from("p1")], target_partitions: 4, @@ -419,7 +421,7 @@ mod tests { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, name); let opt = ListingOptions { - file_extension: "parquet".to_owned(), + file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), format: Arc::new(ParquetFormat::default()), table_partition_cols: vec![], target_partitions: 2, diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index ceea83d952e0..0841fea33062 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -24,8 +24,8 @@ use crate::{ datasource::listing::{ListingOptions, ListingTable}, datasource::{ file_format::{ - avro::AvroFormat, - csv::CsvFormat, + avro::{AvroFormat, DEFAULT_AVRO_EXTENSION}, + csv::{CsvFormat, DEFAULT_CSV_EXTENSION}, parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, FileFormat, }, @@ -209,17 +209,20 @@ impl ExecutionContext { ref file_type, ref has_header, }) => { - let file_format = match file_type { - FileType::CSV => { - Ok(Arc::new(CsvFormat::default().with_has_header(*has_header)) - as Arc) - } - FileType::Parquet => { - Ok(Arc::new(ParquetFormat::default()) as Arc) - } - FileType::Avro => { - Ok(Arc::new(AvroFormat::default()) as Arc) - } + let (file_format, file_extension) = match file_type { + FileType::CSV => Ok(( + Arc::new(CsvFormat::default().with_has_header(*has_header)) + as Arc, + DEFAULT_CSV_EXTENSION, + )), + FileType::Parquet => Ok(( + Arc::new(ParquetFormat::default()) as Arc, + DEFAULT_PARQUET_EXTENSION, + )), + FileType::Avro => Ok(( + Arc::new(AvroFormat::default()) as Arc, + DEFAULT_AVRO_EXTENSION, + )), _ => Err(DataFusionError::NotImplemented(format!( "Unsupported file type {:?}.", file_type @@ -229,7 +232,7 @@ impl ExecutionContext { let options = ListingOptions { format: file_format, collect_stat: false, - file_extension: String::new(), + file_extension: file_extension.to_owned(), target_partitions: self .state .lock() diff --git a/datafusion/src/execution/options.rs b/datafusion/src/execution/options.rs index 219e2fd89700..7ad3ccc99040 100644 --- a/datafusion/src/execution/options.rs +++ b/datafusion/src/execution/options.rs @@ -25,6 +25,7 @@ use crate::datasource::{ file_format::{avro::AvroFormat, csv::CsvFormat}, listing::ListingOptions, }; +use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; /// CSV file read option #[derive(Copy, Clone)] @@ -173,7 +174,7 @@ impl<'a> Default for NdJsonReadOptions<'a> { Self { schema: None, schema_infer_max_records: 1000, - file_extension: ".json", + file_extension: DEFAULT_JSON_EXTENSION, } } } From 8a9cb7741e50dd5c9a74471f091bddc046d0b169 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 26 Jan 2022 16:19:26 +0800 Subject: [PATCH 2/3] add Invalid file in log. --- datafusion/src/physical_plan/file_format/parquet.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index d240fe27c58a..905bb1e28f9a 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -221,7 +221,7 @@ impl ExecutionPlan for ParquetExec { object_store.as_ref(), file_schema_ref, partition_index, - partition, + &partition, metrics, &projection, &pruning_predicate, @@ -230,7 +230,10 @@ impl ExecutionPlan for ParquetExec { limit, partition_col_proj, ) { - println!("Parquet reader thread terminated due to error: {:?}", e); + println!( + "Parquet reader thread terminated due to error: {:?} for files: {:?}", + e, partition + ); } }); @@ -445,7 +448,7 @@ fn read_partition( object_store: &dyn ObjectStore, file_schema: SchemaRef, partition_index: usize, - partition: Vec, + partition: &[PartitionedFile], metrics: ExecutionPlanMetricsSet, projection: &[usize], pruning_predicate: &Option, From 4a08ccff6980b8bd4b3ebd591b7f45c4a089f206 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 26 Jan 2022 17:13:30 +0800 Subject: [PATCH 3/3] fix fmt --- benchmarks/src/bin/tpch.rs | 2 +- datafusion-examples/examples/parquet_sql_multiple_files.rs | 4 +++- datafusion/src/datasource/listing/table.rs | 4 ++-- datafusion/src/execution/options.rs | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 33694f4d4b1c..59bb55162a8e 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -54,9 +54,9 @@ use datafusion::{ }, }; -use structopt::StructOpt; use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; +use structopt::StructOpt; #[cfg(feature = "snmalloc")] #[global_allocator] diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs index caf02a31c38a..7485bc72f193 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -use datafusion::datasource::file_format::parquet::{DEFAULT_PARQUET_EXTENSION, ParquetFormat}; +use datafusion::datasource::file_format::parquet::{ + ParquetFormat, DEFAULT_PARQUET_EXTENSION, +}; use datafusion::datasource::listing::ListingOptions; use datafusion::error::Result; use datafusion::prelude::*; diff --git a/datafusion/src/datasource/listing/table.rs b/datafusion/src/datasource/listing/table.rs index e406959b21f5..1501b8bd7a18 100644 --- a/datafusion/src/datasource/listing/table.rs +++ b/datafusion/src/datasource/listing/table.rs @@ -266,6 +266,8 @@ impl ListingTable { mod tests { use arrow::datatypes::DataType; + use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; + use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; use crate::{ datasource::{ file_format::{avro::AvroFormat, parquet::ParquetFormat}, @@ -274,8 +276,6 @@ mod tests { logical_plan::{col, lit}, test::{columns, object_store::TestObjectStore}, }; - use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; - use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; use super::*; diff --git a/datafusion/src/execution/options.rs b/datafusion/src/execution/options.rs index 7ad3ccc99040..79b07536acb3 100644 --- a/datafusion/src/execution/options.rs +++ b/datafusion/src/execution/options.rs @@ -21,11 +21,11 @@ use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; +use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::{ file_format::{avro::AvroFormat, csv::CsvFormat}, listing::ListingOptions, }; -use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; /// CSV file read option #[derive(Copy, Clone)]