Skip to content

Commit

Permalink
fix: Use the number of rows from underlying arrays instead of logical…
Browse files Browse the repository at this point in the history
… row count from RecordBatch (#972)
  • Loading branch information
viirya authored Sep 27, 2024
1 parent 22561c4 commit 317a534
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,15 @@ class NativeUtil {
arrayAddrs: Array[Long],
schemaAddrs: Array[Long],
batch: ColumnarBatch): Int = {
val numRows = mutable.ArrayBuffer.empty[Int]

(0 until batch.numCols()).foreach { index =>
batch.column(index) match {
case a: CometVector =>
val valueVector = a.getValueVector

numRows += valueVector.getValueCount

val provider = if (valueVector.getField.getDictionary != null) {
a.getDictionaryProvider
} else {
Expand All @@ -120,7 +124,16 @@ class NativeUtil {
}
}

batch.numRows()
if (numRows.distinct.length > 1) {
throw new SparkException(
s"Number of rows in each column should be the same, but got [${numRows.distinct}]")
}

// `ColumnarBatch.numRows` might return a different number than the actual number of rows in
// the Arrow arrays. For example, Iceberg column reader will skip deleted rows internally in
// its `CometVector` implementation. The `ColumnarBatch` returned by the reader will report
// logical number of rows which is less than actual number of rows due to row deletion.
numRows.headOption.getOrElse(batch.numRows())
}

/**
Expand Down

0 comments on commit 317a534

Please sign in to comment.