From fa8ad401bed4df04c6d9e6745cd528b56d5820ec Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 15 Jan 2023 13:12:39 -0800 Subject: [PATCH] Executing ProjectionExec with no column should not return an Err (#4912) * Fix projection with no column * Fix clippy --- .../core/src/physical_plan/projection.rs | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index 14431b88951b..e418a8f93444 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -33,7 +33,7 @@ use crate::physical_plan::{ }; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; -use arrow::record_batch::RecordBatch; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use log::debug; use super::expressions::{Column, PhysicalSortExpr}; @@ -328,7 +328,13 @@ impl ProjectionStream { .map(|r| r.map(|v| v.into_array(batch.num_rows()))) .collect::>>()?; - RecordBatch::try_new(self.schema.clone(), arrays) + if arrays.is_empty() { + let options = + RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); + RecordBatch::try_new_with_options(self.schema.clone(), arrays, &options) + } else { + RecordBatch::try_new(self.schema.clone(), arrays) + } } } @@ -372,6 +378,7 @@ impl RecordBatchStream for ProjectionStream { mod tests { use super::*; + use crate::physical_plan::common::collect; use crate::physical_plan::expressions::{self, col}; use crate::prelude::SessionContext; use crate::scalar::ScalarValue; @@ -418,6 +425,22 @@ mod tests { Ok(()) } + #[tokio::test] + async fn project_no_column() -> Result<()> { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let csv = test::scan_partitioned_csv(1)?; + let expected = collect(csv.execute(0, task_ctx.clone())?).await.unwrap(); + + let projection = ProjectionExec::try_new(vec![], csv)?; + let stream = projection.execute(0, task_ctx.clone())?; + let output = collect(stream).await.unwrap(); + assert_eq!(output.len(), expected.len()); + + Ok(()) + } + #[tokio::test] async fn test_stats_projection_columns_only() { let source = Statistics {