diff --git a/ci/scripts/rust_clippy.sh b/ci/scripts/rust_clippy.sh index c48881b7a7bde..22ca9602daa60 100755 --- a/ci/scripts/rust_clippy.sh +++ b/ci/scripts/rust_clippy.sh @@ -18,4 +18,4 @@ # under the License. set -ex -cargo clippy --all-targets --workspace --features avro,jit,pyarrow,scheduler -- -D warnings +cargo clippy --all-targets --workspace --features avro,jit,pyarrow,scheduler,compression -- -D warnings diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 774f57ed9fdcb..92c7ecffc58f4 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -21,9 +21,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "464b3811b747f8f7ebc8849c9c728c39f6ac98a055edad93baf9eb330e3f8f9d" +checksum = "bf6ccdb167abbf410dcb915cabd428929d7f6a04980b54a11f26a39f1c7f7107" dependencies = [ "cfg-if", "const-random", @@ -83,7 +83,7 @@ version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e24e2bcd431a4aa0ff003fdd2dc21c78cfb42f31459c89d2312c2746fe17a5ac" dependencies = [ - "ahash 0.8.1", + "ahash 0.8.2", "arrow-array", "arrow-buffer", "arrow-data", @@ -112,7 +112,7 @@ version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9044300874385f19e77cbf90911e239bd23630d8f23bb0f948f9067998a13b7" dependencies = [ - "ahash 0.8.1", + "ahash 0.8.2", "arrow-buffer", "arrow-data", "arrow-schema", @@ -163,21 +163,6 @@ dependencies = [ "num", ] -[[package]] -name = "async-compression" -version = "0.3.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" -dependencies = [ - "bzip2", - "flate2", - "futures-core", - "futures-io", - "memchr", - "pin-project-lite", - "tokio", -] - [[package]] name = "async-trait" version = "0.1.58" @@ -301,32 +286,11 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" -[[package]] -name = "bzip2" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6afcd980b5f3a45017c57e57a2fcccbb351cc43a356ce117ef760ef8052b89b0" -dependencies = [ - "bzip2-sys", - "libc", -] - -[[package]] -name = "bzip2-sys" -version = "0.1.11+1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" -dependencies = [ - "cc", - "libc", - "pkg-config", -] - [[package]] name = "cc" -version = "1.0.74" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "581f5dba903aac52ea3feb5ec4810848460ee833876f1f9b0fdeab1f19091574" +checksum = "76a284da2e6fe2092f2353e51713435363112dfd60030e22add80be333fb928f" dependencies = [ "jobserver", ] @@ -513,9 +477,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.80" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b7d4e43b25d3c994662706a1d4fcfc32aaa6afd287502c111b237093bb23f3a" +checksum = "97abf9f0eca9e52b7f81b945524e76710e6cb2366aead23b7d4fbf72e281f888" dependencies = [ "cc", "cxxbridge-flags", @@ -525,9 +489,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.80" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84f8829ddc213e2c1368e51a2564c552b65a8cb6a28f31e576270ac81d5e5827" +checksum = "7cc32cc5fea1d894b77d269ddb9f192110069a8a9c1f1d441195fba90553dea3" dependencies = [ "cc", "codespan-reporting", @@ -540,15 +504,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.80" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e72537424b474af1460806647c41d4b6d35d09ef7fe031c5c2fa5766047cc56a" +checksum = "8ca220e4794c934dc6b1207c3b42856ad4c302f2df1712e9f8d2eec5afaacf1f" [[package]] name = "cxxbridge-macro" -version = "1.0.80" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "309e4fb93eed90e1e14bea0da16b209f81813ba9fc7830c20ed151dd7bc0a4d7" +checksum = "b846f081361125bfc8dc9d3940c84e1fd83ba54bbca7b17cd29483c828be0704" dependencies = [ "proc-macro2", "quote", @@ -572,12 +536,12 @@ dependencies = [ name = "datafusion" version = "14.0.0" dependencies = [ - "ahash 0.8.1", + "ahash 0.8.2", "arrow", - "async-compression", + "arrow-buffer", + "arrow-schema", "async-trait", "bytes", - "bzip2", "chrono", "dashmap", "datafusion-common", @@ -586,7 +550,6 @@ dependencies = [ "datafusion-physical-expr", "datafusion-row", "datafusion-sql", - "flate2", "futures", "glob", "hashbrown", @@ -642,7 +605,7 @@ dependencies = [ name = "datafusion-expr" version = "14.0.0" dependencies = [ - "ahash 0.8.1", + "ahash 0.8.2", "arrow", "datafusion-common", "log", @@ -667,7 +630,7 @@ dependencies = [ name = "datafusion-physical-expr" version = "14.0.0" dependencies = [ - "ahash 0.8.1", + "ahash 0.8.2", "arrow", "arrow-buffer", "arrow-schema", @@ -792,9 +755,9 @@ checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" [[package]] name = "env_logger" -version = "0.9.1" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c90bf5f19754d10198ccb95b70664fc925bd1fc090a0fd9a6ebc54acc8cd6272" +checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7" dependencies = [ "atty", "humantime", @@ -845,9 +808,9 @@ dependencies = [ [[package]] name = "fd-lock" -version = "3.0.7" +version = "3.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c93a581058d957dc4176875aad04f82f81613e6611d64aa1a9c755bdfb16711" +checksum = "bb21c69b9fea5e15dbc1049e4b77145dd0ba1c84019c488102de0dc4ea4b0a27" dependencies = [ "cfg-if", "rustix", @@ -995,10 +958,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ "cfg-if", - "js-sys", "libc", "wasi", - "wasm-bindgen", ] [[package]] @@ -1102,9 +1063,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.22" +version = "0.14.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abfba89e19b959ca163c7752ba59d737c1ceea53a5d31a149c805446fc958064" +checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c" dependencies = [ "bytes", "futures-channel", @@ -1198,15 +1159,19 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "io-lifetimes" -version = "0.7.5" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59ce5ef949d49ee85593fc4d3f3f95ad61657076395cbbce23e2121fc5542074" +checksum = "a7d367024b3f3414d8e01f437f704f41a9f64ab36f9067fa73e526ad4c763c87" +dependencies = [ + "libc", + "windows-sys", +] [[package]] name = "ipnet" -version = "2.5.0" +version = "2.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" +checksum = "f88c5561171189e69df9d98bcf18fd5f9558300f7ea7b801eb8a0fd748bd8745" [[package]] name = "itertools" @@ -1325,15 +1290,15 @@ checksum = "fc7fcc620a3bff7cdd7a365be3376c97191aeaccc2a603e600951e452615bf89" [[package]] name = "libm" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "292a948cd991e376cf75541fe5b97a1081d713c618b4f1b9500f8844e49eb565" +checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" [[package]] name = "libmimalloc-sys" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc093ab289b0bfda3aa1bdfab9c9542be29c7ef385cfcbe77f8c9813588eb48" +checksum = "c37567b180c1af25924b303ddf1ee4467653783440c62360beb2b322a4d93361" dependencies = [ "cc", ] @@ -1349,9 +1314,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.0.46" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4d2456c373231a208ad294c33dc5bff30051eafd954cd4caae83a712b12854d" +checksum = "bb68f22743a3fb35785f1e7f844ca5a3de2dde5bd0c0ef5b372065814699b121" [[package]] name = "lock_api" @@ -1409,9 +1374,9 @@ checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] name = "mimalloc" -version = "0.1.30" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76ce6a4b40d3bff9eb3ce9881ca0737a85072f9f975886082640cd46a75cdb35" +checksum = "b32d6a9ac92d0239d7bfa31137fb47634ac7272a3c11bcee91379ac100781670" dependencies = [ "libmimalloc-sys", ] @@ -1615,9 +1580,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.3.1" +version = "6.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3baf96e39c5359d2eb0dd6ccb42c62b91d9678aa68160d261b9e0ccbf9e9dea9" +checksum = "7b5bf27447411e9ee3ff51186bf7a08e16c341efdde93f4d823e8844429bed7e" [[package]] name = "parking_lot" @@ -1648,7 +1613,7 @@ version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3bf8fa7ab6572791325a8595f55dc532dde88b996ae10a5ca8a2db746784ecc4" dependencies = [ - "ahash 0.8.1", + "ahash 0.8.2", "arrow", "base64", "brotli", @@ -1691,17 +1656,11 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "pkg-config" -version = "0.3.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" - [[package]] name = "ppv-lite86" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro-error" @@ -1823,9 +1782,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" +checksum = "e076559ef8e241f2ae3479e36f97bd5741c0330689e217ad51ce2c76808b868a" dependencies = [ "aho-corasick", "memchr", @@ -1840,9 +1799,9 @@ checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" [[package]] name = "regex-syntax" -version = "0.6.27" +version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" +checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" [[package]] name = "remove_dir_all" @@ -1910,9 +1869,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.35.13" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "727a1a6d65f786ec22df8a81ca3121107f235970dc1705ed681d3e6e8b9cd5f9" +checksum = "812a2ec2043c4d6bc6482f5be2ab8244613cac2493d128d36c0759e52a626ab3" dependencies = [ "bitflags", "errno", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 039d0ebe68662..5aeafb2354c39 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -41,12 +41,13 @@ path = "src/lib.rs" # Used to enable the avro format avro = ["apache-avro", "num-traits", "datafusion-common/avro"] crypto_expressions = ["datafusion-physical-expr/crypto_expressions"] -default = ["crypto_expressions", "regex_expressions", "unicode_expressions"] +default = ["compression", "crypto_expressions", "regex_expressions", "unicode_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] # Used to enable JIT code generation jit = ["datafusion-jit", "datafusion-row/jit"] pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"] +compression = ["xz2", "bzip2", "flate2", "async-compression"] regex_expressions = ["datafusion-physical-expr/regex_expressions"] # Used to enable scheduler scheduler = ["rayon"] @@ -57,10 +58,12 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } apache-avro = { version = "0.14", optional = true } arrow = { version = "26.0.0", features = ["prettyprint"] } -async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "futures-io", "tokio"] } +arrow-buffer = "26.0.0" +arrow-schema = "26.0.0" +async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "xz", "futures-io", "tokio"], optional = true } async-trait = "0.1.41" bytes = "1.1" -bzip2 = "0.4.3" +bzip2 = { version = "0.4.3", optional = true } chrono = { version = "0.4.22", default-features = false } dashmap = "5.4.0" datafusion-common = { path = "../common", version = "14.0.0", features = ["parquet", "object_store"] } @@ -70,7 +73,7 @@ datafusion-optimizer = { path = "../optimizer", version = "14.0.0" } datafusion-physical-expr = { path = "../physical-expr", version = "14.0.0" } datafusion-row = { path = "../row", version = "14.0.0" } datafusion-sql = { path = "../sql", version = "14.0.0" } -flate2 = "1.0.24" +flate2 = { version = "1.0.24", optional = true } futures = "0.3" glob = "0.3.0" hashbrown = { version = "0.12", features = ["raw"] } @@ -96,6 +99,7 @@ tokio-stream = "0.1" tokio-util = { version = "0.7.4", features = ["io"] } url = "2.2" uuid = { version = "1.0", features = ["v4"] } +xz2 = { version = "0.1", optional = true } [dev-dependencies] arrow = { version = "26.0.0", features = ["prettyprint", "dyn_cmp_dict"] } diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index f3d170d9ad6e1..f1ba160f2c689 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -127,7 +127,7 @@ impl FileFormat for CsvFormat { .await .map_err(|e| DataFusionError::External(Box::new(e)))?; - let decoder = self.file_compression_type.convert_read(data.reader()); + let decoder = self.file_compression_type.convert_read(data.reader())?; let (schema, records_read) = arrow::csv::reader::infer_reader_schema( decoder, self.delimiter, diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs index f08a21ca1c267..622f93ff9c0f7 100644 --- a/datafusion/core/src/datasource/file_format/file_type.rs +++ b/datafusion/core/src/datasource/file_format/file_type.rs @@ -18,22 +18,29 @@ //! File type abstraction use crate::error::{DataFusionError, Result}; -use std::io::Error; - -use async_compression::tokio::bufread::{ - BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder, -}; -use bzip2::read::BzDecoder; use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; +#[cfg(feature = "compression")] +use async_compression::tokio::bufread::{ + BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder, + XzDecoder as AsyncXzDecoder, +}; use bytes::Bytes; +#[cfg(feature = "compression")] +use bzip2::read::BzDecoder; +#[cfg(feature = "compression")] use flate2::read::GzDecoder; -use futures::{Stream, TryStreamExt}; +use futures::Stream; +#[cfg(feature = "compression")] +use futures::TryStreamExt; use std::str::FromStr; +#[cfg(feature = "compression")] use tokio_util::io::{ReaderStream, StreamReader}; +#[cfg(feature = "compression")] +use xz2::read::XzDecoder; /// Define each `FileType`/`FileCompressionType`'s extension pub trait GetExt { @@ -48,6 +55,8 @@ pub enum FileCompressionType { GZIP, /// Bzip2-ed file BZIP2, + /// Xz-ed file (liblzma) + XZ, /// Uncompressed file UNCOMPRESSED, } @@ -57,6 +66,7 @@ impl GetExt for FileCompressionType { match self { FileCompressionType::GZIP => ".gz".to_owned(), FileCompressionType::BZIP2 => ".bz2".to_owned(), + FileCompressionType::XZ => ".xz".to_owned(), FileCompressionType::UNCOMPRESSED => "".to_owned(), } } @@ -70,6 +80,7 @@ impl FromStr for FileCompressionType { match s.as_str() { "GZIP" | "GZ" => Ok(FileCompressionType::GZIP), "BZIP2" | "BZ2" => Ok(FileCompressionType::BZIP2), + "XZ" => Ok(FileCompressionType::XZ), "" => Ok(FileCompressionType::UNCOMPRESSED), _ => Err(DataFusionError::NotImplemented(format!( "Unknown FileCompressionType: {}", @@ -85,8 +96,9 @@ impl FileCompressionType { pub fn convert_stream> + Unpin + Send + 'static>( &self, s: T, - ) -> Box> + Send + Unpin> { - let err_converter = |e: Error| match e + ) -> Result> + Send + Unpin>> { + #[cfg(feature = "compression")] + let err_converter = |e: std::io::Error| match e .get_ref() .and_then(|e| e.downcast_ref::()) { @@ -99,29 +111,56 @@ impl FileCompressionType { None => Into::::into(e), }; - match self { + Ok(match self { + #[cfg(feature = "compression")] FileCompressionType::GZIP => Box::new( ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s))) .map_err(err_converter), ), + #[cfg(feature = "compression")] FileCompressionType::BZIP2 => Box::new( ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s))) .map_err(err_converter), ), + #[cfg(feature = "compression")] + FileCompressionType::XZ => Box::new( + ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s))) + .map_err(err_converter), + ), + #[cfg(not(feature = "compression"))] + FileCompressionType::GZIP + | FileCompressionType::BZIP2 + | FileCompressionType::XZ => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } FileCompressionType::UNCOMPRESSED => Box::new(s), - } + }) } /// Given a `Read`, create a `Read` which data are decompressed with `FileCompressionType`. pub fn convert_read( &self, r: T, - ) -> Box { - match self { + ) -> Result> { + Ok(match self { + #[cfg(feature = "compression")] FileCompressionType::GZIP => Box::new(GzDecoder::new(r)), + #[cfg(feature = "compression")] FileCompressionType::BZIP2 => Box::new(BzDecoder::new(r)), + #[cfg(feature = "compression")] + FileCompressionType::XZ => Box::new(XzDecoder::new(r)), + #[cfg(not(feature = "compression"))] + FileCompressionType::GZIP + | FileCompressionType::BZIP2 + | FileCompressionType::XZ => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } FileCompressionType::UNCOMPRESSED => Box::new(r), - } + }) } } @@ -205,6 +244,12 @@ mod tests { .unwrap(), ".csv.gz" ); + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::XZ) + .unwrap(), + ".csv.xz" + ); assert_eq!( file_type .get_ext_with_compression(FileCompressionType::BZIP2) @@ -225,6 +270,12 @@ mod tests { .unwrap(), ".json.gz" ); + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::XZ) + .unwrap(), + ".json.xz" + ); assert_eq!( file_type .get_ext_with_compression(FileCompressionType::BZIP2) @@ -300,7 +351,14 @@ mod tests { FileCompressionType::from_str("GZIP").unwrap(), FileCompressionType::GZIP ); - + assert_eq!( + FileCompressionType::from_str("xz").unwrap(), + FileCompressionType::XZ + ); + assert_eq!( + FileCompressionType::from_str("XZ").unwrap(), + FileCompressionType::XZ + ); assert_eq!( FileCompressionType::from_str("bz2").unwrap(), FileCompressionType::BZIP2 diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 626331ab123d2..08bb2adece85d 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -103,14 +103,14 @@ impl FileFormat for JsonFormat { let schema = match store.get(&object.location).await? { GetResult::File(file, _) => { - let decoder = file_compression_type.convert_read(file); + let decoder = file_compression_type.convert_read(file)?; let mut reader = BufReader::new(decoder); let iter = ValueIter::new(&mut reader, None); infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? } r @ GetResult::Stream(_) => { let data = r.bytes().await?; - let decoder = file_compression_type.convert_read(data.reader()); + let decoder = file_compression_type.convert_read(data.reader())?; let mut reader = BufReader::new(decoder); let iter = ValueIter::new(&mut reader, None); infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 51180c0f00a8c..4bc34a7916494 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -217,13 +217,13 @@ impl FileOpener for CsvOpener { Ok(Box::pin(async move { match store.get(file_meta.location()).await? { GetResult::File(file, _) => { - let decoder = file_compression_type.convert_read(file); + let decoder = file_compression_type.convert_read(file)?; Ok(futures::stream::iter(config.open(decoder, true)).boxed()) } GetResult::Stream(s) => { let mut first_chunk = true; let s = s.map_err(Into::::into); - let decoder = file_compression_type.convert_stream(s); + let decoder = file_compression_type.convert_stream(s)?; Ok(newline_delimited_stream(decoder) .map_ok(move |bytes| { let reader = config.open(bytes.reader(), first_chunk); @@ -302,7 +302,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn csv_exec_with_projection( @@ -356,7 +357,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn csv_exec_with_limit( @@ -410,7 +412,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn csv_exec_with_missing_column( @@ -464,7 +467,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn csv_exec_with_partition( @@ -556,7 +560,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn test_chunked(file_compression_type: FileCompressionType) { diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index ceb9e79589343..9025099b7833d 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -176,13 +176,13 @@ impl FileOpener for JsonOpener { Ok(Box::pin(async move { match store.get(file_meta.location()).await? { GetResult::File(file, _) => { - let decoder = file_compression_type.convert_read(file); + let decoder = file_compression_type.convert_read(file)?; let reader = json::Reader::new(decoder, schema.clone(), options); Ok(futures::stream::iter(reader).boxed()) } GetResult::Stream(s) => { let s = s.map_err(Into::into); - let decoder = file_compression_type.convert_stream(s); + let decoder = file_compression_type.convert_stream(s)?; Ok(newline_delimited_stream(decoder) .map_ok(move |bytes| { @@ -305,7 +305,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn nd_json_exec_file_without_projection( @@ -376,7 +377,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn nd_json_exec_file_with_missing_column( @@ -429,7 +431,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn nd_json_exec_file_projection( @@ -528,7 +531,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn test_chunked(file_compression_type: FileCompressionType) { diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index f5c37eb053f05..e1a405e486ab0 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -23,6 +23,8 @@ use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::{MemTable, TableProvider}; +#[cfg(not(feature = "compression"))] +use crate::error::DataFusionError; use crate::error::Result; use crate::from_slice::FromSlice; use crate::logical_expr::LogicalPlan; @@ -33,9 +35,13 @@ use array::ArrayRef; use arrow::array::{self, Array, Decimal128Builder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +#[cfg(feature = "compression")] use bzip2::write::BzEncoder; +#[cfg(feature = "compression")] use bzip2::Compression as BzCompression; +#[cfg(feature = "compression")] use flate2::write::GzEncoder; +#[cfg(feature = "compression")] use flate2::Compression as GzCompression; use futures::{Future, FutureExt}; use std::fs::File; @@ -44,6 +50,8 @@ use std::io::{BufReader, BufWriter}; use std::pin::Pin; use std::sync::Arc; use tempfile::TempDir; +#[cfg(feature = "compression")] +use xz2::write::XzEncoder; pub fn create_table_dual() -> Arc { let dual_schema = Arc::new(Schema::new(vec![ @@ -110,15 +118,35 @@ pub fn partitioned_file_groups( let file = File::create(&filename).unwrap(); + #[cfg(feature = "compression")] let encoder: Box = match file_compression_type.to_owned() { FileCompressionType::UNCOMPRESSED => Box::new(file), FileCompressionType::GZIP => { Box::new(GzEncoder::new(file, GzCompression::default())) } + FileCompressionType::XZ => Box::new(XzEncoder::new(file, 9)), + FileCompressionType::BZIP2 => { Box::new(BzEncoder::new(file, BzCompression::default())) } + #[cfg(not(feature = "compression"))] + FileCompressionType::GZIP + | FileCompressionType::BZIP2 + | FileCompressionType::XZ => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } }; + #[cfg(not(feature = "compression"))] + let encoder: Box = match file_compression_type.to_owned() { + FileCompressionType::UNCOMPRESSED => Ok(Box::new(file)), + FileCompressionType::GZIP + | FileCompressionType::BZIP2 + | FileCompressionType::XZ => Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )), + }?; let writer = BufWriter::new(encoder); writers.push(writer); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6ad643fd1ae37..62913c7a9deea 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1336,7 +1336,7 @@ pub struct CreateExternalTable { pub if_not_exists: bool, /// SQL used to create the table, if available pub definition: Option, - /// File compression type (GZIP, BZIP2) + /// File compression type (GZIP, BZIP2, XZ) pub file_compression_type: String, /// Table(provider) specific options pub options: HashMap, diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index fe0c24b466a68..99fc19acbc883 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -62,7 +62,7 @@ pub struct CreateExternalTable { pub table_partition_cols: Vec, /// Option to not error if table already exists pub if_not_exists: bool, - /// File compression type (GZIP, BZIP2) + /// File compression type (GZIP, BZIP2, XZ) pub file_compression_type: String, /// Table(provider) specific options pub options: HashMap, @@ -387,7 +387,7 @@ impl<'a> DFParser<'a> { fn parse_file_compression_type(&mut self) -> Result { match self.parser.next_token() { Token::Word(w) => parse_file_compression_type(&w.value), - unexpected => self.expected("one of GZIP, BZIP2", unexpected), + unexpected => self.expected("one of GZIP, BZIP2, XZ", unexpected), } } @@ -586,6 +586,7 @@ mod tests { let sqls = vec![ ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE GZIP LOCATION 'foo.csv'", "GZIP"), ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE BZIP2 LOCATION 'foo.csv'", "BZIP2"), + ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE XZ LOCATION 'foo.csv'", "XZ"), ]; for (sql, file_compression_type) in sqls { let expected = Statement::CreateExternalTable(CreateExternalTable {