Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow using a custom Spark Resource Name for a GPU #4024

Merged
merged 9 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Name | Description | Default Value
-----|-------------|--------------
<a name="alluxio.pathsToReplace"></a>spark.rapids.alluxio.pathsToReplace|List of paths to be replaced with corresponding alluxio scheme. Eg, when configureis set to "s3:/foo->alluxio://0.1.2.3:19998/foo,gcs:/bar->alluxio://0.1.2.3:19998/bar", which means: s3:/foo/a.csv will be replaced to alluxio://0.1.2.3:19998/foo/a.csv and gcs:/bar/b.csv will be replaced to alluxio://0.1.2.3:19998/bar/b.csv|None
<a name="cloudSchemes"></a>spark.rapids.cloudSchemes|Comma separated list of additional URI schemes that are to be considered cloud based filesystems. Schemes already included: dbfs, s3, s3a, s3n, wasbs, gs. Cloud based stores generally would be total separate from the executors and likely have a higher I/O read cost. Many times the cloud filesystems also get better throughput when you have multiple readers in parallel. This is used with spark.rapids.sql.format.parquet.reader.type|None
<a name="gpu.resourceName"></a>spark.rapids.gpu.resourceName|The name of the Spark resource that represents a GPU that you want the plugin to use if using custom resources with Spark.|gpu
<a name="memory.gpu.allocFraction"></a>spark.rapids.memory.gpu.allocFraction|The fraction of available (free) GPU memory that should be allocated for pooled memory. This must be less than or equal to the maximum limit configured via spark.rapids.memory.gpu.maxAllocFraction, and greater than or equal to the minimum limit configured via spark.rapids.memory.gpu.minAllocFraction.|1.0
<a name="memory.gpu.debug"></a>spark.rapids.memory.gpu.debug|Provides a log of GPU memory allocations and frees. If set to STDOUT or STDERR the logging will go there. Setting it to NONE disables logging. All other values are reserved for possible future expansion and in the mean time will disable logging.|NONE
<a name="memory.gpu.direct.storage.spill.batchWriteBuffer.size"></a>spark.rapids.memory.gpu.direct.storage.spill.batchWriteBuffer.size|The size of the GPU memory buffer used to batch small buffers when spilling to GDS. Note that this buffer is mapped to the PCI Base Address Register (BAR) space, which may be very limited on some GPUs (e.g. the NVIDIA T4 only has 256 MiB), and it is also used by UCX bounce buffers.|8388608
Expand Down
10 changes: 9 additions & 1 deletion sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
Expand Down Expand Up @@ -368,7 +373,10 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>

<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,12 @@ object GpuDeviceManager extends Logging {
addr
}

