diff --git a/ballista-examples/src/bin/ballista-dataframe.rs b/ballista-examples/src/bin/ballista-dataframe.rs index cab8f5645450..a819950f4267 100644 --- a/ballista-examples/src/bin/ballista-dataframe.rs +++ b/ballista-examples/src/bin/ballista-dataframe.rs @@ -16,7 +16,7 @@ // under the License. use ballista::prelude::*; -use datafusion::prelude::{col, lit}; +use datafusion::prelude::{col, lit, ParquetReadOptions}; /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and /// fetching results, using the DataFrame trait @@ -33,7 +33,7 @@ async fn main() -> Result<()> { // define the query using the DataFrame trait let df = ctx - .read_parquet(filename) + .read_parquet(filename, ParquetReadOptions::default()) .await? .select_columns(&["id", "bool_col", "timestamp_col"])? .filter(col("id").gt(lit(1)))?; diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index e4233d4f4025..0a002e888416 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -36,7 +36,7 @@ use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan}; use datafusion::prelude::{ - AvroReadOptions, CsvReadOptions, SessionConfig, SessionContext, + AvroReadOptions, CsvReadOptions, ParquetReadOptions, SessionConfig, SessionContext, }; use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement}; @@ -221,13 +221,17 @@ impl BallistaContext { /// Create a DataFrame representing a Parquet table scan /// TODO fetch schema from scheduler instead of resolving locally - pub async fn read_parquet(&self, path: &str) -> Result> { + pub async fn read_parquet( + &self, + path: &str, + options: ParquetReadOptions<'_>, + ) -> Result> { // convert to absolute path because the executor likely has a different working directory let path = PathBuf::from(path); let path = fs::canonicalize(&path)?; let ctx = self.context.clone(); - let df = ctx.read_parquet(path.to_str().unwrap()).await?; + let df = ctx.read_parquet(path.to_str().unwrap(), options).await?; Ok(df) } @@ -272,8 +276,13 @@ impl BallistaContext { } } - pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> { - match self.read_parquet(path).await?.to_logical_plan() { + pub async fn register_parquet( + &self, + name: &str, + path: &str, + options: ParquetReadOptions<'_>, + ) -> Result<()> { + match self.read_parquet(path, options).await?.to_logical_plan() { LogicalPlan::TableScan(TableScan { source, .. }) => { self.register_table(name, source) } @@ -366,6 +375,7 @@ impl BallistaContext { ref location, ref file_type, ref has_header, + ref table_partition_cols, }) => match file_type { FileType::CSV => { self.register_csv( @@ -373,18 +383,30 @@ impl BallistaContext { location, CsvReadOptions::new() .schema(&schema.as_ref().to_owned().into()) - .has_header(*has_header), + .has_header(*has_header) + .table_partition_cols(table_partition_cols.to_vec()), ) .await?; Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan))) } FileType::Parquet => { - self.register_parquet(name, location).await?; + self.register_parquet( + name, + location, + ParquetReadOptions::default() + .table_partition_cols(table_partition_cols.to_vec()), + ) + .await?; Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan))) } FileType::Avro => { - self.register_avro(name, location, AvroReadOptions::default()) - .await?; + self.register_avro( + name, + location, + AvroReadOptions::default() + .table_partition_cols(table_partition_cols.to_vec()), + ) + .await?; Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan))) } _ => Err(DataFusionError::NotImplemented(format!( @@ -525,7 +547,11 @@ mod tests { let testdata = datafusion::test_util::parquet_test_data(); context - .register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata)) + .register_parquet( + "single_nan", + &format!("{}/single_nan.parquet", testdata), + ParquetReadOptions::default(), + ) .await .unwrap(); diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 015618fe876a..7f96a00ea3e2 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -146,6 +146,7 @@ message CreateExternalTableNode { FileType file_type = 3; bool has_header = 4; datafusion.DfSchema schema = 5; + repeated string table_partition_cols = 6; } message CreateCatalogSchemaNode { diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 05f396608978..3aed2db006a8 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -325,6 +325,9 @@ impl AsLogicalPlan for LogicalPlanNode { location: create_extern_table.location.clone(), file_type: pb_file_type.into(), has_header: create_extern_table.has_header, + table_partition_cols: create_extern_table + .table_partition_cols + .clone(), })) } LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => { @@ -771,6 +774,7 @@ impl AsLogicalPlan for LogicalPlanNode { file_type, has_header, schema: df_schema, + table_partition_cols, }) => { use datafusion::sql::parser::FileType; @@ -789,6 +793,7 @@ impl AsLogicalPlan for LogicalPlanNode { file_type: pb_file_type as i32, has_header: *has_header, schema: Some(df_schema.into()), + table_partition_cols: table_partition_cols.clone(), }, )), }) @@ -1123,6 +1128,7 @@ mod roundtrip_tests { location: String::from("employee.csv"), file_type: *file, has_header: true, + table_partition_cols: vec![], }); roundtrip_test!(create_table_node); diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs index a0cdb748a31e..e22c71e5e9e3 100644 --- a/benchmarks/src/bin/nyctaxi.rs +++ b/benchmarks/src/bin/nyctaxi.rs @@ -29,7 +29,7 @@ use datafusion::error::Result; use datafusion::execution::context::{SessionConfig, SessionContext}; use datafusion::physical_plan::collect; -use datafusion::prelude::CsvReadOptions; +use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; use structopt::StructOpt; #[cfg(feature = "snmalloc")] @@ -82,7 +82,10 @@ async fn main() -> Result<()> { let options = CsvReadOptions::new().schema(&schema).has_header(true); ctx.register_csv("tripdata", path, options).await? } - "parquet" => ctx.register_parquet("tripdata", path).await?, + "parquet" => { + ctx.register_parquet("tripdata", path, ParquetReadOptions::default()) + .await? + } other => { println!("Invalid file format '{}'", other); process::exit(-1); diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index d337857b042e..1060bd2e0f94 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -559,7 +559,7 @@ async fn register_tables(path: &str, file_format: &str, ctx: &BallistaContext) { } "parquet" => { let path = format!("{}/{}", path, table); - ctx.register_parquet(table, &path) + ctx.register_parquet(table, &path, ParquetReadOptions::default()) .await .map_err(|e| DataFusionError::Plan(format!("{:?}", e))) .unwrap(); diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index 982f1b4a9cb9..5cdec9b88716 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -31,7 +31,7 @@ async fn main() -> Result<()> { // define the query using the DataFrame trait let df = ctx - .read_parquet(filename) + .read_parquet(filename, ParquetReadOptions::default()) .await? .select_columns(&["id", "bool_col", "timestamp_col"])? .filter(col("id").gt(lit(1)))?; diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index fa277829f0df..703cb702546f 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -98,6 +98,7 @@ impl FlightService for FlightServiceImpl { ctx.register_parquet( "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), + ParquetReadOptions::default(), ) .await .map_err(to_tonic_err)?; diff --git a/datafusion-examples/examples/parquet_sql.rs b/datafusion-examples/examples/parquet_sql.rs index 9d3dd4967507..bcaa05d7e85a 100644 --- a/datafusion-examples/examples/parquet_sql.rs +++ b/datafusion-examples/examples/parquet_sql.rs @@ -31,6 +31,7 @@ async fn main() -> Result<()> { ctx.register_parquet( "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), + ParquetReadOptions::default(), ) .await?; diff --git a/datafusion/core/benches/parquet_query_sql.rs b/datafusion/core/benches/parquet_query_sql.rs index 5416e2c8edb9..08156fad45e2 100644 --- a/datafusion/core/benches/parquet_query_sql.rs +++ b/datafusion/core/benches/parquet_query_sql.rs @@ -24,7 +24,7 @@ use arrow::datatypes::{ }; use arrow::record_batch::RecordBatch; use criterion::{criterion_group, criterion_main, Criterion}; -use datafusion::prelude::SessionContext; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; use parquet::arrow::ArrowWriter; use parquet::file::properties::{WriterProperties, WriterVersion}; use rand::distributions::uniform::SampleUniform; @@ -196,8 +196,12 @@ fn criterion_benchmark(c: &mut Criterion) { let context = SessionContext::new(); let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); - rt.block_on(context.register_parquet("t", file_path.as_str())) - .unwrap(); + rt.block_on(context.register_parquet( + "t", + file_path.as_str(), + ParquetReadOptions::default(), + )) + .unwrap(); // We read the queries from a file so they can be changed without recompiling the benchmark let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap(); diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index aa17d57309b8..ab65d28f7942 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -92,7 +92,9 @@ use chrono::{DateTime, Utc}; use parquet::file::properties::WriterProperties; use uuid::Uuid; -use super::options::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions}; +use super::options::{ + AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, +}; /// The default catalog name - this impacts what SQL queries use if not specified const DEFAULT_CATALOG: &str = "datafusion"; @@ -215,6 +217,7 @@ impl SessionContext { ref location, ref file_type, ref has_header, + ref table_partition_cols, }) => { let (file_format, file_extension) = match file_type { FileType::CSV => ( @@ -241,7 +244,7 @@ impl SessionContext { collect_stat: false, file_extension: file_extension.to_owned(), target_partitions: self.copied_config().target_partitions, - table_partition_cols: vec![], + table_partition_cols: table_partition_cols.clone(), }; // TODO make schema in CreateExternalTable optional instead of empty @@ -462,14 +465,23 @@ impl SessionContext { } /// Creates a DataFrame for reading a Parquet data source. - pub async fn read_parquet(&self, uri: impl Into) -> Result> { + pub async fn read_parquet( + &self, + uri: impl Into, + options: ParquetReadOptions<'_>, + ) -> Result> { let uri: String = uri.into(); let (object_store, path) = self.runtime_env().object_store(&uri)?; let target_partitions = self.copied_config().target_partitions; - let logical_plan = - LogicalPlanBuilder::scan_parquet(object_store, path, None, target_partitions) - .await? - .build()?; + let logical_plan = LogicalPlanBuilder::scan_parquet( + object_store, + path, + options, + None, + target_partitions, + ) + .await? + .build()?; Ok(Arc::new(DataFrame::new(self.state.clone(), &logical_plan))) } @@ -548,20 +560,19 @@ impl SessionContext { /// Registers a Parquet data source so that it can be referenced from SQL statements /// executed against this context. - pub async fn register_parquet(&self, name: &str, uri: &str) -> Result<()> { - let (target_partitions, enable_pruning) = { + pub async fn register_parquet( + &self, + name: &str, + uri: &str, + options: ParquetReadOptions<'_>, + ) -> Result<()> { + let (target_partitions, parquet_pruning) = { let conf = self.copied_config(); (conf.target_partitions, conf.parquet_pruning) }; - let file_format = ParquetFormat::default().with_enable_pruning(enable_pruning); - - let listing_options = ListingOptions { - format: Arc::new(file_format), - collect_stat: true, - file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), - target_partitions, - table_partition_cols: vec![], - }; + let listing_options = options + .parquet_pruning(parquet_pruning) + .to_listing_options(target_partitions); self.register_listing_table(name, uri, listing_options, None) .await?; @@ -3510,7 +3521,9 @@ mod tests { async fn call_read_parquet(&self) -> Arc { let ctx = SessionContext::new(); - ctx.read_parquet("dummy").await.unwrap() + ctx.read_parquet("dummy", ParquetReadOptions::default()) + .await + .unwrap() } } } diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs index d9a0304420d1..b790ca3bfae4 100644 --- a/datafusion/core/src/execution/options.rs +++ b/datafusion/core/src/execution/options.rs @@ -21,14 +21,18 @@ use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; -use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::{ - file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat}, + file_format::{ + avro::{AvroFormat, DEFAULT_AVRO_EXTENSION}, + csv::{CsvFormat, DEFAULT_CSV_EXTENSION}, + json::{JsonFormat, DEFAULT_JSON_EXTENSION}, + parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, + }, listing::ListingOptions, }; /// CSV file read option -#[derive(Copy, Clone)] +#[derive(Clone)] pub struct CsvReadOptions<'a> { /// Does the CSV file have a header? /// @@ -43,8 +47,10 @@ pub struct CsvReadOptions<'a> { /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000. pub schema_infer_max_records: usize, /// File extension; only files with this extension are selected for data input. - /// Defaults to ".csv". + /// Defaults to DEFAULT_CSV_EXTENSION. pub file_extension: &'a str, + /// Partition Columns + pub table_partition_cols: Vec, } impl<'a> Default for CsvReadOptions<'a> { @@ -61,7 +67,8 @@ impl<'a> CsvReadOptions<'a> { schema: None, schema_infer_max_records: 1000, delimiter: b',', - file_extension: ".csv", + file_extension: DEFAULT_CSV_EXTENSION, + table_partition_cols: vec![], } } @@ -97,6 +104,12 @@ impl<'a> CsvReadOptions<'a> { self } + /// Specify table_partition_cols for partition pruning + pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + /// Configure number of max records to read for schema inference pub fn schema_infer_max_records(mut self, max_records: usize) -> Self { self.schema_infer_max_records = max_records; @@ -115,7 +128,58 @@ impl<'a> CsvReadOptions<'a> { collect_stat: false, file_extension: self.file_extension.to_owned(), target_partitions, + table_partition_cols: self.table_partition_cols.clone(), + } + } +} + +/// Parquet read options +#[derive(Clone)] +pub struct ParquetReadOptions<'a> { + /// File extension; only files with this extension are selected for data input. + /// Defaults to ".parquet". + pub file_extension: &'a str, + /// Partition Columns + pub table_partition_cols: Vec, + /// Should DataFusion parquet reader using the predicate to prune data, + /// overridden by value on execution::context::SessionConfig + pub parquet_pruning: bool, +} + +impl<'a> Default for ParquetReadOptions<'a> { + fn default() -> Self { + Self { + file_extension: DEFAULT_PARQUET_EXTENSION, table_partition_cols: vec![], + parquet_pruning: ParquetFormat::default().enable_pruning(), + } + } +} + +impl<'a> ParquetReadOptions<'a> { + /// Specify parquet_pruning + pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self { + self.parquet_pruning = parquet_pruning; + self + } + + /// Specify table_partition_cols for partition pruning + pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + + /// Helper to convert these user facing options to `ListingTable` options + pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { + let file_format = + ParquetFormat::default().with_enable_pruning(self.parquet_pruning); + + ListingOptions { + format: Arc::new(file_format), + collect_stat: true, + file_extension: self.file_extension.to_owned(), + target_partitions, + table_partition_cols: self.table_partition_cols.clone(), } } } @@ -127,20 +191,29 @@ pub struct AvroReadOptions<'a> { pub schema: Option, /// File extension; only files with this extension are selected for data input. - /// Defaults to ".avro". + /// Defaults to DEFAULT_AVRO_EXTENSION. pub file_extension: &'a str, + /// Partition Columns + pub table_partition_cols: Vec, } impl<'a> Default for AvroReadOptions<'a> { fn default() -> Self { Self { schema: None, - file_extension: ".avro", + file_extension: DEFAULT_AVRO_EXTENSION, + table_partition_cols: vec![], } } } impl<'a> AvroReadOptions<'a> { + /// Specify table_partition_cols for partition pruning + pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + /// Helper to convert these user facing options to `ListingTable` options pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { let file_format = AvroFormat::default(); @@ -150,7 +223,7 @@ impl<'a> AvroReadOptions<'a> { collect_stat: false, file_extension: self.file_extension.to_owned(), target_partitions, - table_partition_cols: vec![], + table_partition_cols: self.table_partition_cols.clone(), } } } @@ -165,8 +238,10 @@ pub struct NdJsonReadOptions<'a> { pub schema_infer_max_records: usize, /// File extension; only files with this extension are selected for data input. - /// Defaults to ".json". + /// Defaults to DEFAULT_JSON_EXTENSION. pub file_extension: &'a str, + /// Partition Columns + pub table_partition_cols: Vec, } impl<'a> Default for NdJsonReadOptions<'a> { @@ -175,11 +250,18 @@ impl<'a> Default for NdJsonReadOptions<'a> { schema: None, schema_infer_max_records: 1000, file_extension: DEFAULT_JSON_EXTENSION, + table_partition_cols: vec![], } } } impl<'a> NdJsonReadOptions<'a> { + /// Specify table_partition_cols for partition pruning + pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + /// Helper to convert these user facing options to `ListingTable` options pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { let file_format = JsonFormat::default(); @@ -188,7 +270,7 @@ impl<'a> NdJsonReadOptions<'a> { collect_stat: false, file_extension: self.file_extension.to_owned(), target_partitions, - table_partition_cols: vec![], + table_partition_cols: self.table_partition_cols.clone(), } } } diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 23a16fc472c1..042588f39cb4 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -19,8 +19,7 @@ use crate::datasource::{ empty::EmptyTable, - file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, - listing::{ListingOptions, ListingTable, ListingTableConfig}, + listing::{ListingTable, ListingTableConfig}, MemTable, TableProvider, }; use crate::error::{DataFusionError, Result}; @@ -256,6 +255,7 @@ impl LogicalPlanBuilder { pub async fn scan_parquet( object_store: Arc, path: impl Into, + options: ParquetReadOptions<'_>, projection: Option>, target_partitions: usize, ) -> Result { @@ -263,6 +263,7 @@ impl LogicalPlanBuilder { Self::scan_parquet_with_name( object_store, path.clone(), + options, projection, target_partitions, path, @@ -274,21 +275,12 @@ impl LogicalPlanBuilder { pub async fn scan_parquet_with_name( object_store: Arc, path: impl Into, + options: ParquetReadOptions<'_>, projection: Option>, target_partitions: usize, table_name: impl Into, ) -> Result { - // TODO remove hard coded enable_pruning - let file_format = ParquetFormat::default().with_enable_pruning(true); - - let listing_options = ListingOptions { - format: Arc::new(file_format), - collect_stat: true, - file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), - target_partitions, - table_partition_cols: vec![], - }; - + let listing_options = options.to_listing_options(target_partitions); let path: String = path.into(); // with parquet we resolve the schema in all cases diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index 3acb69ecc120..eeb06d5599a9 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -184,6 +184,8 @@ pub struct CreateExternalTable { pub file_type: FileType, /// Whether the CSV file contains a header pub has_header: bool, + /// Partition Columns + pub table_partition_cols: Vec, } /// Creates a schema. diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index c3bf7f9b201d..9791b2e00e3f 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -487,8 +487,12 @@ mod tests { // register each partition as well as the top level dir let csv_read_option = CsvReadOptions::new().schema(&schema); - ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option) - .await?; + ctx.register_csv( + "part0", + &format!("{}/part-0.csv", out_dir), + csv_read_option.clone(), + ) + .await?; ctx.register_csv("allparts", &out_dir, csv_read_option) .await?; diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 6b784cd6aa92..b573c6075230 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -593,7 +593,7 @@ mod tests { use super::*; use crate::execution::options::CsvReadOptions; - use crate::prelude::{SessionConfig, SessionContext}; + use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use arrow::array::Float32Array; use arrow::{ array::{Int64Array, Int8Array, StringArray}, @@ -1329,15 +1329,32 @@ mod tests { let ctx = SessionContext::new(); // register each partition as well as the top level dir - ctx.register_parquet("part0", &format!("{}/part-0.parquet", out_dir)) - .await?; - ctx.register_parquet("part1", &format!("{}/part-1.parquet", out_dir)) - .await?; - ctx.register_parquet("part2", &format!("{}/part-2.parquet", out_dir)) - .await?; - ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir)) + ctx.register_parquet( + "part0", + &format!("{}/part-0.parquet", out_dir), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet( + "part1", + &format!("{}/part-1.parquet", out_dir), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet( + "part2", + &format!("{}/part-2.parquet", out_dir), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet( + "part3", + &format!("{}/part-3.parquet", out_dir), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default()) .await?; - ctx.register_parquet("allparts", &out_dir).await?; let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?; let allparts = ctx diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 7504dd4385e4..4dbaca203751 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1586,7 +1586,7 @@ mod tests { let logical_plan = LogicalPlanBuilder::scan_csv( Arc::new(LocalFileSystem {}), &path, - options, + options.clone(), None, 1, ) @@ -1686,7 +1686,7 @@ mod tests { let logical_plan = LogicalPlanBuilder::scan_csv( Arc::new(LocalFileSystem {}), &path, - options, + options.clone(), None, 1, ) @@ -1708,7 +1708,7 @@ mod tests { let logical_plan = LogicalPlanBuilder::scan_csv( Arc::new(LocalFileSystem {}), &path, - options, + options.clone(), None, 1, ) diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs index e40693e71566..cd78209326bd 100644 --- a/datafusion/core/src/prelude.rs +++ b/datafusion/core/src/prelude.rs @@ -27,8 +27,9 @@ pub use crate::dataframe::DataFrame; pub use crate::execution::context::{SessionConfig, SessionContext}; -pub use crate::execution::options::AvroReadOptions; -pub use crate::execution::options::{CsvReadOptions, NdJsonReadOptions}; +pub use crate::execution::options::{ + AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, +}; pub use crate::logical_plan::{ approx_percentile_cont, array, ascii, avg, bit_length, btrim, character_length, chr, col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest, in_list, diff --git a/datafusion/core/src/sql/parser.rs b/datafusion/core/src/sql/parser.rs index 6ad105fc6970..566e91c929b7 100644 --- a/datafusion/core/src/sql/parser.rs +++ b/datafusion/core/src/sql/parser.rs @@ -78,6 +78,8 @@ pub struct CreateExternalTable { pub has_header: bool, /// Path to file pub location: String, + /// Partition Columns + pub table_partition_cols: Vec, } /// DataFusion Statement representations. @@ -192,6 +194,35 @@ impl<'a> DFParser<'a> { } } + fn parse_partitions(&mut self) -> Result, ParserError> { + let mut partitions: Vec = vec![]; + if !self.parser.consume_token(&Token::LParen) + || self.parser.consume_token(&Token::RParen) + { + return Ok(partitions); + } + + loop { + if let Token::Word(_) = self.parser.peek_token() { + let identifier = self.parser.parse_identifier()?; + partitions.push(identifier.to_string()); + } else { + return self.expected("partition name", self.parser.peek_token()); + } + let comma = self.parser.consume_token(&Token::Comma); + if self.parser.consume_token(&Token::RParen) { + // allow a trailing comma, even though it's not in standard + break; + } else if !comma { + return self.expected( + "',' or ')' after partition definition", + self.parser.peek_token(), + ); + } + } + Ok(partitions) + } + // This is a copy of the equivalent implementation in sqlparser. fn parse_columns( &mut self, @@ -277,6 +308,12 @@ impl<'a> DFParser<'a> { let has_header = self.parse_csv_has_header(); + let table_partition_cols = if self.parse_has_partition() { + self.parse_partitions()? + } else { + vec![] + }; + self.parser.expect_keyword(Keyword::LOCATION)?; let location = self.parser.parse_literal_string()?; @@ -286,6 +323,7 @@ impl<'a> DFParser<'a> { file_type, has_header, location, + table_partition_cols, }; Ok(Statement::CreateExternalTable(create)) } @@ -314,6 +352,11 @@ impl<'a> DFParser<'a> { & self.consume_token(&Token::make_keyword("HEADER")) & self.consume_token(&Token::make_keyword("ROW")) } + + fn parse_has_partition(&mut self) -> bool { + self.consume_token(&Token::make_keyword("PARTITIONED")) + & self.consume_token(&Token::make_keyword("BY")) + } } #[cfg(test)] @@ -376,6 +419,20 @@ mod tests { file_type: FileType::CSV, has_header: false, location: "foo.csv".into(), + table_partition_cols: vec![], + }); + expect_parse_ok(sql, expected)?; + + // positive case: partitioned by + let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1, p2) LOCATION 'foo.csv'"; + let display = None; + let expected = Statement::CreateExternalTable(CreateExternalTable { + name: "t".into(), + columns: vec![make_column_def("c1", DataType::Int(display))], + file_type: FileType::CSV, + has_header: false, + location: "foo.csv".into(), + table_partition_cols: vec!["p1".to_string(), "p2".to_string()], }); expect_parse_ok(sql, expected)?; @@ -391,6 +448,7 @@ mod tests { file_type: FileType::CSV, has_header: true, location: "foo.csv".into(), + table_partition_cols: vec![], }); expect_parse_ok(sql, expected)?; } @@ -403,6 +461,7 @@ mod tests { file_type: FileType::Parquet, has_header: false, location: "foo.parquet".into(), + table_partition_cols: vec![], }); expect_parse_ok(sql, expected)?; @@ -414,6 +473,7 @@ mod tests { file_type: FileType::Parquet, has_header: false, location: "foo.parquet".into(), + table_partition_cols: vec![], }); expect_parse_ok(sql, expected)?; @@ -425,6 +485,7 @@ mod tests { file_type: FileType::Avro, has_header: false, location: "foo.avro".into(), + table_partition_cols: vec![], }); expect_parse_ok(sql, expected)?; @@ -433,6 +494,11 @@ mod tests { "CREATE EXTERNAL TABLE t(c1 int) STORED AS UNKNOWN_TYPE LOCATION 'foo.csv'"; expect_parse_error(sql, "expect one of PARQUET, AVRO, NDJSON, or CSV"); + // Error cases: partition column does not support type + let sql = + "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'"; + expect_parse_error(sql, "sql parser error: Expected ',' or ')' after partition definition, found: int"); + Ok(()) } } diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 886052156305..cf14127bd9d8 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -308,6 +308,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { file_type, has_header, location, + table_partition_cols, } = statement; // semantic checks @@ -333,6 +334,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { location, file_type, has_header, + table_partition_cols, })) } diff --git a/datafusion/core/tests/parquet_pruning.rs b/datafusion/core/tests/parquet_pruning.rs index 4301e6556b57..d5392e9dcbff 100644 --- a/datafusion/core/tests/parquet_pruning.rs +++ b/datafusion/core/tests/parquet_pruning.rs @@ -37,7 +37,7 @@ use datafusion::{ accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan, ExecutionPlanVisitor, }, - prelude::{SessionConfig, SessionContext}, + prelude::{ParquetReadOptions, SessionConfig, SessionContext}, scalar::ScalarValue, }; use parquet::{arrow::ArrowWriter, file::properties::WriterProperties}; @@ -482,7 +482,9 @@ impl ContextWithParquet { // now, setup a the file as a data source and run a query against it let ctx = SessionContext::with_config(config); - ctx.register_parquet("t", &parquet_path).await.unwrap(); + ctx.register_parquet("t", &parquet_path, ParquetReadOptions::default()) + .await + .unwrap(); let provider = ctx.deregister_table("t").unwrap().unwrap(); ctx.register_table("t", provider.clone()).unwrap(); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 724fffce7bbf..a788316fd8da 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -624,6 +624,7 @@ async fn register_alltypes_parquet(ctx: &SessionContext) { ctx.register_parquet( "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), + ParquetReadOptions::default(), ) .await .unwrap(); diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index 6dfb37ed3c7e..7c1bb7c436b3 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -52,9 +52,13 @@ async fn parquet_query() { async fn parquet_single_nan_schema() { let ctx = SessionContext::new(); let testdata = datafusion::test_util::parquet_test_data(); - ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata)) - .await - .unwrap(); + ctx.register_parquet( + "single_nan", + &format!("{}/single_nan.parquet", testdata), + ParquetReadOptions::default(), + ) + .await + .unwrap(); let sql = "SELECT mycol FROM single_nan"; let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); @@ -75,6 +79,7 @@ async fn parquet_list_columns() { ctx.register_parquet( "list_columns", &format!("{}/list_columns.parquet", testdata), + ParquetReadOptions::default(), ) .await .unwrap(); @@ -214,7 +219,10 @@ async fn schema_merge_ignores_metadata() { // (no errors) let ctx = SessionContext::new(); let df = ctx - .read_parquet(table_dir.to_str().unwrap().to_string()) + .read_parquet( + table_dir.to_str().unwrap().to_string(), + ParquetReadOptions::default(), + ) .await .unwrap(); let result = df.collect().await.unwrap(); diff --git a/docs/source/user-guide/sql/ddl.md b/docs/source/user-guide/sql/ddl.md index eb10f40de5a6..75ec0f6cb0fa 100644 --- a/docs/source/user-guide/sql/ddl.md +++ b/docs/source/user-guide/sql/ddl.md @@ -55,6 +55,21 @@ WITH HEADER ROW LOCATION '/path/to/aggregate_test_100.csv'; ``` +If data sources are already partitioned in Hive style, `PARTITIONED BY` can be used for partition pruning. + +``` +/mnt/nyctaxi/year=2022/month=01/tripdata.parquet +/mnt/nyctaxi/year=2021/month=12/tripdata.parquet +/mnt/nyctaxi/year=2021/month=11/tripdata.parquet +``` + +```sql +CREATE EXTERNAL TABLE taxi +STORED AS PARQUET +PARTITIONED BY (year, month) +LOCATION '/mnt/nyctaxi'; +``` + ## CREATE MEMORY TABLE Memory table can be created with query.