diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 73f6c795c901..f5433d7e869c 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -44,6 +44,7 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; + use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable}; use datafusion_physical_expr::create_physical_expr; diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 31fa1f2d8bcf..707ed039c26c 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -25,6 +25,7 @@ mod delimited_stream; mod file_stream; mod json; mod parquet; +mod row_filter; pub(crate) use self::csv::plan_to_csv; pub use self::csv::CsvExec; diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 14e679c7f16a..a6ce44b22c8d 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -29,6 +29,7 @@ use crate::datasource::listing::FileRange; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; +use crate::physical_plan::file_format::row_filter::build_row_filter; use crate::physical_plan::file_format::FileMeta; use crate::{ error::{DataFusionError, Result}, @@ -67,6 +68,30 @@ use parquet::file::{ }; use parquet::schema::types::ColumnDescriptor; +#[derive(Debug, Clone, Default)] +/// Specify options for the parquet scan +pub struct ParquetScanOptions { + /// If true, any available `pruning_predicate` will be converted to a `RowFilter` + /// and pushed down to the `ParquetRecordBatchStream`. This will enable row level + /// filter at the decoder level. Defaults to false + pushdown_filters: bool, + /// If true, the generated `RowFilter` may reorder the predicate `Expr`s to try and optimize + /// the cost of filter evaluation. + reorder_predicates: bool, +} + +impl ParquetScanOptions { + pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self { + self.pushdown_filters = pushdown_filters; + self + } + + pub fn with_reorder_predicates(mut self, reorder_predicates: bool) -> Self { + self.reorder_predicates = reorder_predicates; + self + } +} + /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { @@ -81,6 +106,8 @@ pub struct ParquetExec { metadata_size_hint: Option, /// Optional user defined parquet file reader factory parquet_file_reader_factory: Option>, + /// Options to specify behavior of parquet scan + scan_options: ParquetScanOptions, } impl ParquetExec { @@ -121,6 +148,7 @@ impl ParquetExec { pruning_predicate, metadata_size_hint, parquet_file_reader_factory: None, + scan_options: ParquetScanOptions::default(), } } @@ -148,6 +176,12 @@ impl ParquetExec { self.parquet_file_reader_factory = Some(parquet_file_reader_factory); self } + + /// Configure `ParquetScanOptions` + pub fn with_scan_options(mut self, scan_options: ParquetScanOptions) -> Self { + self.scan_options = scan_options; + self + } } /// Stores metrics about the parquet execution for a particular parquet file. @@ -258,6 +292,7 @@ impl ExecutionPlan for ParquetExec { metadata_size_hint: self.metadata_size_hint, metrics: self.metrics.clone(), parquet_file_reader_factory, + scan_options: self.scan_options.clone(), }; let stream = FileStream::new( @@ -319,6 +354,7 @@ struct ParquetOpener { metadata_size_hint: Option, metrics: ExecutionPlanMetricsSet, parquet_file_reader_factory: Arc, + scan_options: ParquetScanOptions, } impl FileOpener for ParquetOpener { @@ -347,9 +383,12 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; let projection = self.projection.clone(); let pruning_predicate = self.pruning_predicate.clone(); + let table_schema = self.table_schema.clone(); + let reorder_predicates = self.scan_options.reorder_predicates; + let pushdown_filters = self.scan_options.pushdown_filters; Ok(Box::pin(async move { - let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; + let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?; let adapted_projections = schema_adapter.map_projections(builder.schema(), &projection)?; @@ -358,6 +397,21 @@ impl FileOpener for ParquetOpener { adapted_projections.iter().cloned(), ); + if let Some(predicate) = pushdown_filters + .then(|| pruning_predicate.as_ref().map(|p| p.logical_expr())) + .flatten() + { + if let Ok(Some(filter)) = build_row_filter( + predicate.clone(), + builder.schema().as_ref(), + table_schema.as_ref(), + builder.metadata(), + reorder_predicates, + ) { + builder = builder.with_row_filter(filter); + } + }; + let groups = builder.metadata().row_groups(); let row_groups = prune_row_groups(groups, file_range, pruning_predicate, &metrics); @@ -839,6 +893,7 @@ mod tests { projection: Option>, schema: Option, predicate: Option, + pushdown_predicate: bool, ) -> Result> { let file_schema = match schema { Some(schema) => schema, @@ -851,7 +906,7 @@ mod tests { let file_groups = meta.into_iter().map(Into::into).collect(); // prepare the scan - let parquet_exec = ParquetExec::new( + let mut parquet_exec = ParquetExec::new( FileScanConfig { object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![file_groups], @@ -865,6 +920,14 @@ mod tests { None, ); + if pushdown_predicate { + parquet_exec = parquet_exec.with_scan_options( + ParquetScanOptions::default() + .with_pushdown_filters(true) + .with_reorder_predicates(true), + ); + } + let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); collect(Arc::new(parquet_exec), task_ctx).await @@ -912,9 +975,10 @@ mod tests { let batch3 = add_to_batch(&batch1, "c3", c3); // read/write them files: - let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None, None) - .await - .unwrap(); + let read = + round_trip_to_parquet(vec![batch1, batch2, batch3], None, None, None, false) + .await + .unwrap(); let expected = vec![ "+-----+----+----+", "| c1 | c2 | c3 |", @@ -953,7 +1017,7 @@ mod tests { let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]); // read/write them files: - let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None) + let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None, false) .await .unwrap(); let expected = vec![ @@ -987,7 +1051,7 @@ mod tests { let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); // read/write them files: - let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None) + let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None, false) .await .unwrap(); let expected = vec![ @@ -1020,24 +1084,60 @@ mod tests { // batch2: c3(int8), c2(int64) let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); - let filter = col("c2").eq(lit(0_i64)); + let filter = col("c2").eq(lit(2_i64)); // read/write them files: - let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter)) - .await - .unwrap(); + let read = + round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false) + .await + .unwrap(); let expected = vec![ "+-----+----+----+", "| c1 | c3 | c2 |", "+-----+----+----+", - "| Foo | 10 | |", + "| | | |", + "| | 10 | 1 |", "| | 20 | |", + "| | 20 | 2 |", + "| Foo | 10 | |", "| bar | | |", "+-----+----+----+", ]; assert_batches_sorted_eq!(expected, &read); } + #[tokio::test] + async fn evolved_schema_intersection_filter_with_filter_pushdown() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + // batch1: c1(string), c3(int8) + let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]); + + // batch2: c3(int8), c2(int64) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); + + let filter = col("c2").eq(lit(2_i64)); + + // read/write them files: + let read = + round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true) + .await + .unwrap(); + let expected = vec![ + "+----+----+----+", + "| c1 | c3 | c2 |", + "+----+----+----+", + "| | 20 | 2 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + #[tokio::test] async fn evolved_schema_projection() { let c1: ArrayRef = @@ -1061,10 +1161,15 @@ mod tests { let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]); // read/write them files: - let read = - round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None, None) - .await - .unwrap(); + let read = round_trip_to_parquet( + vec![batch1, batch2], + Some(vec![0, 3]), + None, + None, + false, + ) + .await + .unwrap(); let expected = vec![ "+-----+-----+", "| c1 | c4 |", @@ -1102,9 +1207,10 @@ mod tests { let filter = col("c3").eq(lit(0_i8)); // read/write them files: - let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter)) - .await - .unwrap(); + let read = + round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false) + .await + .unwrap(); // Predicate should prune all row groups assert_eq!(read.len(), 0); @@ -1123,12 +1229,13 @@ mod tests { // batch2: c2(int64) let batch2 = create_batch(vec![("c2", c2)]); - let filter = col("c2").eq(lit(0_i64)); + let filter = col("c2").eq(lit(1_i64)); // read/write them files: - let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter)) - .await - .unwrap(); + let read = + round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false) + .await + .unwrap(); // This does not look correct since the "c2" values in the result do not in fact match the predicate `c2 == 0` // but parquet pruning is not exact. If the min/max values are not defined (which they are not in this case since the it is @@ -1139,14 +1246,48 @@ mod tests { "+-----+----+", "| c1 | c2 |", "+-----+----+", - "| Foo | |", "| | |", + "| | |", + "| | 1 |", + "| | 2 |", + "| Foo | |", "| bar | |", "+-----+----+", ]; assert_batches_sorted_eq!(expected, &read); } + #[tokio::test] + async fn evolved_schema_disjoint_schema_filter_with_pushdown() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + // batch1: c1(string) + let batch1 = create_batch(vec![("c1", c1.clone())]); + + // batch2: c2(int64) + let batch2 = create_batch(vec![("c2", c2)]); + + let filter = col("c2").eq(lit(1_i64)); + + // read/write them files: + let read = + round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true) + .await + .unwrap(); + + let expected = vec![ + "+----+----+", + "| c1 | c2 |", + "+----+----+", + "| | 1 |", + "+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + #[tokio::test] async fn evolved_schema_incompatible_types() { let c1: ArrayRef = @@ -1181,6 +1322,7 @@ mod tests { None, Some(Arc::new(schema)), None, + false, ) .await; assert_contains!(read.unwrap_err().to_string(), diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs new file mode 100644 index 000000000000..56bdba5577e9 --- /dev/null +++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs @@ -0,0 +1,400 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, BooleanArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::error::{ArrowError, Result as ArrowResult}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{Column, Result, ScalarValue, ToDFSchema}; +use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion}; + +use datafusion_expr::{uncombine_filter, Expr}; +use datafusion_physical_expr::execution_props::ExecutionProps; +use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; +use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; +use parquet::arrow::ProjectionMask; +use parquet::file::metadata::ParquetMetaData; +use std::sync::Arc; + +/// This module contains utilities for enabling the pushdown of DataFusion filter predicates (which +/// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the parquet decoder level in `arrow-rs`. +/// DataFusion will use a `ParquetRecordBatchStream` to read data from parquet into arrow `RecordBatch`es. +/// When constructing the `ParquetRecordBatchStream` you can provide a `RowFilter` which is itself just a vector +/// of `Box`. During decoding, the predicates are evaluated to generate a mask which is used +/// to avoid decoding rows in projected columns which are not selected which can significantly reduce the amount +/// of compute required for decoding. +/// +/// Since the predicates are applied serially in the order defined in the `RowFilter`, the optimal ordering +/// will depend on the exact filters. The best filters to execute first have two properties: +/// 1. The are relatively inexpensive to evaluate (e.g. they read column chunks which are relatively small) +/// 2. They filter a lot of rows, reducing the amount of decoding required for subsequent filters and projected columns +/// +/// Given the metadata exposed by parquet, the selectivity of filters is not easy to estimate so the heuristics we use here primarily +/// focus on the evaluation cost. +/// +/// The basic algorithm for constructing the `RowFilter` is as follows +/// 1. Recursively break conjunctions into separate predicates. An expression like `a = 1 AND (b = 2 AND c = 3)` would be +/// separated into the expressions `a = 1`, `b = 2`, and `c = 3`. +/// 2. Determine whether each predicate is suitable as an `ArrowPredicate`. As long as the predicate does not reference any projected columns +/// or columns with non-primitive types, then it is considered suitable. +/// 3. Determine, for each predicate, the total compressed size of all columns required to evaluate the predicate. +/// 4. Determine, for each predicate, whether all columns required to evaluate the expression are sorted. +/// 5. Re-order the predicate by total size (from step 3). +/// 6. Partition the predicates according to whether they are sorted (from step 4) +/// 7. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`. +/// 8. Build the `RowFilter` with the sorted predicates followed by the unsorted predicates. Within each partition +/// the predicates will still be sorted by size. + +/// A predicate which can be passed to `ParquetRecordBatchStream` to perform row-level +/// filtering during parquet decoding. +#[derive(Debug)] +pub(crate) struct DatafusionArrowPredicate { + physical_expr: Arc, + projection: ProjectionMask, +} + +impl DatafusionArrowPredicate { + pub fn try_new( + candidate: FilterCandidate, + schema: &Schema, + metadata: &ParquetMetaData, + ) -> Result { + let props = ExecutionProps::default(); + + let schema = schema.project(&candidate.projection)?; + let df_schema = schema.clone().to_dfschema()?; + + let physical_expr = + create_physical_expr(&candidate.expr, &df_schema, &schema, &props)?; + + Ok(Self { + physical_expr, + projection: ProjectionMask::roots( + metadata.file_metadata().schema_descr(), + candidate.projection, + ), + }) + } +} + +impl ArrowPredicate for DatafusionArrowPredicate { + fn projection(&self) -> &ProjectionMask { + &self.projection + } + + fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { + match self + .physical_expr + .evaluate(&batch) + .map(|v| v.into_array(batch.num_rows())) + { + Ok(array) => { + if let Some(mask) = array.as_any().downcast_ref::() { + Ok(BooleanArray::from(mask.data().clone())) + } else { + Err(ArrowError::ComputeError( + "Unexpected result of predicate evaluation, expected BooleanArray".to_owned(), + )) + } + } + Err(e) => Err(ArrowError::ComputeError(format!( + "Error evaluating filter predicate: {:?}", + e + ))), + } + } +} + +/// A candidate expression for creating a `RowFilter` contains the +/// expression as well as data to estimate the cost of evaluating +/// the resulting expression. +pub(crate) struct FilterCandidate { + expr: Expr, + required_bytes: usize, + can_use_index: bool, + projection: Vec, +} + +/// Helper to build a `FilterCandidate`. This will do several things +/// 1. Determine the columns required to evaluate the expression +/// 2. Calculate data required to estimate the cost of evaluating the filter +/// 3. Rewrite column expressions in the predicate which reference columns not in the particular file schema. +/// This is relevant in the case where we have determined the table schema by merging all individual file schemas +/// and any given file may or may not contain all columns in the merged schema. If a particular column is not present +/// we replace the column expression with a literal expression that produces a null value. +struct FilterCandidateBuilder<'a> { + expr: Expr, + file_schema: &'a Schema, + table_schema: &'a Schema, + required_column_indices: Vec, + non_primitive_columns: bool, + projected_columns: bool, +} + +impl<'a> FilterCandidateBuilder<'a> { + pub fn new(expr: Expr, file_schema: &'a Schema, table_schema: &'a Schema) -> Self { + Self { + expr, + file_schema, + table_schema, + required_column_indices: vec![], + non_primitive_columns: false, + projected_columns: false, + } + } + + pub fn build( + mut self, + metadata: &ParquetMetaData, + ) -> Result> { + let expr = self.expr.clone(); + let expr = expr.rewrite(&mut self)?; + + if self.non_primitive_columns || self.projected_columns { + Ok(None) + } else { + let required_bytes = + size_of_columns(&self.required_column_indices, metadata)?; + let can_use_index = columns_sorted(&self.required_column_indices, metadata)?; + + Ok(Some(FilterCandidate { + expr, + required_bytes, + can_use_index, + projection: self.required_column_indices, + })) + } + } +} + +impl<'a> ExprRewriter for FilterCandidateBuilder<'a> { + fn pre_visit(&mut self, expr: &Expr) -> Result { + if let Expr::Column(column) = expr { + if let Ok(idx) = self.file_schema.index_of(&column.name) { + self.required_column_indices.push(idx); + + if !is_primitive_field(self.file_schema.field(idx)) { + self.non_primitive_columns = true; + } + } else if self.table_schema.index_of(&column.name).is_err() { + // If the column does not exist in the (un-projected) table schema then + // it must be a projected column. + self.projected_columns = true; + } + } + Ok(RewriteRecursion::Continue) + } + + fn mutate(&mut self, expr: Expr) -> Result { + if let Expr::Column(Column { name, .. }) = &expr { + if self.file_schema.field_with_name(name).is_err() { + return Ok(Expr::Literal(ScalarValue::Null)); + } + } + + Ok(expr) + } +} + +/// Calculate the total compressed size of all `Column's required for +/// predicate `Expr`. This should represent the total amount of file IO +/// required to evaluate the predicate. +fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result { + let mut total_size = 0; + let row_groups = metadata.row_groups(); + for idx in columns { + for rg in row_groups.iter() { + total_size += rg.column(*idx).compressed_size() as usize; + } + } + + Ok(total_size) +} + +/// For a given set of `Column`s required for predicate `Expr` determine whether all +/// columns are sorted. Sorted columns may be queried more efficiently in the presence of +/// a PageIndex. +fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result { + // TODO How do we know this? + Ok(false) +} + +/// Build a [`RowFilter`] from the given predicate `Expr` +pub fn build_row_filter( + expr: Expr, + file_schema: &Schema, + table_schema: &Schema, + metadata: &ParquetMetaData, + reorder_predicates: bool, +) -> Result> { + let predicates = uncombine_filter(expr); + + let mut candidates: Vec = predicates + .into_iter() + .flat_map(|expr| { + if let Ok(candidate) = + FilterCandidateBuilder::new(expr, file_schema, table_schema) + .build(metadata) + { + candidate + } else { + None + } + }) + .collect(); + + if candidates.is_empty() { + Ok(None) + } else if reorder_predicates { + candidates.sort_by_key(|c| c.required_bytes); + + let (indexed_candidates, other_candidates): (Vec<_>, Vec<_>) = + candidates.into_iter().partition(|c| c.can_use_index); + + let mut filters: Vec> = vec![]; + + for candidate in indexed_candidates { + let filter = + DatafusionArrowPredicate::try_new(candidate, file_schema, metadata)?; + + filters.push(Box::new(filter)); + } + + for candidate in other_candidates { + let filter = + DatafusionArrowPredicate::try_new(candidate, file_schema, metadata)?; + + filters.push(Box::new(filter)); + } + + Ok(Some(RowFilter::new(filters))) + } else { + let mut filters: Vec> = vec![]; + for candidate in candidates { + let filter = + DatafusionArrowPredicate::try_new(candidate, file_schema, metadata)?; + + filters.push(Box::new(filter)); + } + + Ok(Some(RowFilter::new(filters))) + } +} + +/// return true if this is a non nested type. +// TODO remove after https://github.com/apache/arrow-rs/issues/2704 is done +fn is_primitive_field(field: &Field) -> bool { + !matches!( + field.data_type(), + DataType::List(_) + | DataType::FixedSizeList(_, _) + | DataType::LargeList(_) + | DataType::Struct(_) + | DataType::Union(_, _, _) + | DataType::Map(_, _) + ) +} + +#[cfg(test)] +mod test { + use crate::physical_plan::file_format::row_filter::FilterCandidateBuilder; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::ScalarValue; + use datafusion_expr::{col, lit}; + use parquet::arrow::parquet_to_arrow_schema; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + // Assume a column expression for a column not in the table schema is a projected column and ignore it + #[test] + fn test_filter_candidate_builder_ignore_projected_columns() { + let testdata = crate::test_util::parquet_test_data(); + let file = std::fs::File::open(&format!("{}/alltypes_plain.parquet", testdata)) + .expect("opening file"); + + let reader = SerializedFileReader::new(file).expect("creating reader"); + + let metadata = reader.metadata(); + + let table_schema = + parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) + .expect("parsing schema"); + + let expr = col("projected_column").eq(lit("value")); + + let candidate = FilterCandidateBuilder::new(expr, &table_schema, &table_schema) + .build(metadata) + .expect("building candidate"); + + assert!(candidate.is_none()); + } + + // We should ignore predicate that read non-primitive columns + #[test] + fn test_filter_candidate_builder_ignore_complex_types() { + let testdata = crate::test_util::parquet_test_data(); + let file = std::fs::File::open(&format!("{}/list_columns.parquet", testdata)) + .expect("opening file"); + + let reader = SerializedFileReader::new(file).expect("creating reader"); + + let metadata = reader.metadata(); + + let table_schema = + parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) + .expect("parsing schema"); + + let expr = col("int64_list").is_not_null(); + + let candidate = FilterCandidateBuilder::new(expr, &table_schema, &table_schema) + .build(metadata) + .expect("building candidate"); + + assert!(candidate.is_none()); + } + + // If a column exists in the table schema but not the file schema it should be rewritten to a null expression + #[test] + fn test_filter_candidate_builder_rewrite_missing_column() { + let testdata = crate::test_util::parquet_test_data(); + let file = std::fs::File::open(&format!("{}/alltypes_plain.parquet", testdata)) + .expect("opening file"); + + let reader = SerializedFileReader::new(file).expect("creating reader"); + + let metadata = reader.metadata(); + + let table_schema = + parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) + .expect("parsing schema"); + + let file_schema = Schema::new(vec![ + Field::new("bigint_col", DataType::Int64, true), + Field::new("float_col", DataType::Float32, true), + ]); + + let expr = col("bigint_col").eq(col("int_col")); + let expected_candidate_expr = col("bigint_col").eq(lit(ScalarValue::Null)); + + let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema) + .build(metadata) + .expect("building candidate"); + + assert!(candidate.is_some()); + + assert_eq!(candidate.unwrap().expr, expected_candidate_expr); + } +} diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 6c5cc0ecc40e..8d89f6f4144e 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -452,6 +452,37 @@ pub fn combine_filters(filters: &[Expr]) -> Option { Some(combined_filter) } +/// Take combined filter (multiple boolean expressions ANDed together) +/// and break down into distinct filters. This should be the inverse of +/// `combine_filters` +pub fn uncombine_filter(filter: Expr) -> Vec { + match filter { + Expr::BinaryExpr { + left, + op: Operator::And, + right, + } => { + let mut exprs = uncombine_filter(*left); + exprs.extend(uncombine_filter(*right)); + exprs + } + expr => { + vec![expr] + } + } +} + +/// Combines an array of filter expressions into a single filter expression +/// consisting of the input filter expressions joined with logical OR. +/// Returns None if the filters array is empty. +pub fn combine_filters_disjunctive(filters: &[Expr]) -> Option { + if filters.is_empty() { + return None; + } + + filters.iter().cloned().reduce(or) +} + /// Recursively un-alias an expressions #[inline] pub fn unalias(expr: Expr) -> Expr { @@ -521,6 +552,7 @@ pub fn call_fn(name: impl AsRef, args: Vec) -> Result { #[cfg(test)] mod test { use super::*; + use arrow::datatypes::{Field, Schema}; #[test] fn filter_is_null_and_is_not_null() { @@ -698,4 +730,56 @@ mod test { combine_filters(&[filter1.clone(), filter2.clone(), filter3.clone()]); assert_eq!(result, Some(and(and(filter1, filter2), filter3))); } + + fn assert_predicates(actual: Vec, expected: Vec) { + assert_eq!( + actual.len(), + expected.len(), + "Predicates are not equal, found {} predicates but expected {}", + actual.len(), + expected.len() + ); + + for expr in expected.into_iter() { + assert!( + actual.contains(&expr), + "Predicates are not equal, predicate {:?} not found in {:?}", + expr, + actual + ); + } + } + + #[test] + fn test_uncombine_filter() { + let _schema = Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Utf8, true), + ]); + + let expr = col("a").eq(lit("s")); + let actual = uncombine_filter(expr); + + assert_predicates(actual, vec![col("a").eq(lit("s"))]); + } + + #[test] + fn test_uncombine_filter_recursively() { + let _schema = Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Utf8, true), + ]); + + let expr = and(col("a"), col("b")); + let actual = uncombine_filter(expr); + + assert_predicates(actual, vec![col("a"), col("b")]); + + let expr = col("a").and(col("b")).or(col("c")); + let actual = uncombine_filter(expr.clone()); + + assert_predicates(actual, vec![expr]); + } }