From 76d5fa0608219dbae184c97e2022f6392f47cf60 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 19 Jan 2022 14:45:00 +0000 Subject: [PATCH] Async ParquetExec --- Cargo.toml | 6 + datafusion/Cargo.toml | 2 +- .../src/datasource/object_store/local.rs | 16 +- datafusion/src/datasource/object_store/mod.rs | 29 +- .../src/physical_plan/file_format/parquet.rs | 599 ++++++++++-------- datafusion/src/test/object_store.rs | 11 +- 6 files changed, 365 insertions(+), 298 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ea1acc04e687..1d2ccd4dfef5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,9 @@ members = [ [profile.release] lto = true codegen-units = 1 + +[patch.crates-io] +# TODO: TEMPORARY +arrow = { git = "https://github.com/tustvold/arrow-rs", rev = "078b37cd76d134a6788c2e3bb2db1a35106d9c59" } +arrow-flight = { git = "https://github.com/tustvold/arrow-rs", rev = "078b37cd76d134a6788c2e3bb2db1a35106d9c59" } +parquet = { git = "https://github.com/tustvold/arrow-rs", rev = "078b37cd76d134a6788c2e3bb2db1a35106d9c59" } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index e0e880dba3dc..c4698157d46c 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -53,7 +53,7 @@ avro = ["avro-rs", "num-traits"] ahash = { version = "0.7", default-features = false } hashbrown = { version = "0.12", features = ["raw"] } arrow = { version = "8.0.0", features = ["prettyprint"] } -parquet = { version = "8.0.0", features = ["arrow"] } +parquet = { version = "8.0.0", features = ["arrow", "async"] } sqlparser = "0.13" paste = "^1.0" num_cpus = "1.13.0" diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index d46be4a5c5b7..7b32161ab47b 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -22,10 +22,10 @@ use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use async_trait::async_trait; -use futures::{stream, AsyncRead, StreamExt}; +use futures::{stream, StreamExt}; use crate::datasource::object_store::{ - FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, + ChunkReader, FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, }; use crate::datasource::PartitionedFile; use crate::error::{DataFusionError, Result}; @@ -67,14 +67,10 @@ impl LocalFileReader { #[async_trait] impl ObjectReader for LocalFileReader { - async fn chunk_reader( - &self, - _start: u64, - _length: usize, - ) -> Result> { - todo!( - "implement once async file readers are available (arrow-rs#78, arrow-rs#111)" - ) + async fn chunk_reader(&self) -> Result> { + let file = tokio::fs::File::open(&self.file.path).await?; + let file = tokio::io::BufReader::new(file); + Ok(Box::new(file)) } fn sync_chunk_reader( diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index c77489689a86..dc429ca17c1d 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -27,21 +27,42 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use futures::{AsyncRead, Stream, StreamExt}; +use futures::{Stream, StreamExt}; +use tokio::io::{AsyncBufRead, AsyncSeek}; use local::LocalFileSystem; use crate::error::{DataFusionError, Result}; +/// Provides async access to read a file, combing [`AsyncSeek`] +/// and [`AsyncBufRead`] so they can be used as a trait object +/// +/// [`AsyncSeek`] is necessary because readers may need to seek around whilst +/// reading, either because the format itself is structured (e.g. parquet) +/// or because it needs to read metadata or infer schema as an initial step +/// +/// [`AsyncBufRead`] is necessary because readers may wish to read data +/// up until some delimiter (e.g. csv or newline-delimited JSON) +/// +/// Note: the same block of data may be read multiple times +/// +/// Implementations that fetch from object storage may wish to maintain an internal +/// buffer of fetched data blocks, potentially discarding them or spilling them to disk +/// based on memory pressure +/// +/// TODO(#1614): Remove Sync +pub trait ChunkReader: AsyncBufRead + AsyncSeek + Send + Sync + Unpin {} +impl ChunkReader for T {} + /// Object Reader for one file in an object store. /// /// Note that the dynamic dispatch on the reader might /// have some performance impacts. #[async_trait] pub trait ObjectReader: Send + Sync { - /// Get reader for a part [start, start + length] in the file asynchronously - async fn chunk_reader(&self, start: u64, length: usize) - -> Result>; + /// Get a [`ChunkReader`] for the file, successive calls to this MUST + /// return readers with independent seek positions + async fn chunk_reader(&self) -> Result>; /// Get reader for a part [start, start + length] in the file fn sync_chunk_reader( diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index d240fe27c58a..c280b11ef73e 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -18,22 +18,21 @@ //! Execution plan for reading Parquet files use std::fmt; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::{any::Any, convert::TryInto}; -use crate::datasource::file_format::parquet::ChunkObjectReader; -use crate::datasource::object_store::ObjectStore; +use crate::datasource::object_store::{ChunkReader, ObjectStore, SizedFile}; use crate::datasource::PartitionedFile; use crate::{ error::{DataFusionError, Result}, + execution::runtime_env::RuntimeEnv, logical_plan::{Column, Expr}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_plan::{ file_format::FileScanConfig, metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, - stream::RecordBatchReceiverStream, - DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, - Statistics, + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, Statistics, }, scalar::ScalarValue, }; @@ -45,23 +44,18 @@ use arrow::{ record_batch::RecordBatch, }; use log::{debug, info}; -use parquet::file::{ - metadata::RowGroupMetaData, - reader::{FileReader, SerializedFileReader}, - statistics::Statistics as ParquetStatistics, +use parquet::{ + arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder}, + file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, }; use arrow::array::new_null_array; use fmt::Debug; -use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; +use std::pin::Pin; +use std::task::{Context, Poll}; -use tokio::{ - sync::mpsc::{channel, Receiver, Sender}, - task, -}; - -use crate::execution::runtime_env::RuntimeEnv; use async_trait::async_trait; +use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; use super::PartitionColumnProjector; @@ -193,52 +187,37 @@ impl ExecutionPlan for ParquetExec { partition_index: usize, runtime: Arc, ) -> Result { - // because the parquet implementation is not thread-safe, it is necessary to execute - // on a thread and communicate with channels - let (response_tx, response_rx): ( - Sender>, - Receiver>, - ) = channel(2); - - let partition = self.base_config.file_groups[partition_index].clone(); - let metrics = self.metrics.clone(); + let files = self.base_config.file_groups[partition_index].to_vec(); + let projection = match self.base_config.file_column_projection_indices() { Some(proj) => proj, None => (0..self.base_config.file_schema.fields().len()).collect(), }; - let pruning_predicate = self.pruning_predicate.clone(); - let batch_size = runtime.batch_size(); - let limit = self.base_config.limit; - let object_store = Arc::clone(&self.base_config.object_store); - let partition_col_proj = PartitionColumnProjector::new( + + let partition_column_projector = PartitionColumnProjector::new( Arc::clone(&self.projected_schema), &self.base_config.table_partition_cols, ); - let file_schema_ref = self.base_config().file_schema.clone(); - let join_handle = task::spawn_blocking(move || { - if let Err(e) = read_partition( - object_store.as_ref(), - file_schema_ref, - partition_index, - partition, - metrics, - &projection, - &pruning_predicate, - batch_size, - response_tx, - limit, - partition_col_proj, - ) { - println!("Parquet reader thread terminated due to error: {:?}", e); - } + let config = Arc::new(PartitionConfig { + projection, + projected_schema: Arc::clone(&self.projected_schema), + object_store: Arc::clone(&self.base_config.object_store), + partition_idx: partition_index, + batch_size: runtime.batch_size(), + pruning_predicate: self.pruning_predicate.clone(), + metrics: self.metrics.clone(), + file_schema: self.base_config.file_schema.clone(), }); - Ok(RecordBatchReceiverStream::create( - &self.projected_schema, - response_rx, - join_handle, - )) + Ok(Box::pin(ParquetExecStream { + partition_column_projector, + config, + files, + state: Mutex::new(StreamState::Init), + file_idx: 0, + remaining_rows: self.base_config.limit.unwrap_or(usize::MAX), + })) } fn fmt_as( @@ -267,15 +246,119 @@ impl ExecutionPlan for ParquetExec { } } -fn send_result( - response_tx: &Sender>, - result: ArrowResult, -) -> Result<()> { - // Note this function is running on its own blockng tokio thread so blocking here is ok. - response_tx - .blocking_send(result) - .map_err(|e| DataFusionError::Execution(e.to_string()))?; - Ok(()) +struct PartitionConfig { + projected_schema: SchemaRef, + + object_store: Arc, + + partition_idx: usize, + + batch_size: usize, + + file_schema: SchemaRef, + + projection: Vec, + + pruning_predicate: Option, + + metrics: ExecutionPlanMetricsSet, +} + +struct ParquetExecStream { + config: Arc, + + remaining_rows: usize, + + file_idx: usize, + + partition_column_projector: PartitionColumnProjector, + + files: Vec, + + // Mutex needed because of #1614 + state: Mutex, +} + +enum StreamState { + Init, + Create(BoxFuture<'static, Result>>>), + Stream(ParquetRecordBatchStream>), + Error, +} + +impl RecordBatchStream for ParquetExecStream { + fn schema(&self) -> SchemaRef { + self.config.projected_schema.clone() + } +} + +impl Stream for ParquetExecStream { + type Item = ArrowResult; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = &mut *self; + let mut state = this.state.lock().expect("not poisoned"); + + if this.remaining_rows == 0 { + return Poll::Ready(None); + } + + loop { + match &mut *state { + StreamState::Init => { + let file = match this.files.get(this.file_idx) { + Some(file) => file.file_meta.sized_file.clone(), + None => return Poll::Ready(None), + }; + + *state = StreamState::Create( + create_stream(this.config.clone(), file).boxed(), + ); + } + StreamState::Create(f) => match futures::ready!(f.poll_unpin(cx)) { + Ok(s) => { + *state = StreamState::Stream(s); + } + Err(e) => { + *state = StreamState::Error; + return Poll::Ready(Some(Err(ArrowError::ExternalError( + Box::new(e), + )))); + } + }, + StreamState::Stream(s) => match futures::ready!(s.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + this.remaining_rows = + this.remaining_rows.saturating_sub(batch.num_rows()); + + let proj_batch = project_batch( + batch, + &mut this.partition_column_projector, + &this.files[this.file_idx].partition_values, + this.config.file_schema.as_ref(), + &this.config.projection, + ); + + return Poll::Ready(Some(proj_batch)); + } + Some(Err(e)) => { + *state = StreamState::Error; + return Poll::Ready(Some(Err(ArrowError::ExternalError( + Box::new(e), + )))); + } + None => { + this.file_idx += 1; + *state = StreamState::Init + } + }, + StreamState::Error => return Poll::Pending, + } + } + } } /// Wraps parquet statistics in a way @@ -387,36 +470,6 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } } -fn build_row_group_predicate( - pruning_predicate: &PruningPredicate, - metrics: ParquetFileMetrics, - row_group_metadata: &[RowGroupMetaData], -) -> Box bool> { - let parquet_schema = pruning_predicate.schema().as_ref(); - - let pruning_stats = RowGroupPruningStatistics { - row_group_metadata, - parquet_schema, - }; - let predicate_values = pruning_predicate.prune(&pruning_stats); - - match predicate_values { - Ok(values) => { - // NB: false means don't scan row group - let num_pruned = values.iter().filter(|&v| !*v).count(); - metrics.row_groups_pruned.add(num_pruned); - Box::new(move |_, i| values[i]) - } - // stats filter array could not be built - // return a closure which will not filter out any row groups - Err(e) => { - debug!("Error evaluating row group predicate values {}", e); - metrics.predicate_evaluation_errors.add(1); - Box::new(|_r, _i| true) - } - } -} - // Map projections from the schema which merges all file schemas to projections on a particular // file fn map_projections( @@ -440,108 +493,87 @@ fn map_projections( Ok(mapped) } -#[allow(clippy::too_many_arguments)] -fn read_partition( - object_store: &dyn ObjectStore, - file_schema: SchemaRef, - partition_index: usize, - partition: Vec, - metrics: ExecutionPlanMetricsSet, +fn project_batch( + batch: RecordBatch, + column_projector: &mut PartitionColumnProjector, + partition_values: &[ScalarValue], + file_schema: &Schema, projection: &[usize], - pruning_predicate: &Option, - batch_size: usize, - response_tx: Sender>, - limit: Option, - mut partition_column_projector: PartitionColumnProjector, -) -> Result<()> { - let mut total_rows = 0; - 'outer: for partitioned_file in partition { - debug!("Reading file {}", &partitioned_file.file_meta.path()); - - let file_metrics = ParquetFileMetrics::new( - partition_index, - &*partitioned_file.file_meta.path(), - &metrics, - ); - let object_reader = - object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?; - let mut file_reader = - SerializedFileReader::new(ChunkObjectReader(object_reader))?; - if let Some(pruning_predicate) = pruning_predicate { - let row_group_predicate = build_row_group_predicate( - pruning_predicate, - file_metrics, - file_reader.metadata().row_groups(), - ); - file_reader.filter_row_groups(&row_group_predicate); +) -> ArrowResult { + let total_cols = file_schema.fields().len(); + let batch_schema = batch.schema(); + let projected_schema = file_schema.clone().project(projection)?; + + let mut cols: Vec = Vec::with_capacity(total_cols); + let batch_cols = batch.columns().to_vec(); + + for field_idx in projection { + let merged_field = &file_schema.fields()[*field_idx]; + if let Some((batch_idx, _name)) = + batch_schema.column_with_name(merged_field.name().as_str()) + { + cols.push(batch_cols[batch_idx].clone()); + } else { + cols.push(new_null_array(merged_field.data_type(), batch.num_rows())) } + } - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); - let mapped_projections = - map_projections(&file_schema, &arrow_reader.get_schema()?, projection)?; + let merged_batch = RecordBatch::try_new(Arc::new(projected_schema), cols)?; + column_projector.project(merged_batch, partition_values) +} - let mut batch_reader = - arrow_reader.get_record_reader_by_columns(mapped_projections, batch_size)?; - loop { - match batch_reader.next() { - Some(Ok(batch)) => { - let total_cols = &file_schema.fields().len(); - let batch_rows = batch.num_rows(); - total_rows += batch.num_rows(); - - let batch_schema = batch.schema(); - - let mut cols: Vec = Vec::with_capacity(*total_cols); - let batch_cols = batch.columns().to_vec(); - - for field_idx in projection { - let merged_field = &file_schema.fields()[*field_idx]; - if let Some((batch_idx, _name)) = - batch_schema.column_with_name(merged_field.name().as_str()) - { - cols.push(batch_cols[batch_idx].clone()); - } else { - cols.push(new_null_array( - merged_field.data_type(), - batch_rows, - )) - } - } +async fn create_stream( + config: Arc, + file: SizedFile, +) -> Result>> { + let file_metrics = + ParquetFileMetrics::new(config.partition_idx, &file.path, &config.metrics); - let projected_schema = file_schema.clone().project(projection)?; + let object_reader = config.object_store.file_reader(file)?; + let reader = object_reader.chunk_reader().await?; - let merged_batch = - RecordBatch::try_new(Arc::new(projected_schema), cols)?; + let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?; - let proj_batch = partition_column_projector - .project(merged_batch, &partitioned_file.partition_values); + let pruning_stats = RowGroupPruningStatistics { + row_group_metadata: builder.metadata().row_groups(), + parquet_schema: builder.schema().as_ref(), + }; - send_result(&response_tx, proj_batch)?; - if limit.map(|l| total_rows >= l).unwrap_or(false) { - break 'outer; - } - } - None => { - break; - } - Some(Err(e)) => { - let err_msg = - format!("Error reading batch from {}: {}", partitioned_file, e); - // send error to operator - send_result( - &response_tx, - Err(ArrowError::ParquetError(err_msg.clone())), - )?; - // terminate thread with error - return Err(DataFusionError::Execution(err_msg)); - } + if let Some(predicate) = &config.pruning_predicate { + match predicate.prune(&pruning_stats) { + Ok(predicate_values) => { + let num_pruned = predicate_values.iter().filter(|&v| !*v).count(); + file_metrics.row_groups_pruned.add(num_pruned); + + let row_groups = predicate_values + .into_iter() + .enumerate() + .filter_map(|(idx, v)| match v { + true => Some(idx), + false => None, + }) + .collect(); + + builder = builder.with_row_groups(row_groups) + } + Err(e) => { + debug!("Error evaluating row group predicate values {}", e); + file_metrics.predicate_evaluation_errors.add(1); } } - } + }; - // finished reading files (dropping response_tx will close - // channel) - Ok(()) + let projection = map_projections( + config.file_schema.as_ref(), + builder.schema().as_ref(), + &config.projection, + )?; + + builder + .with_batch_size(config.batch_size) + .with_projection(projection) + .build() + .map_err(DataFusionError::ParquetError) } #[cfg(test)] @@ -558,6 +590,7 @@ mod tests { }; use super::*; + use crate::physical_plan::execute_stream; use arrow::array::Float32Array; use arrow::{ array::{Int64Array, Int8Array, StringArray}, @@ -573,16 +606,12 @@ mod tests { }, schema::types::SchemaDescPtr, }; + use tempfile::NamedTempFile; - /// writes each RecordBatch as an individual parquet file and then - /// reads it back in to the named location. - async fn round_trip_to_parquet( - batches: Vec, - projection: Option>, - schema: Option, - ) -> Vec { + // Writes the `batches` to temporary files + fn write_files(batches: Vec) -> Vec { // When vec is dropped, temp files are deleted - let files: Vec<_> = batches + batches .into_iter() .map(|batch| { let output = tempfile::NamedTempFile::new().expect("creating temp file"); @@ -598,8 +627,14 @@ mod tests { writer.close().unwrap(); output }) - .collect(); + .collect() + } + async fn make_exec( + files: &[NamedTempFile], + projection: Option>, + schema: Option, + ) -> ParquetExec { let file_names: Vec<_> = files .iter() .map(|t| t.path().to_string_lossy().to_string()) @@ -621,7 +656,7 @@ mod tests { }; // prepare the scan - let parquet_exec = ParquetExec::new( + ParquetExec::new( FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_groups: vec![file_groups], @@ -632,8 +667,18 @@ mod tests { table_partition_cols: vec![], }, None, - ); + ) + } + /// writes each RecordBatch as an individual parquet file and then + /// reads it back in to the named location. + async fn round_trip_to_parquet( + batches: Vec, + projection: Option>, + schema: Option, + ) -> Vec { + let files = write_files(batches); + let parquet_exec = make_exec(&files, projection, schema).await; let runtime = Arc::new(RuntimeEnv::default()); collect(Arc::new(parquet_exec), runtime) .await @@ -837,12 +882,13 @@ mod tests { Field::new("c3", DataType::Int8, true), ]); - // read/write them files: - let read = - round_trip_to_parquet(vec![batch1, batch2], None, Some(Arc::new(schema))) - .await; + let files = write_files(vec![batch1, batch2]); + let plan = make_exec(&files, None, Some(Arc::new(schema))).await; + let runtime = Arc::new(RuntimeEnv::default()); + let mut stream = execute_stream(Arc::new(plan), runtime).await.unwrap(); + let batch = stream.next().await.unwrap().unwrap(); - // expect only the first batch to be read + // First batch should read successfully let expected = vec![ "+-----+----+----+", "| c1 | c2 | c3 |", @@ -852,7 +898,11 @@ mod tests { "| bar | | |", "+-----+----+----+", ]; - assert_batches_sorted_eq!(expected, &read); + assert_batches_sorted_eq!(expected, &[batch]); + + // Second batch should error + let err = stream.next().await.unwrap().unwrap_err().to_string(); + assert!(err.contains("Failed to map column projection for field c3. Incompatible data types Float32 and Int8"), "{}", err); } #[tokio::test] @@ -955,11 +1005,6 @@ mod tests { Ok(()) } - fn parquet_file_metrics() -> ParquetFileMetrics { - let metrics = Arc::new(ExecutionPlanMetricsSet::new()); - ParquetFileMetrics::new(0, "file.parquet", &metrics) - } - #[test] fn row_group_pruning_predicate_simple_expr() -> Result<()> { use crate::logical_plan::{col, lit}; @@ -978,16 +1023,12 @@ mod tests { vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); assert_eq!(row_group_filter, vec![false, true]); Ok(()) @@ -1011,16 +1052,12 @@ mod tests { vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); // missing statistics for first row group mean that the result from the predicate expression // is null / undefined so the first row group can't be filtered out assert_eq!(row_group_filter, vec![true, true]); @@ -1059,16 +1096,12 @@ mod tests { ], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); // the first row group is still filtered out because the predicate expression can be partially evaluated // when conditions are joined using AND assert_eq!(row_group_filter, vec![false, true]); @@ -1077,16 +1110,12 @@ mod tests { // this bypasses the entire predicate expression and no row groups are filtered out let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); assert_eq!(row_group_filter, vec![true, true]); Ok(()) @@ -1126,51 +1155,69 @@ mod tests { let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); // First row group was filtered out because it contains no null value on "c2". assert_eq!(row_group_filter, vec![false, true]); Ok(()) } - #[test] - fn row_group_pruning_predicate_eq_null_expr() -> Result<()> { + #[tokio::test] + async fn row_group_pruning_predicate_eq_null_expr() -> Result<()> { use crate::logical_plan::{col, lit}; // test row group predicate with an unknown (Null) expr // // int > 1 and bool = NULL => c1_max > 1 and null - let expr = col("c1") + let expr = col("tinyint_col") .gt(lit(15)) - .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Boolean, false), - ])); - let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; - let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); + .and(col("bool_col").eq(lit(ScalarValue::Boolean(None)))); + + let runtime = Arc::new(RuntimeEnv::default()); + let testdata = crate::test_util::parquet_test_data(); + let filename = format!("{}/alltypes_plain.parquet", testdata); - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups: vec![vec![local_unpartitioned_file(filename.clone())]], + file_schema: ParquetFormat::default() + .infer_schema(local_object_reader_stream(vec![filename])) + .await?, + statistics: Statistics::default(), + projection: Some(vec![1, 2, 3]), + limit: None, + table_partition_cols: vec![], + }, + Some(expr), ); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); + // no row group is filtered out because the predicate expression can't be evaluated - // when a null array is generated for a statistics column, - assert_eq!(row_group_filter, vec![true, true]); + let mut results = parquet_exec.execute(0, runtime).await?; + let batch = results.next().await.unwrap()?; + + assert_eq!(8, batch.num_rows()); + assert_eq!(3, batch.num_columns()); + + let schema = batch.schema(); + let field_names: Vec<&str> = + schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!(vec!["bool_col", "tinyint_col", "smallint_col"], field_names); + + let batch = results.next().await; + assert!(batch.is_none()); + + let metrics = parquet_exec.metrics().unwrap(); + let metric = metrics + .iter() + .find(|metric| metric.value().name() == "predicate_evaluation_errors") + .unwrap(); + assert_eq!(metric.value().as_usize(), 1); Ok(()) } diff --git a/datafusion/src/test/object_store.rs b/datafusion/src/test/object_store.rs index e93b4cd2d410..b9bcf4939b9a 100644 --- a/datafusion/src/test/object_store.rs +++ b/datafusion/src/test/object_store.rs @@ -24,12 +24,13 @@ use std::{ use crate::{ datasource::object_store::{ - FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile, + ChunkReader, FileMeta, FileMetaStream, ListEntryStream, ObjectReader, + ObjectStore, SizedFile, }, error::{DataFusionError, Result}, }; use async_trait::async_trait; -use futures::{stream, AsyncRead, StreamExt}; +use futures::{stream, StreamExt}; #[derive(Debug)] /// An object store implem that is useful for testing. @@ -99,11 +100,7 @@ struct EmptyObjectReader(u64); #[async_trait] impl ObjectReader for EmptyObjectReader { - async fn chunk_reader( - &self, - _start: u64, - _length: usize, - ) -> Result> { + async fn chunk_reader(&self) -> Result> { unimplemented!() }