From 7ddbfcc886d7aaf45290df439756fff1a6526b81 Mon Sep 17 00:00:00 2001 From: KAZUYUKI TANIMURA Date: Fri, 24 Mar 2023 11:25:31 -0700 Subject: [PATCH] Executing LocalLimitExec with no column should not return an Err (#5709) * fix limit with no column * test * fmt --- datafusion/core/src/physical_plan/limit.rs | 43 +++++++++++++++++++++- datafusion/core/src/test/mod.rs | 10 ++++- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index e633c32920fb..bfeb9c65b9d5 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -31,7 +31,7 @@ use crate::physical_plan::{ }; use arrow::array::ArrayRef; use arrow::datatypes::SchemaRef; -use arrow::record_batch::RecordBatch; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use super::expressions::PhysicalSortExpr; use super::{ @@ -462,7 +462,16 @@ impl LimitStream { .iter() .map(|col| col.slice(0, col.len().min(batch_rows))) .collect(); - Some(RecordBatch::try_new(batch.schema(), limited_columns).unwrap()) + let options = + RecordBatchOptions::new().with_row_count(Option::from(batch_rows)); + Some( + RecordBatch::try_new_with_options( + batch.schema(), + limited_columns, + &options, + ) + .unwrap(), + ) } } } @@ -601,6 +610,36 @@ mod tests { Ok(()) } + #[tokio::test] + async fn limit_no_column() -> Result<()> { + let batches = vec![ + test::make_batch_no_column(6), + test::make_batch_no_column(6), + test::make_batch_no_column(6), + ]; + let input = test::exec::TestStream::new(batches); + + let index = input.index(); + assert_eq!(index.value(), 0); + + // limit of six needs to consume the entire first record batch + // (6 rows) and stop immediately + let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let limit_stream = + LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics); + assert_eq!(index.value(), 0); + + let results = collect(Box::pin(limit_stream)).await.unwrap(); + let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum(); + // Only 6 rows should have been produced + assert_eq!(num_rows, 6); + + // Only the first batch should be consumed + assert_eq!(index.value(), 1); + + Ok(()) + } + // test cases for "skip" async fn skip_and_fetch(skip: usize, fetch: Option) -> Result { let session_ctx = SessionContext::new(); diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index f7113123cf98..54a8272c5f8d 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -33,7 +33,7 @@ use crate::test_util::{aggr_test_schema, arrow_test_data}; use array::ArrayRef; use arrow::array::{self, Array, Decimal128Builder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; #[cfg(feature = "compression")] use bzip2::write::BzEncoder; #[cfg(feature = "compression")] @@ -275,6 +275,14 @@ pub fn make_partition(sz: i32) -> RecordBatch { RecordBatch::try_new(schema, vec![arr]).unwrap() } +/// Return a RecordBatch with a single array with row_count sz +pub fn make_batch_no_column(sz: usize) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![])); + + let options = RecordBatchOptions::new().with_row_count(Option::from(sz)); + RecordBatch::try_new_with_options(schema, vec![], &options).unwrap() +} + /// Return a new table which provide this decimal column pub fn table_with_decimal() -> Arc { let batch_decimal = make_decimal();