diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index f9ec72ab00678..7573a263bee9c 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1676,6 +1676,34 @@ mod tests { assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5); } + #[tokio::test] + async fn multi_column_predicate_pushdown() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]); + + // Columns in different order to schema + let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar"))); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1], None, None, Some(filter), true) + .await + .unwrap(); + + let expected = vec![ + "+-----+----+", + "| c1 | c2 |", + "+-----+----+", + "| Foo | 1 |", + "| bar | |", + "+-----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + #[tokio::test] async fn evolved_schema_incompatible_types() { let c1: ArrayRef = diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs index 49ec6b5caf3a3..54bf4bb8fa42f 100644 --- a/datafusion/core/src/physical_plan/file_format/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs @@ -67,7 +67,8 @@ use crate::physical_plan::metrics; #[derive(Debug)] pub(crate) struct DatafusionArrowPredicate { physical_expr: Arc, - projection: ProjectionMask, + projection_mask: ProjectionMask, + projection: Vec, /// how many rows were filtered out by this predicate rows_filtered: metrics::Count, /// how long was spent evaluating this predicate @@ -90,9 +91,22 @@ impl DatafusionArrowPredicate { let physical_expr = create_physical_expr(&candidate.expr, &df_schema, &schema, &props)?; + // ArrowPredicate::evaluate is passed columns in the order they appear in the file + // If the predicate has multiple columns, we therefore must project the columns based + // on the order they appear in the file + let projection = match candidate.projection.len() { + 0 | 1 => vec![], + len => { + let mut projection: Vec<_> = (0..len).collect(); + projection.sort_unstable_by_key(|x| candidate.projection[*x]); + projection + } + }; + Ok(Self { physical_expr, - projection: ProjectionMask::roots( + projection, + projection_mask: ProjectionMask::roots( metadata.file_metadata().schema_descr(), candidate.projection, ), @@ -104,10 +118,15 @@ impl DatafusionArrowPredicate { impl ArrowPredicate for DatafusionArrowPredicate { fn projection(&self) -> &ProjectionMask { - &self.projection + &self.projection_mask } fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { + let batch = match self.projection.is_empty() { + true => batch, + false => batch.project(&self.projection)?, + }; + // scoped timer updates on drop let mut timer = self.time.timer(); match self