def getGPUAddrFromResources(resources: Map[String, ResourceInformation]): Option[Int] = {
if (resources.contains("gpu")) {
val addrs = resources("gpu").addresses
def getGPUAddrFromResources(resources: Map[String, ResourceInformation],
conf: RapidsConf): Option[Int] = {
val sparkGpuResourceName = conf.getSparkGpuResourceName
if (resources.contains(sparkGpuResourceName)) {
logDebug(s"Spark resources contain: $sparkGpuResourceName")
val addrs = resources(sparkGpuResourceName).addresses
if (addrs.length > 1) {
// Throw an exception since we assume one GPU per executor.
// If multiple GPUs are allocated by spark, then different tasks could get assigned
Expand All @@ -118,16 +121,17 @@ object GpuDeviceManager extends Logging {

// Initializes the GPU if Spark assigned one.
// Returns either the GPU addr Spark assigned or None if Spark didn't assign one.
def initializeGpu(resources: Map[String, ResourceInformation]): Option[Int] = {
getGPUAddrFromResources(resources).map(setGpuDeviceAndAcquire(_))
def initializeGpu(resources: Map[String, ResourceInformation], conf: RapidsConf): Option[Int] = {
getGPUAddrFromResources(resources, conf).map(setGpuDeviceAndAcquire(_))
}

def initializeGpuAndMemory(resources: Map[String, ResourceInformation]): Unit = {
def initializeGpuAndMemory(resources: Map[String, ResourceInformation],
conf: RapidsConf): Unit = {
// Set the GPU before RMM is initialized if spark provided the GPU address so that RMM
// uses that GPU. We only need to initialize RMM once per Executor because we are relying on
// only 1 GPU per executor.
// If Spark didn't provide the address we just use the default GPU.
val addr = initializeGpu(resources)
val addr = initializeGpu(resources, conf)
initializeMemory(addr)
}

Expand All @@ -153,11 +157,12 @@ object GpuDeviceManager extends Logging {
def initializeFromTask(): Unit = {
if (threadGpuInitialized.get() == false) {
val resources = getResourcesFromTaskContext
val conf = new RapidsConf(SparkEnv.get.conf)
if (rmmTaskInitEnabled) {
initializeGpuAndMemory(resources)
initializeGpuAndMemory(resources, conf)
} else {
// just set the device if provided so task thread uses right GPU
initializeGpu(resources)
initializeGpu(resources, conf)
}
threadGpuInitialized.set(true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
// on executor startup.
if (!GpuDeviceManager.rmmTaskInitEnabled) {
logInfo("Initializing memory from Executor Plugin")
GpuDeviceManager.initializeGpuAndMemory(pluginContext.resources().asScala.toMap)
GpuDeviceManager.initializeGpuAndMemory(pluginContext.resources().asScala.toMap, conf)
if (GpuShuffleEnv.isRapidsShuffleAvailable) {
GpuShuffleEnv.initShuffleManager()
if (conf.shuffleTransportEarlyStart) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,12 @@ object RapidsConf {
.booleanConf
.createWithDefault(value = true)

val SPARK_GPU_RESOURCE_NAME = conf("spark.rapids.gpu.resourceName")
.doc("The name of the Spark resource that represents a GPU that you want the plugin to use " +
"if using custom resources with Spark.")
.stringConf
.createWithDefault("gpu")

val SUPPRESS_PLANNING_FAILURE = conf("spark.rapids.sql.suppressPlanningFailure")
.doc("Option to fallback an individual query to CPU if an unexpected condition prevents the " +
"query plan from being converted to a GPU-enabled one. Note this is different from " +
Expand Down Expand Up @@ -1719,6 +1725,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isRangeWindowLongEnabled: Boolean = get(ENABLE_RANGE_WINDOW_LONG)

lazy val getSparkGpuResourceName: String = get(SPARK_GPU_RESOURCE_NAME)

lazy val isCpuBasedUDFEnabled: Boolean = get(ENABLE_CPU_BASED_UDF)

private val optimizerDefaults = Map(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import org.scalatest.{Assertion, FunSuite}

import org.apache.spark.SparkConf
import org.apache.spark.resource.ResourceInformation

class GpuDeviceManagerSuite extends FunSuite {

test("Test Spark gpu resource") {
val sparkConf = new SparkConf()
val conf = new RapidsConf(sparkConf)
val gpu = new ResourceInformation("gpu", Array("3"))
val resources = Map("gpu" -> gpu)
val gpuAddr = GpuDeviceManager.getGPUAddrFromResources(resources, conf)
assert(gpuAddr.nonEmpty)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(gpuAddr.get == 3)
}

test("Test Spark custom resource missed") {
val sparkConf = new SparkConf()
val conf = new RapidsConf(sparkConf)
val gpu = new ResourceInformation("nvidia/gpu", Array("2"))
val resources = Map("nvidia/gpu" -> gpu)
val gpuAddr = GpuDeviceManager.getGPUAddrFromResources(resources, conf)
assert(gpuAddr.isEmpty)
}

test("Test Spark multiple GPUs throws") {
val sparkConf = new SparkConf()
val conf = new RapidsConf(sparkConf)
val gpu = new ResourceInformation("gpu", Array("2", "3"))
val resources = Map("gpu" -> gpu)
assertThrows[IllegalArgumentException](
GpuDeviceManager.getGPUAddrFromResources(resources, conf))
}

test("Test Spark custom resource") {
val sparkConf = new SparkConf()
sparkConf.set(RapidsConf.SPARK_GPU_RESOURCE_NAME.toString, "nvidia/gpu")
val conf = new RapidsConf(sparkConf)
val gpu = new ResourceInformation("nvidia/gpu", Array("1"))
val resources = Map("nvidia/gpu" -> gpu)
val gpuAddr = GpuDeviceManager.getGPUAddrFromResources(resources, conf)
assert(gpuAddr.nonEmpty)
assert(gpuAddr.get == 1)
}
}