Skip to content

Commit

Permalink
Add retry support to HostToGpuCoalesceIterator.concatAllAndPutOnGPU (
Browse files Browse the repository at this point in the history
…#9434)

Add retry support to HostToGpuCoalesceIterator.concatAllAndPutOnGPU


---------

Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman authored Oct 17, 2023
1 parent b85e5df commit 0341a28
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ public static abstract class GpuColumnarBatchBuilderBase implements AutoCloseabl

protected abstract ai.rapids.cudf.ColumnVector buildAndPutOnDevice(int builderIndex);

/** Try to build a ColumnarBatch, it can be called multiple times in case of failures */
public abstract ColumnarBatch tryBuild(int rows);

public ColumnarBatch build(int rows) {
return build(rows, this::buildAndPutOnDevice);
}
Expand Down Expand Up @@ -212,6 +215,12 @@ public ai.rapids.cudf.ArrowColumnBuilder builder(int i) {
return builders[i];
}

@Override
public ColumnarBatch tryBuild(int rows) {
// Arrow data should not be released until close is called.
return build(rows, i -> builders[i].buildAndPutOnDevice());
}

@Override
public void close() {
for (ai.rapids.cudf.ArrowColumnBuilder b: builders) {
Expand Down Expand Up @@ -300,6 +309,7 @@ public HostColumnVector[] buildHostColumns() {
* It is safe to call this multiple times, and data will be released
* after a call to `close`.
*/
@Override
public ColumnarBatch tryBuild(int rows) {
if (hostColumns == null) {
hostColumns = buildHostColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,9 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch],
// About to place data back on the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())

val ret = batchBuilder.build(totalRows)
val ret = RmmRapidsRetryIterator.withRetryNoSplit[ColumnarBatch]{
batchBuilder.tryBuild(totalRows)
}
val maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(ret)

// refine the estimate for number of rows based on this batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.jni.{RetryOOM, RmmSpark, SplitAndRetryOOM}
import com.nvidia.spark.rapids.jni.{RmmSpark, SplitAndRetryOOM}
import org.mockito.Mockito._
import org.scalatestplus.mockito.MockitoSugar

Expand Down Expand Up @@ -118,15 +118,10 @@ class GpuCoalesceBatchesRetrySuite
}
}

// this is a placeholder test. The HostToGpuCoalesceIterator is going to
// need a change in cuDF to make it retriable, so we are asserting here
// that the exception we could handle `RetryOOM` is being thrown.
test("coalesce gpu batches with retry host iter") {
val iter = getHostIter(injectRetry = 1)
assertThrows[RetryOOM] {
withResource(iter.next()) { coalesced =>
assertResult(10)(coalesced.numRows())
}
withResource(iter.next()) { coalesced =>
assertResult(10)(coalesced.numRows())
}
// ensure that this iterator _did not close_ the incoming batches
// as that is the semantics of the HostToGpuCoalesceIterator
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuColumnVector.GpuArrowColumnarBatchBuilder
import com.nvidia.spark.rapids.jni.RmmSpark
import org.apache.arrow.memory.{BufferAllocator, RootAllocator}
import org.apache.arrow.vector.IntVector

import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector}

class HostColumnToGpuRetrySuite extends RmmSparkRetrySuiteBase {

private val schema = StructType(Seq(StructField("a", IntegerType)))
private val NUM_ROWS = 50

private def buildArrowIntColumn(allocator: BufferAllocator): ColumnVector = {
val intVector = new IntVector("intVector", allocator)
intVector.allocateNew(NUM_ROWS)
(0 until NUM_ROWS).foreach { pos =>
intVector.set(pos, pos * 10)
}
intVector.setValueCount(NUM_ROWS)
new ArrowColumnVector(intVector)
}

test("Arrow column builder with retry OOM") {
withResource(new RootAllocator()) { allocator =>
val batch = withResource(new GpuArrowColumnarBatchBuilder(schema)) { builder =>
withResource(buildArrowIntColumn(allocator)) { arrowColumn =>
builder.copyColumnar(arrowColumn, 0, NUM_ROWS)
}
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmRapidsRetryIterator.withRetryNoSplit[ColumnarBatch] {
builder.tryBuild(NUM_ROWS)
}
}
withResource(batch) { _ =>
assertResult(NUM_ROWS)(batch.numRows())
assertResult(1)(batch.numCols())
withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol =>
withResource(buildArrowIntColumn(allocator)) { arrowCol =>
(0 until NUM_ROWS).foreach { pos =>
assert(hostCol.getInt(pos) == arrowCol.getInt(pos))
}
}
}
}
}
}

}

0 comments on commit 0341a28

Please sign in to comment.