From b111bbac544afdb956d8af18a8bfbd54fe391eeb Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 26 Sep 2024 12:18:23 -0700 Subject: [PATCH] fix: Use logical row count from RecordBatch --- .../org/apache/comet/vector/NativeUtil.scala | 15 ++++++++++++++- native/core/src/execution/operators/scan.rs | 4 ++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index 33af8662fe..742d8e2285 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -92,11 +92,15 @@ class NativeUtil { arrayAddrs: Array[Long], schemaAddrs: Array[Long], batch: ColumnarBatch): Int = { + val numRows = mutable.ArrayBuffer.empty[Int] + (0 until batch.numCols()).foreach { index => batch.column(index) match { case a: CometVector => val valueVector = a.getValueVector + numRows += valueVector.getValueCount + val provider = if (valueVector.getField.getDictionary != null) { a.getDictionaryProvider } else { @@ -120,7 +124,16 @@ class NativeUtil { } } - batch.numRows() + if (numRows.distinct.length != 1) { + throw new SparkException( + s"Number of rows in each column should be the same, but got [${numRows.distinct}]") + } + + // `ColumnarBatch.numRows` might return a different number than the actual number of rows in + // the Arrow arrays. For example, Iceberg column reader will skip deleted rows internally in + // its `CometVector` implementation. The `ColumnarBatch` returned by the reader will report + // logical number of rows which is less than actual number of rows due to row deletion. + numRows.headOption.getOrElse(batch.numRows()) } /** diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 7d75f7f1cd..3da430dd70 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -415,6 +415,10 @@ impl<'a> Stream for ScanStream<'a> { InputBatch::EOF => Poll::Ready(None), InputBatch::Batch(columns, num_rows) => { self.baseline_metrics.record_output(*num_rows); + + println!("columns: {:?}", columns); + println!("num_rows: {:?}", num_rows); + let maybe_batch = self.build_record_batch(columns, *num_rows); Poll::Ready(Some(maybe_batch)) }