Skip to content

Commit

Permalink
fix: Use logical row count from RecordBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Sep 26, 2024
1 parent b4de8e0 commit b111bba
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
15 changes: 14 additions & 1 deletion common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,15 @@ class NativeUtil {
arrayAddrs: Array[Long],
schemaAddrs: Array[Long],
batch: ColumnarBatch): Int = {
val numRows = mutable.ArrayBuffer.empty[Int]

(0 until batch.numCols()).foreach { index =>
batch.column(index) match {
case a: CometVector =>
val valueVector = a.getValueVector

numRows += valueVector.getValueCount

val provider = if (valueVector.getField.getDictionary != null) {
a.getDictionaryProvider
} else {
Expand All @@ -120,7 +124,16 @@ class NativeUtil {
}
}

batch.numRows()
if (numRows.distinct.length != 1) {
throw new SparkException(
s"Number of rows in each column should be the same, but got [${numRows.distinct}]")
}

// `ColumnarBatch.numRows` might return a different number than the actual number of rows in
// the Arrow arrays. For example, Iceberg column reader will skip deleted rows internally in
// its `CometVector` implementation. The `ColumnarBatch` returned by the reader will report
// logical number of rows which is less than actual number of rows due to row deletion.
numRows.headOption.getOrElse(batch.numRows())
}

/**
Expand Down
4 changes: 4 additions & 0 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,10 @@ impl<'a> Stream for ScanStream<'a> {
InputBatch::EOF => Poll::Ready(None),
InputBatch::Batch(columns, num_rows) => {
self.baseline_metrics.record_output(*num_rows);

println!("columns: {:?}", columns);
println!("num_rows: {:?}", num_rows);

let maybe_batch = self.build_record_batch(columns, *num_rows);
Poll::Ready(Some(maybe_batch))
}
Expand Down

0 comments on commit b111bba

Please sign in to comment.