From 60961397f3fe7baff82270b5df92cdb9265604be Mon Sep 17 00:00:00 2001 From: jychen7 Date: Sun, 27 Mar 2022 00:22:09 -0400 Subject: [PATCH 01/10] #2061 support "PARTITIONED BY" in CreateExternalTable DDL for datafusion --- datafusion/src/execution/context.rs | 3 +- datafusion/src/logical_plan/plan.rs | 2 + datafusion/src/sql/parser.rs | 66 +++++++++++++++++++++++++++++ datafusion/src/sql/planner.rs | 2 + docs/source/user-guide/sql/ddl.md | 15 +++++++ 5 files changed, 87 insertions(+), 1 deletion(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 5aebe99070f5..03857ec7850e 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -210,6 +210,7 @@ impl SessionContext { ref location, ref file_type, ref has_header, + ref table_partition_cols, }) => { let (file_format, file_extension) = match file_type { FileType::CSV => ( @@ -236,7 +237,7 @@ impl SessionContext { collect_stat: false, file_extension: file_extension.to_owned(), target_partitions: self.copied_config().target_partitions, - table_partition_cols: vec![], + table_partition_cols: table_partition_cols.clone(), }; // TODO make schema in CreateExternalTable optional instead of empty diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 3acb69ecc120..3e651e3e3ac1 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -184,6 +184,8 @@ pub struct CreateExternalTable { pub file_type: FileType, /// Whether the CSV file contains a header pub has_header: bool, + // Partition Columns + pub table_partition_cols: Vec, } /// Creates a schema. diff --git a/datafusion/src/sql/parser.rs b/datafusion/src/sql/parser.rs index 6ad105fc6970..3197647df252 100644 --- a/datafusion/src/sql/parser.rs +++ b/datafusion/src/sql/parser.rs @@ -78,6 +78,8 @@ pub struct CreateExternalTable { pub has_header: bool, /// Path to file pub location: String, + // Partition Columns + pub table_partition_cols: Vec, } /// DataFusion Statement representations. @@ -192,6 +194,35 @@ impl<'a> DFParser<'a> { } } + fn parse_partitions(&mut self) -> Result, ParserError> { + let mut partitions: Vec = vec![]; + if !self.parser.consume_token(&Token::LParen) + || self.parser.consume_token(&Token::RParen) + { + return Ok(partitions); + } + + loop { + if let Token::Word(_) = self.parser.peek_token() { + let identifier = self.parser.parse_identifier()?; + partitions.push(identifier.to_string()); + } else { + return self.expected("partition name", self.parser.peek_token()); + } + let comma = self.parser.consume_token(&Token::Comma); + if self.parser.consume_token(&Token::RParen) { + // allow a trailing comma, even though it's not in standard + break; + } else if !comma { + return self.expected( + "',' or ')' after partition definition", + self.parser.peek_token(), + ); + } + } + Ok(partitions) + } + // This is a copy of the equivalent implementation in sqlparser. fn parse_columns( &mut self, @@ -277,6 +308,12 @@ impl<'a> DFParser<'a> { let has_header = self.parse_csv_has_header(); + let has_partition = self.parse_has_partition(); + let mut table_partition_cols: Vec = vec![]; + if has_partition { + table_partition_cols = self.parse_partitions()?; + } + self.parser.expect_keyword(Keyword::LOCATION)?; let location = self.parser.parse_literal_string()?; @@ -286,6 +323,7 @@ impl<'a> DFParser<'a> { file_type, has_header, location, + table_partition_cols, }; Ok(Statement::CreateExternalTable(create)) } @@ -314,6 +352,11 @@ impl<'a> DFParser<'a> { & self.consume_token(&Token::make_keyword("HEADER")) & self.consume_token(&Token::make_keyword("ROW")) } + + fn parse_has_partition(&mut self) -> bool { + self.consume_token(&Token::make_keyword("PARTITIONED")) + & self.consume_token(&Token::make_keyword("BY")) + } } #[cfg(test)] @@ -376,6 +419,20 @@ mod tests { file_type: FileType::CSV, has_header: false, location: "foo.csv".into(), + table_partition_cols: vec![], + }); + expect_parse_ok(sql, expected)?; + + // positive case: partitioned by + let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1, p2) LOCATION 'foo.csv'"; + let display = None; + let expected = Statement::CreateExternalTable(CreateExternalTable { + name: "t".into(), + columns: vec![make_column_def("c1", DataType::Int(display))], + file_type: FileType::CSV, + has_header: false, + location: "foo.csv".into(), + table_partition_cols: vec!["p1".to_string(), "p2".to_string()], }); expect_parse_ok(sql, expected)?; @@ -391,6 +448,7 @@ mod tests { file_type: FileType::CSV, has_header: true, location: "foo.csv".into(), + table_partition_cols: vec![], }); expect_parse_ok(sql, expected)?; } @@ -403,6 +461,7 @@ mod tests { file_type: FileType::Parquet, has_header: false, location: "foo.parquet".into(), + table_partition_cols: vec![], }); expect_parse_ok(sql, expected)?; @@ -414,6 +473,7 @@ mod tests { file_type: FileType::Parquet, has_header: false, location: "foo.parquet".into(), + table_partition_cols: vec![], }); expect_parse_ok(sql, expected)?; @@ -425,6 +485,7 @@ mod tests { file_type: FileType::Avro, has_header: false, location: "foo.avro".into(), + table_partition_cols: vec![], }); expect_parse_ok(sql, expected)?; @@ -433,6 +494,11 @@ mod tests { "CREATE EXTERNAL TABLE t(c1 int) STORED AS UNKNOWN_TYPE LOCATION 'foo.csv'"; expect_parse_error(sql, "expect one of PARQUET, AVRO, NDJSON, or CSV"); + // Error cases: partition column does not support type + let sql = + "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'"; + expect_parse_error(sql, "sql parser error: Expected ',' or ')' after partition definition, found: int"); + Ok(()) } } diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 886052156305..cf14127bd9d8 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -308,6 +308,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { file_type, has_header, location, + table_partition_cols, } = statement; // semantic checks @@ -333,6 +334,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { location, file_type, has_header, + table_partition_cols, })) } diff --git a/docs/source/user-guide/sql/ddl.md b/docs/source/user-guide/sql/ddl.md index eb10f40de5a6..75ec0f6cb0fa 100644 --- a/docs/source/user-guide/sql/ddl.md +++ b/docs/source/user-guide/sql/ddl.md @@ -55,6 +55,21 @@ WITH HEADER ROW LOCATION '/path/to/aggregate_test_100.csv'; ``` +If data sources are already partitioned in Hive style, `PARTITIONED BY` can be used for partition pruning. + +``` +/mnt/nyctaxi/year=2022/month=01/tripdata.parquet +/mnt/nyctaxi/year=2021/month=12/tripdata.parquet +/mnt/nyctaxi/year=2021/month=11/tripdata.parquet +``` + +```sql +CREATE EXTERNAL TABLE taxi +STORED AS PARQUET +PARTITIONED BY (year, month) +LOCATION '/mnt/nyctaxi'; +``` + ## CREATE MEMORY TABLE Memory table can be created with query. From f50e8000ee4de8432d7b59b63c18a8e0171a072b Mon Sep 17 00:00:00 2001 From: jychen7 Date: Sun, 27 Mar 2022 00:40:34 -0400 Subject: [PATCH 02/10] support table_partition_cols in ballista and add ParquetReadOptions --- .../src/bin/ballista-dataframe.rs | 4 +- ballista/rust/client/src/context.rs | 40 ++++++-- ballista/rust/core/proto/ballista.proto | 1 + .../rust/core/src/serde/logical_plan/mod.rs | 5 + benchmarks/src/bin/tpch.rs | 2 +- datafusion/src/execution/context.rs | 18 +++- datafusion/src/execution/options.rs | 91 +++++++++++++++++-- datafusion/src/logical_plan/builder.rs | 18 +--- datafusion/src/prelude.rs | 5 +- 9 files changed, 142 insertions(+), 42 deletions(-) diff --git a/ballista-examples/src/bin/ballista-dataframe.rs b/ballista-examples/src/bin/ballista-dataframe.rs index 345b6982dd85..03d4ac919945 100644 --- a/ballista-examples/src/bin/ballista-dataframe.rs +++ b/ballista-examples/src/bin/ballista-dataframe.rs @@ -16,7 +16,7 @@ // under the License. use ballista::prelude::*; -use datafusion::prelude::{col, lit}; +use datafusion::prelude::{col, lit, ParquetReadOptions}; /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and /// fetching results, using the DataFrame trait @@ -33,7 +33,7 @@ async fn main() -> Result<()> { // define the query using the DataFrame trait let df = ctx - .read_parquet(filename) + .read_parquet(filename, ParquetReadOptions::default()) .await? .select_columns(&["id", "bool_col", "timestamp_col"])? .filter(col("id").gt(lit(1)))?; diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 4a5fe6d30bfe..bd132da1516e 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -34,7 +34,7 @@ use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan}; use datafusion::prelude::{ - AvroReadOptions, CsvReadOptions, SessionConfig, SessionContext, + AvroReadOptions, CsvReadOptions, ParquetReadOptions, SessionConfig, SessionContext, }; use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement}; @@ -167,7 +167,11 @@ impl BallistaContext { /// Create a DataFrame representing a Parquet table scan /// TODO fetch schema from scheduler instead of resolving locally - pub async fn read_parquet(&self, path: &str) -> Result> { + pub async fn read_parquet( + &self, + path: &str, + options: ParquetReadOptions<'_>, + ) -> Result> { // convert to absolute path because the executor likely has a different working directory let path = PathBuf::from(path); let path = fs::canonicalize(&path)?; @@ -181,7 +185,7 @@ impl BallistaContext { guard.config(), ) }; - let df = ctx.read_parquet(path.to_str().unwrap()).await?; + let df = ctx.read_parquet(path.to_str().unwrap(), options).await?; Ok(df) } @@ -234,8 +238,13 @@ impl BallistaContext { } } - pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> { - match self.read_parquet(path).await?.to_logical_plan() { + pub async fn register_parquet( + &self, + name: &str, + path: &str, + options: ParquetReadOptions<'_>, + ) -> Result<()> { + match self.read_parquet(path, options).await?.to_logical_plan() { LogicalPlan::TableScan(TableScan { source, .. }) => { self.register_table(name, source) } @@ -331,6 +340,7 @@ impl BallistaContext { ref location, ref file_type, ref has_header, + ref table_partition_cols, }) => match file_type { FileType::CSV => { self.register_csv( @@ -338,18 +348,30 @@ impl BallistaContext { location, CsvReadOptions::new() .schema(&schema.as_ref().to_owned().into()) - .has_header(*has_header), + .has_header(*has_header) + .table_partition_cols(table_partition_cols.to_vec()), ) .await?; Ok(Arc::new(DataFrame::new(ctx.state, &plan))) } FileType::Parquet => { - self.register_parquet(name, location).await?; + self.register_parquet( + name, + location, + ParquetReadOptions::default() + .table_partition_cols(table_partition_cols.to_vec()), + ) + .await?; Ok(Arc::new(DataFrame::new(ctx.state, &plan))) } FileType::Avro => { - self.register_avro(name, location, AvroReadOptions::default()) - .await?; + self.register_avro( + name, + location, + AvroReadOptions::default() + .table_partition_cols(table_partition_cols.to_vec()), + ) + .await?; Ok(Arc::new(DataFrame::new(ctx.state, &plan))) } _ => Err(DataFusionError::NotImplemented(format!( diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 5bb12890ccc8..33655f4532cc 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -145,6 +145,7 @@ message CreateExternalTableNode { FileType file_type = 3; bool has_header = 4; datafusion.DfSchema schema = 5; + repeated string table_partition_cols = 6; } message CreateCatalogSchemaNode { diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index d64fac320fcd..ba4836212d70 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -325,6 +325,9 @@ impl AsLogicalPlan for LogicalPlanNode { location: create_extern_table.location.clone(), file_type: pb_file_type.into(), has_header: create_extern_table.has_header, + table_partition_cols: create_extern_table + .table_partition_cols + .clone(), })) } LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => { @@ -752,6 +755,7 @@ impl AsLogicalPlan for LogicalPlanNode { file_type, has_header, schema: df_schema, + table_partition_cols, }) => { use datafusion::sql::parser::FileType; @@ -770,6 +774,7 @@ impl AsLogicalPlan for LogicalPlanNode { file_type: pb_file_type as i32, has_header: *has_header, schema: Some(df_schema.into()), + table_partition_cols: table_partition_cols.clone(), }, )), }) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index aa70500d19e3..3cbae89b6baa 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -544,7 +544,7 @@ async fn register_tables(path: &str, file_format: &str, ctx: &BallistaContext) { } "parquet" => { let path = format!("{}/{}", path, table); - ctx.register_parquet(table, &path) + ctx.register_parquet(table, &path, ParquetReadOptions::default()) .await .map_err(|e| DataFusionError::Plan(format!("{:?}", e))) .unwrap(); diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 03857ec7850e..819fb8541634 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -92,7 +92,9 @@ use chrono::{DateTime, Utc}; use parquet::file::properties::WriterProperties; use uuid::Uuid; -use super::options::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions}; +use super::options::{ + AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, +}; /// The default catalog name - this impacts what SQL queries use if not specified const DEFAULT_CATALOG: &str = "datafusion"; @@ -461,14 +463,20 @@ impl SessionContext { pub async fn read_parquet( &mut self, uri: impl Into, + options: ParquetReadOptions<'_>, ) -> Result> { let uri: String = uri.into(); let (object_store, path) = self.runtime_env().object_store(&uri)?; let target_partitions = self.copied_config().target_partitions; - let logical_plan = - LogicalPlanBuilder::scan_parquet(object_store, path, None, target_partitions) - .await? - .build()?; + let logical_plan = LogicalPlanBuilder::scan_parquet( + object_store, + path, + options, + None, + target_partitions, + ) + .await? + .build()?; Ok(Arc::new(DataFrame::new(self.state.clone(), &logical_plan))) } diff --git a/datafusion/src/execution/options.rs b/datafusion/src/execution/options.rs index d9a0304420d1..b868e65ee1d3 100644 --- a/datafusion/src/execution/options.rs +++ b/datafusion/src/execution/options.rs @@ -21,14 +21,18 @@ use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; -use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::{ - file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat}, + file_format::{ + avro::{AvroFormat, DEFAULT_AVRO_EXTENSION}, + csv::{CsvFormat, DEFAULT_CSV_EXTENSION}, + json::{JsonFormat, DEFAULT_JSON_EXTENSION}, + parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, + }, listing::ListingOptions, }; /// CSV file read option -#[derive(Copy, Clone)] +#[derive(Clone)] pub struct CsvReadOptions<'a> { /// Does the CSV file have a header? /// @@ -43,8 +47,10 @@ pub struct CsvReadOptions<'a> { /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000. pub schema_infer_max_records: usize, /// File extension; only files with this extension are selected for data input. - /// Defaults to ".csv". + /// Defaults to DEFAULT_CSV_EXTENSION. pub file_extension: &'a str, + // Partition Columns + pub table_partition_cols: Vec, } impl<'a> Default for CsvReadOptions<'a> { @@ -61,7 +67,8 @@ impl<'a> CsvReadOptions<'a> { schema: None, schema_infer_max_records: 1000, delimiter: b',', - file_extension: ".csv", + file_extension: DEFAULT_CSV_EXTENSION, + table_partition_cols: vec![], } } @@ -97,6 +104,12 @@ impl<'a> CsvReadOptions<'a> { self } + /// Specify table_partition_cols for partition pruning + pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + /// Configure number of max records to read for schema inference pub fn schema_infer_max_records(mut self, max_records: usize) -> Self { self.schema_infer_max_records = max_records; @@ -115,11 +128,51 @@ impl<'a> CsvReadOptions<'a> { collect_stat: false, file_extension: self.file_extension.to_owned(), target_partitions, + table_partition_cols: self.table_partition_cols.clone(), + } + } +} + +/// Parquet read options +#[derive(Clone)] +pub struct ParquetReadOptions<'a> { + /// File extension; only files with this extension are selected for data input. + /// Defaults to ".parquet". + pub file_extension: &'a str, + // Partition Columns + pub table_partition_cols: Vec, +} + +impl<'a> Default for ParquetReadOptions<'a> { + fn default() -> Self { + Self { + file_extension: DEFAULT_PARQUET_EXTENSION, table_partition_cols: vec![], } } } +impl<'a> ParquetReadOptions<'a> { + /// Specify table_partition_cols for partition pruning + pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + + /// Helper to convert these user facing options to `ListingTable` options + pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { + let file_format = ParquetFormat::default(); + + ListingOptions { + format: Arc::new(file_format), + collect_stat: true, + file_extension: self.file_extension.to_owned(), + target_partitions, + table_partition_cols: self.table_partition_cols.clone(), + } + } +} + /// Avro read options #[derive(Clone)] pub struct AvroReadOptions<'a> { @@ -127,20 +180,29 @@ pub struct AvroReadOptions<'a> { pub schema: Option, /// File extension; only files with this extension are selected for data input. - /// Defaults to ".avro". + /// Defaults to DEFAULT_AVRO_EXTENSION. pub file_extension: &'a str, + // Partition Columns + pub table_partition_cols: Vec, } impl<'a> Default for AvroReadOptions<'a> { fn default() -> Self { Self { schema: None, - file_extension: ".avro", + file_extension: DEFAULT_AVRO_EXTENSION, + table_partition_cols: vec![], } } } impl<'a> AvroReadOptions<'a> { + /// Specify table_partition_cols for partition pruning + pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + /// Helper to convert these user facing options to `ListingTable` options pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { let file_format = AvroFormat::default(); @@ -150,7 +212,7 @@ impl<'a> AvroReadOptions<'a> { collect_stat: false, file_extension: self.file_extension.to_owned(), target_partitions, - table_partition_cols: vec![], + table_partition_cols: self.table_partition_cols.clone(), } } } @@ -165,8 +227,10 @@ pub struct NdJsonReadOptions<'a> { pub schema_infer_max_records: usize, /// File extension; only files with this extension are selected for data input. - /// Defaults to ".json". + /// Defaults to DEFAULT_JSON_EXTENSION. pub file_extension: &'a str, + // Partition Columns + pub table_partition_cols: Vec, } impl<'a> Default for NdJsonReadOptions<'a> { @@ -175,11 +239,18 @@ impl<'a> Default for NdJsonReadOptions<'a> { schema: None, schema_infer_max_records: 1000, file_extension: DEFAULT_JSON_EXTENSION, + table_partition_cols: vec![], } } } impl<'a> NdJsonReadOptions<'a> { + /// Specify table_partition_cols for partition pruning + pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + /// Helper to convert these user facing options to `ListingTable` options pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { let file_format = JsonFormat::default(); @@ -188,7 +259,7 @@ impl<'a> NdJsonReadOptions<'a> { collect_stat: false, file_extension: self.file_extension.to_owned(), target_partitions, - table_partition_cols: vec![], + table_partition_cols: self.table_partition_cols.clone(), } } } diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 86e9db535e09..cc3e937bf5bd 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -19,8 +19,7 @@ use crate::datasource::{ empty::EmptyTable, - file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, - listing::{ListingOptions, ListingTable, ListingTableConfig}, + listing::{ListingTable, ListingTableConfig}, MemTable, TableProvider, }; use crate::error::{DataFusionError, Result}; @@ -256,6 +255,7 @@ impl LogicalPlanBuilder { pub async fn scan_parquet( object_store: Arc, path: impl Into, + options: ParquetReadOptions<'_>, projection: Option>, target_partitions: usize, ) -> Result { @@ -263,6 +263,7 @@ impl LogicalPlanBuilder { Self::scan_parquet_with_name( object_store, path.clone(), + options, projection, target_partitions, path, @@ -274,21 +275,12 @@ impl LogicalPlanBuilder { pub async fn scan_parquet_with_name( object_store: Arc, path: impl Into, + options: ParquetReadOptions<'_>, projection: Option>, target_partitions: usize, table_name: impl Into, ) -> Result { - // TODO remove hard coded enable_pruning - let file_format = ParquetFormat::default().with_enable_pruning(true); - - let listing_options = ListingOptions { - format: Arc::new(file_format), - collect_stat: true, - file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), - target_partitions, - table_partition_cols: vec![], - }; - + let listing_options = options.to_listing_options(target_partitions); let path: String = path.into(); // with parquet we resolve the schema in all cases diff --git a/datafusion/src/prelude.rs b/datafusion/src/prelude.rs index e40693e71566..cd78209326bd 100644 --- a/datafusion/src/prelude.rs +++ b/datafusion/src/prelude.rs @@ -27,8 +27,9 @@ pub use crate::dataframe::DataFrame; pub use crate::execution::context::{SessionConfig, SessionContext}; -pub use crate::execution::options::AvroReadOptions; -pub use crate::execution::options::{CsvReadOptions, NdJsonReadOptions}; +pub use crate::execution::options::{ + AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, +}; pub use crate::logical_plan::{ approx_percentile_cont, array, ascii, avg, bit_length, btrim, character_length, chr, col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest, in_list, From 8b1a25898f9cfb4126ec8893bd4c17781cb33e21 Mon Sep 17 00:00:00 2001 From: jychen7 Date: Sun, 27 Mar 2022 09:17:14 -0400 Subject: [PATCH 03/10] fix a few usage of read_parquet --- datafusion-examples/examples/dataframe.rs | 2 +- datafusion/src/execution/context.rs | 4 +++- datafusion/tests/sql/parquet.rs | 5 ++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index 655c568804f9..0cce69865b26 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -31,7 +31,7 @@ async fn main() -> Result<()> { // define the query using the DataFrame trait let df = ctx - .read_parquet(filename) + .read_parquet(filename, ParquetReadOptions::default()) .await? .select_columns(&["id", "bool_col", "timestamp_col"])? .filter(col("id").gt(lit(1)))?; diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 819fb8541634..dac4fab24558 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -3426,7 +3426,9 @@ mod tests { async fn call_read_parquet(&self) -> Arc { let mut ctx = SessionContext::new(); - ctx.read_parquet("dummy").await.unwrap() + ctx.read_parquet("dummy", ParquetReadOptions::default()) + .await + .unwrap() } } } diff --git a/datafusion/tests/sql/parquet.rs b/datafusion/tests/sql/parquet.rs index 77949938cc7a..92ad35cc5942 100644 --- a/datafusion/tests/sql/parquet.rs +++ b/datafusion/tests/sql/parquet.rs @@ -214,7 +214,10 @@ async fn schema_merge_ignores_metadata() { // (no errors) let mut ctx = SessionContext::new(); let df = ctx - .read_parquet(table_dir.to_str().unwrap().to_string()) + .read_parquet( + table_dir.to_str().unwrap().to_string(), + ParquetReadOptions::default(), + ) .await .unwrap(); let result = df.collect().await.unwrap(); From fce905e0acf90d9d9e86551ebade5b7bbefab27c Mon Sep 17 00:00:00 2001 From: jychen7 Date: Sun, 27 Mar 2022 09:33:33 -0400 Subject: [PATCH 04/10] fix CsvReadOption clone due to removing the copy trait --- ballista/rust/core/src/serde/logical_plan/mod.rs | 1 + datafusion/src/physical_plan/file_format/csv.rs | 2 +- datafusion/src/physical_plan/planner.rs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index ba4836212d70..4919aea2daf0 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -1093,6 +1093,7 @@ mod roundtrip_tests { location: String::from("employee.csv"), file_type: *file, has_header: true, + table_partition_cols: vec![], }); roundtrip_test!(create_table_node); diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index 28466b8aace2..c0f1227e6adb 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -487,7 +487,7 @@ mod tests { // register each partition as well as the top level dir let csv_read_option = CsvReadOptions::new().schema(&schema); - ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option) + ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option.clone()) .await?; ctx.register_csv("allparts", &out_dir, csv_read_option) .await?; diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 1189f23d921c..428492e58de5 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -1586,7 +1586,7 @@ mod tests { let logical_plan = LogicalPlanBuilder::scan_csv( Arc::new(LocalFileSystem {}), &path, - options, + options.clone(), None, 1, ) From 3a91480548459ef36a9baeb0bc8479e67ef35772 Mon Sep 17 00:00:00 2001 From: jychen7 Date: Sun, 27 Mar 2022 09:44:52 -0400 Subject: [PATCH 05/10] fix CsvReadOption clone due to removing the copy trait --- datafusion/src/physical_plan/file_format/csv.rs | 8 ++++++-- datafusion/src/physical_plan/planner.rs | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index c0f1227e6adb..d17f00377b11 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -487,8 +487,12 @@ mod tests { // register each partition as well as the top level dir let csv_read_option = CsvReadOptions::new().schema(&schema); - ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option.clone()) - .await?; + ctx.register_csv( + "part0", + &format!("{}/part-0.csv", out_dir), + csv_read_option.clone(), + ) + .await?; ctx.register_csv("allparts", &out_dir, csv_read_option) .await?; diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 428492e58de5..79abdb1aaef5 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -1686,7 +1686,7 @@ mod tests { let logical_plan = LogicalPlanBuilder::scan_csv( Arc::new(LocalFileSystem {}), &path, - options, + options.clone(), None, 1, ) @@ -1708,7 +1708,7 @@ mod tests { let logical_plan = LogicalPlanBuilder::scan_csv( Arc::new(LocalFileSystem {}), &path, - options, + options.clone(), None, 1, ) From 8f0a588fc6328f26f1a5ba408520dcccd233be4d Mon Sep 17 00:00:00 2001 From: jychen7 Date: Sun, 27 Mar 2022 10:12:00 -0400 Subject: [PATCH 06/10] fix "missing documentation for a struct field" --- datafusion/src/execution/options.rs | 8 ++++---- datafusion/src/logical_plan/plan.rs | 2 +- datafusion/src/sql/parser.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/src/execution/options.rs b/datafusion/src/execution/options.rs index b868e65ee1d3..c15f3c675c6b 100644 --- a/datafusion/src/execution/options.rs +++ b/datafusion/src/execution/options.rs @@ -49,7 +49,7 @@ pub struct CsvReadOptions<'a> { /// File extension; only files with this extension are selected for data input. /// Defaults to DEFAULT_CSV_EXTENSION. pub file_extension: &'a str, - // Partition Columns + /// Partition Columns pub table_partition_cols: Vec, } @@ -139,7 +139,7 @@ pub struct ParquetReadOptions<'a> { /// File extension; only files with this extension are selected for data input. /// Defaults to ".parquet". pub file_extension: &'a str, - // Partition Columns + /// Partition Columns pub table_partition_cols: Vec, } @@ -182,7 +182,7 @@ pub struct AvroReadOptions<'a> { /// File extension; only files with this extension are selected for data input. /// Defaults to DEFAULT_AVRO_EXTENSION. pub file_extension: &'a str, - // Partition Columns + /// Partition Columns pub table_partition_cols: Vec, } @@ -229,7 +229,7 @@ pub struct NdJsonReadOptions<'a> { /// File extension; only files with this extension are selected for data input. /// Defaults to DEFAULT_JSON_EXTENSION. pub file_extension: &'a str, - // Partition Columns + /// Partition Columns pub table_partition_cols: Vec, } diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 3e651e3e3ac1..eeb06d5599a9 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -184,7 +184,7 @@ pub struct CreateExternalTable { pub file_type: FileType, /// Whether the CSV file contains a header pub has_header: bool, - // Partition Columns + /// Partition Columns pub table_partition_cols: Vec, } diff --git a/datafusion/src/sql/parser.rs b/datafusion/src/sql/parser.rs index 3197647df252..ef54d7f60183 100644 --- a/datafusion/src/sql/parser.rs +++ b/datafusion/src/sql/parser.rs @@ -78,7 +78,7 @@ pub struct CreateExternalTable { pub has_header: bool, /// Path to file pub location: String, - // Partition Columns + /// Partition Columns pub table_partition_cols: Vec, } From 9492d177dd4cda847dbb6c0880235c4622fb46c9 Mon Sep 17 00:00:00 2001 From: jychen7 Date: Sun, 27 Mar 2022 11:31:18 -0400 Subject: [PATCH 07/10] fix a few usage of register_parquet --- ballista/rust/client/src/context.rs | 6 +++- benchmarks/src/bin/nyctaxi.rs | 7 ++-- datafusion-examples/examples/flight_server.rs | 1 + datafusion-examples/examples/parquet_sql.rs | 1 + datafusion/benches/parquet_query_sql.rs | 8 +++-- datafusion/src/execution/context.rs | 22 +++++------- .../src/physical_plan/file_format/parquet.rs | 35 ++++++++++++++----- datafusion/tests/parquet_pruning.rs | 4 ++- datafusion/tests/sql/mod.rs | 1 + datafusion/tests/sql/parquet.rs | 11 ++++-- 10 files changed, 64 insertions(+), 32 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index bd132da1516e..c0bdbca6c34b 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -516,7 +516,11 @@ mod tests { let testdata = datafusion::test_util::parquet_test_data(); context - .register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata)) + .register_parquet( + "single_nan", + &format!("{}/single_nan.parquet", testdata), + ParquetReadOptions::default(), + ) .await .unwrap(); diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs index a0cdb748a31e..e22c71e5e9e3 100644 --- a/benchmarks/src/bin/nyctaxi.rs +++ b/benchmarks/src/bin/nyctaxi.rs @@ -29,7 +29,7 @@ use datafusion::error::Result; use datafusion::execution::context::{SessionConfig, SessionContext}; use datafusion::physical_plan::collect; -use datafusion::prelude::CsvReadOptions; +use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; use structopt::StructOpt; #[cfg(feature = "snmalloc")] @@ -82,7 +82,10 @@ async fn main() -> Result<()> { let options = CsvReadOptions::new().schema(&schema).has_header(true); ctx.register_csv("tripdata", path, options).await? } - "parquet" => ctx.register_parquet("tripdata", path).await?, + "parquet" => { + ctx.register_parquet("tripdata", path, ParquetReadOptions::default()) + .await? + } other => { println!("Invalid file format '{}'", other); process::exit(-1); diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index d8b0405f19b9..697b56117e04 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -98,6 +98,7 @@ impl FlightService for FlightServiceImpl { ctx.register_parquet( "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), + ParquetReadOptions::default(), ) .await .map_err(to_tonic_err)?; diff --git a/datafusion-examples/examples/parquet_sql.rs b/datafusion-examples/examples/parquet_sql.rs index 6ab24ff41ab6..22923c5239af 100644 --- a/datafusion-examples/examples/parquet_sql.rs +++ b/datafusion-examples/examples/parquet_sql.rs @@ -31,6 +31,7 @@ async fn main() -> Result<()> { ctx.register_parquet( "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), + ParquetReadOptions::default(), ) .await?; diff --git a/datafusion/benches/parquet_query_sql.rs b/datafusion/benches/parquet_query_sql.rs index 183f64739069..35fef901867a 100644 --- a/datafusion/benches/parquet_query_sql.rs +++ b/datafusion/benches/parquet_query_sql.rs @@ -196,8 +196,12 @@ fn criterion_benchmark(c: &mut Criterion) { let mut context = SessionContext::new(); let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); - rt.block_on(context.register_parquet("t", file_path.as_str())) - .unwrap(); + rt.block_on(context.register_parquet( + "t", + file_path.as_str(), + ParquetReadOptions::default(), + )) + .unwrap(); // We read the queries from a file so they can be changed without recompiling the benchmark let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap(); diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index dac4fab24558..9b608da28b6e 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -558,20 +558,14 @@ impl SessionContext { /// Registers a Parquet data source so that it can be referenced from SQL statements /// executed against this context. - pub async fn register_parquet(&mut self, name: &str, uri: &str) -> Result<()> { - let (target_partitions, enable_pruning) = { - let conf = self.copied_config(); - (conf.target_partitions, conf.parquet_pruning) - }; - let file_format = ParquetFormat::default().with_enable_pruning(enable_pruning); - - let listing_options = ListingOptions { - format: Arc::new(file_format), - collect_stat: true, - file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), - target_partitions, - table_partition_cols: vec![], - }; + pub async fn register_parquet( + &mut self, + name: &str, + uri: &str, + options: ParquetReadOptions<'_>, + ) -> Result<()> { + let listing_options = + options.to_listing_options(self.copied_config().target_partitions); self.register_listing_table(name, uri, listing_options, None) .await?; diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index c77e7d8a27c0..97dde31c3d72 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -593,7 +593,7 @@ mod tests { use super::*; use crate::execution::options::CsvReadOptions; - use crate::prelude::{SessionConfig, SessionContext}; + use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use arrow::array::Float32Array; use arrow::{ array::{Int64Array, Int8Array, StringArray}, @@ -1329,15 +1329,32 @@ mod tests { let mut ctx = SessionContext::new(); // register each partition as well as the top level dir - ctx.register_parquet("part0", &format!("{}/part-0.parquet", out_dir)) - .await?; - ctx.register_parquet("part1", &format!("{}/part-1.parquet", out_dir)) - .await?; - ctx.register_parquet("part2", &format!("{}/part-2.parquet", out_dir)) - .await?; - ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir)) + ctx.register_parquet( + "part0", + &format!("{}/part-0.parquet", out_dir), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet( + "part1", + &format!("{}/part-1.parquet", out_dir), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet( + "part2", + &format!("{}/part-2.parquet", out_dir), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet( + "part3", + &format!("{}/part-3.parquet", out_dir), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default()) .await?; - ctx.register_parquet("allparts", &out_dir).await?; let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?; let allparts = ctx diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index 83d51e071aaa..d3b86ab28eed 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -482,7 +482,9 @@ impl ContextWithParquet { // now, setup a the file as a data source and run a query against it let mut ctx = SessionContext::with_config(config); - ctx.register_parquet("t", &parquet_path).await.unwrap(); + ctx.register_parquet("t", &parquet_path, ParquetReadOptions::default()) + .await + .unwrap(); let provider = ctx.deregister_table("t").unwrap().unwrap(); ctx.register_table("t", provider.clone()).unwrap(); diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs index 1e8938a06f83..ddceab2c371c 100644 --- a/datafusion/tests/sql/mod.rs +++ b/datafusion/tests/sql/mod.rs @@ -627,6 +627,7 @@ async fn register_alltypes_parquet(ctx: &mut SessionContext) { ctx.register_parquet( "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), + ParquetReadOptions::default(), ) .await .unwrap(); diff --git a/datafusion/tests/sql/parquet.rs b/datafusion/tests/sql/parquet.rs index 92ad35cc5942..bed507a37262 100644 --- a/datafusion/tests/sql/parquet.rs +++ b/datafusion/tests/sql/parquet.rs @@ -52,9 +52,13 @@ async fn parquet_query() { async fn parquet_single_nan_schema() { let mut ctx = SessionContext::new(); let testdata = datafusion::test_util::parquet_test_data(); - ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata)) - .await - .unwrap(); + ctx.register_parquet( + "single_nan", + &format!("{}/single_nan.parquet", testdata), + ParquetReadOptions::default(), + ) + .await + .unwrap(); let sql = "SELECT mycol FROM single_nan"; let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); @@ -75,6 +79,7 @@ async fn parquet_list_columns() { ctx.register_parquet( "list_columns", &format!("{}/list_columns.parquet", testdata), + ParquetReadOptions::default(), ) .await .unwrap(); From a3d2591d0e8217bbc90376cdc73efc03fa7d9908 Mon Sep 17 00:00:00 2001 From: jychen7 Date: Sun, 27 Mar 2022 12:15:40 -0400 Subject: [PATCH 08/10] Allow ParquetReadOption to receive parquet_pruning from execution::Context::SessionConfig https://github.com/apache/arrow-datafusion/blob/73ea6e16f5c8f34526c01490a5ec277a68f33791/datafusion/tests/parquet_pruning.rs#L143 --- datafusion/src/execution/context.rs | 9 +++++++-- datafusion/src/execution/options.rs | 12 +++++++++++- datafusion/tests/parquet_pruning.rs | 2 +- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 9b608da28b6e..5acf63f888ef 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -564,8 +564,13 @@ impl SessionContext { uri: &str, options: ParquetReadOptions<'_>, ) -> Result<()> { - let listing_options = - options.to_listing_options(self.copied_config().target_partitions); + let (target_partitions, parquet_pruning) = { + let conf = self.copied_config(); + (conf.target_partitions, conf.parquet_pruning) + }; + let listing_options = options + .parquet_pruning(parquet_pruning) + .to_listing_options(target_partitions); self.register_listing_table(name, uri, listing_options, None) .await?; diff --git a/datafusion/src/execution/options.rs b/datafusion/src/execution/options.rs index c15f3c675c6b..f83b8345cc7c 100644 --- a/datafusion/src/execution/options.rs +++ b/datafusion/src/execution/options.rs @@ -141,6 +141,8 @@ pub struct ParquetReadOptions<'a> { pub file_extension: &'a str, /// Partition Columns pub table_partition_cols: Vec, + /// Should DataFusion parquet reader using the predicate to prune data, following execution::context::SessionConfig + pub parquet_pruning: bool, } impl<'a> Default for ParquetReadOptions<'a> { @@ -148,11 +150,18 @@ impl<'a> Default for ParquetReadOptions<'a> { Self { file_extension: DEFAULT_PARQUET_EXTENSION, table_partition_cols: vec![], + parquet_pruning: ParquetFormat::default().enable_pruning(), } } } impl<'a> ParquetReadOptions<'a> { + /// Specify parquet_pruning + pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self { + self.parquet_pruning = parquet_pruning; + self + } + /// Specify table_partition_cols for partition pruning pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { self.table_partition_cols = table_partition_cols; @@ -161,7 +170,8 @@ impl<'a> ParquetReadOptions<'a> { /// Helper to convert these user facing options to `ListingTable` options pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { - let file_format = ParquetFormat::default(); + let file_format = + ParquetFormat::default().with_enable_pruning(self.parquet_pruning); ListingOptions { format: Arc::new(file_format), diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index d3b86ab28eed..bf9049a10310 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -37,7 +37,7 @@ use datafusion::{ accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan, ExecutionPlanVisitor, }, - prelude::{SessionConfig, SessionContext}, + prelude::{ParquetReadOptions, SessionConfig, SessionContext}, scalar::ScalarValue, }; use parquet::{arrow::ArrowWriter, file::properties::WriterProperties}; From fd68f728780f5a2bb1adb58ae859f84470b9ef9a Mon Sep 17 00:00:00 2001 From: jychen7 Date: Sun, 27 Mar 2022 17:59:01 -0400 Subject: [PATCH 09/10] fix benches import --- datafusion/benches/parquet_query_sql.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/benches/parquet_query_sql.rs b/datafusion/benches/parquet_query_sql.rs index 35fef901867a..058f757d186c 100644 --- a/datafusion/benches/parquet_query_sql.rs +++ b/datafusion/benches/parquet_query_sql.rs @@ -24,7 +24,7 @@ use arrow::datatypes::{ }; use arrow::record_batch::RecordBatch; use criterion::{criterion_group, criterion_main, Criterion}; -use datafusion::prelude::SessionContext; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; use parquet::arrow::ArrowWriter; use parquet::file::properties::{WriterProperties, WriterVersion}; use rand::distributions::uniform::SampleUniform; From e8993ebbdfc383472976a2cb210a7ee14fc4ac3f Mon Sep 17 00:00:00 2001 From: jychen7 Date: Sat, 2 Apr 2022 10:06:51 -0400 Subject: [PATCH 10/10] Apply suggestions from code review (lint) --- datafusion/core/src/execution/options.rs | 3 ++- datafusion/core/src/sql/parser.rs | 10 +++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs index f83b8345cc7c..b790ca3bfae4 100644 --- a/datafusion/core/src/execution/options.rs +++ b/datafusion/core/src/execution/options.rs @@ -141,7 +141,8 @@ pub struct ParquetReadOptions<'a> { pub file_extension: &'a str, /// Partition Columns pub table_partition_cols: Vec, - /// Should DataFusion parquet reader using the predicate to prune data, following execution::context::SessionConfig + /// Should DataFusion parquet reader using the predicate to prune data, + /// overridden by value on execution::context::SessionConfig pub parquet_pruning: bool, } diff --git a/datafusion/core/src/sql/parser.rs b/datafusion/core/src/sql/parser.rs index ef54d7f60183..566e91c929b7 100644 --- a/datafusion/core/src/sql/parser.rs +++ b/datafusion/core/src/sql/parser.rs @@ -308,11 +308,11 @@ impl<'a> DFParser<'a> { let has_header = self.parse_csv_has_header(); - let has_partition = self.parse_has_partition(); - let mut table_partition_cols: Vec = vec![]; - if has_partition { - table_partition_cols = self.parse_partitions()?; - } + let table_partition_cols = if self.parse_has_partition() { + self.parse_partitions()? + } else { + vec![] + }; self.parser.expect_keyword(Keyword::LOCATION)?; let location = self.parser.parse_literal_string()?;