From 04214329a37e7d7728f34a4a91c16e9eafdba711 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 26 Sep 2024 12:18:23 -0700 Subject: [PATCH] fix: Use logical row count from RecordBatch --- .../org/apache/comet/vector/NativeUtil.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index 33af8662fe..742d8e2285 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -92,11 +92,15 @@ class NativeUtil { arrayAddrs: Array[Long], schemaAddrs: Array[Long], batch: ColumnarBatch): Int = { + val numRows = mutable.ArrayBuffer.empty[Int] + (0 until batch.numCols()).foreach { index => batch.column(index) match { case a: CometVector => val valueVector = a.getValueVector + numRows += valueVector.getValueCount + val provider = if (valueVector.getField.getDictionary != null) { a.getDictionaryProvider } else { @@ -120,7 +124,16 @@ class NativeUtil { } } - batch.numRows() + if (numRows.distinct.length != 1) { + throw new SparkException( + s"Number of rows in each column should be the same, but got [${numRows.distinct}]") + } + + // `ColumnarBatch.numRows` might return a different number than the actual number of rows in + // the Arrow arrays. For example, Iceberg column reader will skip deleted rows internally in + // its `CometVector` implementation. The `ColumnarBatch` returned by the reader will report + // logical number of rows which is less than actual number of rows due to row deletion. + numRows.headOption.getOrElse(batch.numRows()) } /**