diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs index 2b7a5f6fe268..bf5bd94a5243 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs @@ -15,9 +15,8 @@ // specific language governing permissions and limitations // under the License. -use datafusion::datasource::file_format::parquet::{ - ParquetFormat, DEFAULT_PARQUET_EXTENSION, -}; +use datafusion::datasource::file_format::file_type::{FileType, GetExt}; +use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::error::Result; use datafusion::prelude::*; @@ -35,7 +34,7 @@ async fn main() -> Result<()> { // Configure listing options let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions { - file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), + file_extension: FileType::PARQUET.get_ext(), format: Arc::new(file_format), table_partition_cols: vec![], collect_stat: true, diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 9157e04a2d21..c2d3e389cae5 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -318,6 +318,12 @@ impl Display for DataFusionError { impl error::Error for DataFusionError {} +impl From for io::Error { + fn from(e: DataFusionError) -> Self { + io::Error::new(io::ErrorKind::Other, e) + } +} + #[cfg(test)] mod test { use crate::error::DataFusionError; diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 097ee3158aee..b43e0da25cdf 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -57,8 +57,10 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } apache-avro = { version = "0.14", optional = true } arrow = { version = "24.0.0", features = ["prettyprint"] } +async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "futures-io", "tokio"] } async-trait = "0.1.41" bytes = "1.1" +bzip2 = "0.4.3" chrono = { version = "0.4", default-features = false } datafusion-common = { path = "../common", version = "13.0.0", features = ["parquet", "object_store"] } datafusion-expr = { path = "../expr", version = "13.0.0" } @@ -67,6 +69,7 @@ datafusion-optimizer = { path = "../optimizer", version = "13.0.0" } datafusion-physical-expr = { path = "../physical-expr", version = "13.0.0" } datafusion-row = { path = "../row", version = "13.0.0" } datafusion-sql = { path = "../sql", version = "13.0.0" } +flate2 = "1.0.24" futures = "0.3" glob = "0.3.0" hashbrown = { version = "0.12", features = ["raw"] } @@ -89,6 +92,7 @@ sqlparser = "0.25" tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } tokio-stream = "0.1" +tokio-util = { version = "0.7.4", features = ["io"] } url = "2.2" uuid = { version = "1.0", features = ["v4"] } @@ -101,6 +105,7 @@ ctor = "0.1.22" doc-comment = "0.3" env_logger = "0.9" fuzz-utils = { path = "fuzz-utils" } +rstest = "0.15.0" [[bench]] harness = false diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 2e6994f4b618..6a99e35b812c 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -18,16 +18,21 @@ //! CSV format abstractions use std::any::Any; + use std::sync::Arc; use arrow::datatypes::Schema; use arrow::{self, datatypes::SchemaRef}; use async_trait::async_trait; +use bytes::Buf; + use datafusion_common::DataFusionError; + use futures::TryFutureExt; use object_store::{ObjectMeta, ObjectStore}; use super::FileFormat; +use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::error::Result; use crate::logical_plan::Expr; @@ -43,6 +48,7 @@ pub struct CsvFormat { has_header: bool, delimiter: u8, schema_infer_max_rec: Option, + file_compression_type: FileCompressionType, } impl Default for CsvFormat { @@ -51,6 +57,7 @@ impl Default for CsvFormat { schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD), has_header: true, delimiter: b',', + file_compression_type: FileCompressionType::UNCOMPRESSED, } } } @@ -82,6 +89,16 @@ impl CsvFormat { self } + /// Set a `FileCompressionType` of CSV + /// - defaults to `FileCompressionType::UNCOMPRESSED` + pub fn with_file_compression_type( + mut self, + file_compression_type: FileCompressionType, + ) -> Self { + self.file_compression_type = file_compression_type; + self + } + /// The delimiter character. pub fn delimiter(&self) -> u8 { self.delimiter @@ -110,8 +127,9 @@ impl FileFormat for CsvFormat { .await .map_err(|e| DataFusionError::External(Box::new(e)))?; + let decoder = self.file_compression_type.convert_read(data.reader()); let (schema, records_read) = arrow::csv::reader::infer_reader_schema( - &mut data.as_ref(), + decoder, self.delimiter, Some(records_to_read), self.has_header, @@ -144,7 +162,12 @@ impl FileFormat for CsvFormat { conf: FileScanConfig, _filters: &[Expr], ) -> Result> { - let exec = CsvExec::new(conf, self.has_header, self.delimiter); + let exec = CsvExec::new( + conf, + self.has_header, + self.delimiter, + self.file_compression_type.to_owned(), + ); Ok(Arc::new(exec)) } } diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs new file mode 100644 index 000000000000..f08a21ca1c26 --- /dev/null +++ b/datafusion/core/src/datasource/file_format/file_type.rs @@ -0,0 +1,331 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! File type abstraction + +use crate::error::{DataFusionError, Result}; +use std::io::Error; + +use async_compression::tokio::bufread::{ + BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder, +}; +use bzip2::read::BzDecoder; + +use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; +use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; +use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; +use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; +use bytes::Bytes; +use flate2::read::GzDecoder; +use futures::{Stream, TryStreamExt}; +use std::str::FromStr; +use tokio_util::io::{ReaderStream, StreamReader}; + +/// Define each `FileType`/`FileCompressionType`'s extension +pub trait GetExt { + /// File extension getter + fn get_ext(&self) -> String; +} + +/// Readable file compression type +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FileCompressionType { + /// Gzip-ed file + GZIP, + /// Bzip2-ed file + BZIP2, + /// Uncompressed file + UNCOMPRESSED, +} + +impl GetExt for FileCompressionType { + fn get_ext(&self) -> String { + match self { + FileCompressionType::GZIP => ".gz".to_owned(), + FileCompressionType::BZIP2 => ".bz2".to_owned(), + FileCompressionType::UNCOMPRESSED => "".to_owned(), + } + } +} + +impl FromStr for FileCompressionType { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + let s = s.to_uppercase(); + match s.as_str() { + "GZIP" | "GZ" => Ok(FileCompressionType::GZIP), + "BZIP2" | "BZ2" => Ok(FileCompressionType::BZIP2), + "" => Ok(FileCompressionType::UNCOMPRESSED), + _ => Err(DataFusionError::NotImplemented(format!( + "Unknown FileCompressionType: {}", + s + ))), + } + } +} + +/// `FileCompressionType` implementation +impl FileCompressionType { + /// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`. + pub fn convert_stream> + Unpin + Send + 'static>( + &self, + s: T, + ) -> Box> + Send + Unpin> { + let err_converter = |e: Error| match e + .get_ref() + .and_then(|e| e.downcast_ref::()) + { + Some(_) => { + *(e.into_inner() + .unwrap() + .downcast::() + .unwrap()) + } + None => Into::::into(e), + }; + + match self { + FileCompressionType::GZIP => Box::new( + ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s))) + .map_err(err_converter), + ), + FileCompressionType::BZIP2 => Box::new( + ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s))) + .map_err(err_converter), + ), + FileCompressionType::UNCOMPRESSED => Box::new(s), + } + } + + /// Given a `Read`, create a `Read` which data are decompressed with `FileCompressionType`. + pub fn convert_read( + &self, + r: T, + ) -> Box { + match self { + FileCompressionType::GZIP => Box::new(GzDecoder::new(r)), + FileCompressionType::BZIP2 => Box::new(BzDecoder::new(r)), + FileCompressionType::UNCOMPRESSED => Box::new(r), + } + } +} + +/// Readable file type +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FileType { + /// Apache Avro file + AVRO, + /// Apache Parquet file + PARQUET, + /// CSV file + CSV, + /// JSON file + JSON, +} + +impl GetExt for FileType { + fn get_ext(&self) -> String { + match self { + FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(), + FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(), + FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(), + FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(), + } + } +} + +impl FromStr for FileType { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + let s = s.to_uppercase(); + match s.as_str() { + "AVRO" => Ok(FileType::AVRO), + "PARQUET" => Ok(FileType::PARQUET), + "CSV" => Ok(FileType::CSV), + "JSON" => Ok(FileType::JSON), + _ => Err(DataFusionError::NotImplemented(format!( + "Unknown FileType: {}", + s + ))), + } + } +} + +impl FileType { + /// Given a `FileCompressionType`, return the `FileType`'s extension with compression suffix + pub fn get_ext_with_compression(&self, c: FileCompressionType) -> Result { + let ext = self.get_ext(); + + match self { + FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())), + FileType::PARQUET | FileType::AVRO => match c { + FileCompressionType::UNCOMPRESSED => Ok(ext), + _ => Err(DataFusionError::Internal( + "FileCompressionType can be specified for CSV/JSON FileType.".into(), + )), + }, + } + } +} + +#[cfg(test)] +mod tests { + use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; + use crate::error::DataFusionError; + use std::str::FromStr; + + #[test] + fn get_ext_with_compression() { + let file_type = FileType::CSV; + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) + .unwrap(), + ".csv" + ); + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::GZIP) + .unwrap(), + ".csv.gz" + ); + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::BZIP2) + .unwrap(), + ".csv.bz2" + ); + + let file_type = FileType::JSON; + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) + .unwrap(), + ".json" + ); + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::GZIP) + .unwrap(), + ".json.gz" + ); + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::BZIP2) + .unwrap(), + ".json.bz2" + ); + + let file_type = FileType::AVRO; + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) + .unwrap(), + ".avro" + ); + assert!(matches!( + file_type.get_ext_with_compression(FileCompressionType::GZIP), + Err(DataFusionError::Internal(_)) + )); + assert!(matches!( + file_type.get_ext_with_compression(FileCompressionType::BZIP2), + Err(DataFusionError::Internal(_)) + )); + + let file_type = FileType::PARQUET; + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) + .unwrap(), + ".parquet" + ); + assert!(matches!( + file_type.get_ext_with_compression(FileCompressionType::GZIP), + Err(DataFusionError::Internal(_)) + )); + assert!(matches!( + file_type.get_ext_with_compression(FileCompressionType::BZIP2), + Err(DataFusionError::Internal(_)) + )); + } + + #[test] + fn from_str() { + assert_eq!(FileType::from_str("csv").unwrap(), FileType::CSV); + assert_eq!(FileType::from_str("CSV").unwrap(), FileType::CSV); + + assert_eq!(FileType::from_str("json").unwrap(), FileType::JSON); + assert_eq!(FileType::from_str("JSON").unwrap(), FileType::JSON); + + assert_eq!(FileType::from_str("avro").unwrap(), FileType::AVRO); + assert_eq!(FileType::from_str("AVRO").unwrap(), FileType::AVRO); + + assert_eq!(FileType::from_str("parquet").unwrap(), FileType::PARQUET); + assert_eq!(FileType::from_str("PARQUET").unwrap(), FileType::PARQUET); + + assert!(matches!( + FileType::from_str("Unknown"), + Err(DataFusionError::NotImplemented(_)) + )); + + assert_eq!( + FileCompressionType::from_str("gz").unwrap(), + FileCompressionType::GZIP + ); + assert_eq!( + FileCompressionType::from_str("GZ").unwrap(), + FileCompressionType::GZIP + ); + assert_eq!( + FileCompressionType::from_str("gzip").unwrap(), + FileCompressionType::GZIP + ); + assert_eq!( + FileCompressionType::from_str("GZIP").unwrap(), + FileCompressionType::GZIP + ); + + assert_eq!( + FileCompressionType::from_str("bz2").unwrap(), + FileCompressionType::BZIP2 + ); + assert_eq!( + FileCompressionType::from_str("BZ2").unwrap(), + FileCompressionType::BZIP2 + ); + assert_eq!( + FileCompressionType::from_str("bzip2").unwrap(), + FileCompressionType::BZIP2 + ); + assert_eq!( + FileCompressionType::from_str("BZIP2").unwrap(), + FileCompressionType::BZIP2 + ); + + assert_eq!( + FileCompressionType::from_str("").unwrap(), + FileCompressionType::UNCOMPRESSED + ); + + assert!(matches!( + FileCompressionType::from_str("Unknown"), + Err(DataFusionError::NotImplemented(_)) + )); + } +} diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 9a889ab4c0f5..02a684e85244 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -18,6 +18,7 @@ //! Line delimited JSON format abstractions use std::any::Any; + use std::io::BufReader; use std::sync::Arc; @@ -26,10 +27,13 @@ use arrow::datatypes::SchemaRef; use arrow::json::reader::infer_json_schema_from_iterator; use arrow::json::reader::ValueIter; use async_trait::async_trait; +use bytes::Buf; + use object_store::{GetResult, ObjectMeta, ObjectStore}; use super::FileFormat; use super::FileScanConfig; +use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::error::Result; use crate::logical_plan::Expr; @@ -43,12 +47,14 @@ pub const DEFAULT_JSON_EXTENSION: &str = ".json"; #[derive(Debug)] pub struct JsonFormat { schema_infer_max_rec: Option, + file_compression_type: FileCompressionType, } impl Default for JsonFormat { fn default() -> Self { Self { schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD), + file_compression_type: FileCompressionType::UNCOMPRESSED, } } } @@ -60,6 +66,16 @@ impl JsonFormat { self.schema_infer_max_rec = max_rec; self } + + /// Set a `FileCompressionType` of JSON + /// - defaults to `FileCompressionType::UNCOMPRESSED` + pub fn with_file_compression_type( + mut self, + file_compression_type: FileCompressionType, + ) -> Self { + self.file_compression_type = file_compression_type; + self + } } #[async_trait] @@ -75,6 +91,7 @@ impl FileFormat for JsonFormat { ) -> Result { let mut schemas = Vec::new(); let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX); + let file_compression_type = self.file_compression_type.to_owned(); for object in objects { let mut take_while = || { let should_take = records_to_read > 0; @@ -86,13 +103,15 @@ impl FileFormat for JsonFormat { let schema = match store.get(&object.location).await? { GetResult::File(file, _) => { - let mut reader = BufReader::new(file); + let decoder = file_compression_type.convert_read(file); + let mut reader = BufReader::new(decoder); let iter = ValueIter::new(&mut reader, None); infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? } r @ GetResult::Stream(_) => { let data = r.bytes().await?; - let mut reader = BufReader::new(data.as_ref()); + let decoder = file_compression_type.convert_read(data.reader()); + let mut reader = BufReader::new(decoder); let iter = ValueIter::new(&mut reader, None); infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? } @@ -122,7 +141,7 @@ impl FileFormat for JsonFormat { conf: FileScanConfig, _filters: &[Expr], ) -> Result> { - let exec = NdJsonExec::new(conf); + let exec = NdJsonExec::new(conf, self.file_compression_type.to_owned()); Ok(Arc::new(exec)) } } diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index b16525c7a4f6..7b9421bc7d71 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -22,6 +22,7 @@ pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000; pub mod avro; pub mod csv; +pub mod file_type; pub mod json; pub mod parquet; diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 0f93b610fd8f..70c8af120a04 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -18,6 +18,7 @@ //! The table implementation. use ahash::HashMap; +use std::str::FromStr; use std::{any::Any, sync::Arc}; use arrow::datatypes::{Field, Schema, SchemaRef}; @@ -27,6 +28,7 @@ use object_store::path::Path; use object_store::ObjectMeta; use parking_lot::RwLock; +use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; use crate::datasource::{ file_format::{ avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, @@ -102,17 +104,39 @@ impl ListingTableConfig { } } - fn infer_format(suffix: &str) -> Result> { - match suffix { - "avro" => Ok(Arc::new(AvroFormat::default())), - "csv" => Ok(Arc::new(CsvFormat::default())), - "json" => Ok(Arc::new(JsonFormat::default())), - "parquet" => Ok(Arc::new(ParquetFormat::default())), - _ => Err(DataFusionError::Internal(format!( - "Unable to infer file type from suffix {}", - suffix - ))), + fn infer_format(path: &str) -> Result<(Arc, String)> { + let err_msg = format!("Unable to infer file type from path: {}", path); + + let mut exts = path.rsplit('.'); + + let mut splitted = exts.next().unwrap_or(""); + + let file_compression_type = FileCompressionType::from_str(splitted) + .unwrap_or(FileCompressionType::UNCOMPRESSED); + + if file_compression_type != FileCompressionType::UNCOMPRESSED { + splitted = exts.next().unwrap_or(""); } + + let file_type = FileType::from_str(splitted) + .map_err(|_| DataFusionError::Internal(err_msg.to_owned()))?; + + let ext = file_type + .get_ext_with_compression(file_compression_type.to_owned()) + .map_err(|_| DataFusionError::Internal(err_msg))?; + + let file_format: Arc = match file_type { + FileType::AVRO => Arc::new(AvroFormat::default()), + FileType::CSV => Arc::new( + CsvFormat::default().with_file_compression_type(file_compression_type), + ), + FileType::JSON => Arc::new( + JsonFormat::default().with_file_compression_type(file_compression_type), + ), + FileType::PARQUET => Arc::new(ParquetFormat::default()), + }; + + Ok((file_format, ext)) } /// Infer `ListingOptions` based on `table_path` suffix. @@ -130,16 +154,13 @@ impl ListingTableConfig { .await .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??; - let file_type = file.location.as_ref().rsplit('.').next().ok_or_else(|| { - DataFusionError::Internal("Unable to infer file suffix".into()) - })?; - - let format = ListingTableConfig::infer_format(file_type)?; + let (format, file_extension) = + ListingTableConfig::infer_format(file.location.as_ref())?; let listing_options = ListingOptions { format, collect_stat: true, - file_extension: file_type.to_string(), + file_extension, target_partitions: ctx.config.target_partitions, table_partition_cols: vec![], }; @@ -474,7 +495,7 @@ impl ListingTable { #[cfg(test)] mod tests { - use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; + use crate::datasource::file_format::file_type::GetExt; use crate::prelude::SessionContext; use crate::{ datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat}, @@ -537,7 +558,7 @@ mod tests { register_test_store(&ctx, &[(&path, 100)]); let opt = ListingOptions { - file_extension: DEFAULT_AVRO_EXTENSION.to_owned(), + file_extension: FileType::AVRO.get_ext(), format: Arc::new(AvroFormat {}), table_partition_cols: vec![String::from("p1")], target_partitions: 4, diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index d0baa035edb1..35670f21fcee 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -24,10 +24,7 @@ use crate::{ datasource::listing::{ListingOptions, ListingTable}, datasource::{ file_format::{ - avro::{AvroFormat, DEFAULT_AVRO_EXTENSION}, - csv::{CsvFormat, DEFAULT_CSV_EXTENSION}, - json::{JsonFormat, DEFAULT_JSON_EXTENSION}, - parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, + avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, FileFormat, }, MemTable, ViewTable, @@ -42,6 +39,7 @@ use crate::{ pub use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::var_provider::is_system_variables; use parking_lot::RwLock; +use std::str::FromStr; use std::sync::Arc; use std::{ any::{Any, TypeId}, @@ -81,6 +79,7 @@ use crate::config::{ OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES, }; use crate::datasource::datasource::TableProviderFactory; +use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; use crate::execution::runtime_env::RuntimeEnv; use crate::logical_expr::Explain; use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet}; @@ -100,6 +99,7 @@ use datafusion_sql::{ planner::{ContextProvider, SqlToRel}, }; use parquet::file::properties::WriterProperties; + use uuid::Uuid; use super::options::{ @@ -448,31 +448,38 @@ impl SessionContext { &self, cmd: &CreateExternalTable, ) -> Result> { - let (file_format, file_extension) = match cmd.file_type.as_str() { - "CSV" => ( - Arc::new( - CsvFormat::default() - .with_has_header(cmd.has_header) - .with_delimiter(cmd.delimiter as u8), - ) as Arc, - DEFAULT_CSV_EXTENSION, - ), - "PARQUET" => ( - Arc::new(ParquetFormat::default()) as Arc, - DEFAULT_PARQUET_EXTENSION, - ), - "AVRO" => ( - Arc::new(AvroFormat::default()) as Arc, - DEFAULT_AVRO_EXTENSION, - ), - "JSON" => ( - Arc::new(JsonFormat::default()) as Arc, - DEFAULT_JSON_EXTENSION, - ), - _ => Err(DataFusionError::Execution( + let file_compression_type = + match FileCompressionType::from_str(cmd.file_compression_type.as_str()) { + Ok(t) => t, + Err(_) => Err(DataFusionError::Execution( + "Only known FileCompressionTypes can be ListingTables!".to_string(), + ))?, + }; + + let file_type = match FileType::from_str(cmd.file_type.as_str()) { + Ok(t) => t, + Err(_) => Err(DataFusionError::Execution( "Only known FileTypes can be ListingTables!".to_string(), ))?, }; + + let file_extension = + file_type.get_ext_with_compression(file_compression_type.to_owned())?; + + let file_format: Arc = match file_type { + FileType::CSV => Arc::new( + CsvFormat::default() + .with_has_header(cmd.has_header) + .with_delimiter(cmd.delimiter as u8) + .with_file_compression_type(file_compression_type), + ), + FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::AVRO => Arc::new(AvroFormat::default()), + FileType::JSON => Arc::new( + JsonFormat::default().with_file_compression_type(file_compression_type), + ), + }; + let table = self.table(cmd.name.as_str()); match (cmd.if_not_exists, table) { (true, Ok(_)) => self.return_empty_dataframe(), diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs index e296d18b6f60..9ddd3f1d686c 100644 --- a/datafusion/core/src/execution/options.rs +++ b/datafusion/core/src/execution/options.rs @@ -21,13 +21,15 @@ use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; +use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; +use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; +use crate::datasource::file_format::file_type::FileCompressionType; +use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; +use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::datasource::{ file_format::{ - avro::{AvroFormat, DEFAULT_AVRO_EXTENSION}, - csv::{CsvFormat, DEFAULT_CSV_EXTENSION}, - json::{JsonFormat, DEFAULT_JSON_EXTENSION}, - parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, + avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, }, listing::ListingOptions, }; @@ -48,10 +50,13 @@ pub struct CsvReadOptions<'a> { /// Max number of rows to read from CSV files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`. pub schema_infer_max_records: usize, /// File extension; only files with this extension are selected for data input. - /// Defaults to DEFAULT_CSV_EXTENSION. + /// Defaults to `FileType::CSV.get_ext().as_str()`. pub file_extension: &'a str, /// Partition Columns pub table_partition_cols: Vec, + + /// File compression type + pub file_compression_type: FileCompressionType, } impl<'a> Default for CsvReadOptions<'a> { @@ -70,6 +75,7 @@ impl<'a> CsvReadOptions<'a> { delimiter: b',', file_extension: DEFAULT_CSV_EXTENSION, table_partition_cols: vec![], + file_compression_type: FileCompressionType::UNCOMPRESSED, } } @@ -117,12 +123,22 @@ impl<'a> CsvReadOptions<'a> { self } + /// Configure file compression type + pub fn file_compression_type( + mut self, + file_compression_type: FileCompressionType, + ) -> Self { + self.file_compression_type = file_compression_type; + self + } + /// Helper to convert these user facing options to `ListingTable` options pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { let file_format = CsvFormat::default() .with_has_header(self.has_header) .with_delimiter(self.delimiter) - .with_schema_infer_max_rec(Some(self.schema_infer_max_records)); + .with_schema_infer_max_rec(Some(self.schema_infer_max_records)) + .with_file_compression_type(self.file_compression_type.to_owned()); ListingOptions { format: Arc::new(file_format), @@ -154,6 +170,7 @@ pub struct ParquetReadOptions<'a> { impl<'a> Default for ParquetReadOptions<'a> { fn default() -> Self { let format_default = ParquetFormat::default(); + Self { file_extension: DEFAULT_PARQUET_EXTENSION, table_partition_cols: vec![], @@ -207,7 +224,7 @@ pub struct AvroReadOptions<'a> { pub schema: Option, /// File extension; only files with this extension are selected for data input. - /// Defaults to DEFAULT_AVRO_EXTENSION. + /// Defaults to `FileType::AVRO.get_ext().as_str()`. pub file_extension: &'a str, /// Partition Columns pub table_partition_cols: Vec, @@ -254,10 +271,13 @@ pub struct NdJsonReadOptions<'a> { pub schema_infer_max_records: usize, /// File extension; only files with this extension are selected for data input. - /// Defaults to DEFAULT_JSON_EXTENSION. + /// Defaults to `FileType::JSON.get_ext().as_str()`. pub file_extension: &'a str, /// Partition Columns pub table_partition_cols: Vec, + + /// File compression type + pub file_compression_type: FileCompressionType, } impl<'a> Default for NdJsonReadOptions<'a> { @@ -267,6 +287,7 @@ impl<'a> Default for NdJsonReadOptions<'a> { schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD, file_extension: DEFAULT_JSON_EXTENSION, table_partition_cols: vec![], + file_compression_type: FileCompressionType::UNCOMPRESSED, } } } @@ -278,9 +299,25 @@ impl<'a> NdJsonReadOptions<'a> { self } + /// Specify file_extension + pub fn file_extension(mut self, file_extension: &'a str) -> Self { + self.file_extension = file_extension; + self + } + + /// Specify file_compression_type + pub fn file_compression_type( + mut self, + file_compression_type: FileCompressionType, + ) -> Self { + self.file_compression_type = file_compression_type; + self + } + /// Helper to convert these user facing options to `ListingTable` options pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { - let file_format = JsonFormat::default(); + let file_format = JsonFormat::default() + .with_file_compression_type(self.file_compression_type.to_owned()); ListingOptions { format: Arc::new(file_format), collect_stat: false, diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 885bea870bd8..03e1e8059471 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -17,6 +17,7 @@ //! Execution plan for reading CSV files +use crate::datasource::file_format::file_type::FileCompressionType; use crate::error::{DataFusionError, Result}; use crate::execution::context::{SessionState, TaskContext}; use crate::physical_plan::expressions::PhysicalSortExpr; @@ -31,7 +32,9 @@ use crate::physical_plan::{ }; use arrow::csv; use arrow::datatypes::SchemaRef; + use bytes::Buf; + use futures::{StreamExt, TryStreamExt}; use object_store::{GetResult, ObjectStore}; use std::any::Any; @@ -52,11 +55,17 @@ pub struct CsvExec { delimiter: u8, /// Execution metrics metrics: ExecutionPlanMetricsSet, + file_compression_type: FileCompressionType, } impl CsvExec { /// Create a new CSV reader execution plan provided base and specific configurations - pub fn new(base_config: FileScanConfig, has_header: bool, delimiter: u8) -> Self { + pub fn new( + base_config: FileScanConfig, + has_header: bool, + delimiter: u8, + file_compression_type: FileCompressionType, + ) -> Self { let (projected_schema, projected_statistics) = base_config.project(); Self { @@ -66,6 +75,7 @@ impl CsvExec { has_header, delimiter, metrics: ExecutionPlanMetricsSet::new(), + file_compression_type, } } @@ -132,7 +142,10 @@ impl ExecutionPlan for CsvExec { delimiter: self.delimiter, }); - let opener = CsvOpener { config }; + let opener = CsvOpener { + config, + file_compression_type: self.file_compression_type.to_owned(), + }; let stream = FileStream::new( &self.base_config, partition, @@ -194,6 +207,7 @@ impl CsvConfig { struct CsvOpener { config: Arc, + file_compression_type: FileCompressionType, } impl FileOpener for CsvOpener { @@ -203,14 +217,18 @@ impl FileOpener for CsvOpener { file_meta: FileMeta, ) -> Result { let config = self.config.clone(); + let file_compression_type = self.file_compression_type.to_owned(); Ok(Box::pin(async move { match store.get(file_meta.location()).await? { GetResult::File(file, _) => { - Ok(futures::stream::iter(config.open(file, true)).boxed()) + let decoder = file_compression_type.convert_read(file); + Ok(futures::stream::iter(config.open(decoder, true)).boxed()) } GetResult::Stream(s) => { let mut first_chunk = true; - Ok(newline_delimited_stream(s.map_err(Into::into)) + let s = s.map_err(Into::::into); + let decoder = file_compression_type.convert_stream(s); + Ok(newline_delimited_stream(decoder) .map_ok(move |bytes| { let reader = config.open(bytes.reader(), first_chunk); first_chunk = false; @@ -265,28 +283,48 @@ pub async fn plan_to_csv( #[cfg(test)] mod tests { use super::*; + use crate::datasource::file_format::file_type::FileType; use crate::physical_plan::file_format::chunked_store::ChunkedStore; use crate::prelude::*; - use crate::test::partitioned_csv_config; - use crate::test_util::aggr_test_schema_with_missing_col; + use crate::test::{partitioned_csv_config, partitioned_file_groups}; + use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data}; use crate::{scalar::ScalarValue, test_util::aggr_test_schema}; use arrow::datatypes::*; use futures::StreamExt; use object_store::local::LocalFileSystem; + use rstest::*; use std::fs::File; use std::io::Write; use tempfile::TempDir; + #[rstest( + file_compression_type, + case(FileCompressionType::UNCOMPRESSED), + case(FileCompressionType::GZIP), + case(FileCompressionType::BZIP2) + )] #[tokio::test] - async fn csv_exec_with_projection() -> Result<()> { + async fn csv_exec_with_projection( + file_compression_type: FileCompressionType, + ) -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let file_schema = aggr_test_schema(); + let path = format!("{}/csv", arrow_test_data()); let filename = "aggregate_test_100.csv"; - let mut config = partitioned_csv_config(filename, file_schema, 1)?; + + let file_groups = partitioned_file_groups( + path.as_str(), + filename, + 1, + FileType::CSV, + file_compression_type.to_owned(), + )?; + + let mut config = partitioned_csv_config(file_schema, file_groups)?; config.projection = Some(vec![0, 2, 4]); - let csv = CsvExec::new(config, true, b','); + let csv = CsvExec::new(config, true, b',', file_compression_type.to_owned()); assert_eq!(13, csv.base_config.file_schema.fields().len()); assert_eq!(3, csv.projected_schema.fields().len()); assert_eq!(3, csv.schema().fields().len()); @@ -313,16 +351,34 @@ mod tests { Ok(()) } + #[rstest( + file_compression_type, + case(FileCompressionType::UNCOMPRESSED), + case(FileCompressionType::GZIP), + case(FileCompressionType::BZIP2) + )] #[tokio::test] - async fn csv_exec_with_limit() -> Result<()> { + async fn csv_exec_with_limit( + file_compression_type: FileCompressionType, + ) -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let file_schema = aggr_test_schema(); + let path = format!("{}/csv", arrow_test_data()); let filename = "aggregate_test_100.csv"; - let mut config = partitioned_csv_config(filename, file_schema, 1)?; + + let file_groups = partitioned_file_groups( + path.as_str(), + filename, + 1, + FileType::CSV, + file_compression_type.to_owned(), + )?; + + let mut config = partitioned_csv_config(file_schema, file_groups)?; config.limit = Some(5); - let csv = CsvExec::new(config, true, b','); + let csv = CsvExec::new(config, true, b',', file_compression_type.to_owned()); assert_eq!(13, csv.base_config.file_schema.fields().len()); assert_eq!(13, csv.projected_schema.fields().len()); assert_eq!(13, csv.schema().fields().len()); @@ -349,16 +405,34 @@ mod tests { Ok(()) } + #[rstest( + file_compression_type, + case(FileCompressionType::UNCOMPRESSED), + case(FileCompressionType::GZIP), + case(FileCompressionType::BZIP2) + )] #[tokio::test] - async fn csv_exec_with_missing_column() -> Result<()> { + async fn csv_exec_with_missing_column( + file_compression_type: FileCompressionType, + ) -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let file_schema = aggr_test_schema_with_missing_col(); + let path = format!("{}/csv", arrow_test_data()); let filename = "aggregate_test_100.csv"; - let mut config = partitioned_csv_config(filename, file_schema, 1)?; + + let file_groups = partitioned_file_groups( + path.as_str(), + filename, + 1, + FileType::CSV, + file_compression_type.to_owned(), + )?; + + let mut config = partitioned_csv_config(file_schema, file_groups)?; config.limit = Some(5); - let csv = CsvExec::new(config, true, b','); + let csv = CsvExec::new(config, true, b',', file_compression_type.to_owned()); assert_eq!(14, csv.base_config.file_schema.fields().len()); assert_eq!(14, csv.projected_schema.fields().len()); assert_eq!(14, csv.schema().fields().len()); @@ -385,13 +459,31 @@ mod tests { Ok(()) } + #[rstest( + file_compression_type, + case(FileCompressionType::UNCOMPRESSED), + case(FileCompressionType::GZIP), + case(FileCompressionType::BZIP2) + )] #[tokio::test] - async fn csv_exec_with_partition() -> Result<()> { + async fn csv_exec_with_partition( + file_compression_type: FileCompressionType, + ) -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let file_schema = aggr_test_schema(); + let path = format!("{}/csv", arrow_test_data()); let filename = "aggregate_test_100.csv"; - let mut config = partitioned_csv_config(filename, file_schema.clone(), 1)?; + + let file_groups = partitioned_file_groups( + path.as_str(), + filename, + 1, + FileType::CSV, + file_compression_type.to_owned(), + )?; + + let mut config = partitioned_csv_config(file_schema, file_groups)?; // Add partition columns config.table_partition_cols = vec!["date".to_owned()]; @@ -400,11 +492,11 @@ mod tests { // We should be able to project on the partition column // Which is supposed to be after the file fields - config.projection = Some(vec![0, file_schema.fields().len()]); + config.projection = Some(vec![0, config.file_schema.fields().len()]); // we don't have `/date=xx/` in the path but that is ok because // partitions are resolved during scan anyway - let csv = CsvExec::new(config, true, b','); + let csv = CsvExec::new(config, true, b',', file_compression_type.to_owned()); assert_eq!(13, csv.base_config.file_schema.fields().len()); assert_eq!(2, csv.projected_schema.fields().len()); assert_eq!(2, csv.schema().fields().len()); @@ -459,8 +551,14 @@ mod tests { Ok(schema) } + #[rstest( + file_compression_type, + case(FileCompressionType::UNCOMPRESSED), + case(FileCompressionType::GZIP), + case(FileCompressionType::BZIP2) + )] #[tokio::test] - async fn test_chunked() { + async fn test_chunked(file_compression_type: FileCompressionType) { let ctx = SessionContext::new(); let chunk_sizes = [10, 20, 30, 40]; @@ -476,11 +574,21 @@ mod tests { let task_ctx = ctx.task_ctx(); - let filename = "aggregate_test_100.csv"; let file_schema = aggr_test_schema(); - let config = - partitioned_csv_config(filename, file_schema.clone(), 1).unwrap(); - let csv = CsvExec::new(config, true, b','); + let path = format!("{}/csv", arrow_test_data()); + let filename = "aggregate_test_100.csv"; + + let file_groups = partitioned_file_groups( + path.as_str(), + filename, + 1, + FileType::CSV, + file_compression_type.to_owned(), + ) + .unwrap(); + + let config = partitioned_csv_config(file_schema, file_groups).unwrap(); + let csv = CsvExec::new(config, true, b',', file_compression_type.to_owned()); let it = csv.execute(0, task_ctx).unwrap(); let batches: Vec<_> = it.try_collect().await.unwrap(); diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 10f148ad060f..9475be15644b 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -16,6 +16,7 @@ // under the License. //! Execution plan for reading line-delimited JSON files +use crate::datasource::file_format::file_type::FileCompressionType; use crate::error::{DataFusionError, Result}; use crate::execution::context::SessionState; use crate::execution::context::TaskContext; @@ -31,7 +32,9 @@ use crate::physical_plan::{ }; use arrow::json::reader::DecoderOptions; use arrow::{datatypes::SchemaRef, json}; + use bytes::Buf; + use futures::{StreamExt, TryStreamExt}; use object_store::{GetResult, ObjectStore}; use std::any::Any; @@ -50,11 +53,15 @@ pub struct NdJsonExec { projected_schema: SchemaRef, /// Execution metrics metrics: ExecutionPlanMetricsSet, + file_compression_type: FileCompressionType, } impl NdJsonExec { /// Create a new JSON reader execution plan provided base configurations - pub fn new(base_config: FileScanConfig) -> Self { + pub fn new( + base_config: FileScanConfig, + file_compression_type: FileCompressionType, + ) -> Self { let (projected_schema, projected_statistics) = base_config.project(); Self { @@ -62,6 +69,7 @@ impl NdJsonExec { projected_schema, projected_statistics, metrics: ExecutionPlanMetricsSet::new(), + file_compression_type, } } } @@ -118,6 +126,7 @@ impl ExecutionPlan for NdJsonExec { let opener = JsonOpener { file_schema, options, + file_compression_type: self.file_compression_type.to_owned(), }; let stream = FileStream::new( @@ -156,6 +165,7 @@ impl ExecutionPlan for NdJsonExec { struct JsonOpener { options: DecoderOptions, file_schema: SchemaRef, + file_compression_type: FileCompressionType, } impl FileOpener for JsonOpener { @@ -166,14 +176,19 @@ impl FileOpener for JsonOpener { ) -> Result { let options = self.options.clone(); let schema = self.file_schema.clone(); + let file_compression_type = self.file_compression_type.to_owned(); Ok(Box::pin(async move { match store.get(file_meta.location()).await? { GetResult::File(file, _) => { - let reader = json::Reader::new(file, schema.clone(), options); + let decoder = file_compression_type.convert_read(file); + let reader = json::Reader::new(decoder, schema.clone(), options); Ok(futures::stream::iter(reader).boxed()) } GetResult::Stream(s) => { - Ok(newline_delimited_stream(s.map_err(Into::into)) + let s = s.map_err(Into::into); + let decoder = file_compression_type.convert_stream(s); + + Ok(newline_delimited_stream(decoder) .map_ok(move |bytes| { let reader = json::Reader::new( bytes.reader(), @@ -236,14 +251,17 @@ mod tests { use object_store::local::LocalFileSystem; use crate::assert_batches_eq; + use crate::datasource::file_format::file_type::FileType; use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::physical_plan::file_format::chunked_store::ChunkedStore; use crate::prelude::NdJsonReadOptions; use crate::prelude::*; - use crate::test::object_store::local_unpartitioned_file; + use crate::test::partitioned_file_groups; + use rstest::*; use tempfile::TempDir; + use url::Url; use super::*; @@ -251,38 +269,65 @@ mod tests { async fn prepare_store( ctx: &SessionContext, + file_compression_type: FileCompressionType, ) -> (ObjectStoreUrl, Vec>, SchemaRef) { let store_url = ObjectStoreUrl::local_filesystem(); let store = ctx.runtime_env().object_store(&store_url).unwrap(); - let path = format!("{}/1.json", TEST_DATA_BASE); - let meta = local_unpartitioned_file(path); + let filename = "1.json"; + let file_groups = partitioned_file_groups( + TEST_DATA_BASE, + filename, + 1, + FileType::JSON, + file_compression_type.to_owned(), + ) + .unwrap(); + let meta = file_groups + .get(0) + .unwrap() + .get(0) + .unwrap() + .clone() + .object_meta; let schema = JsonFormat::default() + .with_file_compression_type(file_compression_type.to_owned()) .infer_schema(&store, &[meta.clone()]) .await .unwrap(); - (store_url, vec![vec![meta.into()]], schema) + (store_url, file_groups, schema) } + #[rstest( + file_compression_type, + case(FileCompressionType::UNCOMPRESSED), + case(FileCompressionType::GZIP), + case(FileCompressionType::BZIP2) + )] #[tokio::test] - async fn nd_json_exec_file_without_projection() -> Result<()> { + async fn nd_json_exec_file_without_projection( + file_compression_type: FileCompressionType, + ) -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); use arrow::datatypes::DataType; let (object_store_url, file_groups, file_schema) = - prepare_store(&session_ctx).await; - - let exec = NdJsonExec::new(FileScanConfig { - object_store_url, - file_groups, - file_schema, - statistics: Statistics::default(), - projection: None, - limit: Some(3), - table_partition_cols: vec![], - }); + prepare_store(&session_ctx, file_compression_type.to_owned()).await; + + let exec = NdJsonExec::new( + FileScanConfig { + object_store_url, + file_groups, + file_schema, + statistics: Statistics::default(), + projection: None, + limit: Some(3), + table_partition_cols: vec![], + }, + file_compression_type.to_owned(), + ); // TODO: this is not where schema inference should be tested @@ -324,13 +369,21 @@ mod tests { Ok(()) } + #[rstest( + file_compression_type, + case(FileCompressionType::UNCOMPRESSED), + case(FileCompressionType::GZIP), + case(FileCompressionType::BZIP2) + )] #[tokio::test] - async fn nd_json_exec_file_with_missing_column() -> Result<()> { + async fn nd_json_exec_file_with_missing_column( + file_compression_type: FileCompressionType, + ) -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); use arrow::datatypes::DataType; let (object_store_url, file_groups, actual_schema) = - prepare_store(&session_ctx).await; + prepare_store(&session_ctx, file_compression_type.to_owned()).await; let mut fields = actual_schema.fields().clone(); fields.push(Field::new("missing_col", DataType::Int32, true)); @@ -338,15 +391,18 @@ mod tests { let file_schema = Arc::new(Schema::new(fields)); - let exec = NdJsonExec::new(FileScanConfig { - object_store_url, - file_groups, - file_schema, - statistics: Statistics::default(), - projection: None, - limit: Some(3), - table_partition_cols: vec![], - }); + let exec = NdJsonExec::new( + FileScanConfig { + object_store_url, + file_groups, + file_schema, + statistics: Statistics::default(), + projection: None, + limit: Some(3), + table_partition_cols: vec![], + }, + file_compression_type.to_owned(), + ); let mut it = exec.execute(0, task_ctx)?; let batch = it.next().await.unwrap()?; @@ -365,22 +421,33 @@ mod tests { Ok(()) } + #[rstest( + file_compression_type, + case(FileCompressionType::UNCOMPRESSED), + case(FileCompressionType::GZIP), + case(FileCompressionType::BZIP2) + )] #[tokio::test] - async fn nd_json_exec_file_projection() -> Result<()> { + async fn nd_json_exec_file_projection( + file_compression_type: FileCompressionType, + ) -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let (object_store_url, file_groups, file_schema) = - prepare_store(&session_ctx).await; - - let exec = NdJsonExec::new(FileScanConfig { - object_store_url, - file_groups, - file_schema, - statistics: Statistics::default(), - projection: Some(vec![0, 2]), - limit: None, - table_partition_cols: vec![], - }); + prepare_store(&session_ctx, file_compression_type.to_owned()).await; + + let exec = NdJsonExec::new( + FileScanConfig { + object_store_url, + file_groups, + file_schema, + statistics: Statistics::default(), + projection: Some(vec![0, 2]), + limit: None, + table_partition_cols: vec![], + }, + file_compression_type.to_owned(), + ); let inferred_schema = exec.schema(); assert_eq!(inferred_schema.fields().len(), 2); @@ -452,8 +519,14 @@ mod tests { Ok(()) } + #[rstest( + file_compression_type, + case(FileCompressionType::UNCOMPRESSED), + case(FileCompressionType::GZIP), + case(FileCompressionType::BZIP2) + )] #[tokio::test] - async fn test_chunked() { + async fn test_chunked(file_compression_type: FileCompressionType) { let mut ctx = SessionContext::new(); for chunk_size in [10, 20, 30, 40] { @@ -466,8 +539,37 @@ mod tests { )), ); - let path = format!("{}/1.json", TEST_DATA_BASE); - let frame = ctx.read_json(path, Default::default()).await.unwrap(); + let filename = "1.json"; + let file_groups = partitioned_file_groups( + TEST_DATA_BASE, + filename, + 1, + FileType::JSON, + file_compression_type.to_owned(), + ) + .unwrap(); + let path = file_groups + .get(0) + .unwrap() + .get(0) + .unwrap() + .object_meta + .location + .as_ref(); + + let store_url = ObjectStoreUrl::local_filesystem(); + let url: &Url = store_url.as_ref(); + let path_buf = Path::new(url.path()).join(path); + let path = path_buf.to_str().unwrap(); + + let ext = FileType::JSON + .get_ext_with_compression(file_compression_type.to_owned()) + .unwrap(); + + let read_options = NdJsonReadOptions::default() + .file_extension(ext.as_str()) + .file_compression_type(file_compression_type.to_owned()); + let frame = ctx.read_json(path, read_options).await.unwrap(); let results = frame.collect().await.unwrap(); assert_batches_eq!( diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 3b3b09567f5a..dfc6d8edca7d 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -18,6 +18,8 @@ //! Common unit test utility methods use crate::arrow::array::UInt32Array; +use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; +use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::{MemTable, TableProvider}; use crate::error::Result; @@ -25,11 +27,15 @@ use crate::from_slice::FromSlice; use crate::logical_plan::LogicalPlan; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::test::object_store::local_unpartitioned_file; -use crate::test_util::aggr_test_schema; +use crate::test_util::{aggr_test_schema, arrow_test_data}; use array::ArrayRef; use arrow::array::{self, Array, Decimal128Builder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use bzip2::write::BzEncoder; +use bzip2::Compression as BzCompression; +use flate2::write::GzEncoder; +use flate2::Compression as GzCompression; use futures::{Future, FutureExt}; use std::fs::File; use std::io::prelude::*; @@ -58,63 +64,99 @@ pub fn create_table_dual() -> Arc { /// Returns a [`CsvExec`] that scans "aggregate_test_100.csv" with `partitions` partitions pub fn scan_partitioned_csv(partitions: usize) -> Result> { let schema = aggr_test_schema(); - let config = partitioned_csv_config("aggregate_test_100.csv", schema, partitions)?; - Ok(Arc::new(CsvExec::new(config, true, b','))) + let filename = "aggregate_test_100.csv"; + let path = format!("{}/csv", arrow_test_data()); + let file_groups = partitioned_file_groups( + path.as_str(), + filename, + partitions, + FileType::CSV, + FileCompressionType::UNCOMPRESSED, + )?; + let config = partitioned_csv_config(schema, file_groups)?; + Ok(Arc::new(CsvExec::new( + config, + true, + b',', + FileCompressionType::UNCOMPRESSED, + ))) } -/// Returns a [`FileScanConfig`] for scanning `partitions` partitions of `filename` -pub fn partitioned_csv_config( +/// Returns file groups [`Vec>`] for scanning `partitions` of `filename` +pub fn partitioned_file_groups( + path: &str, filename: &str, - schema: SchemaRef, partitions: usize, -) -> Result { - let testdata = crate::test_util::arrow_test_data(); - let path = format!("{}/csv/{}", testdata, filename); + file_type: FileType, + file_compression_type: FileCompressionType, +) -> Result>> { + let path = format!("{}/{}", path, filename); - let file_groups = if partitions > 1 { - let tmp_dir = TempDir::new()?.into_path(); + let tmp_dir = TempDir::new()?.into_path(); - let mut writers = vec![]; - let mut files = vec![]; - for i in 0..partitions { - let filename = format!("partition-{}.csv", i); - let filename = tmp_dir.join(&filename); + let mut writers = vec![]; + let mut files = vec![]; + for i in 0..partitions { + let filename = format!( + "partition-{}{}", + i, + file_type + .to_owned() + .get_ext_with_compression(file_compression_type.to_owned()) + .unwrap() + ); + let filename = tmp_dir.join(&filename); - let writer = BufWriter::new(File::create(&filename).unwrap()); - writers.push(writer); - files.push(filename); - } + let file = File::create(&filename).unwrap(); - let f = File::open(&path)?; - let f = BufReader::new(f); - for (i, line) in f.lines().enumerate() { - let line = line.unwrap(); - - if i == 0 { - // write header to all partitions - for w in writers.iter_mut() { - w.write_all(line.as_bytes()).unwrap(); - w.write_all(b"\n").unwrap(); - } - } else { - // write data line to single partition - let partition = i % partitions; - writers[partition].write_all(line.as_bytes()).unwrap(); - writers[partition].write_all(b"\n").unwrap(); + let encoder: Box = match file_compression_type.to_owned() { + FileCompressionType::UNCOMPRESSED => Box::new(file), + FileCompressionType::GZIP => { + Box::new(GzEncoder::new(file, GzCompression::default())) } + FileCompressionType::BZIP2 => { + Box::new(BzEncoder::new(file, BzCompression::default())) + } + }; + + let writer = BufWriter::new(encoder); + writers.push(writer); + files.push(filename); + } + + let f = File::open(&path)?; + let f = BufReader::new(f); + for (i, line) in f.lines().enumerate() { + let line = line.unwrap(); + + if i == 0 && file_type == FileType::CSV { + // write header to all partitions + for w in writers.iter_mut() { + w.write_all(line.as_bytes()).unwrap(); + w.write_all(b"\n").unwrap(); + } + } else { + // write data line to single partition + let partition = i % partitions; + writers[partition].write_all(line.as_bytes()).unwrap(); + writers[partition].write_all(b"\n").unwrap(); } - for w in writers.iter_mut() { - w.flush().unwrap(); - } + } + for w in writers.iter_mut() { + w.flush().unwrap(); + } - files - .into_iter() - .map(|f| vec![local_unpartitioned_file(f).into()]) - .collect::>() - } else { - vec![vec![local_unpartitioned_file(path).into()]] - }; + Ok(files + .into_iter() + .map(|f| vec![local_unpartitioned_file(f).into()]) + .collect::>()) +} +/// Returns a [`FileScanConfig`] for given `file_groups` +pub fn partitioned_csv_config( + schema: SchemaRef, + file_groups: Vec>, +) -> Result { Ok(FileScanConfig { object_store_url: ObjectStoreUrl::local_filesystem(), file_schema: schema, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 7f5c95ab1436..fd828021732e 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1268,6 +1268,8 @@ pub struct CreateExternalTable { pub if_not_exists: bool, /// SQL used to create the table, if available pub definition: Option, + /// File compression type (GZIP, BZIP2) + pub file_compression_type: String, } /// Produces a relation with string representations of diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 50545b581954..9f2cc2d07839 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -156,6 +156,7 @@ message CreateExternalTableNode { bool if_not_exists = 7; string delimiter = 8; string definition = 9; + string file_compression_type = 10; } message CreateCatalogSchemaNode { diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 8f9f8a9c4661..a5ddccdb6224 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -492,6 +492,7 @@ impl AsLogicalPlan for LogicalPlanNode { .table_partition_cols .clone(), if_not_exists: create_extern_table.if_not_exists, + file_compression_type: create_extern_table.file_compression_type.to_string(), definition, })) } @@ -1042,6 +1043,7 @@ impl AsLogicalPlan for LogicalPlanNode { table_partition_cols, if_not_exists, definition, + file_compression_type, }) => Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::CreateExternalTable( protobuf::CreateExternalTableNode { @@ -1054,6 +1056,7 @@ impl AsLogicalPlan for LogicalPlanNode { if_not_exists: *if_not_exists, delimiter: String::from(*delimiter), definition: definition.clone().unwrap_or_else(|| "".to_string()), + file_compression_type: file_compression_type.to_string(), }, )), }), diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index b9254159ba52..cc9078e2a7fc 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -38,6 +38,10 @@ fn parse_file_type(s: &str) -> Result { Ok(s.to_uppercase()) } +fn parse_file_compression_type(s: &str) -> Result { + Ok(s.to_uppercase()) +} + /// DataFusion extension DDL for `CREATE EXTERNAL TABLE` #[derive(Debug, Clone, PartialEq, Eq)] pub struct CreateExternalTable { @@ -57,6 +61,8 @@ pub struct CreateExternalTable { pub table_partition_cols: Vec, /// Option to not error if table already exists pub if_not_exists: bool, + /// File compression type (GZIP, BZIP2) + pub file_compression_type: String, } impl fmt::Display for CreateExternalTable { @@ -330,6 +336,12 @@ impl<'a> DFParser<'a> { false => ',', }; + let file_compression_type = if self.parse_has_file_compression_type() { + self.parse_file_compression_type()? + } else { + "".to_string() + }; + let table_partition_cols = if self.parse_has_partition() { self.parse_partitions()? } else { @@ -348,6 +360,7 @@ impl<'a> DFParser<'a> { location, table_partition_cols, if_not_exists, + file_compression_type, }; Ok(Statement::CreateExternalTable(create)) } @@ -360,6 +373,14 @@ impl<'a> DFParser<'a> { } } + /// Parses the set of + fn parse_file_compression_type(&mut self) -> Result { + match self.parser.next_token() { + Token::Word(w) => parse_file_compression_type(&w.value), + unexpected => self.expected("one of GZIP, BZIP2", unexpected), + } + } + fn consume_token(&mut self, expected: &Token) -> bool { let token = self.parser.peek_token().to_string().to_uppercase(); let token = Token::make_keyword(&token); @@ -370,6 +391,10 @@ impl<'a> DFParser<'a> { false } } + fn parse_has_file_compression_type(&mut self) -> bool { + self.consume_token(&Token::make_keyword("COMPRESSION")) + & self.consume_token(&Token::make_keyword("TYPE")) + } fn parse_csv_has_header(&mut self) -> bool { self.consume_token(&Token::make_keyword("WITH")) @@ -460,6 +485,7 @@ mod tests { location: "foo.csv".into(), table_partition_cols: vec![], if_not_exists: false, + file_compression_type: "".to_string(), }); expect_parse_ok(sql, expected)?; @@ -475,6 +501,7 @@ mod tests { location: "foo.csv".into(), table_partition_cols: vec![], if_not_exists: false, + file_compression_type: "".to_string(), }); expect_parse_ok(sql, expected)?; @@ -490,6 +517,7 @@ mod tests { location: "foo.csv".into(), table_partition_cols: vec!["p1".to_string(), "p2".to_string()], if_not_exists: false, + file_compression_type: "".to_string(), }); expect_parse_ok(sql, expected)?; @@ -508,6 +536,27 @@ mod tests { location: "foo.csv".into(), table_partition_cols: vec![], if_not_exists: false, + file_compression_type: "".to_string(), + }); + expect_parse_ok(sql, expected)?; + } + + // positive case: it is ok for sql stmt with `COMPRESSION TYPE GZIP` tokens + let sqls = vec![ + ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE GZIP LOCATION 'foo.csv'", "GZIP"), + ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE BZIP2 LOCATION 'foo.csv'", "BZIP2"), + ]; + for (sql, file_compression_type) in sqls { + let expected = Statement::CreateExternalTable(CreateExternalTable { + name: "t".into(), + columns: vec![make_column_def("c1", DataType::Int(display))], + file_type: "CSV".to_string(), + has_header: false, + delimiter: ',', + location: "foo.csv".into(), + table_partition_cols: vec![], + if_not_exists: false, + file_compression_type: file_compression_type.to_owned(), }); expect_parse_ok(sql, expected)?; } @@ -523,6 +572,7 @@ mod tests { location: "foo.parquet".into(), table_partition_cols: vec![], if_not_exists: false, + file_compression_type: "".to_string(), }); expect_parse_ok(sql, expected)?; @@ -537,6 +587,7 @@ mod tests { location: "foo.parquet".into(), table_partition_cols: vec![], if_not_exists: false, + file_compression_type: "".to_string(), }); expect_parse_ok(sql, expected)?; @@ -551,6 +602,7 @@ mod tests { location: "foo.avro".into(), table_partition_cols: vec![], if_not_exists: false, + file_compression_type: "".to_string(), }); expect_parse_ok(sql, expected)?; @@ -566,6 +618,7 @@ mod tests { location: "foo.parquet".into(), table_partition_cols: vec![], if_not_exists: true, + file_compression_type: "".to_string(), }); expect_parse_ok(sql, expected)?; diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 58b65af596ef..b75efd6f4723 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -478,6 +478,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { location, table_partition_cols, if_not_exists, + file_compression_type, } = statement; // semantic checks @@ -487,6 +488,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ))?; } + if file_type != "CSV" && file_type != "JSON" && !file_compression_type.is_empty() + { + Err(DataFusionError::Plan( + "File compression type can be specified for CSV/JSON files.".into(), + ))?; + } + let schema = self.build_schema(columns)?; Ok(LogicalPlan::CreateExternalTable(PlanCreateExternalTable { @@ -499,6 +507,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { table_partition_cols, if_not_exists, definition, + file_compression_type, })) } @@ -3998,6 +4007,36 @@ mod tests { quick_test(sql, expected); } + #[test] + fn create_external_table_with_compression_type() { + // positive case + let sqls = vec![ + "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE GZIP LOCATION 'foo.csv.gz'", + "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE BZIP2 LOCATION 'foo.csv.bz2'", + "CREATE EXTERNAL TABLE t(c1 int) STORED AS JSON COMPRESSION TYPE GZIP LOCATION 'foo.json.gz'", + "CREATE EXTERNAL TABLE t(c1 int) STORED AS JSON COMPRESSION TYPE BZIP2 LOCATION 'foo.json.bz2'", + ]; + for sql in sqls { + let expected = "CreateExternalTable: \"t\""; + quick_test(sql, expected); + } + + // negative case + let sqls = vec![ + "CREATE EXTERNAL TABLE t STORED AS AVRO COMPRESSION TYPE GZIP LOCATION 'foo.avro'", + "CREATE EXTERNAL TABLE t STORED AS AVRO COMPRESSION TYPE BZIP2 LOCATION 'foo.avro'", + "CREATE EXTERNAL TABLE t STORED AS PARQUET COMPRESSION TYPE GZIP LOCATION 'foo.parquet'", + "CREATE EXTERNAL TABLE t STORED AS PARQUET COMPRESSION TYPE BZIP2 LOCATION 'foo.parquet'", + ]; + for sql in sqls { + let err = logical_plan(sql).expect_err("query should have failed"); + assert_eq!( + "Plan(\"File compression type can be specified for CSV/JSON files.\")", + format!("{:?}", err) + ); + } + } + #[test] fn create_external_table_parquet() { let sql =