From 9e050863db020c195c1c9e388aac8c635bf31df5 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 1 Jun 2022 17:42:30 +0100 Subject: [PATCH 01/12] Switch to object_store crate (#2489) --- datafusion/common/Cargo.toml | 1 + datafusion/common/src/error.rs | 21 ++ datafusion/core/Cargo.toml | 5 +- datafusion/core/src/catalog/schema.rs | 4 +- .../core/src/datasource/file_format/avro.rs | 19 +- .../core/src/datasource/file_format/csv.rs | 20 +- .../core/src/datasource/file_format/json.rs | 43 ++- .../core/src/datasource/file_format/mod.rs | 16 +- .../src/datasource/file_format/parquet.rs | 140 +++---- .../core/src/datasource/listing/helpers.rs | 152 ++++---- datafusion/core/src/datasource/listing/mod.rs | 32 +- .../core/src/datasource/listing/table.rs | 4 +- datafusion/core/src/datasource/listing/url.rs | 120 +++--- .../core/src/datasource/object_store.rs | 9 +- datafusion/core/src/execution/runtime_env.rs | 2 +- datafusion/core/src/lib.rs | 1 - .../src/physical_plan/file_format/avro.rs | 121 ++++--- .../core/src/physical_plan/file_format/csv.rs | 103 ++++-- .../physical_plan/file_format/file_stream.rs | 304 +++++++++------- .../src/physical_plan/file_format/json.rs | 81 +++-- .../core/src/physical_plan/file_format/mod.rs | 2 +- .../src/physical_plan/file_format/parquet.rs | 341 ++++++------------ datafusion/core/src/physical_plan/mod.rs | 1 + datafusion/core/src/test/mod.rs | 6 +- datafusion/core/src/test/object_store.rs | 124 ++----- datafusion/core/tests/path_partition.rs | 105 +++--- datafusion/core/tests/row.rs | 14 +- datafusion/core/tests/sql/mod.rs | 13 +- 28 files changed, 895 insertions(+), 909 deletions(-) diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index bf52bc26d085..d27af558bb07 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -41,6 +41,7 @@ pyarrow = ["pyo3"] arrow = { version = "15.0.0", features = ["prettyprint"] } avro-rs = { version = "0.13", features = ["snappy"], optional = true } cranelift-module = { version = "0.84.0", optional = true } +object_store = { version = "0.1", optional = true } ordered-float = "3.0" parquet = { version = "15.0.0", features = ["arrow"], optional = true } pyo3 = { version = "0.16", optional = true } diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 807a916abcda..f7fb53bbcf96 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -49,6 +49,9 @@ pub enum DataFusionError { /// Wraps an error from the Avro crate #[cfg(feature = "avro")] AvroError(AvroError), + /// Wraps an error from the object_store crate + #[cfg(feature = "object_store")] + ObjectStore(object_store::Error), /// Error associated to I/O operations and associated traits. IoError(io::Error), /// Error returned when SQL is syntactically incorrect. @@ -203,6 +206,20 @@ impl From for DataFusionError { } } +#[cfg(feature = "object_store")] +impl From for DataFusionError { + fn from(e: object_store::Error) -> Self { + DataFusionError::ObjectStore(e) + } +} + +#[cfg(feature = "object_store")] +impl From for DataFusionError { + fn from(e: object_store::path::Error) -> Self { + DataFusionError::ObjectStore(e.into()) + } +} + impl From for DataFusionError { fn from(e: ParserError) -> Self { DataFusionError::SQL(e) @@ -264,6 +281,10 @@ impl Display for DataFusionError { DataFusionError::JITError(ref desc) => { write!(f, "JIT error: {}", desc) } + #[cfg(feature = "object_store")] + DataFusionError::ObjectStore(ref desc) => { + write!(f, "Object Store error: {}", desc) + } } } } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 79a48cf1593f..7d32c03a5b95 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -58,9 +58,9 @@ ahash = { version = "0.7", default-features = false } arrow = { version = "15.0.0", features = ["prettyprint"] } async-trait = "0.1.41" avro-rs = { version = "0.13", features = ["snappy"], optional = true } +bytes = "1.1" chrono = { version = "0.4", default-features = false } -datafusion-common = { path = "../common", version = "8.0.0", features = ["parquet"] } -datafusion-data-access = { path = "../data-access", version = "8.0.0" } +datafusion-common = { path = "../common", version = "8.0.0", features = ["parquet", "object_store"] } datafusion-expr = { path = "../expr", version = "8.0.0" } datafusion-jit = { path = "../jit", version = "8.0.0", optional = true } datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" } @@ -74,6 +74,7 @@ lazy_static = { version = "^1.4.0" } log = "^0.4" num-traits = { version = "0.2", optional = true } num_cpus = "1.13.0" +object_store = "0.1.0" ordered-float = "3.0" parking_lot = "0.12" parquet = { version = "15.0.0", features = ["arrow"] } diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index db25c1edcc18..4ed9741a536d 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -132,7 +132,7 @@ mod tests { use std::sync::Arc; use arrow::datatypes::Schema; - use datafusion_data_access::object_store::local::LocalFileSystem; + use object_store::local::LocalFileSystem; use crate::assert_batches_eq; use crate::catalog::catalog::{CatalogProvider, MemoryCatalogProvider}; @@ -170,7 +170,7 @@ mod tests { let schema = MemorySchemaProvider::new(); let ctx = SessionContext::new(); - let store = Arc::new(LocalFileSystem {}); + let store = Arc::new(LocalFileSystem::new()); ctx.runtime_env().register_object_store("file", store); let config = ListingTableConfig::new(table_path) diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 63781da52787..dec368808c44 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use arrow::datatypes::Schema; use arrow::{self, datatypes::SchemaRef}; use async_trait::async_trait; -use datafusion_data_access::FileMeta; +use object_store::{GetResult, ObjectMeta, ObjectStore}; use super::FileFormat; use crate::avro_to_arrow::read_avro_schema_from_reader; @@ -32,7 +32,6 @@ use crate::logical_plan::Expr; use crate::physical_plan::file_format::{AvroExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; -use datafusion_data_access::object_store::ObjectStore; /// The default file extension of avro files pub const DEFAULT_AVRO_EXTENSION: &str = ".avro"; @@ -49,12 +48,18 @@ impl FileFormat for AvroFormat { async fn infer_schema( &self, store: &Arc, - files: &[FileMeta], + objects: &[ObjectMeta], ) -> Result { let mut schemas = vec![]; - for file in files { - let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?; - let schema = read_avro_schema_from_reader(&mut reader)?; + for object in objects { + let schema = match store.get(&object.location).await? { + GetResult::File(mut file, _) => read_avro_schema_from_reader(&mut file)?, + r @ GetResult::Stream(_) => { + // TODO: Fetching entire file to get schema is potentially wasteful + let data = r.bytes().await?; + read_avro_schema_from_reader(&mut data.as_ref())? + } + }; schemas.push(schema); } let merged_schema = Schema::try_merge(schemas)?; @@ -65,7 +70,7 @@ impl FileFormat for AvroFormat { &self, _store: &Arc, _table_schema: SchemaRef, - _file: &FileMeta, + _object: &ObjectMeta, ) -> Result { Ok(Statistics::default()) } diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 0d665e9ef6b3..d72a6e767572 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -23,7 +23,9 @@ use std::sync::Arc; use arrow::datatypes::Schema; use arrow::{self, datatypes::SchemaRef}; use async_trait::async_trait; -use datafusion_data_access::FileMeta; +use datafusion_common::DataFusionError; +use futures::TryFutureExt; +use object_store::{ObjectMeta, ObjectStore}; use super::FileFormat; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; @@ -32,7 +34,6 @@ use crate::logical_plan::Expr; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; -use datafusion_data_access::object_store::ObjectStore; /// The default file extension of csv files pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; @@ -96,16 +97,21 @@ impl FileFormat for CsvFormat { async fn infer_schema( &self, store: &Arc, - files: &[FileMeta], + objects: &[ObjectMeta], ) -> Result { let mut schemas = vec![]; let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX); - for file in files { - let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?; + for object in objects { + let data = store + .get(&object.location) + .and_then(|r| r.bytes()) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let (schema, records_read) = arrow::csv::reader::infer_reader_schema( - &mut reader, + &mut data.as_ref(), self.delimiter, Some(records_to_read), self.has_header, @@ -128,7 +134,7 @@ impl FileFormat for CsvFormat { &self, _store: &Arc, _table_schema: SchemaRef, - _file: &FileMeta, + _object: &ObjectMeta, ) -> Result { Ok(Statistics::default()) } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index e9b49c42ddd6..9a889ab4c0f5 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef; use arrow::json::reader::infer_json_schema_from_iterator; use arrow::json::reader::ValueIter; use async_trait::async_trait; -use datafusion_data_access::{object_store::ObjectStore, FileMeta}; +use object_store::{GetResult, ObjectMeta, ObjectStore}; use super::FileFormat; use super::FileScanConfig; @@ -71,21 +71,33 @@ impl FileFormat for JsonFormat { async fn infer_schema( &self, store: &Arc, - files: &[FileMeta], + objects: &[ObjectMeta], ) -> Result { let mut schemas = Vec::new(); let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX); - for file in files { - let reader = store.file_reader(file.sized_file.clone())?.sync_reader()?; - let mut reader = BufReader::new(reader); - let iter = ValueIter::new(&mut reader, None); - let schema = infer_json_schema_from_iterator(iter.take_while(|_| { + for object in objects { + let mut take_while = || { let should_take = records_to_read > 0; if should_take { records_to_read -= 1; } should_take - }))?; + }; + + let schema = match store.get(&object.location).await? { + GetResult::File(file, _) => { + let mut reader = BufReader::new(file); + let iter = ValueIter::new(&mut reader, None); + infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? + } + r @ GetResult::Stream(_) => { + let data = r.bytes().await?; + let mut reader = BufReader::new(data.as_ref()); + let iter = ValueIter::new(&mut reader, None); + infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? + } + }; + schemas.push(schema); if records_to_read == 0 { break; @@ -100,7 +112,7 @@ impl FileFormat for JsonFormat { &self, _store: &Arc, _table_schema: SchemaRef, - _file: &FileMeta, + _object: &ObjectMeta, ) -> Result { Ok(Statistics::default()) } @@ -120,15 +132,12 @@ mod tests { use super::super::test_util::scan_format; use arrow::array::Int64Array; use futures::StreamExt; + use object_store::local::LocalFileSystem; use super::*; + use crate::physical_plan::collect; use crate::prelude::{SessionConfig, SessionContext}; - use crate::{ - datafusion_data_access::object_store::local::{ - local_unpartitioned_file, LocalFileSystem, - }, - physical_plan::collect, - }; + use crate::test::object_store::local_unpartitioned_file; #[tokio::test] async fn read_small_batches() -> Result<()> { @@ -229,12 +238,12 @@ mod tests { #[tokio::test] async fn infer_schema_with_limit() { - let store = Arc::new(LocalFileSystem {}) as _; + let store = Arc::new(LocalFileSystem::new()) as _; let filename = "tests/jsons/schema_infer_limit.json"; let format = JsonFormat::default().with_schema_infer_max_rec(Some(3)); let file_schema = format - .infer_schema(&store, &[local_unpartitioned_file(filename.to_string())]) + .infer_schema(&store, &[local_unpartitioned_file(filename)]) .await .expect("Schema inference"); diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index a15750394903..8a0d5b97ef65 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -36,8 +36,7 @@ use crate::physical_plan::file_format::FileScanConfig; use crate::physical_plan::{ExecutionPlan, Statistics}; use async_trait::async_trait; -use datafusion_data_access::object_store::ObjectStore; -use datafusion_data_access::FileMeta; +use object_store::{ObjectMeta, ObjectStore}; /// This trait abstracts all the file format specific implementations /// from the `TableProvider`. This helps code re-utilization across @@ -55,7 +54,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { async fn infer_schema( &self, store: &Arc, - files: &[FileMeta], + objects: &[ObjectMeta], ) -> Result; /// Infer the statistics for the provided object. The cost and accuracy of the @@ -69,7 +68,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { &self, store: &Arc, table_schema: SchemaRef, - file: &FileMeta, + object: &ObjectMeta, ) -> Result; /// Take a list of files and convert it to the appropriate executor @@ -86,9 +85,8 @@ pub(crate) mod test_util { use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; - use datafusion_data_access::object_store::local::{ - local_unpartitioned_file, LocalFileSystem, - }; + use crate::test::object_store::local_unpartitioned_file; + use object_store::local::LocalFileSystem; pub async fn scan_format( format: &dyn FileFormat, @@ -97,7 +95,7 @@ pub(crate) mod test_util { projection: Option>, limit: Option, ) -> Result> { - let store = Arc::new(LocalFileSystem {}) as _; + let store = Arc::new(LocalFileSystem::new()) as _; let meta = local_unpartitioned_file(format!("{}/{}", store_root, file_name)); let file_schema = format.infer_schema(&store, &[meta.clone()]).await?; @@ -107,7 +105,7 @@ pub(crate) mod test_util { .await?; let file_groups = vec![vec![PartitionedFile { - file_meta: meta, + object_meta: meta, partition_values: vec![], range: None, }]]; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 42bff573ccd7..1f8c48af6ae8 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -18,21 +18,17 @@ //! Parquet format abstractions use std::any::Any; -use std::io::Read; use std::sync::Arc; use arrow::datatypes::Schema; use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use datafusion_data_access::FileMeta; use hashbrown::HashMap; -use parquet::arrow::ArrowReader; -use parquet::arrow::ParquetFileArrowReader; -use parquet::errors::ParquetError; -use parquet::errors::Result as ParquetResult; -use parquet::file::reader::ChunkReader; -use parquet::file::reader::Length; -use parquet::file::serialized_reader::SerializedFileReader; +use object_store::{GetResult, ObjectMeta, ObjectStore}; +use parquet::arrow::parquet_to_arrow_schema; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::reader::FileReader; +use parquet::file::serialized_reader::{SerializedFileReader, SliceableCursor}; use parquet::file::statistics::Statistics as ParquetStatistics; use super::FileFormat; @@ -42,15 +38,12 @@ use crate::arrow::array::{ }; use crate::arrow::datatypes::{DataType, Field}; use crate::datasource::{create_max_min_accs, get_col_stats}; -use crate::error::DataFusionError; use crate::error::Result; use crate::logical_plan::combine_filters; use crate::logical_plan::Expr; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter}; -use crate::physical_plan::{metrics, ExecutionPlan}; -use crate::physical_plan::{Accumulator, Statistics}; -use datafusion_data_access::object_store::{ObjectReader, ObjectStore}; +use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics}; /// The default file extension of parquet files pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; @@ -91,11 +84,11 @@ impl FileFormat for ParquetFormat { async fn infer_schema( &self, store: &Arc, - files: &[FileMeta], + objects: &[ObjectMeta], ) -> Result { - let mut schemas = Vec::with_capacity(files.len()); - for file in files { - let schema = fetch_schema(store.as_ref(), file)?; + let mut schemas = Vec::with_capacity(objects.len()); + for object in objects { + let schema = fetch_schema(store.as_ref(), object).await?; schemas.push(schema) } let schema = Schema::try_merge(schemas)?; @@ -106,9 +99,9 @@ impl FileFormat for ParquetFormat { &self, store: &Arc, table_schema: SchemaRef, - file: &FileMeta, + object: &ObjectMeta, ) -> Result { - let stats = fetch_statistics(store.as_ref(), table_schema, file)?; + let stats = fetch_statistics(store.as_ref(), table_schema, object).await?; Ok(stats) } @@ -276,37 +269,50 @@ fn summarize_min_max( } } -/// Read and parse the schema of the Parquet file at location `path` -fn fetch_schema(store: &dyn ObjectStore, file: &FileMeta) -> Result { - let object_reader = store.file_reader(file.sized_file.clone())?; - let obj_reader = ChunkObjectReader { - object_reader, - bytes_scanned: None, - }; - let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); - let schema = arrow_reader.get_schema()?; +async fn fetch_metadata( + store: &dyn ObjectStore, + file: &ObjectMeta, +) -> Result { + // TODO: Fetching the entire file to get metadata is wasteful + match store.get(&file.location).await? { + GetResult::File(file, _) => { + Ok(SerializedFileReader::new(file)?.metadata().clone()) + } + r @ GetResult::Stream(_) => { + let data = r.bytes().await?; + let cursor = SliceableCursor::new(data.to_vec()); + Ok(SerializedFileReader::new(cursor)?.metadata().clone()) + } + } +} +/// Read and parse the schema of the Parquet file at location `path` +async fn fetch_schema(store: &dyn ObjectStore, file: &ObjectMeta) -> Result { + let metadata = fetch_metadata(store, file).await?; + let file_metadata = metadata.file_metadata(); + let schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + )?; Ok(schema) } /// Read and parse the statistics of the Parquet file at location `path` -fn fetch_statistics( +async fn fetch_statistics( store: &dyn ObjectStore, table_schema: SchemaRef, - file: &FileMeta, + file: &ObjectMeta, ) -> Result { - let object_reader = store.file_reader(file.sized_file.clone())?; - let obj_reader = ChunkObjectReader { - object_reader, - bytes_scanned: None, - }; - let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); - let file_schema = arrow_reader.get_schema()?; + let metadata = fetch_metadata(store, file).await?; + let file_metadata = metadata.file_metadata(); + + let file_schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + )?; + let num_fields = table_schema.fields().len(); let fields = table_schema.fields().to_vec(); - let meta_data = arrow_reader.get_metadata(); let mut num_rows = 0; let mut total_byte_size = 0; @@ -317,7 +323,7 @@ fn fetch_statistics( let (mut max_values, mut min_values) = create_max_min_accs(&table_schema); - for row_group_meta in meta_data.row_groups() { + for row_group_meta in metadata.row_groups() { num_rows += row_group_meta.num_rows(); total_byte_size += row_group_meta.total_byte_size(); @@ -373,46 +379,18 @@ fn fetch_statistics( Ok(statistics) } -/// A wrapper around the object reader to make it implement `ChunkReader` -pub struct ChunkObjectReader { - /// The underlying object reader - pub object_reader: Arc, - /// Optional counter which will track total number of bytes scanned - pub bytes_scanned: Option, -} - -impl Length for ChunkObjectReader { - fn len(&self) -> u64 { - self.object_reader.length() - } -} - -impl ChunkReader for ChunkObjectReader { - type T = Box; - - fn get_read(&self, start: u64, length: usize) -> ParquetResult { - if let Some(m) = self.bytes_scanned.as_ref() { - m.add(length) - } - self.object_reader - .sync_chunk_reader(start, length) - .map_err(DataFusionError::IoError) - .map_err(|e| ParquetError::ArrowError(e.to_string())) - } -} - #[cfg(test)] pub(crate) mod test_util { use super::*; + use crate::test::object_store::local_unpartitioned_file; use arrow::record_batch::RecordBatch; - use datafusion_data_access::object_store::local::local_unpartitioned_file; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; use tempfile::NamedTempFile; pub async fn store_parquet( batches: Vec, - ) -> Result<(Vec, Vec)> { + ) -> Result<(Vec, Vec)> { let files: Vec<_> = batches .into_iter() .map(|batch| { @@ -431,11 +409,7 @@ pub(crate) mod test_util { }) .collect(); - let meta: Vec<_> = files - .iter() - .map(|f| local_unpartitioned_file(f.path().to_string_lossy().to_string())) - .collect(); - + let meta: Vec<_> = files.iter().map(|f| local_unpartitioned_file(f)).collect(); Ok((meta, files)) } } @@ -444,7 +418,6 @@ pub(crate) mod test_util { mod tests { use super::super::test_util::scan_format; use crate::physical_plan::collect; - use datafusion_data_access::object_store::local::LocalFileSystem; use super::*; @@ -458,6 +431,7 @@ mod tests { use arrow::record_batch::RecordBatch; use datafusion_common::ScalarValue; use futures::StreamExt; + use object_store::local::LocalFileSystem; #[tokio::test] async fn read_merged_batches() -> Result<()> { @@ -469,13 +443,13 @@ mod tests { let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap(); let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap(); - let store = Arc::new(LocalFileSystem {}) as _; + let store = Arc::new(LocalFileSystem::new()) as _; let (meta, _files) = store_parquet(vec![batch1, batch2]).await?; let format = ParquetFormat::default(); let schema = format.infer_schema(&store, &meta).await.unwrap(); - let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0])?; + let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0]).await?; assert_eq!(stats.num_rows, Some(3)); let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0]; @@ -483,7 +457,7 @@ mod tests { assert_eq!(c1_stats.null_count, Some(1)); assert_eq!(c2_stats.null_count, Some(3)); - let stats = fetch_statistics(store.as_ref(), schema, &meta[1])?; + let stats = fetch_statistics(store.as_ref(), schema, &meta[1]).await?; assert_eq!(stats.num_rows, Some(3)); let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0]; let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1]; @@ -540,8 +514,8 @@ mod tests { let _ = collect(exec.clone(), task_ctx.clone()).await?; let _ = collect(exec_projected.clone(), task_ctx).await?; - assert_bytes_scanned(exec, 2522); - assert_bytes_scanned(exec_projected, 1924); + assert_bytes_scanned(exec, 1851); + assert_bytes_scanned(exec_projected, 1851); Ok(()) } @@ -560,7 +534,7 @@ mod tests { let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); assert_eq!(11, batches[0].num_columns()); - assert_eq!(8, batches[0].num_rows()); + assert_eq!(1, batches[0].num_rows()); Ok(()) } diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index a26eafabb50c..5b4daa0b04d8 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -41,8 +41,10 @@ use crate::{ use super::PartitionedFile; use crate::datasource::listing::ListingTableUrl; -use datafusion_data_access::{object_store::ObjectStore, FileMeta, SizedFile}; +use datafusion_common::DataFusionError; use datafusion_expr::Volatility; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_"; const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_"; @@ -184,7 +186,7 @@ pub async fn pruned_partition_list<'a>( Ok(Box::pin(list.try_filter_map(move |file_meta| async move { let parsed_path = parse_partitions_for_path( table_path, - file_meta.path(), + &file_meta.location, table_partition_cols, ) .map(|p| { @@ -195,7 +197,7 @@ pub async fn pruned_partition_list<'a>( Ok(parsed_path.map(|partition_values| PartitionedFile { partition_values, - file_meta, + object_meta: file_meta, range: None, })) }))) @@ -214,10 +216,9 @@ pub async fn pruned_partition_list<'a>( df = df.filter(filter.clone())?; } let filtered_batches = df.collect().await?; + let paths = batches_to_paths(&filtered_batches)?; - Ok(Box::pin(futures::stream::iter( - batches_to_paths(&filtered_batches).into_iter().map(Ok), - ))) + Ok(Box::pin(futures::stream::iter(paths.into_iter().map(Ok)))) } } @@ -231,7 +232,7 @@ pub async fn pruned_partition_list<'a>( fn paths_to_batch( table_partition_cols: &[String], table_path: &ListingTableUrl, - metas: &[FileMeta], + metas: &[ObjectMeta], ) -> Result { let mut key_builder = StringBuilder::new(metas.len()); let mut length_builder = UInt64Builder::new(metas.len()); @@ -241,20 +242,19 @@ fn paths_to_batch( .map(|_| StringBuilder::new(metas.len())) .collect::>(); for file_meta in metas { - if let Some(partition_values) = - parse_partitions_for_path(table_path, file_meta.path(), table_partition_cols) - { - key_builder.append_value(file_meta.path())?; - length_builder.append_value(file_meta.size())?; - match file_meta.last_modified { - Some(lm) => modified_builder.append_value(lm.timestamp_millis())?, - None => modified_builder.append_null()?, - } + if let Some(partition_values) = parse_partitions_for_path( + table_path, + &file_meta.location, + table_partition_cols, + ) { + key_builder.append_value(file_meta.location.as_ref())?; + length_builder.append_value(file_meta.size as u64)?; + modified_builder.append_value(file_meta.last_modified.timestamp_millis())?; for (i, part_val) in partition_values.iter().enumerate() { partition_builders[i].append_value(part_val)?; } } else { - debug!("No partitioning for path {}", file_meta.path()); + debug!("No partitioning for path {}", file_meta.location); } } @@ -283,7 +283,7 @@ fn paths_to_batch( } /// convert a set of record batches created by `paths_to_batch()` back to partitioned files. -fn batches_to_paths(batches: &[RecordBatch]) -> Vec { +fn batches_to_paths(batches: &[RecordBatch]) -> Result> { batches .iter() .flat_map(|batch| { @@ -303,23 +303,21 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec { .downcast_ref::() .unwrap(); - (0..batch.num_rows()).map(move |row| PartitionedFile { - file_meta: FileMeta { - last_modified: match modified_array.is_null(row) { - false => Some(Utc.timestamp_millis(modified_array.value(row))), - true => None, - }, - sized_file: SizedFile { - path: key_array.value(row).to_owned(), - size: length_array.value(row), + (0..batch.num_rows()).map(move |row| { + Ok(PartitionedFile { + object_meta: ObjectMeta { + location: Path::parse(key_array.value(row)) + .map_err(|e| DataFusionError::External(Box::new(e)))?, + last_modified: Utc.timestamp_millis(modified_array.value(row)), + size: length_array.value(row) as usize, }, - }, - partition_values: (3..batch.columns().len()) - .map(|col| { - ScalarValue::try_from_array(batch.column(col), row).unwrap() - }) - .collect(), - range: None, + partition_values: (3..batch.columns().len()) + .map(|col| { + ScalarValue::try_from_array(batch.column(col), row).unwrap() + }) + .collect(), + range: None, + }) }) }) .collect() @@ -329,7 +327,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec { /// associated to the partitions defined by `table_partition_cols` fn parse_partitions_for_path<'a>( table_path: &ListingTableUrl, - file_path: &'a str, + file_path: &'a Path, table_partition_cols: &[String], ) -> Option> { let subpath = table_path.strip_prefix(file_path)?; @@ -346,10 +344,8 @@ fn parse_partitions_for_path<'a>( #[cfg(test)] mod tests { - use crate::{ - logical_plan::{case, col, lit}, - test::object_store::TestObjectStore, - }; + use crate::logical_plan::{case, col, lit}; + use crate::test::object_store::make_test_store; use futures::StreamExt; use super::*; @@ -396,7 +392,7 @@ mod tests { #[tokio::test] async fn test_pruned_partition_list_empty() { - let store = TestObjectStore::new_arc(&[ + let store = make_test_store(&[ ("tablepath/mypartition=val1/notparquetfile", 100), ("tablepath/file.parquet", 100), ]); @@ -418,7 +414,7 @@ mod tests { #[tokio::test] async fn test_pruned_partition_list() { - let store = TestObjectStore::new_arc(&[ + let store = make_test_store(&[ ("tablepath/mypartition=val1/file.parquet", 100), ("tablepath/mypartition=val2/file.parquet", 100), ("tablepath/mypartition=val1/other=val3/file.parquet", 100), @@ -440,7 +436,7 @@ mod tests { assert_eq!(pruned.len(), 2); let f1 = &pruned[0]; assert_eq!( - f1.file_meta.path(), + f1.object_meta.location.as_ref(), "tablepath/mypartition=val1/file.parquet" ); assert_eq!( @@ -449,7 +445,7 @@ mod tests { ); let f2 = &pruned[1]; assert_eq!( - f2.file_meta.path(), + f2.object_meta.location.as_ref(), "tablepath/mypartition=val1/other=val3/file.parquet" ); assert_eq!( @@ -460,7 +456,7 @@ mod tests { #[tokio::test] async fn test_pruned_partition_list_multi() { - let store = TestObjectStore::new_arc(&[ + let store = make_test_store(&[ ("tablepath/part1=p1v1/file.parquet", 100), ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), @@ -487,7 +483,7 @@ mod tests { assert_eq!(pruned.len(), 2); let f1 = &pruned[0]; assert_eq!( - f1.file_meta.path(), + f1.object_meta.location.as_ref(), "tablepath/part1=p1v2/part2=p2v1/file1.parquet" ); assert_eq!( @@ -499,7 +495,7 @@ mod tests { ); let f2 = &pruned[1]; assert_eq!( - f2.file_meta.path(), + f2.object_meta.location.as_ref(), "tablepath/part1=p1v2/part2=p2v1/file2.parquet" ); assert_eq!( @@ -517,7 +513,7 @@ mod tests { Some(vec![]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), - "bucket/mytable/file.csv", + &Path::from("bucket/mytable/file.csv"), &[] ) ); @@ -525,7 +521,7 @@ mod tests { None, parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/othertable").unwrap(), - "bucket/mytable/file.csv", + &Path::from("bucket/mytable/file.csv"), &[] ) ); @@ -533,7 +529,7 @@ mod tests { None, parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), - "bucket/mytable/file.csv", + &Path::from("bucket/mytable/file.csv"), &[String::from("mypartition")] ) ); @@ -541,7 +537,7 @@ mod tests { Some(vec!["v1"]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), - "bucket/mytable/mypartition=v1/file.csv", + &Path::from("bucket/mytable/mypartition=v1/file.csv"), &[String::from("mypartition")] ) ); @@ -549,7 +545,7 @@ mod tests { Some(vec!["v1"]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(), - "bucket/mytable/mypartition=v1/file.csv", + &Path::from("bucket/mytable/mypartition=v1/file.csv"), &[String::from("mypartition")] ) ); @@ -558,7 +554,7 @@ mod tests { None, parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), - "bucket/mytable/v1/file.csv", + &Path::from("bucket/mytable/v1/file.csv"), &[String::from("mypartition")] ) ); @@ -566,7 +562,7 @@ mod tests { Some(vec!["v1", "v2"]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), - "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv", + &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"), &[String::from("mypartition"), String::from("otherpartition")] ) ); @@ -574,7 +570,7 @@ mod tests { Some(vec!["v1"]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), - "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv", + &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"), &[String::from("mypartition")] ) ); @@ -604,19 +600,15 @@ mod tests { #[test] fn test_path_batch_roundtrip_no_partiton() { let files = vec![ - FileMeta { - sized_file: SizedFile { - path: String::from("mybucket/tablepath/part1=val1/file.parquet"), - size: 100, - }, - last_modified: Some(Utc.timestamp_millis(1634722979123)), + ObjectMeta { + location: Path::from("mybucket/tablepath/part1=val1/file.parquet"), + last_modified: Utc.timestamp_millis(1634722979123), + size: 100, }, - FileMeta { - sized_file: SizedFile { - path: String::from("mybucket/tablepath/part1=val2/file.parquet"), - size: 100, - }, - last_modified: None, + ObjectMeta { + location: Path::from("mybucket/tablepath/part1=val2/file.parquet"), + last_modified: Utc.timestamp_millis(0), + size: 100, }, ]; @@ -624,14 +616,14 @@ mod tests { let batches = paths_to_batch(&[], &table_path, &files) .expect("Serialization of file list to batch failed"); - let parsed_files = batches_to_paths(&[batches]); + let parsed_files = batches_to_paths(&[batches]).unwrap(); assert_eq!(parsed_files.len(), 2); assert_eq!(&parsed_files[0].partition_values, &[]); assert_eq!(&parsed_files[1].partition_values, &[]); let parsed_metas = parsed_files .into_iter() - .map(|pf| pf.file_meta) + .map(|pf| pf.object_meta) .collect::>(); assert_eq!(parsed_metas, files); } @@ -639,19 +631,15 @@ mod tests { #[test] fn test_path_batch_roundtrip_with_partition() { let files = vec![ - FileMeta { - sized_file: SizedFile { - path: String::from("mybucket/tablepath/part1=val1/file.parquet"), - size: 100, - }, - last_modified: Some(Utc.timestamp_millis(1634722979123)), + ObjectMeta { + location: Path::from("mybucket/tablepath/part1=val1/file.parquet"), + last_modified: Utc.timestamp_millis(1634722979123), + size: 100, }, - FileMeta { - sized_file: SizedFile { - path: String::from("mybucket/tablepath/part1=val2/file.parquet"), - size: 100, - }, - last_modified: None, + ObjectMeta { + location: Path::from("mybucket/tablepath/part1=val2/file.parquet"), + last_modified: Utc.timestamp_millis(0), + size: 100, }, ]; @@ -662,7 +650,7 @@ mod tests { ) .expect("Serialization of file list to batch failed"); - let parsed_files = batches_to_paths(&[batches]); + let parsed_files = batches_to_paths(&[batches]).unwrap(); assert_eq!(parsed_files.len(), 2); assert_eq!( &parsed_files[0].partition_values, @@ -675,7 +663,7 @@ mod tests { let parsed_metas = parsed_files .into_iter() - .map(|pf| pf.file_meta) + .map(|pf| pf.object_meta) .collect::>(); assert_eq!(parsed_metas, files); } diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index c11de5f8021a..85d4b6f7d00d 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -22,9 +22,11 @@ mod helpers; mod table; mod url; +use crate::error::Result; +use chrono::TimeZone; use datafusion_common::ScalarValue; -use datafusion_data_access::{FileMeta, Result, SizedFile}; use futures::Stream; +use object_store::{path::Path, ObjectMeta}; use std::pin::Pin; pub use self::url::ListingTableUrl; @@ -51,7 +53,7 @@ pub struct FileRange { /// and partition column values that need to be appended to each row. pub struct PartitionedFile { /// Path for the file (e.g. URL, filesystem path, etc) - pub file_meta: FileMeta, + pub object_meta: ObjectMeta, /// Values of partition columns to be appended to each row pub partition_values: Vec, /// An optional file range for a more fine-grained parallel execution @@ -62,9 +64,10 @@ impl PartitionedFile { /// Create a simple file without metadata or partition pub fn new(path: String, size: u64) -> Self { Self { - file_meta: FileMeta { - sized_file: SizedFile { path, size }, - last_modified: None, + object_meta: ObjectMeta { + location: Path::from(path), + last_modified: chrono::Utc.timestamp_nanos(0), + size: size as usize, }, partition_values: vec![], range: None, @@ -74,9 +77,10 @@ impl PartitionedFile { /// Create a file range without metadata or partition pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self { Self { - file_meta: FileMeta { - sized_file: SizedFile { path, size }, - last_modified: None, + object_meta: ObjectMeta { + location: Path::from(path), + last_modified: chrono::Utc.timestamp_nanos(0), + size: size as usize, }, partition_values: vec![], range: Some(FileRange { start, end }), @@ -84,16 +88,10 @@ impl PartitionedFile { } } -impl std::fmt::Display for PartitionedFile { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.file_meta) - } -} - -impl From for PartitionedFile { - fn from(file_meta: FileMeta) -> Self { +impl From for PartitionedFile { + fn from(object_meta: ObjectMeta) -> Self { PartitionedFile { - file_meta, + object_meta, partition_values: vec![], range: None, } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 29cde3c9faab..57291c0beee7 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -109,7 +109,7 @@ impl ListingTableConfig { .await .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??; - let file_type = file.path().rsplit('.').next().ok_or_else(|| { + let file_type = file.location.as_ref().rsplit('.').next().ok_or_else(|| { DataFusionError::Internal("Unable to infer file suffix".into()) })?; @@ -369,7 +369,7 @@ impl ListingTable { let statistics = if self.options.collect_stat { self.options .format - .infer_stats(&store, self.file_schema.clone(), &part_file.file_meta) + .infer_stats(&store, self.file_schema.clone(), &part_file.object_meta) .await? } else { Statistics::default() diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 041a6ab7f045..08caa542a7a2 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -17,13 +17,12 @@ use crate::datasource::object_store::ObjectStoreUrl; use datafusion_common::{DataFusionError, Result}; -use datafusion_data_access::object_store::ObjectStore; -use datafusion_data_access::FileMeta; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use glob::Pattern; use itertools::Itertools; -use std::path::is_separator; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; use url::Url; /// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`] @@ -32,6 +31,8 @@ use url::Url; pub struct ListingTableUrl { /// A URL that identifies a file or directory to list files from url: Url, + /// The path prefix + prefix: Path, /// An optional glob expression used to filter files glob: Option, } @@ -79,7 +80,7 @@ impl ListingTableUrl { } match Url::parse(s) { - Ok(url) => Ok(Self { url, glob: None }), + Ok(url) => Ok(Self::new(url, None)), Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s), Err(e) => Err(DataFusionError::External(Box::new(e))), } @@ -102,7 +103,17 @@ impl ListingTableUrl { false => Url::from_directory_path(path).unwrap(), }; - Ok(Self { url, glob }) + Ok(Self::new(url, glob)) + } + + /// Creates a new [`ListingTableUrl`] from a url and optional glob expression + fn new(url: Url, glob: Option) -> Self { + // TODO: fix this upstream + let prefix = (url.path().len() > 1) + .then(|| Path::parse(url.path()).expect("should be URL safe")) + .unwrap_or_default(); + + Self { url, prefix, glob } } /// Returns the URL scheme @@ -110,38 +121,22 @@ impl ListingTableUrl { self.url.scheme() } - /// Returns the path as expected by [`ObjectStore`] - /// - /// In particular for file scheme URLs, this is an absolute - /// on the local filesystem in the OS-specific path representation - /// - /// For other URLs, this is a the host and path of the URL, - /// delimited by `/`, and with no leading `/` - /// - /// TODO: Handle paths consistently (#2489) - fn prefix(&self) -> &str { - match self.scheme() { - "file" => match cfg!(target_family = "windows") { - true => self.url.path().strip_prefix('/').unwrap(), - false => self.url.path(), - }, - _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath], - } - } - /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning /// an iterator of the remaining path segments - /// - /// TODO: Handle paths consistently (#2489) pub(crate) fn strip_prefix<'a, 'b: 'a>( &'a self, - path: &'b str, + path: &'b Path, ) -> Option + 'a> { - let prefix = self.prefix(); + use object_store::path::DELIMITER; + // TODO: Make object_store::path::Path::prefix_match public + + let path: &str = path.as_ref(); + let prefix: &str = self.prefix.as_ref(); + // Ignore empty path segments let diff = itertools::diff_with( - path.split(is_separator).filter(|s| !s.is_empty()), - prefix.split(is_separator).filter(|s| !s.is_empty()), + path.split(DELIMITER).filter(|s| !s.is_empty()), + prefix.split(DELIMITER).filter(|s| !s.is_empty()), |a, b| a == b, ); @@ -157,31 +152,34 @@ impl ListingTableUrl { &'a self, store: &'a dyn ObjectStore, file_extension: &'a str, - ) -> BoxStream<'a, Result> { - futures::stream::once(async move { - let prefix = self.prefix(); - store.list_file(prefix.as_ref()).await - }) - .try_flatten() - .map_err(DataFusionError::IoError) - .try_filter(move |meta| { - let path = meta.path(); - - let extension_match = path.ends_with(file_extension); - let glob_match = match &self.glob { - Some(glob) => match self.strip_prefix(path) { - Some(mut segments) => { - let stripped = segments.join("/"); - glob.matches(&stripped) - } - None => false, - }, - None => true, - }; - - futures::future::ready(extension_match && glob_match) - }) - .boxed() + ) -> BoxStream<'a, Result> { + // If the prefix is a file, use a head request, otherwise list + let is_dir = self.url.as_str().ends_with('/'); + let list = match is_dir { + true => futures::stream::once(store.list(Some(&self.prefix))) + .try_flatten() + .boxed(), + false => futures::stream::once(store.head(&self.prefix)).boxed(), + }; + + list.map_err(Into::into) + .try_filter(move |meta| { + let path = &meta.location; + let extension_match = path.as_ref().ends_with(file_extension); + let glob_match = match &self.glob { + Some(glob) => match self.strip_prefix(path) { + Some(mut segments) => { + let stripped = segments.join("/"); + glob.matches(&stripped) + } + None => false, + }, + None => true, + }; + + futures::future::ready(extension_match && glob_match) + }) + .boxed() } /// Returns this [`ListingTableUrl`] as a string @@ -250,7 +248,7 @@ mod tests { let root = root.to_string_lossy(); let url = ListingTableUrl::parse(&root).unwrap(); - let child = format!("{}/partition/file", root); + let child = Path::from(format!("{}/partition/file", root)); let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect(); assert_eq!(prefix, vec!["partition", "file"]); @@ -259,14 +257,14 @@ mod tests { #[test] fn test_prefix_s3() { let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap(); - assert_eq!(url.prefix(), "bucket/foo/bar"); + assert_eq!(url.prefix.as_ref(), "foo/bar"); - let path = "bucket/foo/bar/partition/foo.parquet"; - let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect(); + let path = Path::from("foo/bar/partition/foo.parquet"); + let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect(); assert_eq!(prefix, vec!["partition", "foo.parquet"]); - let path = "other-bucket/foo/bar/partition/foo.parquet"; - assert!(url.strip_prefix(path).is_none()); + let path = Path::from("other/bar/partition/foo.parquet"); + assert!(url.strip_prefix(&path).is_none()); } #[test] diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs index ac7e1a847075..a28a1d15beaf 100644 --- a/datafusion/core/src/datasource/object_store.rs +++ b/datafusion/core/src/datasource/object_store.rs @@ -20,8 +20,8 @@ //! and query data inside these systems. use datafusion_common::{DataFusionError, Result}; -use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME}; -use datafusion_data_access::object_store::ObjectStore; +use object_store::local::LocalFileSystem; +use object_store::ObjectStore; use parking_lot::RwLock; use std::collections::HashMap; use std::sync::Arc; @@ -109,7 +109,7 @@ impl ObjectStoreRegistry { /// ['LocalFileSystem'] store is registered in by default to support read local files natively. pub fn new() -> Self { let mut map: HashMap> = HashMap::new(); - map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem)); + map.insert("file".to_string(), Arc::new(LocalFileSystem::new())); Self { object_stores: RwLock::new(map), @@ -155,7 +155,6 @@ impl ObjectStoreRegistry { mod tests { use super::*; use crate::datasource::listing::ListingTableUrl; - use datafusion_data_access::object_store::local::LocalFileSystem; use std::sync::Arc; #[test] @@ -197,7 +196,7 @@ mod tests { #[test] fn test_get_by_url_s3() { let sut = ObjectStoreRegistry::default(); - sut.register_store("s3".to_string(), Arc::new(LocalFileSystem {})); + sut.register_store("s3".to_string(), Arc::new(LocalFileSystem::new())); let url = ListingTableUrl::parse("s3://bucket/key").unwrap(); sut.get_by_url(&url).unwrap(); } diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs index 2f134990fec7..35ca4b6a5d09 100644 --- a/datafusion/core/src/execution/runtime_env.rs +++ b/datafusion/core/src/execution/runtime_env.rs @@ -28,7 +28,7 @@ use crate::{ use crate::datasource::object_store::ObjectStoreRegistry; use datafusion_common::DataFusionError; -use datafusion_data_access::object_store::ObjectStore; +use object_store::ObjectStore; use std::fmt::{Debug, Formatter}; use std::path::PathBuf; use std::sync::Arc; diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index e8852fdc142a..d2ead0f6bfcf 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -228,7 +228,6 @@ pub use parquet; // re-export DataFusion crates pub use datafusion_common as common; -pub use datafusion_data_access; pub use datafusion_expr as logical_expr; pub use datafusion_physical_expr as physical_expr; pub use datafusion_sql as sql; diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index ae1efb2979c1..50df3f06502d 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -16,23 +16,17 @@ // under the License. //! Execution plan for reading line-delimited Avro files -#[cfg(feature = "avro")] -use crate::avro_to_arrow; use crate::error::Result; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; use arrow::datatypes::SchemaRef; -#[cfg(feature = "avro")] -use arrow::error::ArrowError; use crate::execution::context::TaskContext; use std::any::Any; use std::sync::Arc; -#[cfg(feature = "avro")] -use super::file_stream::{BatchIter, FileStream}; use super::FileScanConfig; /// Execution plan for scanning Avro data source @@ -109,39 +103,17 @@ impl ExecutionPlan for AvroExec { partition: usize, context: Arc, ) -> Result { - let proj = self.base_config.projected_file_column_names(); - - let batch_size = context.session_config().batch_size; - let file_schema = Arc::clone(&self.base_config.file_schema); - - // The avro reader cannot limit the number of records, so `remaining` is ignored. - let fun = move |file, _remaining: &Option| { - let reader_res = avro_to_arrow::Reader::try_new( - file, - Arc::clone(&file_schema), - batch_size, - proj.clone(), - ); - match reader_res { - Ok(r) => Box::new(r) as BatchIter, - Err(e) => Box::new( - vec![Err(ArrowError::ExternalError(Box::new(e)))].into_iter(), - ), - } - }; - - let object_store = context - .runtime_env() - .object_store(&self.base_config.object_store_url)?; - - Ok(Box::pin(FileStream::new( - object_store, - self.base_config.file_groups[partition].clone(), - fun, - Arc::clone(&self.projected_schema), - self.base_config.limit, - self.base_config.table_partition_cols.clone(), - ))) + use super::file_stream::FileStream; + + let config = Arc::new(avro::AvroConfig { + schema: Arc::clone(&self.base_config.file_schema), + batch_size: context.session_config().batch_size, + projection: self.base_config.projected_file_column_names(), + }); + let opener = avro::AvroOpener { config }; + + let stream = FileStream::new(&self.base_config, partition, context, opener)?; + Ok(Box::pin(stream)) } fn fmt_as( @@ -166,6 +138,66 @@ impl ExecutionPlan for AvroExec { } } +#[cfg(feature = "avro")] +mod avro { + use super::*; + use crate::datasource::listing::FileRange; + use crate::physical_plan::file_format::file_stream::{FormatReader, ReaderFuture}; + use crate::physical_plan::stream::RecordBatchStreamAdapter; + use bytes::Buf; + use futures::future::BoxFuture; + use futures::StreamExt; + use object_store::{GetResult, ObjectMeta, ObjectStore}; + + pub struct AvroConfig { + pub schema: SchemaRef, + pub batch_size: usize, + pub projection: Option>, + } + + impl AvroConfig { + fn open( + &self, + reader: R, + ) -> Result> { + crate::avro_to_arrow::Reader::try_new( + reader, + self.schema.clone(), + self.batch_size, + self.projection.clone(), + ) + } + } + + pub struct AvroOpener { + pub config: Arc, + } + + impl FormatReader for AvroOpener { + fn open( + &self, + store: Arc, + file: ObjectMeta, + _range: Option, + ) -> ReaderFuture { + let config = self.config.clone(); + Box::pin(async move { + match store.get(&file.location).await? { + GetResult::File(file, _) => { + let reader = config.open(file)?; + Ok(futures::stream::iter(reader).boxed()) + } + r @ GetResult::Stream(_) => { + let bytes = r.bytes().await?; + let reader = config.open(bytes.reader())?; + Ok(futures::stream::iter(reader).boxed()) + } + } + }) + } + } +} + #[cfg(test)] #[cfg(feature = "avro")] mod tests { @@ -174,11 +206,10 @@ mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::prelude::SessionContext; use crate::scalar::ScalarValue; + use crate::test::object_store::local_unpartitioned_file; use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_data_access::object_store::local::{ - local_unpartitioned_file, LocalFileSystem, - }; use futures::StreamExt; + use object_store::local::LocalFileSystem; use super::*; @@ -186,7 +217,7 @@ mod tests { async fn avro_exec_without_partition() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); - let store = Arc::new(LocalFileSystem {}) as _; + let store = Arc::new(LocalFileSystem::new()) as _; let meta = local_unpartitioned_file(filename); let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?; @@ -246,7 +277,7 @@ mod tests { async fn avro_exec_missing_column() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); - let object_store = Arc::new(LocalFileSystem {}) as _; + let object_store = Arc::new(LocalFileSystem::new()) as _; let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); let actual_schema = AvroFormat {} @@ -315,7 +346,7 @@ mod tests { async fn avro_exec_with_partition() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); - let object_store = Arc::new(LocalFileSystem {}) as _; + let object_store = Arc::new(LocalFileSystem::new()) as _; let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); let file_schema = AvroFormat {} diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 7dddb70e95a9..b503e9e32d87 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -24,16 +24,21 @@ use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; +use crate::datasource::listing::FileRange; +use crate::physical_plan::file_format::file_stream::{ + FileStream, FormatReader, ReaderFuture, +}; use arrow::csv; use arrow::datatypes::SchemaRef; +use bytes::Buf; use futures::{StreamExt, TryStreamExt}; +use object_store::{GetResult, ObjectMeta, ObjectStore}; use std::any::Any; use std::fs; use std::path::Path; use std::sync::Arc; use tokio::task::{self, JoinHandle}; -use super::file_stream::{BatchIter, FileStream}; use super::FileScanConfig; /// Execution plan for scanning a CSV file @@ -115,40 +120,17 @@ impl ExecutionPlan for CsvExec { partition: usize, context: Arc, ) -> Result { - let batch_size = context.session_config().batch_size; - let file_schema = Arc::clone(&self.base_config.file_schema); - let file_projection = self.base_config.file_column_projection_indices(); - let has_header = self.has_header; - let delimiter = self.delimiter; - let start_line = if has_header { 1 } else { 0 }; - - let fun = move |file, remaining: &Option| { - let bounds = remaining.map(|x| (0, x + start_line)); - let datetime_format = None; - Box::new(csv::Reader::new( - file, - Arc::clone(&file_schema), - has_header, - Some(delimiter), - batch_size, - bounds, - file_projection.clone(), - datetime_format, - )) as BatchIter - }; - - let object_store = context - .runtime_env() - .object_store(&self.base_config.object_store_url)?; - - Ok(Box::pin(FileStream::new( - object_store, - self.base_config.file_groups[partition].clone(), - fun, - Arc::clone(&self.projected_schema), - self.base_config.limit, - self.base_config.table_partition_cols.clone(), - ))) + let config = Arc::new(CsvConfig { + batch_size: context.session_config().batch_size, + file_schema: Arc::clone(&self.base_config.file_schema), + file_projection: self.base_config.file_column_projection_indices(), + has_header: self.has_header, + delimiter: self.delimiter, + }); + + let opener = CsvOpener { config }; + let stream = FileStream::new(&self.base_config, partition, context, opener)?; + Ok(Box::pin(stream) as SendableRecordBatchStream) } fn fmt_as( @@ -175,6 +157,57 @@ impl ExecutionPlan for CsvExec { } } +#[derive(Debug, Clone)] +struct CsvConfig { + batch_size: usize, + file_schema: SchemaRef, + file_projection: Option>, + has_header: bool, + delimiter: u8, +} + +impl CsvConfig { + fn open(&self, reader: R) -> csv::Reader { + let datetime_format = None; + csv::Reader::new( + reader, + Arc::clone(&self.file_schema), + self.has_header, + Some(self.delimiter), + self.batch_size, + None, + self.file_projection.clone(), + datetime_format, + ) + } +} + +struct CsvOpener { + config: Arc, +} + +impl FormatReader for CsvOpener { + fn open( + &self, + store: Arc, + file: ObjectMeta, + _range: Option, + ) -> ReaderFuture { + let config = self.config.clone(); + Box::pin(async move { + match store.get(&file.location).await? { + GetResult::File(file, _) => { + Ok(futures::stream::iter(config.open(file)).boxed()) + } + r @ GetResult::Stream(_) => { + let bytes = r.bytes().await?; + Ok(futures::stream::iter(config.open(bytes.reader())).boxed()) + } + } + }) + } +} + pub async fn plan_to_csv( state: &SessionState, plan: Arc, diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index 97a95b568a61..904a880bae52 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -21,51 +21,42 @@ //! Note: Most traits here need to be marked `Sync + Send` to be //! compliant with the `SendableRecordBatchStream` trait. -use crate::datasource::listing::PartitionedFile; -use crate::{physical_plan::RecordBatchStream, scalar::ScalarValue}; -use arrow::{ - datatypes::SchemaRef, - error::{ArrowError, Result as ArrowResult}, - record_batch::RecordBatch, -}; -use datafusion_data_access::object_store::ObjectStore; -use futures::Stream; -use std::{ - io::Read, - iter, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -use super::PartitionColumnProjector; - -pub type FileIter = Box + Send + Sync>; -pub type BatchIter = Box> + Send + Sync>; - -/// A closure that creates a file format reader (iterator over `RecordBatch`) from a `Read` object -/// and an optional number of required records. -pub trait FormatReaderOpener: - FnMut(Box, &Option) -> BatchIter + Send + Unpin + 'static -{ -} +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow::datatypes::SchemaRef; +use arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; +use futures::future::BoxFuture; +use futures::stream::BoxStream; +use futures::{ready, FutureExt, Stream, StreamExt}; +use object_store::{ObjectMeta, ObjectStore}; + +use datafusion_common::ScalarValue; + +use crate::datasource::listing::{FileRange, PartitionedFile}; +use crate::error::Result; +use crate::execution::context::TaskContext; +use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector}; +use crate::physical_plan::RecordBatchStream; + +pub type ReaderFuture = + BoxFuture<'static, Result>>>; -impl FormatReaderOpener for T where - T: FnMut(Box, &Option) -> BatchIter - + Send - + Unpin - + 'static -{ +pub trait FormatReader: Unpin { + fn open( + &self, + store: Arc, + file: ObjectMeta, + range: Option, + ) -> ReaderFuture; } /// A stream that iterates record batch by record batch, file over file. -pub struct FileStream { - /// An iterator over record batches of the last file returned by file_iter - batch_iter: BatchIter, - /// Partitioning column values for the current batch_iter - partition_values: Vec, +pub struct FileStream { /// An iterator over input files. - file_iter: FileIter, + file_iter: VecDeque, /// The stream schema (file schema including partition columns and after /// projection). projected_schema: SchemaRef, @@ -80,104 +71,146 @@ pub struct FileStream { pc_projector: PartitionColumnProjector, /// the store from which to source the files. object_store: Arc, + /// The stream state + state: FileStreamState, } -impl FileStream { +enum FileStreamState { + Idle, + Open { + future: ReaderFuture, + partition_values: Vec, + }, + Scan { + /// Partitioning column values for the current batch_iter + partition_values: Vec, + /// The reader instance + reader: BoxStream<'static, ArrowResult>, + }, + Error, + Limit, +} + +impl FileStream { pub fn new( - object_store: Arc, - files: Vec, + config: &FileScanConfig, + partition: usize, + context: Arc, file_reader: F, - projected_schema: SchemaRef, - limit: Option, - table_partition_cols: Vec, - ) -> Self { + ) -> Result { + let (projected_schema, _) = config.project(); let pc_projector = PartitionColumnProjector::new( - Arc::clone(&projected_schema), - &table_partition_cols, + projected_schema.clone(), + &config.table_partition_cols, ); - Self { - file_iter: Box::new(files.into_iter()), - batch_iter: Box::new(iter::empty()), - partition_values: vec![], - remain: limit, + let files = config.file_groups[partition].clone(); + + let object_store = context + .runtime_env() + .object_store(&config.object_store_url)?; + + Ok(Self { + file_iter: files.into(), projected_schema, + remain: config.limit, file_reader, pc_projector, object_store, - } + state: FileStreamState::Idle, + }) } - /// Acts as a flat_map of record batches over files. Adds the partitioning - /// Columns to the returned record batches. - fn next_batch(&mut self) -> Option> { - match self.batch_iter.next() { - Some(Ok(batch)) => { - Some(self.pc_projector.project(batch, &self.partition_values)) - } - Some(Err(e)) => Some(Err(e)), - None => match self.file_iter.next() { - Some(f) => { - self.partition_values = f.partition_values; - self.object_store - .file_reader(f.file_meta.sized_file) - .and_then(|r| r.sync_reader()) - .map_err(|e| ArrowError::ExternalError(Box::new(e))) - .and_then(|f| { - self.batch_iter = (self.file_reader)(f, &self.remain); - self.next_batch().transpose() - }) - .transpose() + fn poll_inner( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + loop { + match &mut self.state { + FileStreamState::Idle => { + let file = match self.file_iter.pop_front() { + Some(file) => file, + None => return Poll::Ready(None), + }; + + let future = self.file_reader.open( + self.object_store.clone(), + file.object_meta, + file.range, + ); + + self.state = FileStreamState::Open { + future, + partition_values: file.partition_values, + } } - None => None, - }, + FileStreamState::Open { + future, + partition_values, + } => match ready!(future.poll_unpin(cx)) { + Ok(reader) => { + self.state = FileStreamState::Scan { + partition_values: std::mem::take(partition_values), + reader, + }; + } + Err(e) => { + self.state = FileStreamState::Error; + return Poll::Ready(Some(Err(e.into()))); + } + }, + FileStreamState::Scan { + reader, + partition_values, + } => match ready!(reader.poll_next_unpin(cx)) { + Some(result) => { + let result = result + .and_then(|b| self.pc_projector.project(b, partition_values)) + .map(|batch| match &mut self.remain { + Some(remain) => { + if *remain > batch.num_rows() { + *remain -= batch.num_rows(); + batch + } else { + let batch = batch.slice(0, *remain); + self.state = FileStreamState::Limit; + *remain = 0; + batch + } + } + None => batch, + }); + + if result.is_err() { + self.state = FileStreamState::Error + } + + return Poll::Ready(Some(result)); + } + None => self.state = FileStreamState::Idle, + }, + FileStreamState::Error | FileStreamState::Limit => { + return Poll::Ready(None) + } + } } } } -impl Stream for FileStream { +impl Stream for FileStream { type Item = ArrowResult; fn poll_next( mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, + cx: &mut Context<'_>, ) -> Poll> { - // check if finished or no limit - match self.remain { - Some(r) if r == 0 => return Poll::Ready(None), - None => return Poll::Ready(self.get_mut().next_batch()), - Some(r) => r, - }; - - Poll::Ready(match self.as_mut().next_batch() { - Some(Ok(item)) => { - if let Some(remain) = self.remain.as_mut() { - if *remain >= item.num_rows() { - *remain -= item.num_rows(); - Some(Ok(item)) - } else { - let len = *remain; - *remain = 0; - Some(Ok(RecordBatch::try_new( - item.schema(), - item.columns() - .iter() - .map(|column| column.slice(0, len)) - .collect(), - )?)) - } - } else { - Some(Ok(item)) - } - } - other => other, - }) + self.poll_inner(cx) } } -impl RecordBatchStream for FileStream { +impl RecordBatchStream for FileStream { fn schema(&self) -> SchemaRef { - Arc::clone(&self.projected_schema) + self.projected_schema.clone() } } @@ -186,33 +219,54 @@ mod tests { use futures::StreamExt; use super::*; + use crate::datasource::object_store::ObjectStoreUrl; + use crate::prelude::SessionContext; use crate::{ error::Result, - test::{make_partition, object_store::TestObjectStore}, + test::{make_partition, object_store::register_test_store}, }; + struct TestOpener { + records: Vec, + } + + impl FormatReader for TestOpener { + fn open( + &self, + _store: Arc, + _file: ObjectMeta, + _range: Option, + ) -> ReaderFuture { + let iterator = self.records.clone().into_iter().map(Ok); + let stream = futures::stream::iter(iterator).boxed(); + futures::future::ready(Ok(stream)).boxed() + } + } + /// helper that creates a stream of 2 files with the same pair of batches in each ([0,1,2] and [0,1]) async fn create_and_collect(limit: Option) -> Vec { let records = vec![make_partition(3), make_partition(2)]; + let file_schema = records[0].schema(); - let source_schema = records[0].schema(); + let reader = TestOpener { records }; - let reader = move |_file, _remain: &Option| { - // this reader returns the same batch regardless of the file - Box::new(records.clone().into_iter().map(Ok)) as BatchIter - }; + let ctx = SessionContext::new(); + register_test_store(&ctx, &[("mock_file1", 10), ("mock_file2", 20)]); - let file_stream = FileStream::new( - TestObjectStore::new_arc(&[("mock_file1", 10), ("mock_file2", 20)]), - vec![ + let config = FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema, + file_groups: vec![vec![ PartitionedFile::new("mock_file1".to_owned(), 10), PartitionedFile::new("mock_file2".to_owned(), 20), - ], - reader, - source_schema, + ]], + statistics: Default::default(), + projection: None, limit, - vec![], - ); + table_partition_cols: vec![], + }; + + let file_stream = FileStream::new(&config, 0, ctx.task_ctx(), reader).unwrap(); file_stream .map(|b| b.expect("No error expected in stream")) diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 397fee5fe234..ca55f4266258 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -18,22 +18,27 @@ //! Execution plan for reading line-delimited JSON files use arrow::json::reader::DecoderOptions; +use crate::datasource::listing::FileRange; use crate::error::{DataFusionError, Result}; use crate::execution::context::SessionState; use crate::execution::context::TaskContext; use crate::physical_plan::expressions::PhysicalSortExpr; +use crate::physical_plan::file_format::file_stream::{ + FileStream, FormatReader, ReaderFuture, +}; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; use arrow::{datatypes::SchemaRef, json}; +use bytes::Buf; use futures::{StreamExt, TryStreamExt}; +use object_store::{GetResult, ObjectMeta, ObjectStore}; use std::any::Any; use std::fs; use std::path::Path; use std::sync::Arc; use tokio::task::{self, JoinHandle}; -use super::file_stream::{BatchIter, FileStream}; use super::FileScanConfig; /// Execution plan for scanning NdJson data source @@ -99,35 +104,21 @@ impl ExecutionPlan for NdJsonExec { let batch_size = context.session_config().batch_size; let file_schema = Arc::clone(&self.base_config.file_schema); - // The json reader cannot limit the number of records, so `remaining` is ignored. - let fun = move |file, _remaining: &Option| { - // TODO: make DecoderOptions implement Clone so we can - // clone here rather than recreating the options each time - // https://github.com/apache/arrow-rs/issues/1580 - let options = DecoderOptions::new().with_batch_size(batch_size); - - let options = if let Some(proj) = proj.clone() { - options.with_projection(proj) - } else { - options - }; - - Box::new(json::Reader::new(file, Arc::clone(&file_schema), options)) - as BatchIter + let options = DecoderOptions::new().with_batch_size(batch_size); + let options = if let Some(proj) = proj.clone() { + options.with_projection(proj) + } else { + options + }; + + let opener = JsonOpener { + file_schema, + options, }; - let object_store = context - .runtime_env() - .object_store(&self.base_config.object_store_url)?; - - Ok(Box::pin(FileStream::new( - object_store, - self.base_config.file_groups[partition].clone(), - fun, - Arc::clone(&self.projected_schema), - self.base_config.limit, - self.base_config.table_partition_cols.clone(), - ))) + let stream = FileStream::new(&self.base_config, partition, context, opener)?; + + Ok(Box::pin(stream) as SendableRecordBatchStream) } fn fmt_as( @@ -152,6 +143,38 @@ impl ExecutionPlan for NdJsonExec { } } +struct JsonOpener { + options: DecoderOptions, + file_schema: SchemaRef, +} + +impl FormatReader for JsonOpener { + fn open( + &self, + store: Arc, + file: ObjectMeta, + _range: Option, + ) -> ReaderFuture { + let options = self.options.clone(); + let schema = self.file_schema.clone(); + Box::pin(async move { + match store.get(&file.location).await? { + GetResult::File(file, _) => { + let reader = json::Reader::new(file, schema.clone(), options); + Ok(futures::stream::iter(reader).boxed()) + } + r @ GetResult::Stream(_) => { + let bytes = r.bytes().await?; + let reader = + json::Reader::new(bytes.reader(), schema.clone(), options); + + Ok(futures::stream::iter(reader).boxed()) + } + } + }) + } +} + pub async fn plan_to_json( state: &SessionState, plan: Arc, @@ -201,7 +224,7 @@ mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::prelude::NdJsonReadOptions; use crate::prelude::*; - use datafusion_data_access::object_store::local::local_unpartitioned_file; + use crate::test::object_store::local_unpartitioned_file; use tempfile::TempDir; use super::*; diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 86015c472463..e59e8248f8ea 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -163,7 +163,7 @@ impl<'a> Display for FileGroupsDisplay<'a> { .iter() .map(|pp| { pp.iter() - .map(|pf| pf.file_meta.path()) + .map(|pf| pf.object_meta.location.as_ref()) .collect::>() .join(", ") }) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index b1e629be5357..39d2e8d61ef7 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -18,28 +18,22 @@ //! Execution plan for reading Parquet files use fmt::Debug; -use std::collections::VecDeque; use std::fmt; use std::fs; -use std::path::Path; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; use std::{any::Any, convert::TryInto}; use arrow::{ array::ArrayRef, datatypes::{Schema, SchemaRef}, - error::{ArrowError, Result as ArrowResult}, - record_batch::RecordBatch, + error::ArrowError, }; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use log::debug; -use parquet::arrow::{ - arrow_reader::ParquetRecordBatchReader, ArrowReader, ArrowWriter, - ParquetFileArrowReader, ProjectionMask, -}; +use object_store::{GetResult, ObjectMeta, ObjectStore}; +use parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader, ProjectionMask}; use parquet::file::reader::FileReader; +use parquet::file::serialized_reader::SliceableCursor; use parquet::file::{ metadata::RowGroupMetaData, properties::WriterProperties, reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder, @@ -47,13 +41,13 @@ use parquet::file::{ }; use datafusion_common::Column; -use datafusion_data_access::object_store::ObjectStore; use datafusion_expr::Expr; -use crate::physical_plan::metrics::BaselineMetrics; -use crate::physical_plan::stream::RecordBatchReceiverStream; +use crate::datasource::listing::FileRange; +use crate::physical_plan::file_format::file_stream::{ + FileStream, FormatReader, ReaderFuture, +}; use crate::{ - datasource::{file_format::parquet::ChunkObjectReader, listing::PartitionedFile}, error::{DataFusionError, Result}, execution::context::{SessionState, TaskContext}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, @@ -61,14 +55,12 @@ use crate::{ expressions::PhysicalSortExpr, file_format::{FileScanConfig, SchemaAdapter}, metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, - DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, Statistics, + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + Statistics, }, scalar::ScalarValue, }; -use super::PartitionColumnProjector; - /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { @@ -210,52 +202,20 @@ impl ExecutionPlan for ParquetExec { Some(proj) => proj, None => (0..self.base_config.file_schema.fields().len()).collect(), }; - let partition_col_proj = PartitionColumnProjector::new( - Arc::clone(&self.projected_schema), - &self.base_config.table_partition_cols, - ); - let object_store = context - .runtime_env() - .object_store(&self.base_config.object_store_url)?; - - let stream = ParquetExecStream { - error: false, + let opener = ParquetOpener { partition_index, - metrics: self.metrics.clone(), - object_store, - pruning_predicate: self.pruning_predicate.clone(), + projection: Arc::from(projection), batch_size: context.session_config().batch_size, - schema: self.projected_schema.clone(), - projection, - remaining_rows: self.base_config.limit, - reader: None, - files: self.base_config.file_groups[partition_index].clone().into(), - projector: partition_col_proj, - adapter: SchemaAdapter::new(self.base_config.file_schema.clone()), - baseline_metrics: BaselineMetrics::new(&self.metrics, partition_index), + pruning_predicate: self.pruning_predicate.clone(), + table_schema: self.base_config.file_schema.clone(), + metrics: self.metrics.clone(), }; - // Use spawn_blocking only if running from a tokio context (#2201) - match tokio::runtime::Handle::try_current() { - Ok(handle) => { - let (response_tx, response_rx) = tokio::sync::mpsc::channel(2); - let schema = stream.schema(); - let join_handle = handle.spawn_blocking(move || { - for result in stream { - if response_tx.blocking_send(result).is_err() { - break; - } - } - }); - Ok(RecordBatchReceiverStream::create( - &schema, - response_rx, - join_handle, - )) - } - Err(_) => Ok(Box::pin(stream)), - } + let stream = + FileStream::new(&self.base_config, partition_index, context, opener)?; + + Ok(Box::pin(stream)) } fn fmt_as( @@ -296,164 +256,101 @@ impl ExecutionPlan for ParquetExec { } } -/// Implements [`RecordBatchStream`] for a collection of [`PartitionedFile`] -/// -/// NB: This will perform blocking IO synchronously without yielding which may -/// be problematic in certain contexts (e.g. a tokio runtime that also performs -/// network IO) -struct ParquetExecStream { - error: bool, +struct ParquetOpener { partition_index: usize, - metrics: ExecutionPlanMetricsSet, - object_store: Arc, - pruning_predicate: Option, + projection: Arc<[usize]>, batch_size: usize, - schema: SchemaRef, - projection: Vec, - remaining_rows: Option, - reader: Option<(ParquetRecordBatchReader, PartitionedFile)>, - files: VecDeque, - projector: PartitionColumnProjector, - adapter: SchemaAdapter, - baseline_metrics: BaselineMetrics, + pruning_predicate: Option, + table_schema: SchemaRef, + metrics: ExecutionPlanMetricsSet, } -impl ParquetExecStream { - fn create_reader( - &mut self, - file: &PartitionedFile, - ) -> Result { +impl FormatReader for ParquetOpener { + fn open( + &self, + store: Arc, + meta: ObjectMeta, + range: Option, + ) -> ReaderFuture { let file_metrics = ParquetFileMetrics::new( self.partition_index, - file.file_meta.path(), + meta.location.as_ref(), &self.metrics, ); - let bytes_scanned = file_metrics.bytes_scanned.clone(); - let object_reader = self - .object_store - .file_reader(file.file_meta.sized_file.clone())?; - - let mut opt = ReadOptionsBuilder::new(); - if let Some(pruning_predicate) = &self.pruning_predicate { - opt = opt.with_predicate(build_row_group_predicate( - pruning_predicate, - file_metrics, - )); - } - if let Some(range) = &file.range { - assert!( - range.start >= 0 && range.end > 0 && range.end > range.start, - "invalid range specified: {:?}", - range - ); - opt = opt.with_range(range.start, range.end); - } - - let file_reader = SerializedFileReader::new_with_options( - ChunkObjectReader { - object_reader, - bytes_scanned: Some(bytes_scanned), - }, - opt.build(), - )?; - - let file_metadata = file_reader.metadata().file_metadata(); - let parquet_schema = file_metadata.schema_descr_ptr(); - - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); - let arrow_schema = arrow_reader.get_schema()?; - - let adapted_projections = self - .adapter - .map_projections(&arrow_schema, &self.projection)?; - - let mask = ProjectionMask::roots(&parquet_schema, adapted_projections); - let reader = arrow_reader.get_record_reader_by_columns(mask, self.batch_size)?; - Ok(reader) - } -} + file_metrics.bytes_scanned.add(meta.size); -impl Iterator for ParquetExecStream { - type Item = ArrowResult; + let schema_adapter = SchemaAdapter::new(self.table_schema.clone()); + let batch_size = self.batch_size; + let projection = self.projection.clone(); + let pruning_predicate = self.pruning_predicate.clone(); - fn next(&mut self) -> Option { - let cloned_time = self.baseline_metrics.elapsed_compute().clone(); - // records time on drop - let _timer = cloned_time.timer(); - - if self.error || matches!(self.remaining_rows, Some(0)) { - return None; - } + let build_opts = move || { + let mut opt = ReadOptionsBuilder::new(); + if let Some(pruning_predicate) = pruning_predicate { + opt = opt.with_predicate(build_row_group_predicate( + pruning_predicate, + file_metrics, + )); + } - // TODO: Split this out into separate operators (#2079) - loop { - let (reader, file) = match self.reader.as_mut() { - Some(current) => current, - None => match self.files.pop_front() { - None => return None, - Some(file) => match self.create_reader(&file) { - Ok(reader) => self.reader.insert((reader, file)), - Err(e) => { - self.error = true; - return Some(Err(ArrowError::ExternalError(Box::new(e)))); - } - }, - }, - }; + if let Some(range) = &range { + opt = opt.with_range(range.start, range.end); + } - let result = reader.next().map(|result| { - result - .and_then(|batch| { - self.adapter - .adapt_batch(batch, &self.projection) - .map_err(|e| ArrowError::ExternalError(Box::new(e))) - }) - .and_then(|batch| { - self.projector.project(batch, &file.partition_values) - }) - }); + opt.build() + }; - let result = match result { - Some(result) => result, - None => { - self.reader = None; - continue; + Box::pin(async move { + let get_result = store.get(&meta.location).await?; + + let (mut arrow_reader, parquet_schema) = match get_result { + GetResult::File(file, _) => { + let reader = + SerializedFileReader::new_with_options(file, build_opts())?; + let parquet_schema = + reader.metadata().file_metadata().schema_descr_ptr(); + let arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); + (arrow_reader, parquet_schema) } - }; - - match (&result, self.remaining_rows.as_mut()) { - (Ok(batch), Some(remaining_rows)) => { - *remaining_rows = remaining_rows.saturating_sub(batch.num_rows()); + r @ GetResult::Stream(_) => { + // TODO: Projection pushdown + let data = r.bytes().await?; + let cursor = SliceableCursor::new(data.to_vec()); + let reader = + SerializedFileReader::new_with_options(cursor, build_opts())?; + let parquet_schema = + reader.metadata().file_metadata().schema_descr_ptr(); + + let arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); + + (arrow_reader, parquet_schema) } - _ => self.error = result.is_err(), - } + }; - //record output rows in parquetExec - if let Ok(batch) = &result { - self.baseline_metrics.record_output(batch.num_rows()); - } + let file_schema = arrow_reader + .get_schema() + .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; - return Some(result); - } - } -} - -impl Stream for ParquetExecStream { - type Item = ArrowResult; + let adapted_projections = + schema_adapter.map_projections(&file_schema, &projection)?; - fn poll_next( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { - let poll = Poll::Ready(Iterator::next(&mut *self)); - self.baseline_metrics.record_poll(poll) - } -} + let mask = ProjectionMask::roots( + &parquet_schema, + adapted_projections.iter().cloned(), + ); + let reader = arrow_reader.get_record_reader_by_columns(mask, batch_size)?; + + let adapted = reader.map(move |maybe_batch| { + maybe_batch.and_then(|b| { + schema_adapter + .adapt_batch(b, &projection) + .map_err(Into::into) + }) + }); -impl RecordBatchStream for ParquetExecStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() + Ok(futures::stream::iter(adapted).boxed()) + }) } } @@ -557,7 +454,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } fn build_row_group_predicate( - pruning_predicate: &PruningPredicate, + pruning_predicate: PruningPredicate, metrics: ParquetFileMetrics, ) -> Box bool> { let pruning_predicate = pruning_predicate.clone(); @@ -597,7 +494,7 @@ pub async fn plan_to_parquet( ) -> Result<()> { let path = path.as_ref(); // create directory to contain the Parquet files (one per partition) - let fs_path = Path::new(path); + let fs_path = std::path::Path::new(path); match fs::create_dir(fs_path) { Ok(()) => { let mut tasks = vec![]; @@ -638,9 +535,6 @@ pub async fn plan_to_parquet( mod tests { use crate::{ assert_batches_sorted_eq, assert_contains, - datafusion_data_access::{ - object_store::local::LocalFileSystem, FileMeta, SizedFile, - }, datasource::file_format::{parquet::ParquetFormat, FileFormat}, physical_plan::collect, }; @@ -648,18 +542,23 @@ mod tests { use super::*; use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::datasource::file_format::test_util::scan_format; - use crate::datasource::listing::FileRange; + use crate::datasource::listing::{FileRange, PartitionedFile}; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::options::CsvReadOptions; use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; + use crate::test::object_store::local_unpartitioned_file; use arrow::array::Float32Array; + use arrow::record_batch::RecordBatch; use arrow::{ array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field}, }; - use datafusion_data_access::object_store::local::local_unpartitioned_file; + use chrono::{TimeZone, Utc}; use datafusion_expr::{col, lit}; use futures::StreamExt; + use object_store::local::LocalFileSystem; + use object_store::path::Path; + use object_store::ObjectMeta; use parquet::{ basic::Type as PhysicalType, file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, @@ -1060,9 +959,9 @@ mod tests { #[tokio::test] async fn parquet_exec_with_range() -> Result<()> { - fn file_range(meta: &FileMeta, start: i64, end: i64) -> PartitionedFile { + fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile { PartitionedFile { - file_meta: meta.clone(), + object_meta: meta.clone(), partition_values: vec![], range: Some(FileRange { start, end }), } @@ -1105,7 +1004,7 @@ mod tests { let meta = local_unpartitioned_file(filename); - let store = Arc::new(LocalFileSystem {}) as _; + let store = Arc::new(LocalFileSystem::new()) as _; let file_schema = ParquetFormat::default() .infer_schema(&store, &[meta.clone()]) .await?; @@ -1159,7 +1058,7 @@ mod tests { .unwrap(); let partitioned_file = PartitionedFile { - file_meta: meta, + object_meta: meta, partition_values: vec![ ScalarValue::Utf8(Some("2021".to_owned())), ScalarValue::Utf8(Some("10".to_owned())), @@ -1216,12 +1115,10 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let partitioned_file = PartitionedFile { - file_meta: FileMeta { - sized_file: SizedFile { - size: 1337, - path: "invalid".into(), - }, - last_modified: None, + object_meta: ObjectMeta { + location: Path::from("invalid"), + last_modified: Utc.timestamp_nanos(0), + size: 1337, }, partition_values: vec![], range: None, @@ -1245,7 +1142,7 @@ mod tests { // invalid file should produce an error to that effect assert_contains!( batch.unwrap_err().to_string(), - "External error: Parquet error: Arrow: IO error" + "Object Store error: Object at location /invalid not found: No such file or directory" ); assert!(results.next().await.is_none()); @@ -1276,7 +1173,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let mut row_group_predicate = - build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); + build_row_group_predicate(pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1306,7 +1203,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let mut row_group_predicate = - build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); + build_row_group_predicate(pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1351,7 +1248,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let mut row_group_predicate = - build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); + build_row_group_predicate(pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1366,7 +1263,7 @@ mod tests { let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); let pruning_predicate = PruningPredicate::try_new(expr, schema)?; let mut row_group_predicate = - build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); + build_row_group_predicate(pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1412,7 +1309,7 @@ mod tests { let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); let mut row_group_predicate = - build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); + build_row_group_predicate(pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1441,7 +1338,7 @@ mod tests { let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); let mut row_group_predicate = - build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); + build_row_group_predicate(pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 2a89ea0df92d..8f0f977f742c 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -313,6 +313,7 @@ pub fn with_new_children_if_necessary( /// /// let working_directory = std::env::current_dir().unwrap(); /// let normalized = working_directory.to_string_lossy().replace(is_separator, "/"); +/// let normalized = normalized.strip_prefix("/").unwrap(); /// let plan_string = plan_string.replace(&normalized, "WORKING_DIR"); /// /// assert_eq!("ProjectionExec: expr=[a@0 as a]\ diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index dd00a50286e0..24d8fab72b68 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -24,12 +24,12 @@ use crate::error::Result; use crate::from_slice::FromSlice; use crate::logical_plan::LogicalPlan; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; +use crate::test::object_store::local_unpartitioned_file; use crate::test_util::{aggr_test_schema, scan_empty}; use array::{Array, ArrayRef}; use arrow::array::{self, DecimalBuilder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion_data_access::object_store::local::local_unpartitioned_file; use futures::{Future, FutureExt}; use std::fs::File; use std::io::prelude::*; @@ -109,9 +109,7 @@ pub fn partitioned_csv_config( files .into_iter() - .map( - |f| vec![local_unpartitioned_file(f.to_str().unwrap().to_owned()).into()], - ) + .map(|f| vec![local_unpartitioned_file(f).into()]) .collect::>() } else { vec![vec![local_unpartitioned_file(path).into()]] diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs index fdd05334670f..dea0e8324c2d 100644 --- a/datafusion/core/src/test/object_store.rs +++ b/datafusion/core/src/test/object_store.rs @@ -14,113 +14,49 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -//! Object store implem used for testing - -use std::{ - io, - io::{Cursor, Read}, - sync::Arc, -}; - -use crate::datafusion_data_access::{ - object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore}, - FileMeta, Result, SizedFile, -}; +//! Object store implementation used for testing use crate::prelude::SessionContext; -use async_trait::async_trait; -use futures::{stream, AsyncRead, StreamExt}; +use futures::FutureExt; +use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore}; +use std::sync::Arc; +use url::Url; /// Returns a test object store with the provided `ctx` -pub(crate) fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) { +pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) { ctx.runtime_env() - .register_object_store("test", TestObjectStore::new_arc(files)); -} - -#[derive(Debug)] -/// An object store implem that is useful for testing. -/// `ObjectReader`s are filled with zero bytes. -pub struct TestObjectStore { - /// The `(path,size)` of the files that "exist" in the store - files: Vec<(String, u64)>, + .register_object_store("test", make_test_store(files)); } -impl TestObjectStore { - pub fn new_arc(files: &[(&str, u64)]) -> Arc { - Arc::new(Self { - files: files.iter().map(|f| (f.0.to_owned(), f.1)).collect(), - }) - } -} +/// Create a test object store with the provided files +pub fn make_test_store(files: &[(&str, u64)]) -> Arc { + let memory = InMemory::new(); -#[async_trait] -impl ObjectStore for TestObjectStore { - async fn list_file(&self, prefix: &str) -> Result { - let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string(); - Ok(Box::pin( - stream::iter( - self.files - .clone() - .into_iter() - .filter(move |f| f.0.starts_with(&prefix)), - ) - .map(|f| { - Ok(FileMeta { - sized_file: SizedFile { - path: f.0.clone(), - size: f.1, - }, - last_modified: None, - }) - }), - )) + for (name, size) in files { + memory + .put(&Path::from(*name), vec![0; *size as usize].into()) + .now_or_never() + .unwrap() + .unwrap(); } - async fn list_dir( - &self, - _prefix: &str, - _delimiter: Option, - ) -> Result { - unimplemented!() - } - - fn file_reader(&self, file: SizedFile) -> Result> { - match self.files.iter().find(|item| file.path == item.0) { - Some((_, size)) if *size == file.size => { - Ok(Arc::new(EmptyObjectReader(*size))) - } - Some(_) => Err(io::Error::new( - io::ErrorKind::NotFound, - "found in test list but wrong size", - )), - None => Err(io::Error::new( - io::ErrorKind::NotFound, - "not in provided test list", - )), - } - } + Arc::new(memory) } -struct EmptyObjectReader(u64); +/// Helper method to fetch the file size and date at given path and create a `ObjectMeta` +pub fn local_unpartitioned_file(path: impl AsRef) -> ObjectMeta { + // TODO: Move this logic into object_store -#[async_trait] -impl ObjectReader for EmptyObjectReader { - async fn chunk_reader( - &self, - _start: u64, - _length: usize, - ) -> Result> { - unimplemented!() - } + // Convert to absolute path + let canonical = std::fs::canonicalize(path.as_ref()).unwrap(); - fn sync_chunk_reader( - &self, - _start: u64, - _length: usize, - ) -> Result> { - Ok(Box::new(Cursor::new(vec![0; self.0 as usize]))) - } + // Convert to URL-safe path + let url = Url::from_file_path(canonical).unwrap(); + let location = Path::parse(url.path()).unwrap(); - fn length(&self) -> u64 { - self.0 + let metadata = std::fs::metadata(path).expect("Local file metadata"); + ObjectMeta { + location, + last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), + size: metadata.len() as usize, } } diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index e3da9d986e0d..4fa1a8898b60 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -17,19 +17,16 @@ //! Test queries on partitioned datasets -use std::{fs, io, sync::Arc}; +use std::fs::File; +use std::ops::Range; +use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; +use chrono::{TimeZone, Utc}; use datafusion::datasource::listing::ListingTableUrl; use datafusion::{ assert_batches_sorted_eq, - datafusion_data_access::{ - object_store::{ - local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader, - ObjectStore, - }, - FileMeta, SizedFile, - }, datasource::{ file_format::{csv::CsvFormat, parquet::ParquetFormat}, listing::{ListingOptions, ListingTable, ListingTableConfig}, @@ -40,7 +37,9 @@ use datafusion::{ test_util::{self, arrow_test_data, parquet_test_data}, }; use datafusion_common::ScalarValue; +use futures::stream::BoxStream; use futures::{stream, StreamExt}; +use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore}; #[tokio::test] async fn parquet_distinct_partition_col() -> Result<()> { @@ -183,7 +182,7 @@ async fn csv_filter_with_file_col() -> Result<()> { "mytable/date=2021-10-28/file.csv", ], &["date"], - "mirror:///mytable", + "mirror:///mytable/", ); let result = ctx @@ -219,7 +218,7 @@ async fn csv_projection_on_partition() -> Result<()> { "mytable/date=2021-10-28/file.csv", ], &["date"], - "mirror:///mytable", + "mirror:///mytable/", ); let result = ctx @@ -256,7 +255,7 @@ async fn csv_grouping_by_partition() -> Result<()> { "mytable/date=2021-10-28/file.csv", ], &["date"], - "mirror:///mytable", + "mirror:///mytable/", ); let result = ctx @@ -481,9 +480,15 @@ pub struct MirroringObjectStore { file_size: u64, } +impl std::fmt::Display for MirroringObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + impl MirroringObjectStore { pub fn new_arc(mirrored_file: String, paths: &[&str]) -> Arc { - let metadata = fs::metadata(&mirrored_file).expect("Local file metadata"); + let metadata = std::fs::metadata(&mirrored_file).expect("Local file metadata"); Arc::new(Self { files: paths.iter().map(|&f| f.to_owned()).collect(), mirrored_file, @@ -494,12 +499,44 @@ impl MirroringObjectStore { #[async_trait] impl ObjectStore for MirroringObjectStore { - async fn list_file( + async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> { + unimplemented!() + } + + async fn get(&self, location: &Path) -> object_store::Result { + self.files.iter().find(|x| *x == location.as_ref()).unwrap(); + let path = std::path::PathBuf::from(&self.mirrored_file); + let file = File::open(&path).unwrap(); + Ok(GetResult::File(file, path)) + } + + async fn get_range( + &self, + _location: &Path, + _range: Range, + ) -> object_store::Result { + unimplemented!() + } + + async fn head(&self, location: &Path) -> object_store::Result { + self.files.iter().find(|x| *x == location.as_ref()).unwrap(); + Ok(ObjectMeta { + location: location.clone(), + last_modified: Utc.timestamp_nanos(0), + size: self.file_size as usize, + }) + } + + async fn delete(&self, _location: &Path) -> object_store::Result<()> { + unimplemented!() + } + + async fn list( &self, - prefix: &str, - ) -> datafusion_data_access::Result { - let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string(); - let size = self.file_size; + prefix: Option<&Path>, + ) -> object_store::Result>> { + let prefix = prefix.map(|p| p.as_ref()).unwrap_or("").to_string(); + let size = self.file_size as usize; Ok(Box::pin( stream::iter( self.files @@ -508,39 +545,19 @@ impl ObjectStore for MirroringObjectStore { .filter(move |f| f.starts_with(&prefix)), ) .map(move |f| { - Ok(FileMeta { - sized_file: SizedFile { path: f, size }, - last_modified: None, + Ok(ObjectMeta { + location: Path::parse(f)?, + last_modified: Utc.timestamp_nanos(0), + size, }) }), )) } - async fn list_dir( + async fn list_with_delimiter( &self, - _prefix: &str, - _delimiter: Option, - ) -> datafusion_data_access::Result { + _prefix: Option<&Path>, + ) -> object_store::Result { unimplemented!() } - - fn file_reader( - &self, - file: SizedFile, - ) -> datafusion_data_access::Result> { - assert_eq!( - self.file_size, file.size, - "Requested files should have the same size as the mirrored file" - ); - match self.files.iter().find(|&item| &file.path == item) { - Some(_) => Ok(LocalFileSystem {}.file_reader(SizedFile { - path: self.mirrored_file.clone(), - size: self.file_size, - })?), - None => Err(io::Error::new( - io::ErrorKind::NotFound, - "not in provided test list", - )), - } - } } diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 1de6af06ad2c..0b30934bbede 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -22,13 +22,12 @@ use datafusion::error::Result; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::SessionContext; -use datafusion_data_access::object_store::local::{ - local_unpartitioned_file, LocalFileSystem, -}; use datafusion_row::layout::RowType::{Compact, WordAligned}; use datafusion_row::reader::read_as_batch; use datafusion_row::writer::write_batch_unchecked; +use object_store::{local::LocalFileSystem, path::Path, ObjectStore}; use std::sync::Arc; +use url::Url; #[tokio::test] async fn test_with_parquet() -> Result<()> { @@ -79,12 +78,17 @@ async fn get_exec( ) -> Result> { let testdata = datafusion::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, file_name); - let meta = local_unpartitioned_file(filename); + + let canonical = std::fs::canonicalize(filename).unwrap(); + let url = Url::from_file_path(canonical).unwrap(); + let path = Path::parse(url.path()).unwrap(); let format = ParquetFormat::default(); - let object_store = Arc::new(LocalFileSystem {}) as _; + let object_store = Arc::new(LocalFileSystem::new()) as Arc; let object_store_url = ObjectStoreUrl::local_filesystem(); + let meta = object_store.head(&path).await.unwrap(); + let file_schema = format .infer_schema(&object_store, &[meta.clone()]) .await diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 3e19dbcb990b..74b7192fb66c 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -825,16 +825,11 @@ impl ExplainNormalizer { // Push path as is replacements.push((path.to_string_lossy().to_string(), key.to_string())); - // Push canonical version of path + // Push URL representation of path let canonical = path.canonicalize().unwrap(); - replacements.push((canonical.to_string_lossy().to_string(), key.to_string())); - - if cfg!(target_family = "windows") { - // Push URL representation of path, to handle windows - let url = Url::from_file_path(canonical).unwrap(); - let path = url.path().strip_prefix('/').unwrap(); - replacements.push((path.to_string(), key.to_string())); - } + let url = Url::from_file_path(canonical).unwrap(); + let path = url.path().strip_prefix('/').unwrap(); + replacements.push((path.to_string(), key.to_string())); }; push_path(test_util::arrow_test_data().into(), "ARROW_TEST_DATA"); From 2848bba80a233395044827af29cdaf60070ab3cd Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 6 Jun 2022 18:00:58 +0100 Subject: [PATCH 02/12] Test fixes --- .../src/datasource/file_format/parquet.rs | 2 +- .../core/src/datasource/listing/helpers.rs | 21 ------------------- .../src/physical_plan/file_format/avro.rs | 4 +--- .../src/physical_plan/file_format/json.rs | 2 +- .../src/physical_plan/file_format/parquet.rs | 2 +- 5 files changed, 4 insertions(+), 27 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 1f8c48af6ae8..37c3219efdf3 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -409,7 +409,7 @@ pub(crate) mod test_util { }) .collect(); - let meta: Vec<_> = files.iter().map(|f| local_unpartitioned_file(f)).collect(); + let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); Ok((meta, files)) } } diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 5b4daa0b04d8..6d641c40ad59 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -576,27 +576,6 @@ mod tests { ); } - #[cfg(target_os = "windows")] - #[test] - fn test_parse_partitions_for_path_windows() { - assert_eq!( - Some(vec!["v1"]), - parse_partitions_for_path( - &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), - "bucket\\mytable\\mypartition=v1\\file.csv", - &[String::from("mypartition")] - ) - ); - assert_eq!( - Some(vec!["v1", "v2"]), - parse_partitions_for_path( - &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), - "bucket\\mytable\\mypartition=v1\\otherpartition=v2\\file.csv", - &[String::from("mypartition"), String::from("otherpartition")] - ) - ); - } - #[test] fn test_path_batch_roundtrip_no_partiton() { let files = vec![ diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index 50df3f06502d..b8f7d0478df5 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -143,9 +143,7 @@ mod avro { use super::*; use crate::datasource::listing::FileRange; use crate::physical_plan::file_format::file_stream::{FormatReader, ReaderFuture}; - use crate::physical_plan::stream::RecordBatchStreamAdapter; use bytes::Buf; - use futures::future::BoxFuture; use futures::StreamExt; use object_store::{GetResult, ObjectMeta, ObjectStore}; @@ -159,7 +157,7 @@ mod avro { fn open( &self, reader: R, - ) -> Result> { + ) -> Result> { crate::avro_to_arrow::Reader::try_new( reader, self.schema.clone(), diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index ca55f4266258..10e395408dff 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -105,7 +105,7 @@ impl ExecutionPlan for NdJsonExec { let file_schema = Arc::clone(&self.base_config.file_schema); let options = DecoderOptions::new().with_batch_size(batch_size); - let options = if let Some(proj) = proj.clone() { + let options = if let Some(proj) = proj { options.with_projection(proj) } else { options diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 39d2e8d61ef7..9c3b2d3a4db8 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -272,6 +272,7 @@ impl FormatReader for ParquetOpener { meta: ObjectMeta, range: Option, ) -> ReaderFuture { + // TODO: Use ParquetRecordBatchStream (arrow-rs#1803) (arrow-rs#1804) let file_metrics = ParquetFileMetrics::new( self.partition_index, meta.location.as_ref(), @@ -457,7 +458,6 @@ fn build_row_group_predicate( pruning_predicate: PruningPredicate, metrics: ParquetFileMetrics, ) -> Box bool> { - let pruning_predicate = pruning_predicate.clone(); Box::new( move |row_group_metadata: &RowGroupMetaData, _i: usize| -> bool { let parquet_schema = pruning_predicate.schema().as_ref(); From a7ec988093e57675226186241c0564417e40a34a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 6 Jun 2022 18:34:41 +0100 Subject: [PATCH 03/12] Update to object_store 0.2.0 --- datafusion/common/Cargo.toml | 2 +- datafusion/core/Cargo.toml | 2 +- datafusion/core/src/test/object_store.rs | 11 +---------- datafusion/core/tests/path_partition.rs | 12 ++++++++++++ datafusion/core/tests/row.rs | 5 +---- datafusion/core/tests/sql/mod.rs | 6 ++---- 6 files changed, 18 insertions(+), 20 deletions(-) diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index d27af558bb07..f48584f43588 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -41,7 +41,7 @@ pyarrow = ["pyo3"] arrow = { version = "15.0.0", features = ["prettyprint"] } avro-rs = { version = "0.13", features = ["snappy"], optional = true } cranelift-module = { version = "0.84.0", optional = true } -object_store = { version = "0.1", optional = true } +object_store = { version = "0.2", optional = true } ordered-float = "3.0" parquet = { version = "15.0.0", features = ["arrow"], optional = true } pyo3 = { version = "0.16", optional = true } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index fd53598e9d97..14e8c2ad5d73 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -75,7 +75,7 @@ lazy_static = { version = "^1.4.0" } log = "^0.4" num-traits = { version = "0.2", optional = true } num_cpus = "1.13.0" -object_store = "0.1.0" +object_store = "0.2.0" ordered-float = "3.0" parking_lot = "0.12" parquet = { version = "15.0.0", features = ["arrow"] } diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs index dea0e8324c2d..a56d3396e4da 100644 --- a/datafusion/core/src/test/object_store.rs +++ b/datafusion/core/src/test/object_store.rs @@ -19,7 +19,6 @@ use crate::prelude::SessionContext; use futures::FutureExt; use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore}; use std::sync::Arc; -use url::Url; /// Returns a test object store with the provided `ctx` pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) { @@ -44,15 +43,7 @@ pub fn make_test_store(files: &[(&str, u64)]) -> Arc { /// Helper method to fetch the file size and date at given path and create a `ObjectMeta` pub fn local_unpartitioned_file(path: impl AsRef) -> ObjectMeta { - // TODO: Move this logic into object_store - - // Convert to absolute path - let canonical = std::fs::canonicalize(path.as_ref()).unwrap(); - - // Convert to URL-safe path - let url = Url::from_file_path(canonical).unwrap(); - let location = Path::parse(url.path()).unwrap(); - + let location = Path::from_filesystem_path(path.as_ref()).unwrap(); let metadata = std::fs::metadata(path).expect("Local file metadata"); ObjectMeta { location, diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 4fa1a8898b60..189a16951214 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -560,4 +560,16 @@ impl ObjectStore for MirroringObjectStore { ) -> object_store::Result { unimplemented!() } + + async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { + unimplemented!() + } + + async fn copy_if_not_exists( + &self, + _from: &Path, + _to: &Path, + ) -> object_store::Result<()> { + unimplemented!() + } } diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 0b30934bbede..2c840321f56c 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -27,7 +27,6 @@ use datafusion_row::reader::read_as_batch; use datafusion_row::writer::write_batch_unchecked; use object_store::{local::LocalFileSystem, path::Path, ObjectStore}; use std::sync::Arc; -use url::Url; #[tokio::test] async fn test_with_parquet() -> Result<()> { @@ -79,9 +78,7 @@ async fn get_exec( let testdata = datafusion::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, file_name); - let canonical = std::fs::canonicalize(filename).unwrap(); - let url = Url::from_file_path(canonical).unwrap(); - let path = Path::parse(url.path()).unwrap(); + let path = Path::from_filesystem_path(filename).unwrap(); let format = ParquetFormat::default(); let object_store = Arc::new(LocalFileSystem::new()) as Arc; diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 74b7192fb66c..b0f5027d538b 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -46,11 +46,11 @@ use datafusion::{ }; use datafusion::{execution::context::SessionContext, physical_plan::displayable}; use datafusion_expr::Volatility; +use object_store::path::Path; use std::fs::File; use std::io::Write; use std::path::PathBuf; use tempfile::TempDir; -use url::Url; /// A macro to assert that some particular line contains two substrings /// @@ -826,9 +826,7 @@ impl ExplainNormalizer { replacements.push((path.to_string_lossy().to_string(), key.to_string())); // Push URL representation of path - let canonical = path.canonicalize().unwrap(); - let url = Url::from_file_path(canonical).unwrap(); - let path = url.path().strip_prefix('/').unwrap(); + let path = Path::from_filesystem_path(path).unwrap(); replacements.push((path.to_string(), key.to_string())); }; From 155f9d1a62f6f84b3c32787c615158c769a82095 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 6 Jun 2022 18:47:27 +0100 Subject: [PATCH 04/12] More windows pacification --- datafusion/core/src/physical_plan/file_format/parquet.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index e641db4e3eb5..ab02e15491f7 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1111,9 +1111,13 @@ mod tests { async fn parquet_exec_with_error() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); + let location = Path::from_filesystem_path(".") + .unwrap() + .child("invalid.parquet"); + let partitioned_file = PartitionedFile { object_meta: ObjectMeta { - location: Path::from("invalid"), + location, last_modified: Utc.timestamp_nanos(0), size: 1337, }, @@ -1139,7 +1143,7 @@ mod tests { // invalid file should produce an error to that effect assert_contains!( batch.unwrap_err().to_string(), - "Object Store error: Object at location /invalid not found: No such file or directory" + "invalid.parquet not found: No such file or directory" ); assert!(results.next().await.is_none()); From a2b89c0b86a09944c9a3eb4c42fe10b50a1a81a7 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 7 Jun 2022 17:17:35 +0100 Subject: [PATCH 05/12] Fix windows test --- datafusion/core/src/physical_plan/file_format/parquet.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index ab02e15491f7..421bda6719a5 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1141,10 +1141,7 @@ mod tests { let mut results = parquet_exec.execute(0, task_ctx)?; let batch = results.next().await.unwrap(); // invalid file should produce an error to that effect - assert_contains!( - batch.unwrap_err().to_string(), - "invalid.parquet not found: No such file or directory" - ); + assert_contains!(batch.unwrap_err().to_string(), "invalid.parquet not found"); assert!(results.next().await.is_none()); Ok(()) From bbbfc550a914ebd76b161970f153b3d270d0df35 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 8 Jun 2022 16:53:41 +0100 Subject: [PATCH 06/12] Fix windows test_prefix_path --- datafusion/core/src/datasource/listing/url.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 08caa542a7a2..fd13205317ad 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -248,7 +248,7 @@ mod tests { let root = root.to_string_lossy(); let url = ListingTableUrl::parse(&root).unwrap(); - let child = Path::from(format!("{}/partition/file", root)); + let child = url.prefix.child("partition").child("file"); let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect(); assert_eq!(prefix, vec!["partition", "file"]); From e9fc7ec74087a601af5a379f34d067cc2e6f5ad3 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 8 Jun 2022 17:30:12 +0100 Subject: [PATCH 07/12] More windows fixes --- datafusion/core/src/physical_plan/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 34749c6fae92..7fe9bb56be64 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -288,7 +288,7 @@ pub fn with_new_children_if_necessary( /// ``` /// use datafusion::prelude::*; /// use datafusion::physical_plan::displayable; -/// use std::path::is_separator; +/// use object_store::path::Path; /// /// #[tokio::main] /// async fn main() { @@ -312,9 +312,8 @@ pub fn with_new_children_if_necessary( /// let plan_string = format!("{}", displayable_plan.indent()); /// /// let working_directory = std::env::current_dir().unwrap(); -/// let normalized = working_directory.to_string_lossy().replace(is_separator, "/"); -/// let normalized = normalized.strip_prefix("/").unwrap(); -/// let plan_string = plan_string.replace(&normalized, "WORKING_DIR"); +/// let normalized = Path::from_filesystem_path(working_directory).unwrap(); +/// let plan_string = plan_string.replace(normalized.as_ref(), "WORKING_DIR"); /// /// assert_eq!("ProjectionExec: expr=[a@0 as a]\ /// \n CoalesceBatchesExec: target_batch_size=4096\ From 7f43ca2d1b92fac04aeec90d9bfc32529583436b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 8 Jun 2022 18:27:09 +0100 Subject: [PATCH 08/12] Simplify ListingTableUrl::strip_prefix --- datafusion/core/src/datasource/listing/url.rs | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index fd13205317ad..7235a0b2b445 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -128,23 +128,12 @@ impl ListingTableUrl { path: &'b Path, ) -> Option + 'a> { use object_store::path::DELIMITER; - // TODO: Make object_store::path::Path::prefix_match public - let path: &str = path.as_ref(); - let prefix: &str = self.prefix.as_ref(); - - // Ignore empty path segments - let diff = itertools::diff_with( - path.split(DELIMITER).filter(|s| !s.is_empty()), - prefix.split(DELIMITER).filter(|s| !s.is_empty()), - |a, b| a == b, - ); - - match diff { - // Match with remaining - Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath), - _ => None, - } + let stripped = match self.prefix.as_ref() { + "" => path, + p => path.strip_prefix(p)?.strip_prefix(DELIMITER)?, + }; + Some(stripped.split(DELIMITER)) } /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension` @@ -252,6 +241,15 @@ mod tests { let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect(); assert_eq!(prefix, vec!["partition", "file"]); + + let url = ListingTableUrl::parse("file:///").unwrap(); + let child = Path::parse("/foo/bar").unwrap(); + let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect(); + assert_eq!(prefix, vec!["foo", "bar"]); + + let url = ListingTableUrl::parse("file:///foo").unwrap(); + let child = Path::parse("/foob/bar").unwrap(); + assert!(url.strip_prefix(&child).is_none()); } #[test] From 8f4203d057cf4856915d60cde1e62f38045be693 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 27 Jun 2022 13:29:35 +0100 Subject: [PATCH 09/12] Review feedback --- datafusion/core/src/datasource/listing/url.rs | 6 +----- .../core/src/physical_plan/file_format/file_stream.rs | 10 ++++++++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 7235a0b2b445..8676f2118728 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -108,11 +108,7 @@ impl ListingTableUrl { /// Creates a new [`ListingTableUrl`] from a url and optional glob expression fn new(url: Url, glob: Option) -> Self { - // TODO: fix this upstream - let prefix = (url.path().len() > 1) - .then(|| Path::parse(url.path()).expect("should be URL safe")) - .unwrap_or_default(); - + let prefix = Path::parse(url.path()).expect("should be URL safe"); Self { url, prefix, glob } } diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index 904a880bae52..ca57028abeed 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -41,6 +41,7 @@ use crate::execution::context::TaskContext; use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector}; use crate::physical_plan::RecordBatchStream; +/// A fallible future that resolves to a stream of [`RecordBatch`] pub type ReaderFuture = BoxFuture<'static, Result>>>; @@ -76,18 +77,27 @@ pub struct FileStream { } enum FileStreamState { + /// The idle state, no file is currently being read Idle, + /// Currently performing asynchronous IO to obtain a stream of RecordBatch + /// for a given parquet file Open { + /// A [`ReaderFuture`] returned by [`FormatReader::open`] future: ReaderFuture, + /// The partition values for this file partition_values: Vec, }, + /// Scanning the [`BoxStream`] returned by the completion of a [`ReaderFuture`] + /// returned by [`FormatReader::open`] Scan { /// Partitioning column values for the current batch_iter partition_values: Vec, /// The reader instance reader: BoxStream<'static, ArrowResult>, }, + /// Encountered an error Error, + /// Reached the row limit Limit, } From 073eff92a79c514c4db7566e2f37a19952542bc8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 6 Jun 2022 18:45:50 +0100 Subject: [PATCH 10/12] Update to latest arrow-rs --- datafusion/core/Cargo.toml | 2 +- .../src/physical_plan/file_format/parquet.rs | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 223c5fa8fb2d..355c22186fe1 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -78,7 +78,7 @@ num_cpus = "1.13.0" object_store = "0.3.0" ordered-float = "3.0" parking_lot = "0.12" -parquet = { version = "16.0.0", features = ["arrow"] } +parquet = { version = "16.0.0", features = ["arrow", "async"] } paste = "^1.0" pin-project-lite = "^0.2.7" pyo3 = { version = "0.16", optional = true } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 49693baf0b94..d99ebd5fb6f8 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -20,6 +20,7 @@ use fmt::Debug; use std::fmt; use std::fs; +use std::ops::Range; use std::sync::Arc; use std::{any::Any, convert::TryInto}; @@ -28,10 +29,14 @@ use arrow::{ datatypes::{Schema, SchemaRef}, error::ArrowError, }; +use bytes::Bytes; +use futures::future::BoxFuture; use futures::{StreamExt, TryStreamExt}; use log::debug; use object_store::{GetResult, ObjectMeta, ObjectStore}; +use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader, ProjectionMask}; +use parquet::file::metadata::ParquetMetaData; use parquet::file::reader::FileReader; use parquet::file::{ metadata::RowGroupMetaData, properties::WriterProperties, @@ -353,6 +358,26 @@ impl FormatReader for ParquetOpener { } } +struct ParquetFileReader { + store: Arc, + meta: ObjectMeta, +} + +impl AsyncFileReader for ParquetFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + todo!() + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + todo!() + } +} + /// Wraps parquet statistics in a way /// that implements [`PruningStatistics`] struct RowGroupPruningStatistics<'a> { From 65547fcf1955722ff23593123b624722ef2251f9 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 7 Jun 2022 16:14:30 +0100 Subject: [PATCH 11/12] Use ParquetRecordBatchStream --- .../src/datasource/file_format/parquet.rs | 53 +++++-- .../src/physical_plan/file_format/parquet.rs | 138 +++++++++--------- datafusion/core/tests/path_partition.rs | 17 ++- 3 files changed, 123 insertions(+), 85 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 0a314374c84a..37813acfb87f 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -23,12 +23,12 @@ use std::sync::Arc; use arrow::datatypes::Schema; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion_common::DataFusionError; use hashbrown::HashMap; -use object_store::{GetResult, ObjectMeta, ObjectStore}; +use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::parquet_to_arrow_schema; +use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; -use parquet::file::reader::FileReader; -use parquet::file::serialized_reader::SerializedFileReader; use parquet::file::statistics::Statistics as ParquetStatistics; use super::FileFormat; @@ -287,25 +287,46 @@ fn summarize_min_max( } } -async fn fetch_metadata( +pub(crate) async fn fetch_parquet_metadata( store: &dyn ObjectStore, - file: &ObjectMeta, + meta: &ObjectMeta, ) -> Result { - // TODO: Fetching the entire file to get metadata is wasteful - match store.get(&file.location).await? { - GetResult::File(file, _) => { - Ok(SerializedFileReader::new(file)?.metadata().clone()) - } - r @ GetResult::Stream(_) => { - let data = r.bytes().await?; - Ok(SerializedFileReader::new(data)?.metadata().clone()) - } + if meta.size < 8 { + return Err(DataFusionError::Execution(format!( + "file size of {} is less than footer", + meta.size + ))); + } + + let footer_start = meta.size - 8; + let suffix = store + .get_range(&meta.location, footer_start..meta.size) + .await?; + + let mut footer = [0; 8]; + footer.copy_from_slice(suffix.as_ref()); + + let length = decode_footer(&footer)?; + + if meta.size < length + 8 { + return Err(DataFusionError::Execution(format!( + "file size of {} is less than footer + metadata {}", + meta.size, + length + 8 + ))); } + + let metadata_start = meta.size - length - 8; + let metadata = store + .get_range(&meta.location, metadata_start..footer_start) + .await?; + + Ok(decode_metadata(metadata.as_ref())?) } /// Read and parse the schema of the Parquet file at location `path` async fn fetch_schema(store: &dyn ObjectStore, file: &ObjectMeta) -> Result { - let metadata = fetch_metadata(store, file).await?; + let metadata = fetch_parquet_metadata(store, file).await?; let file_metadata = metadata.file_metadata(); let schema = parquet_to_arrow_schema( file_metadata.schema_descr(), @@ -320,7 +341,7 @@ async fn fetch_statistics( table_schema: SchemaRef, file: &ObjectMeta, ) -> Result { - let metadata = fetch_metadata(store, file).await?; + let metadata = fetch_parquet_metadata(store, file).await?; let file_metadata = metadata.file_metadata(); let file_schema = parquet_to_arrow_schema( diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index d99ebd5fb6f8..9819385e9f48 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -31,22 +31,22 @@ use arrow::{ }; use bytes::Bytes; use futures::future::BoxFuture; -use futures::{StreamExt, TryStreamExt}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use log::debug; -use object_store::{GetResult, ObjectMeta, ObjectStore}; +use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::async_reader::AsyncFileReader; -use parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader, ProjectionMask}; -use parquet::file::metadata::ParquetMetaData; -use parquet::file::reader::FileReader; +use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::errors::ParquetError; use parquet::file::{ - metadata::RowGroupMetaData, properties::WriterProperties, - reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder, + metadata::{ParquetMetaData, RowGroupMetaData}, + properties::WriterProperties, statistics::Statistics as ParquetStatistics, }; use datafusion_common::Column; use datafusion_expr::Expr; +use crate::datasource::file_format::parquet::fetch_parquet_metadata; use crate::datasource::listing::FileRange; use crate::physical_plan::file_format::file_stream::{ FileStream, FormatReader, ReaderFuture, @@ -288,72 +288,63 @@ impl FormatReader for ParquetOpener { let schema_adapter = SchemaAdapter::new(self.table_schema.clone()); let batch_size = self.batch_size; let projection = self.projection.clone(); - let pruning_predicate = self.pruning_predicate.clone(); - - let build_opts = move || { - let mut opt = ReadOptionsBuilder::new(); - if let Some(pruning_predicate) = pruning_predicate { - opt = opt.with_predicate(build_row_group_predicate( - pruning_predicate, - file_metrics, - )); - } - - if let Some(range) = &range { - opt = opt.with_range(range.start, range.end); - } - - opt.build() - }; + let mut pruning_predicate = self + .pruning_predicate + .clone() + .map(|predicate| build_row_group_predicate(predicate, file_metrics)); Box::pin(async move { - let get_result = store.get(&meta.location).await?; - - let (mut arrow_reader, parquet_schema) = match get_result { - GetResult::File(file, _) => { - let reader = - SerializedFileReader::new_with_options(file, build_opts())?; - let parquet_schema = - reader.metadata().file_metadata().schema_descr_ptr(); - let arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); - (arrow_reader, parquet_schema) - } - r @ GetResult::Stream(_) => { - // TODO: Projection pushdown - let data = r.bytes().await?; - let reader = - SerializedFileReader::new_with_options(data, build_opts())?; - let parquet_schema = - reader.metadata().file_metadata().schema_descr_ptr(); - - let arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); - - (arrow_reader, parquet_schema) - } - }; - - let file_schema = arrow_reader - .get_schema() - .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; + let reader = ParquetFileReader { store, meta }; + let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; let adapted_projections = - schema_adapter.map_projections(&file_schema, &projection)?; + schema_adapter.map_projections(builder.schema(), &projection)?; let mask = ProjectionMask::roots( - &parquet_schema, + builder.parquet_schema(), adapted_projections.iter().cloned(), ); - let reader = arrow_reader.get_record_reader_by_columns(mask, batch_size)?; - let adapted = reader.map(move |maybe_batch| { - maybe_batch.and_then(|b| { - schema_adapter - .adapt_batch(b, &projection) - .map_err(Into::into) + let row_groups = builder + .metadata() + .row_groups() + .iter() + .enumerate() + .filter_map(move |(idx, metadata)| { + let keep_prune = pruning_predicate + .as_mut() + .map(|p| p(metadata, idx)) + .unwrap_or(true); + + let keep_range = range + .as_ref() + .map(|x| { + let offset = metadata.column(0).file_offset(); + offset >= x.start && offset < x.end + }) + .unwrap_or(true); + + (keep_prune && keep_range).then(|| idx) }) - }); - - Ok(futures::stream::iter(adapted).boxed()) + .collect(); + + let stream = builder + .with_projection(mask) + .with_batch_size(batch_size) + .with_row_groups(row_groups) + .build()?; + + let adapted = stream + .map_err(|e| ArrowError::ExternalError(Box::new(e))) + .map(move |maybe_batch| { + maybe_batch.and_then(|b| { + schema_adapter + .adapt_batch(b, &projection) + .map_err(Into::into) + }) + }); + + Ok(adapted.boxed()) }) } } @@ -368,13 +359,28 @@ impl AsyncFileReader for ParquetFileReader { &mut self, range: Range, ) -> BoxFuture<'_, parquet::errors::Result> { - todo!() + self.store + .get_range(&self.meta.location, range) + .map_err(|e| { + ParquetError::General(format!("AsyncChunkReader::get_bytes error: {}", e)) + }) + .boxed() } fn get_metadata( &mut self, ) -> BoxFuture<'_, parquet::errors::Result>> { - todo!() + Box::pin(async move { + let metadata = fetch_parquet_metadata(self.store.as_ref(), &self.meta) + .await + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_metadata error: {}", + e + )) + })?; + Ok(Arc::new(metadata)) + }) } } @@ -480,7 +486,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn build_row_group_predicate( pruning_predicate: PruningPredicate, metrics: ParquetFileMetrics, -) -> Box bool> { +) -> Box bool + Send> { Box::new( move |row_group_metadata: &RowGroupMetaData, _i: usize| -> bool { let parquet_schema = pruning_predicate.schema().as_ref(); diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 189a16951214..42ff28426f73 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -18,6 +18,7 @@ //! Test queries on partitioned datasets use std::fs::File; +use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; use std::sync::Arc; @@ -512,10 +513,20 @@ impl ObjectStore for MirroringObjectStore { async fn get_range( &self, - _location: &Path, - _range: Range, + location: &Path, + range: Range, ) -> object_store::Result { - unimplemented!() + self.files.iter().find(|x| *x == location.as_ref()).unwrap(); + let path = std::path::PathBuf::from(&self.mirrored_file); + let mut file = File::open(&path).unwrap(); + file.seek(SeekFrom::Start(range.start as u64)).unwrap(); + + let to_read = range.end - range.start; + let mut data = Vec::with_capacity(to_read); + let read = file.take(to_read as u64).read_to_end(&mut data).unwrap(); + assert_eq!(read, to_read); + + Ok(data.into()) } async fn head(&self, location: &Path) -> object_store::Result { From ace05fd1cfb209cb81de9dc6e724387a5da44df6 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 8 Jun 2022 10:44:11 +0100 Subject: [PATCH 12/12] Simplify predicate pruning --- .../src/datasource/file_format/parquet.rs | 4 +- .../src/physical_plan/file_format/parquet.rs | 213 ++++++++---------- 2 files changed, 93 insertions(+), 124 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 37813acfb87f..19794f6cae10 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -554,8 +554,8 @@ mod tests { let _ = collect(exec.clone(), task_ctx.clone()).await?; let _ = collect(exec_projected.clone(), task_ctx).await?; - assert_bytes_scanned(exec, 1851); - assert_bytes_scanned(exec_projected, 1851); + assert_bytes_scanned(exec, 671); + assert_bytes_scanned(exec_projected, 73); Ok(()) } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 9819385e9f48..cae92ddbb49c 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -90,7 +90,6 @@ struct ParquetFileMetrics { impl ParquetExec { /// Create a new Parquet reader execution plan provided file list and schema. - /// Even if `limit` is set, ParquetExec rounds up the number of records to the next `batch_size`. pub fn new(base_config: FileScanConfig, predicate: Option) -> Self { debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", base_config.file_groups, base_config.projection, predicate, base_config.limit); @@ -260,6 +259,7 @@ impl ExecutionPlan for ParquetExec { } } +/// Implements [`FormatReader`] for a parquet file struct ParquetOpener { partition_index: usize, projection: Arc<[usize]>, @@ -276,26 +276,24 @@ impl FormatReader for ParquetOpener { meta: ObjectMeta, range: Option, ) -> ReaderFuture { - // TODO: Use ParquetRecordBatchStream (arrow-rs#1803) (arrow-rs#1804) - let file_metrics = ParquetFileMetrics::new( + let metrics = ParquetFileMetrics::new( self.partition_index, meta.location.as_ref(), &self.metrics, ); - file_metrics.bytes_scanned.add(meta.size); + let reader = ParquetFileReader { + store, + meta, + metrics: metrics.clone(), + }; let schema_adapter = SchemaAdapter::new(self.table_schema.clone()); let batch_size = self.batch_size; let projection = self.projection.clone(); - let mut pruning_predicate = self - .pruning_predicate - .clone() - .map(|predicate| build_row_group_predicate(predicate, file_metrics)); + let pruning_predicate = self.pruning_predicate.clone(); Box::pin(async move { - let reader = ParquetFileReader { store, meta }; - let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; let adapted_projections = schema_adapter.map_projections(builder.schema(), &projection)?; @@ -305,28 +303,8 @@ impl FormatReader for ParquetOpener { adapted_projections.iter().cloned(), ); - let row_groups = builder - .metadata() - .row_groups() - .iter() - .enumerate() - .filter_map(move |(idx, metadata)| { - let keep_prune = pruning_predicate - .as_mut() - .map(|p| p(metadata, idx)) - .unwrap_or(true); - - let keep_range = range - .as_ref() - .map(|x| { - let offset = metadata.column(0).file_offset(); - offset >= x.start && offset < x.end - }) - .unwrap_or(true); - - (keep_prune && keep_range).then(|| idx) - }) - .collect(); + let groups = builder.metadata().row_groups(); + let row_groups = prune_row_groups(groups, range, pruning_predicate, &metrics); let stream = builder .with_projection(mask) @@ -349,9 +327,11 @@ impl FormatReader for ParquetOpener { } } +/// Implements [`AsyncFileReader`] for a parquet file in object storage struct ParquetFileReader { store: Arc, meta: ObjectMeta, + metrics: ParquetFileMetrics, } impl AsyncFileReader for ParquetFileReader { @@ -359,6 +339,8 @@ impl AsyncFileReader for ParquetFileReader { &mut self, range: Range, ) -> BoxFuture<'_, parquet::errors::Result> { + self.metrics.bytes_scanned.add(range.end - range.start); + self.store .get_range(&self.meta.location, range) .map_err(|e| { @@ -483,35 +465,47 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } } -fn build_row_group_predicate( - pruning_predicate: PruningPredicate, - metrics: ParquetFileMetrics, -) -> Box bool + Send> { - Box::new( - move |row_group_metadata: &RowGroupMetaData, _i: usize| -> bool { - let parquet_schema = pruning_predicate.schema().as_ref(); +fn prune_row_groups( + groups: &[RowGroupMetaData], + range: Option, + predicate: Option, + metrics: &ParquetFileMetrics, +) -> Vec { + // TODO: Columnar pruning + let mut filtered = Vec::with_capacity(groups.len()); + for (idx, metadata) in groups.iter().enumerate() { + if let Some(range) = &range { + let offset = metadata.column(0).file_offset(); + if offset < range.start || offset >= range.end { + continue; + } + } + + if let Some(predicate) = &predicate { let pruning_stats = RowGroupPruningStatistics { - row_group_metadata, - parquet_schema, + row_group_metadata: metadata, + parquet_schema: predicate.schema().as_ref(), }; - let predicate_values = pruning_predicate.prune(&pruning_stats); - match predicate_values { + match predicate.prune(&pruning_stats) { Ok(values) => { // NB: false means don't scan row group - let num_pruned = values.iter().filter(|&v| !*v).count(); - metrics.row_groups_pruned.add(num_pruned); - values[0] + if !values[0] { + metrics.row_groups_pruned.add(1); + continue; + } } // stats filter array could not be built // return a closure which will not filter out any row groups Err(e) => { debug!("Error evaluating row group predicate values {}", e); metrics.predicate_evaluation_errors.add(1); - true } } - }, - ) + } + + filtered.push(idx) + } + filtered } /// Executes a query and writes the results to a partitioned Parquet file. @@ -1182,12 +1176,13 @@ mod tests { } #[test] - fn row_group_pruning_predicate_simple_expr() -> Result<()> { + fn row_group_pruning_predicate_simple_expr() { use datafusion_expr::{col, lit}; // int > 1 => c1_max > 1 let expr = col("c1").gt(lit(15)); let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let pruning_predicate = PruningPredicate::try_new(expr, Arc::new(schema))?; + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); let rgm1 = get_row_group_meta_data( @@ -1198,26 +1193,22 @@ mod tests { &schema_descr, vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); - let row_group_metadata = vec![rgm1, rgm2]; - let mut row_group_predicate = - build_row_group_predicate(pruning_predicate, parquet_file_metrics()); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); - assert_eq!(row_group_filter, vec![false, true]); - Ok(()) + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics), + vec![1] + ); } #[test] - fn row_group_pruning_predicate_missing_stats() -> Result<()> { + fn row_group_pruning_predicate_missing_stats() { use datafusion_expr::{col, lit}; // int > 1 => c1_max > 1 let expr = col("c1").gt(lit(15)); let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let pruning_predicate = PruningPredicate::try_new(expr, Arc::new(schema))?; + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); let rgm1 = get_row_group_meta_data( @@ -1228,23 +1219,17 @@ mod tests { &schema_descr, vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); - let row_group_metadata = vec![rgm1, rgm2]; - let mut row_group_predicate = - build_row_group_predicate(pruning_predicate, parquet_file_metrics()); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let metrics = parquet_file_metrics(); // missing statistics for first row group mean that the result from the predicate expression // is null / undefined so the first row group can't be filtered out - assert_eq!(row_group_filter, vec![true, true]); - - Ok(()) + assert_eq!( + prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics), + vec![0, 1] + ); } #[test] - fn row_group_pruning_predicate_partial_expr() -> Result<()> { + fn row_group_pruning_predicate_partial_expr() { use datafusion_expr::{col, lit}; // test row group predicate with partially supported expression // int > 1 and int % 2 => c1_max > 1 and true @@ -1253,7 +1238,7 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Int32, false), ])); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?; + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let schema_descr = get_test_schema_descr(vec![ ("c1", PhysicalType::INT32), @@ -1273,32 +1258,27 @@ mod tests { ParquetStatistics::int32(Some(11), Some(20), None, 0, false), ], ); - let row_group_metadata = vec![rgm1, rgm2]; - let mut row_group_predicate = - build_row_group_predicate(pruning_predicate, parquet_file_metrics()); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + + let metrics = parquet_file_metrics(); + let groups = &[rgm1, rgm2]; // the first row group is still filtered out because the predicate expression can be partially evaluated // when conditions are joined using AND - assert_eq!(row_group_filter, vec![false, true]); + assert_eq!( + prune_row_groups(groups, None, Some(pruning_predicate), &metrics), + vec![1] + ); // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); - let pruning_predicate = PruningPredicate::try_new(expr, schema)?; - let mut row_group_predicate = - build_row_group_predicate(pruning_predicate, parquet_file_metrics()); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); - assert_eq!(row_group_filter, vec![true, true]); + let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); - Ok(()) + // if conditions in predicate are joined with OR and an unsupported expression is used + // this bypasses the entire predicate expression and no row groups are filtered out + assert_eq!( + prune_row_groups(groups, None, Some(pruning_predicate), &metrics), + vec![0, 1] + ); } fn gen_row_group_meta_data_for_pruning_predicate() -> Vec { @@ -1324,7 +1304,7 @@ mod tests { } #[test] - fn row_group_pruning_predicate_null_expr() -> Result<()> { + fn row_group_pruning_predicate_null_expr() { use datafusion_expr::{col, lit}; // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0 let expr = col("c1").gt(lit(15)).and(col("c2").is_null()); @@ -1332,24 +1312,19 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), ])); - let pruning_predicate = PruningPredicate::try_new(expr, schema)?; - let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); + let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); + let groups = gen_row_group_meta_data_for_pruning_predicate(); - let mut row_group_predicate = - build_row_group_predicate(pruning_predicate, parquet_file_metrics()); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let metrics = parquet_file_metrics(); // First row group was filtered out because it contains no null value on "c2". - assert_eq!(row_group_filter, vec![false, true]); - - Ok(()) + assert_eq!( + prune_row_groups(&groups, None, Some(pruning_predicate), &metrics), + vec![1] + ); } #[test] - fn row_group_pruning_predicate_eq_null_expr() -> Result<()> { + fn row_group_pruning_predicate_eq_null_expr() { use datafusion_expr::{col, lit}; // test row group predicate with an unknown (Null) expr // @@ -1361,22 +1336,16 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), ])); - let pruning_predicate = PruningPredicate::try_new(expr, schema)?; - let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); - - let mut row_group_predicate = - build_row_group_predicate(pruning_predicate, parquet_file_metrics()); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); + let groups = gen_row_group_meta_data_for_pruning_predicate(); + let metrics = parquet_file_metrics(); // bool = NULL always evaluates to NULL (and thus will not // pass predicates. Ideally these should both be false - assert_eq!(row_group_filter, vec![false, true]); - - Ok(()) + assert_eq!( + prune_row_groups(&groups, None, Some(pruning_predicate), &metrics), + vec![1] + ); } fn get_row_group_meta_data(