From 0617169310ddbf81fd9b107eb13fa96cda6c6c32 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 19 Jan 2022 14:45:00 +0000 Subject: [PATCH] Async ParquetExec --- Cargo.toml | 6 + datafusion/Cargo.toml | 2 +- .../src/datasource/object_store/local.rs | 16 +- datafusion/src/datasource/object_store/mod.rs | 29 +- .../src/physical_plan/file_format/parquet.rs | 459 ++++++++---------- datafusion/src/test/object_store.rs | 11 +- 6 files changed, 243 insertions(+), 280 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ea1acc04e687f..4b7642d5bd7a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,9 @@ members = [ [profile.release] lto = true codegen-units = 1 + +[patch.crates-io] +# TODO: TEMPORARY +arrow = { git = "https://github.com/tustvold/arrow-rs", rev = "7825ea86c425ad8f95664295a5ead576824bf832" } +arrow-flight = { git = "https://github.com/tustvold/arrow-rs", rev = "7825ea86c425ad8f95664295a5ead576824bf832" } +parquet = { git = "https://github.com/tustvold/arrow-rs", rev = "7825ea86c425ad8f95664295a5ead576824bf832" } \ No newline at end of file diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index d5334130dd8dd..e73757946dba9 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -53,7 +53,7 @@ avro = ["avro-rs", "num-traits"] ahash = { version = "0.7", default-features = false } hashbrown = { version = "0.11", features = ["raw"] } arrow = { version = "7.0.0", features = ["prettyprint"] } -parquet = { version = "7.0.0", features = ["arrow"] } +parquet = { version = "7.0.0", features = ["arrow", "async"] } sqlparser = "0.13" paste = "^1.0" num_cpus = "1.13.0" diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index 0e857c8485828..5c0a7dde3e889 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -22,10 +22,10 @@ use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use async_trait::async_trait; -use futures::{stream, AsyncRead, StreamExt}; +use futures::{stream, StreamExt}; use crate::datasource::object_store::{ - FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, + ChunkReader, FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, }; use crate::datasource::PartitionedFile; use crate::error::DataFusionError; @@ -68,14 +68,10 @@ impl LocalFileReader { #[async_trait] impl ObjectReader for LocalFileReader { - async fn chunk_reader( - &self, - _start: u64, - _length: usize, - ) -> Result> { - todo!( - "implement once async file readers are available (arrow-rs#78, arrow-rs#111)" - ) + async fn chunk_reader(&self) -> Result> { + let file = tokio::fs::File::open(&self.file.path).await?; + let file = tokio::io::BufReader::new(file); + Ok(Box::new(file)) } fn sync_chunk_reader( diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index 77ca1ef6bae73..d1ee4fd6e8deb 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -27,21 +27,42 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use futures::{AsyncRead, Stream, StreamExt}; +use futures::{Stream, StreamExt}; +use tokio::io::{AsyncBufRead, AsyncSeek}; use local::LocalFileSystem; use crate::error::{DataFusionError, Result}; +/// Provides async access to read a file, combing [`AsyncSeek`] +/// and [`AsyncBufRead`] so they can be used as a trait object +/// +/// [`AsyncSeek`] is necessary because readers may need to seek around whilst +/// reading, either because the format itself is structured (e.g. parquet) +/// or because it needs to read metadata or infer schema as an initial step +/// +/// [`AsyncBufRead`] is necessary because readers may wish to read data +/// up until some delimiter (e.g. csv or newline-delimited JSON) +/// +/// Note: the same block of data may be read multiple times +/// +/// Implementations that fetch from object storage may wish to maintain an internal +/// buffer of fetched data blocks, potentially discarding them or spilling them to disk +/// based on memory pressure +/// +/// TODO(#1614): Remove Sync +pub trait ChunkReader: AsyncBufRead + AsyncSeek + Send + Sync + Unpin {} +impl ChunkReader for T {} + /// Object Reader for one file in an object store. /// /// Note that the dynamic dispatch on the reader might /// have some performance impacts. #[async_trait] pub trait ObjectReader: Send + Sync { - /// Get reader for a part [start, start + length] in the file asynchronously - async fn chunk_reader(&self, start: u64, length: usize) - -> Result>; + /// Get a [`ChunkReader`] for the file, successive calls to this MUST + /// return readers with independent seek positions + async fn chunk_reader(&self) -> Result>; /// Get reader for a part [start, start + length] in the file fn sync_chunk_reader( diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 17abb434fe68b..045175ce1a161 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -18,22 +18,21 @@ //! Execution plan for reading Parquet files use std::fmt; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::{any::Any, convert::TryInto}; -use crate::datasource::file_format::parquet::ChunkObjectReader; -use crate::datasource::object_store::ObjectStore; +use crate::datasource::object_store::{ChunkReader, ObjectStore}; use crate::datasource::PartitionedFile; use crate::{ error::{DataFusionError, Result}, + execution::runtime_env::RuntimeEnv, logical_plan::{Column, Expr}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_plan::{ file_format::FileScanConfig, metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, - stream::RecordBatchReceiverStream, - DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, - Statistics, + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, Statistics, }, scalar::ScalarValue, }; @@ -45,22 +44,17 @@ use arrow::{ record_batch::RecordBatch, }; use log::debug; -use parquet::file::{ - metadata::RowGroupMetaData, - reader::{FileReader, SerializedFileReader}, - statistics::Statistics as ParquetStatistics, +use parquet::{ + arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder}, + file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, }; use fmt::Debug; -use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; +use std::pin::Pin; +use std::task::{Context, Poll}; -use tokio::{ - sync::mpsc::{channel, Receiver, Sender}, - task, -}; - -use crate::execution::runtime_env::RuntimeEnv; use async_trait::async_trait; +use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; use super::PartitionColumnProjector; @@ -192,50 +186,31 @@ impl ExecutionPlan for ParquetExec { partition_index: usize, runtime: Arc, ) -> Result { - // because the parquet implementation is not thread-safe, it is necessary to execute - // on a thread and communicate with channels - let (response_tx, response_rx): ( - Sender>, - Receiver>, - ) = channel(2); - - let partition = self.base_config.file_groups[partition_index].clone(); - let metrics = self.metrics.clone(); + let files = self.base_config.file_groups[partition_index].to_vec(); + let projection = match self.base_config.file_column_projection_indices() { Some(proj) => proj, None => (0..self.base_config.file_schema.fields().len()).collect(), }; - let pruning_predicate = self.pruning_predicate.clone(); - let batch_size = runtime.batch_size(); - let limit = self.base_config.limit; - let object_store = Arc::clone(&self.base_config.object_store); - let partition_col_proj = PartitionColumnProjector::new( + + let partition_column_projector = PartitionColumnProjector::new( Arc::clone(&self.projected_schema), &self.base_config.table_partition_cols, ); - let join_handle = task::spawn_blocking(move || { - if let Err(e) = read_partition( - object_store.as_ref(), - partition_index, - partition, - metrics, - &projection, - &pruning_predicate, - batch_size, - response_tx, - limit, - partition_col_proj, - ) { - println!("Parquet reader thread terminated due to error: {:?}", e); - } - }); - - Ok(RecordBatchReceiverStream::create( - &self.projected_schema, - response_rx, - join_handle, - )) + Ok(Box::pin(ParquetExecStream { + files, + partition_column_projector, + projection: Arc::from(projection), + schema: Arc::clone(&self.projected_schema), + state: Mutex::new(StreamState::Init), + object_store: Arc::clone(&self.base_config.object_store), + metrics: self.metrics.clone(), + partition_idx: partition_index, + batch_size: runtime.batch_size(), + pruning_predicate: self.pruning_predicate.clone(), + file_idx: 0, + })) } fn fmt_as( @@ -264,15 +239,156 @@ impl ExecutionPlan for ParquetExec { } } -fn send_result( - response_tx: &Sender>, - result: ArrowResult, -) -> Result<()> { - // Note this function is running on its own blockng tokio thread so blocking here is ok. - response_tx - .blocking_send(result) - .map_err(|e| DataFusionError::Execution(e.to_string()))?; - Ok(()) +struct ParquetExecStream { + schema: SchemaRef, + + object_store: Arc, + + metrics: ExecutionPlanMetricsSet, + + partition_idx: usize, + + batch_size: usize, + + projection: Arc<[usize]>, + + pruning_predicate: Option, + + partition_column_projector: PartitionColumnProjector, + + file_idx: usize, + + files: Vec, + + // Mutex needed because of #1614 + state: Mutex, +} + +enum StreamState { + Init, + Create(BoxFuture<'static, Result>>>), + Stream(ParquetRecordBatchStream>), + Error, +} + +impl ParquetExecStream { + fn create_stream( + &self, + file: &PartitionedFile, + ) -> BoxFuture<'static, Result>>> { + let object_store = Arc::clone(&self.object_store); + let batch_size = self.batch_size; + let projection = Arc::clone(&self.projection); + let pruning_predicate = self.pruning_predicate.clone(); + let sized_file = file.file_meta.sized_file.clone(); + let file_metrics = + ParquetFileMetrics::new(self.partition_idx, &sized_file.path, &self.metrics); + + async move { + let object_reader = object_store.file_reader(sized_file)?; + let reader = object_reader.chunk_reader().await?; + + let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?; + + let pruning_stats = RowGroupPruningStatistics { + row_group_metadata: builder.metadata().row_groups(), + parquet_schema: builder.schema().as_ref(), + }; + + if let Some(predicate) = pruning_predicate { + match predicate.prune(&pruning_stats) { + Ok(predicate_values) => { + let num_pruned = predicate_values.iter().filter(|&v| !*v).count(); + file_metrics.row_groups_pruned.add(num_pruned); + + let row_groups = predicate_values + .into_iter() + .enumerate() + .filter_map(|(idx, v)| match v { + true => Some(idx), + false => None, + }) + .collect(); + + builder = builder.with_row_groups(row_groups) + } + Err(e) => { + debug!("Error evaluating row group predicate values {}", e); + file_metrics.predicate_evaluation_errors.add(1); + } + } + }; + + builder + .with_projection(projection) + .with_batch_size(batch_size) + .build() + .map_err(DataFusionError::ParquetError) + } + .boxed() + } +} + +impl RecordBatchStream for ParquetExecStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for ParquetExecStream { + type Item = ArrowResult; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = &mut *self; + let mut state = this.state.lock().expect("not poisoned"); + + loop { + match &mut *state { + StreamState::Init => { + let partitioned_file = match this.files.get(this.file_idx) { + Some(file) => file, + None => return Poll::Ready(None), + }; + + *state = StreamState::Create(this.create_stream(partitioned_file)); + } + StreamState::Create(f) => match futures::ready!(f.poll_unpin(cx)) { + Ok(s) => { + *state = StreamState::Stream(s); + } + Err(e) => { + *state = StreamState::Error; + return Poll::Ready(Some(Err(ArrowError::ExternalError( + Box::new(e), + )))); + } + }, + StreamState::Stream(s) => match futures::ready!(s.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let proj_batch = this + .partition_column_projector + .project(batch, &this.files[this.file_idx].partition_values); + + return Poll::Ready(Some(proj_batch)); + } + Some(Err(e)) => { + *state = StreamState::Error; + return Poll::Ready(Some(Err(ArrowError::ExternalError( + Box::new(e), + )))); + } + None => { + this.file_idx += 1; + *state = StreamState::Init + } + }, + StreamState::Error => return Poll::Pending, + } + } + } } /// Wraps parquet statistics in a way @@ -355,106 +471,6 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } } -fn build_row_group_predicate( - pruning_predicate: &PruningPredicate, - metrics: ParquetFileMetrics, - row_group_metadata: &[RowGroupMetaData], -) -> Box bool> { - let parquet_schema = pruning_predicate.schema().as_ref(); - - let pruning_stats = RowGroupPruningStatistics { - row_group_metadata, - parquet_schema, - }; - let predicate_values = pruning_predicate.prune(&pruning_stats); - - match predicate_values { - Ok(values) => { - // NB: false means don't scan row group - let num_pruned = values.iter().filter(|&v| !*v).count(); - metrics.row_groups_pruned.add(num_pruned); - Box::new(move |_, i| values[i]) - } - // stats filter array could not be built - // return a closure which will not filter out any row groups - Err(e) => { - debug!("Error evaluating row group predicate values {}", e); - metrics.predicate_evaluation_errors.add(1); - Box::new(|_r, _i| true) - } - } -} - -#[allow(clippy::too_many_arguments)] -fn read_partition( - object_store: &dyn ObjectStore, - partition_index: usize, - partition: Vec, - metrics: ExecutionPlanMetricsSet, - projection: &[usize], - pruning_predicate: &Option, - batch_size: usize, - response_tx: Sender>, - limit: Option, - mut partition_column_projector: PartitionColumnProjector, -) -> Result<()> { - let mut total_rows = 0; - 'outer: for partitioned_file in partition { - let file_metrics = ParquetFileMetrics::new( - partition_index, - &*partitioned_file.file_meta.path(), - &metrics, - ); - let object_reader = - object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?; - let mut file_reader = - SerializedFileReader::new(ChunkObjectReader(object_reader))?; - if let Some(pruning_predicate) = pruning_predicate { - let row_group_predicate = build_row_group_predicate( - pruning_predicate, - file_metrics, - file_reader.metadata().row_groups(), - ); - file_reader.filter_row_groups(&row_group_predicate); - } - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); - let mut batch_reader = arrow_reader - .get_record_reader_by_columns(projection.to_owned(), batch_size)?; - loop { - match batch_reader.next() { - Some(Ok(batch)) => { - total_rows += batch.num_rows(); - let proj_batch = partition_column_projector - .project(batch, &partitioned_file.partition_values); - - send_result(&response_tx, proj_batch)?; - if limit.map(|l| total_rows >= l).unwrap_or(false) { - break 'outer; - } - } - None => { - break; - } - Some(Err(e)) => { - let err_msg = - format!("Error reading batch from {}: {}", partitioned_file, e); - // send error to operator - send_result( - &response_tx, - Err(ArrowError::ParquetError(err_msg.clone())), - )?; - // terminate thread with error - return Err(DataFusionError::Execution(err_msg)); - } - } - } - } - - // finished reading files (dropping response_tx will close - // channel) - Ok(()) -} - #[cfg(test)] mod tests { use crate::datasource::{ @@ -573,11 +589,6 @@ mod tests { Ok(()) } - fn parquet_file_metrics() -> ParquetFileMetrics { - let metrics = Arc::new(ExecutionPlanMetricsSet::new()); - ParquetFileMetrics::new(0, "file.parquet", &metrics) - } - #[test] fn row_group_pruning_predicate_simple_expr() -> Result<()> { use crate::logical_plan::{col, lit}; @@ -596,16 +607,12 @@ mod tests { vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); assert_eq!(row_group_filter, vec![false, true]); Ok(()) @@ -629,16 +636,12 @@ mod tests { vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); // missing statistics for first row group mean that the result from the predicate expression // is null / undefined so the first row group can't be filtered out assert_eq!(row_group_filter, vec![true, true]); @@ -677,16 +680,12 @@ mod tests { ], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); // the first row group is still filtered out because the predicate expression can be partially evaluated // when conditions are joined using AND assert_eq!(row_group_filter, vec![false, true]); @@ -695,68 +694,12 @@ mod tests { // this bypasses the entire predicate expression and no row groups are filtered out let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); - assert_eq!(row_group_filter, vec![true, true]); - - Ok(()) - } - - #[test] - fn row_group_pruning_predicate_null_expr() -> Result<()> { - use crate::logical_plan::{col, lit}; - // test row group predicate with an unknown (Null) expr - // - // int > 1 and bool = NULL => c1_max > 1 and null - let expr = col("c1") - .gt(lit(15)) - .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Boolean, false), - ])); - let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; - let schema_descr = get_test_schema_descr(vec![ - ("c1", PhysicalType::INT32), - ("c2", PhysicalType::BOOLEAN), - ]); - let rgm1 = get_row_group_meta_data( - &schema_descr, - vec![ - ParquetStatistics::int32(Some(1), Some(10), None, 0, false), - ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), - ], - ); - let rgm2 = get_row_group_meta_data( - &schema_descr, - vec![ - ParquetStatistics::int32(Some(11), Some(20), None, 0, false), - ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), - ], - ); - let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); - // no row group is filtered out because the predicate expression can't be evaluated - // when a null array is generated for a statistics column, - // because the null values propagate to the end result, making the predicate result undefined + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); assert_eq!(row_group_filter, vec![true, true]); Ok(()) diff --git a/datafusion/src/test/object_store.rs b/datafusion/src/test/object_store.rs index e93b4cd2d410d..b9bcf4939b9a0 100644 --- a/datafusion/src/test/object_store.rs +++ b/datafusion/src/test/object_store.rs @@ -24,12 +24,13 @@ use std::{ use crate::{ datasource::object_store::{ - FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile, + ChunkReader, FileMeta, FileMetaStream, ListEntryStream, ObjectReader, + ObjectStore, SizedFile, }, error::{DataFusionError, Result}, }; use async_trait::async_trait; -use futures::{stream, AsyncRead, StreamExt}; +use futures::{stream, StreamExt}; #[derive(Debug)] /// An object store implem that is useful for testing. @@ -99,11 +100,7 @@ struct EmptyObjectReader(u64); #[async_trait] impl ObjectReader for EmptyObjectReader { - async fn chunk_reader( - &self, - _start: u64, - _length: usize, - ) -> Result> { + async fn chunk_reader(&self) -> Result> { unimplemented!() }