From 07c61638cb666e1308720fb461bc33bf48bb9448 Mon Sep 17 00:00:00 2001 From: Liangcai Li Date: Tue, 5 Dec 2023 09:25:57 +0800 Subject: [PATCH] Fix a potential data corruption for Pandas UDF (#9942) This PR moves the BatchQueue into the DataProducer to share the same lock as the output iterator returned by asIterator, and make the batch movement from the input iterator to the batch queue be an atomic operation to eliminate the race when appending the batches to the queue. --- .../python/GpuAggregateInPandasExec.scala | 21 +- .../python/GpuArrowEvalPythonExec.scala | 203 +++++++++--------- .../python/GpuWindowInPandasExecBase.scala | 21 +- .../rapids/shims/GpuWindowInPandasExec.scala | 23 +- 4 files changed, 117 insertions(+), 151 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala index caf323ec053..3f3f2803f5c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala @@ -20,7 +20,7 @@ import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import com.nvidia.spark.rapids.shims.ShimUnaryExecNode @@ -194,23 +194,14 @@ case class GpuAggregateInPandasExec( } val batchProducer = new BatchProducer( - BatchGroupedIterator(miniIter, miniAttrs, groupingRefs.indices)) - val queue = new BatchQueue(batchProducer, Some(keyConverter)) - val pyInputIter = batchProducer.asIterator.map { case (batch, isForPeek) => - val inputBatch = closeOnExcept(batch) { _ => + BatchGroupedIterator(miniIter, miniAttrs, groupingRefs.indices), Some(keyConverter)) + val pyInputIter = batchProducer.asIterator.map { batch => + withResource(batch) { _ => val pyInputColumns = pyInputRefs.indices.safeMap { idx => batch.column(idx + groupingRefs.size).asInstanceOf[GpuColumnVector].incRefCount() } new ColumnarBatch(pyInputColumns.toArray, batch.numRows()) } - if (isForPeek) { - batch.close() - } else { - // When adding batch to the queue, queue will convert it to a key batch because this - // queue is constructed with the key converter. - queue.add(batch) - } - inputBatch } // Third, sends to Python to execute the aggregate and returns the result. @@ -232,8 +223,8 @@ case class GpuAggregateInPandasExec( val combinedAttrs = gpuGroupingExpressions.map(_.toAttribute) ++ pyOutAttributes val resultRefs = GpuBindReferences.bindGpuReferences(resultExprs, combinedAttrs) // Gets the combined batch for each group and projects for the output. - new CombiningIterator(queue, pyOutputIterator, pyRunner, mNumOutputRows, - mNumOutputBatches).map { combinedBatch => + new CombiningIterator(batchProducer.getBatchQueue, pyOutputIterator, pyRunner, + mNumOutputRows, mNumOutputBatches).map { combinedBatch => withResource(combinedBatch) { batch => GpuProjectExec.project(batch, resultRefs) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala index 60b6b3929e1..5e588cae7bd 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala @@ -171,29 +171,56 @@ class RebatchingRoundoffIterator( } /** - * Work with BatchQueue to support BatchQueue's peek operation by pulling - * in a batch from the input iterator on demand. + * A trait provides dedicated APIs for GPU reading batches from python. + * This is also for easy type declarations since it is implemented by an inner class + * of BatchProducer. + */ +trait BatchQueue { + /** Return and remove the first batch in the cache. Caller should close it. */ + def remove(): SpillableColumnarBatch + + /** Get the number of rows in the next batch, without actually getting the batch. */ + def peekBatchNumRows(): Int +} + +/** + * It accepts an iterator as input and will cache the batches when pulling them in from + * the input for later combination with batches coming back from python by the reader. + * It also supports an optional converter to convert input batches and put the converted + * result to the cache queue. This is for GpuAggregateInPandas to build and cache key + * batches. * - * It also supports accessing batches from the input by an iterator. Call - * "asIterator" to get the iterator. This iterator will return a tuple of - * ColumnarBatch and Boolean. And the boolean indicates whether the batch - * is pulled in for peak. + * Call "getBatchQueue" to get the internal cache queue and specify it to the output + * combination iterator. + * To access the batches from input, call "asIterator" to get the output iterator. */ -class BatchProducer(input: Iterator[ColumnarBatch]) extends AutoCloseable { producer => +class BatchProducer( + input: Iterator[ColumnarBatch], + converter: Option[ColumnarBatch => ColumnarBatch] = None +) extends AutoCloseable { producer => Option(TaskContext.get()).foreach(onTaskCompletion(_)(close())) - // Cache for batches pulled in by the "produce" call for the peek operation. - // In fact, there is usually only one batch. But using a queue here is because in + // A queue that holds the pending batches that need to line up with and combined + // with batches coming back from python. + private[this] val batchQueue = new BatchQueueImpl + + /** Get the internal BatchQueue */ + def getBatchQueue: BatchQueue = batchQueue + + // The cache that holds the pending batches pulled in by the "produce" call for + // the reader peeking the next rows number when the "batchQueue" is empty, and + // consumed by the iterator returned from "asIterator". + // (In fact, there is usually only ONE batch. But using a queue here is because in // theory "produce" can be called multiple times, then more than one batch can be - // pulled in. - private val pending = mutable.Queue[SpillableColumnarBatch]() + // pulled in.) + private[this] val pendingOutput = mutable.Queue[SpillableColumnarBatch]() - private[rapids] def produce(): ColumnarBatch = producer.synchronized { + private def produce(): ColumnarBatch = { if (input.hasNext) { val cb = input.next() // Need to duplicate this batch for "next" - pending.enqueue(SpillableColumnarBatch(GpuColumnVector.incRefCounts(cb), + pendingOutput.enqueue(SpillableColumnarBatch(GpuColumnVector.incRefCounts(cb), SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) cb } else { @@ -201,113 +228,94 @@ class BatchProducer(input: Iterator[ColumnarBatch]) extends AutoCloseable { prod } } - def asIterator: Iterator[(ColumnarBatch, Boolean)] = { - new Iterator[(ColumnarBatch, Boolean)] { + /** Return an iterator to access the batches from the input */ + def asIterator: Iterator[ColumnarBatch] = { + new Iterator[ColumnarBatch] { override def hasNext: Boolean = producer.synchronized { - pending.nonEmpty || input.hasNext + pendingOutput.nonEmpty || input.hasNext } - override def next(): (ColumnarBatch, Boolean) = producer.synchronized { + override def next(): ColumnarBatch = producer.synchronized { if (!hasNext) { throw new NoSuchElementException() } - if (pending.nonEmpty) { - withResource(pending.dequeue()) { scb => - (scb.getColumnarBatch(), true) + if (pendingOutput.nonEmpty) { + withResource(pendingOutput.dequeue()) { scb => + scb.getColumnarBatch() } } else { - (input.next(), false) + closeOnExcept(input.next()) { cb => + // Need to duplicate it for later combination with Python output + batchQueue.add(GpuColumnVector.incRefCounts(cb)) + cb + } } } } } - override def close(): Unit = synchronized { - while(pending.nonEmpty) { - pending.dequeue().close() + override def close(): Unit = producer.synchronized { + batchQueue.close() + while (pendingOutput.nonEmpty) { + pendingOutput.dequeue().close() } } -} - -/** - * A simple queue that holds the pending batches that need to line up with - * and combined with batches coming back from python. - * - * It will ask for a batch from "batchProducer" when peeking the rows number - * and the queue is empty. - * It also supports an optional converter to convert the input batch and save - * the converted batch. This is design for the GpuAggregateInPandasExec to save - * the group key instead of the original input batch. - */ -class BatchQueue( - batchProducer: BatchProducer, - converter: Option[ColumnarBatch => ColumnarBatch] = None -) extends AutoCloseable { - - assert(batchProducer != null, "BatchQueue requires a BatchProducer") - Option(TaskContext.get()).foreach(onTaskCompletion(_)(close())) - - private val queue = mutable.ArrayBuffer[SpillableColumnarBatch]() - - private[this] def convertIfAny(batch: ColumnarBatch): ColumnarBatch = { - converter.map { convert => - withResource(batch)(convert) - }.getOrElse(batch) - } - /** Add a batch to the queue, the input batch will be taken over, do not use it anymore */ - def add(batch: ColumnarBatch): Unit = { - val cb = convertIfAny(batch) - this.synchronized { - queue.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + // Put this batch queue inside the BatchProducer to share the same lock with the + // output iterator returned by "asIterator" and make sure the batch movement from + // input iterator to this queue is an atomic operation. + // In a two-threaded Python runner, using two locks to protect the batch pulling + // from the input and the batch queue separately can not ensure batches in the + // queue has the same order as they are pulled in from the input. Because there is + // a race when the reader and the writer append batches to the queue. + // One possible case is: + // 1) the writer thread gets a batch A, but next it pauses. + // 2) then the reader thread gets the next Batch B, and appends it to the queue. + // 3) the writer thread restores and appends batch A to the queue. + // Therefore, batch A and B have the reversed order in the queue now, leading to data + // corruption when doing the combination. + private class BatchQueueImpl extends BatchQueue with AutoCloseable { + private val queue = mutable.Queue[SpillableColumnarBatch]() + + /** Add a batch to the queue, the input batch will be taken over, do not use it anymore */ + private[python] def add(batch: ColumnarBatch): Unit = { + val cb = converter.map { convert => + withResource(batch)(convert) + }.getOrElse(batch) + queue.enqueue(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) } - } - /** Return and remove the first batch in the cache. */ - def remove(): SpillableColumnarBatch = synchronized { - if (queue.isEmpty) { - null - } else { - queue.remove(0) + /** Return and remove the first batch in the cache. Caller should close it */ + override def remove(): SpillableColumnarBatch = producer.synchronized { + if (queue.isEmpty) { + null + } else { + queue.dequeue() + } } - } - /** Get the number of rows in the next batch, without actually getting the batch. */ - def peekBatchNumRows(): Int = { - val isEmpty = this.synchronized { - queue.isEmpty - } - if (isEmpty) { - // Try to ask for the next batch instead of waiting for inserting a - // batch by the python runner's writing. Because the writing may - // happen after this peak in the single threaded python runner, leading - // to a hang. - // Do not call it inside a lock to avoid any dead lock. - val nextBatch = batchProducer.produce() - if (nextBatch != null) { - val cb = convertIfAny(nextBatch) - this.synchronized { - // Since we release the lock for some time, it is possible some batches - // have been added into the queue. Then we need to make sure this batch - // is the first one. - queue.insert(0, SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + /** Get the number of rows in the next batch, without actually getting the batch. */ + override def peekBatchNumRows(): Int = producer.synchronized { + // Try to pull in the next batch for peek + if (queue.isEmpty) { + val cb = produce() + if (cb != null) { + add(cb) } } - } - this.synchronized { if (queue.nonEmpty) { queue.head.numRows() } else { 0 // Should not go here but just in case. } } - } - override def close(): Unit = synchronized { - while (queue.nonEmpty) { - queue.remove(0).close() + override def close(): Unit = producer.synchronized { + while (queue.nonEmpty) { + queue.dequeue().close() + } } } } @@ -399,19 +407,8 @@ case class GpuArrowEvalPythonExec( val batchProducer = new BatchProducer( new RebatchingRoundoffIterator(iter, inputSchema, targetBatchSize, numInputRows, numInputBatches)) - val queue = new BatchQueue(batchProducer) - val pyInputIterator = batchProducer.asIterator.map { case (batch, isForPeek) => - // We have to do the project before we add the batch because the batch might be closed - // when it is added - val ret = closeOnExcept(batch)(GpuProjectExec.project(_, boundReferences)) - if (isForPeek) { - batch.close() - } else { - // We only add the batch that is not for peek, because the batch for peek is already - // added by the reader when peeking the next rows number. - queue.add(batch) - } - ret + val pyInputIterator = batchProducer.asIterator.map { batch => + withResource(batch)(GpuProjectExec.project(_, boundReferences)) } if (isPythonOnGpuEnabled) { @@ -431,7 +428,7 @@ case class GpuArrowEvalPythonExec( pythonOutputSchema) val outputIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context) - new CombiningIterator(queue, outputIterator, pyRunner, numOutputRows, + new CombiningIterator(batchProducer.getBatchQueue, outputIterator, pyRunner, numOutputRows, numOutputBatches) } else { // Empty partition, return it directly diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala index 12e2258aaaf..ab56a0b24b5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf import ai.rapids.cudf.{GroupByAggregation, NullPolicy, OrderByArg} import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.python.PythonWorkerSemaphore @@ -505,24 +505,13 @@ trait GpuWindowInPandasExecBase extends ShimUnaryExecNode with GpuPythonExecBase val boundPartitionRefs = GpuBindReferences.bindGpuReferences(gpuPartitionSpec, childOutput) val batchProducer = new BatchProducer( new GroupingIterator(inputIter, boundPartitionRefs, numInputRows, numInputBatches)) - val queue = new BatchQueue(batchProducer) - val pyInputIterator = batchProducer.asIterator.map { case (batch, isForPeek) => - // We have to do the project before we add the batch because the batch might be closed - // when it is added - val inputBatch = closeOnExcept(batch) { _ => + val pyInputIterator = batchProducer.asIterator.map { batch => + withResource(batch) { _ => withResource(GpuProjectExec.project(batch, boundDataRefs)) { projectedCb => // Compute the window bounds and insert to the head of each row for one batch insertWindowBounds(projectedCb) } } - if (isForPeek) { - batch.close() - } else { - // We only add the batch that is not for peek, because the batch for peek is already - // added by the reader when peeking the next rows number. - queue.add(batch) - } - inputBatch } if (isPythonOnGpuEnabled) { @@ -543,8 +532,8 @@ trait GpuWindowInPandasExecBase extends ShimUnaryExecNode with GpuPythonExecBase pythonOutputSchema) val outputIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context) - new CombiningIterator(queue, outputIterator, pyRunner, numOutputRows, - numOutputBatches).map(projectResult) + new CombiningIterator(batchProducer.getBatchQueue, outputIterator, pyRunner, + numOutputRows, numOutputBatches).map(projectResult) } else { // Empty partition, return the input iterator directly inputIter diff --git a/sql-plugin/src/main/spark321db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala b/sql-plugin/src/main/spark321db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala index 3d0d3450320..988cbe2521c 100644 --- a/sql-plugin/src/main/spark321db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala +++ b/sql-plugin/src/main/spark321db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala @@ -25,7 +25,7 @@ package com.nvidia.spark.rapids.shims import scala.collection.mutable.ArrayBuffer import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import org.apache.spark.TaskContext @@ -33,7 +33,7 @@ import org.apache.spark.api.python.PythonEvalType import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.rapids.execution.python.{BatchProducer, BatchQueue, CombiningIterator, GpuPythonHelper, GpuWindowInPandasExecBase, GroupingIterator} +import org.apache.spark.sql.rapids.execution.python.{BatchProducer, CombiningIterator, GpuPythonHelper, GpuWindowInPandasExecBase, GroupingIterator} import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} @@ -199,24 +199,13 @@ case class GpuWindowInPandasExec( val boundPartitionRefs = GpuBindReferences.bindGpuReferences(gpuPartitionSpec, childOutput) val batchProducer = new BatchProducer( new GroupingIterator(inputIter, boundPartitionRefs, numInputRows, numInputBatches)) - val queue = new BatchQueue(batchProducer) - val pyInputIterator = batchProducer.asIterator.map { case (batch, isForPeek) => - // We have to do the project before we add the batch because the batch might be closed - // when it is added - val inputBatch = closeOnExcept(batch) { _ => + val pyInputIterator = batchProducer.asIterator.map { batch => + withResource(batch) { _ => withResource(GpuProjectExec.project(batch, boundDataRefs)) { projectedCb => // Compute the window bounds and insert to the head of each row for one batch insertWindowBounds(projectedCb) } } - if (isForPeek) { - batch.close() - } else { - // We only add the batch that is not for peek, because the batch for peek is already - // added by the reader when peeking the next rows number. - queue.add(batch) - } - inputBatch } if (isPythonOnGpuEnabled) { @@ -237,8 +226,8 @@ case class GpuWindowInPandasExec( pythonOutputSchema) val outputIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context) - new CombiningIterator(queue, outputIterator, pyRunner, numOutputRows, - numOutputBatches).map(projectResult) + new CombiningIterator(batchProducer.getBatchQueue, outputIterator, pyRunner, + numOutputRows, numOutputBatches).map(projectResult) } else { // Empty partition, return the input iterator directly inputIter