From 37cf00a7274f8e5dc844563df47f543d368fa8ab Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 22 Mar 2023 12:38:49 -0700 Subject: [PATCH 1/3] fix limit with no column --- datafusion/core/src/physical_plan/limit.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index e633c32920fb..3c0e6343c45a 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -31,7 +31,7 @@ use crate::physical_plan::{ }; use arrow::array::ArrayRef; use arrow::datatypes::SchemaRef; -use arrow::record_batch::RecordBatch; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use super::expressions::PhysicalSortExpr; use super::{ @@ -462,7 +462,8 @@ impl LimitStream { .iter() .map(|col| col.slice(0, col.len().min(batch_rows))) .collect(); - Some(RecordBatch::try_new(batch.schema(), limited_columns).unwrap()) + let options = RecordBatchOptions::new().with_row_count(Option::from(batch_rows)); + Some(RecordBatch::try_new_with_options(batch.schema(), limited_columns, &options).unwrap()) } } } From a9b726b0fb3a5af5610ad2de25886218a55e36f8 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Thu, 23 Mar 2023 02:04:35 -0700 Subject: [PATCH 2/3] test --- datafusion/core/src/physical_plan/limit.rs | 30 ++++++++++++++++++++++ datafusion/core/src/test/mod.rs | 10 +++++++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index 3c0e6343c45a..dafcedf5f6fd 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -602,6 +602,36 @@ mod tests { Ok(()) } + #[tokio::test] + async fn limit_no_column() -> Result<()> { + let batches = vec![ + test::make_batch_no_column(6), + test::make_batch_no_column(6), + test::make_batch_no_column(6), + ]; + let input = test::exec::TestStream::new(batches); + + let index = input.index(); + assert_eq!(index.value(), 0); + + // limit of six needs to consume the entire first record batch + // (6 rows) and stop immediately + let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let limit_stream = + LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics); + assert_eq!(index.value(), 0); + + let results = collect(Box::pin(limit_stream)).await.unwrap(); + let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum(); + // Only 6 rows should have been produced + assert_eq!(num_rows, 6); + + // Only the first batch should be consumed + assert_eq!(index.value(), 1); + + Ok(()) + } + // test cases for "skip" async fn skip_and_fetch(skip: usize, fetch: Option) -> Result { let session_ctx = SessionContext::new(); diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index f7113123cf98..54a8272c5f8d 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -33,7 +33,7 @@ use crate::test_util::{aggr_test_schema, arrow_test_data}; use array::ArrayRef; use arrow::array::{self, Array, Decimal128Builder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; #[cfg(feature = "compression")] use bzip2::write::BzEncoder; #[cfg(feature = "compression")] @@ -275,6 +275,14 @@ pub fn make_partition(sz: i32) -> RecordBatch { RecordBatch::try_new(schema, vec![arr]).unwrap() } +/// Return a RecordBatch with a single array with row_count sz +pub fn make_batch_no_column(sz: usize) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![])); + + let options = RecordBatchOptions::new().with_row_count(Option::from(sz)); + RecordBatch::try_new_with_options(schema, vec![], &options).unwrap() +} + /// Return a new table which provide this decimal column pub fn table_with_decimal() -> Arc { let batch_decimal = make_decimal(); From 706c83502995bc566d421d02fc58a259a741c84c Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Thu, 23 Mar 2023 13:22:48 -0700 Subject: [PATCH 3/3] fmt --- datafusion/core/src/physical_plan/limit.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index dafcedf5f6fd..bfeb9c65b9d5 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -462,8 +462,16 @@ impl LimitStream { .iter() .map(|col| col.slice(0, col.len().min(batch_rows))) .collect(); - let options = RecordBatchOptions::new().with_row_count(Option::from(batch_rows)); - Some(RecordBatch::try_new_with_options(batch.schema(), limited_columns, &options).unwrap()) + let options = + RecordBatchOptions::new().with_row_count(Option::from(batch_rows)); + Some( + RecordBatch::try_new_with_options( + batch.schema(), + limited_columns, + &options, + ) + .unwrap(), + ) } } }