Skip to content

Commit

Permalink
Executing LocalLimitExec with no column should not return an Err (#5709)
Browse files Browse the repository at this point in the history
* fix limit with no column

* test

* fmt
  • Loading branch information
kazuyukitanimura authored Mar 24, 2023
1 parent 74c3955 commit 7ddbfcc
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
43 changes: 41 additions & 2 deletions datafusion/core/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::physical_plan::{
};
use arrow::array::ArrayRef;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::record_batch::{RecordBatch, RecordBatchOptions};

use super::expressions::PhysicalSortExpr;
use super::{
Expand Down Expand Up @@ -462,7 +462,16 @@ impl LimitStream {
.iter()
.map(|col| col.slice(0, col.len().min(batch_rows)))
.collect();
Some(RecordBatch::try_new(batch.schema(), limited_columns).unwrap())
let options =
RecordBatchOptions::new().with_row_count(Option::from(batch_rows));
Some(
RecordBatch::try_new_with_options(
batch.schema(),
limited_columns,
&options,
)
.unwrap(),
)
}
}
}
Expand Down Expand Up @@ -601,6 +610,36 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn limit_no_column() -> Result<()> {
let batches = vec![
test::make_batch_no_column(6),
test::make_batch_no_column(6),
test::make_batch_no_column(6),
];
let input = test::exec::TestStream::new(batches);

let index = input.index();
assert_eq!(index.value(), 0);

// limit of six needs to consume the entire first record batch
// (6 rows) and stop immediately
let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let limit_stream =
LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
assert_eq!(index.value(), 0);

let results = collect(Box::pin(limit_stream)).await.unwrap();
let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
// Only 6 rows should have been produced
assert_eq!(num_rows, 6);

// Only the first batch should be consumed
assert_eq!(index.value(), 1);

Ok(())
}

// test cases for "skip"
async fn skip_and_fetch(skip: usize, fetch: Option<usize>) -> Result<usize> {
let session_ctx = SessionContext::new();
Expand Down
10 changes: 9 additions & 1 deletion datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::test_util::{aggr_test_schema, arrow_test_data};
use array::ArrayRef;
use arrow::array::{self, Array, Decimal128Builder, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
#[cfg(feature = "compression")]
use bzip2::write::BzEncoder;
#[cfg(feature = "compression")]
Expand Down Expand Up @@ -275,6 +275,14 @@ pub fn make_partition(sz: i32) -> RecordBatch {
RecordBatch::try_new(schema, vec![arr]).unwrap()
}

/// Return a RecordBatch with a single array with row_count sz
pub fn make_batch_no_column(sz: usize) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![]));

let options = RecordBatchOptions::new().with_row_count(Option::from(sz));
RecordBatch::try_new_with_options(schema, vec![], &options).unwrap()
}

/// Return a new table which provide this decimal column
pub fn table_with_decimal() -> Arc<dyn TableProvider> {
let batch_decimal = make_decimal();
Expand Down

0 comments on commit 7ddbfcc

Please sign in to comment.