From 34229e89b0c116acf87c8509175e73adc0a677e1 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 30 Oct 2022 08:56:07 +1300 Subject: [PATCH] Project columns within DatafusionArrowPredicate (#4005) (#4006) --- .../physical_plan/file_format/row_filter.rs | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs index dd9c8fb650fd..b8a8487247e4 100644 --- a/datafusion/core/src/physical_plan/file_format/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs @@ -65,7 +65,8 @@ use std::sync::Arc; #[derive(Debug)] pub(crate) struct DatafusionArrowPredicate { physical_expr: Arc, - projection: ProjectionMask, + projection_mask: ProjectionMask, + projection: Vec, } impl DatafusionArrowPredicate { @@ -82,22 +83,40 @@ impl DatafusionArrowPredicate { let physical_expr = create_physical_expr(&candidate.expr, &df_schema, &schema, &props)?; + // ArrowPredicate::evaluate is passed columns in the order they appear in the file + // If the predicate has multiple columns, we therefore must project the columns based + // on the order they appear in the file + let projection = match candidate.projection.len() { + 0 | 1 => vec![], + len => { + let mut projection: Vec<_> = (0..len).collect(); + projection.sort_by_key(|x| candidate.projection[*x]); + projection + } + }; + Ok(Self { physical_expr, - projection: ProjectionMask::roots( + projection, + projection_mask: ProjectionMask::roots( metadata.file_metadata().schema_descr(), candidate.projection, - ), + ) }) } } impl ArrowPredicate for DatafusionArrowPredicate { fn projection(&self) -> &ProjectionMask { - &self.projection + &self.projection_mask } fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { + let batch = match self.projection.is_empty() { + true => batch, + false => batch.project(&self.projection)? + }; + match self .physical_expr .evaluate(&batch)