Skip to content

Commit

Permalink
Extended configuration of OOM injection mode (#10013)
Browse files Browse the repository at this point in the history
This PR enables specifying OOM injection mode with additional config

such as
- num_ooms to inject
- skip=N to make triger first OOM on N+1st allocation
- type with the current default mix `CPU_OR_GPU`,  `CPU` (just host allocations), `GPU` (just device allocations)
- split=bool whether to inject SplitAndRetryOOM

Enables IT invocations such as :

```
PYSP_TEST_spark_rapids_memory_gpu_state_debug=STDERR \
TEST_PARALLEL=0 \
SPARK_HOME=~/dist/spark-3.3.0-bin-hadoop3 \
./integration_tests/run_pyspark_from_build.sh \
  -k array_exists --test_oom_injection_mode=always:type=CPU,num_ooms=1,skip=4,split=true
``` 

This PR requires is stacked on NVIDIA/spark-rapids-jni#1637 
---------

Signed-off-by: Gera Shegalov <[email protected]>
Co-authored-by: Jim Brennan <[email protected]>
  • Loading branch information
gerashegalov and jbrennan333 authored Dec 15, 2023
1 parent afe66ca commit ea43d89
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 30 deletions.
20 changes: 13 additions & 7 deletions integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,15 @@ def is_parquet_testing_tests_forced():

_inject_oom = None

def should_inject_oom():
return _inject_oom != None

def get_inject_oom_conf():
return _inject_oom


# For datagen: we expect a seed to be provided by the environment, or default to 0.
# Note that tests can override their seed when calling into datagen by setting seed= in their tests.
_test_datagen_random_seed = int(os.getenv("SPARK_RAPIDS_TEST_DATAGEN_SEED", 0))
print(f"Starting with datagen test seed: {_test_datagen_random_seed}. "
print(f"Starting with datagen test seed: {_test_datagen_random_seed}. "
"Set env variable SPARK_RAPIDS_TEST_DATAGEN_SEED to override.")

def get_datagen_seed():
Expand Down Expand Up @@ -296,7 +298,7 @@ def pytest_configure(config):
# pytest expects this starting list to match for all workers, it is important that the same seed
# is set for all, either from the environment or as a constant.
oom_random_injection_seed = int(os.getenv("SPARK_RAPIDS_TEST_INJECT_OOM_SEED", 1))
print(f"Starting with OOM injection seed: {oom_random_injection_seed}. "
print(f"Starting with OOM injection seed: {oom_random_injection_seed}. "
"Set env variable SPARK_RAPIDS_TEST_INJECT_OOM_SEED to override.")

def pytest_collection_modifyitems(config, items):
Expand All @@ -305,7 +307,9 @@ def pytest_collection_modifyitems(config, items):
extras = []
order = item.get_closest_marker('ignore_order')
# decide if OOMs should be injected, and when
injection_mode = config.getoption('test_oom_injection_mode').lower()
injection_mode_and_conf = config.getoption('test_oom_injection_mode').split(":")
injection_mode = injection_mode_and_conf[0].lower()
injection_conf = injection_mode_and_conf[1] if len(injection_mode_and_conf) == 2 else None
inject_choice = False
datagen_overrides = item.get_closest_marker('datagen_overrides')
if datagen_overrides:
Expand All @@ -322,8 +326,10 @@ def pytest_collection_modifyitems(config, items):
elif injection_mode == 'always':
inject_choice = True
if inject_choice:
extras.append('INJECT_OOM')
item.add_marker('inject_oom', append=True)
extras.append('INJECT_OOM_%s' % injection_conf if injection_conf else 'INJECT_OOM')
item.add_marker(
pytest.mark.inject_oom(injection_conf) if injection_conf else 'inject_oom',
append=True)
if order:
if order.kwargs:
extras.append('IGNORE_ORDER(' + str(order.kwargs) + ')')
Expand Down
10 changes: 5 additions & 5 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import os
from conftest import is_allowing_any_non_gpu, get_non_gpu_allowed, get_validate_execs_in_gpu_plan, is_databricks_runtime, is_at_least_precommit_run, should_inject_oom
from conftest import is_allowing_any_non_gpu, get_non_gpu_allowed, get_validate_execs_in_gpu_plan, is_databricks_runtime, is_at_least_precommit_run, get_inject_oom_conf
from pyspark.sql import DataFrame
from spark_init_internal import get_spark_i_know_what_i_am_doing, spark_version

Expand Down Expand Up @@ -56,10 +56,10 @@ def _from_scala_map(scala_map):

def _set_all_confs(conf):
newconf = _default_conf.copy()
if (should_inject_oom()):
_spark.conf.set("spark.rapids.sql.test.injectRetryOOM", "true")
else:
_spark.conf.set("spark.rapids.sql.test.injectRetryOOM", "false")
inject_oom = get_inject_oom_conf()
if inject_oom:
_spark.conf.set("spark.rapids.sql.test.injectRetryOOM",
inject_oom.args[0] if len(inject_oom.args) > 0 else True)
newconf.update(conf)
for key, value in newconf.items():
if _spark.conf.get(key, None) != value:
Expand Down
83 changes: 71 additions & 12 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}

import ai.rapids.cudf.Cuda
import com.nvidia.spark.rapids.jni.RmmSpark.OomInjectionType

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -117,7 +118,7 @@ object ConfHelper {
}
}

abstract class ConfEntry[T](val key: String, val converter: String => T, val doc: String,
abstract class ConfEntry[T](val key: String, val converter: String => T, val doc: String,
val isInternal: Boolean, val isStartUpOnly: Boolean, val isCommonlyUsed: Boolean) {

def get(conf: Map[String, String]): T
Expand All @@ -127,7 +128,7 @@ abstract class ConfEntry[T](val key: String, val converter: String => T, val doc
override def toString: String = key
}

class ConfEntryWithDefault[T](key: String, converter: String => T, doc: String,
class ConfEntryWithDefault[T](key: String, converter: String => T, doc: String,
isInternal: Boolean, isStartupOnly: Boolean, isCommonlyUsed: Boolean = false,
val defaultValue: T)
extends ConfEntry[T](key, converter, doc, isInternal, isStartupOnly, isCommonlyUsed) {
Expand Down Expand Up @@ -164,7 +165,7 @@ class ConfEntryWithDefault[T](key: String, converter: String => T, doc: String,

class OptionalConfEntry[T](key: String, val rawConverter: String => T, doc: String,
isInternal: Boolean, isStartupOnly: Boolean, isCommonlyUsed: Boolean = false)
extends ConfEntry[Option[T]](key, s => Some(rawConverter(s)), doc, isInternal,
extends ConfEntry[Option[T]](key, s => Some(rawConverter(s)), doc, isInternal,
isStartupOnly, isCommonlyUsed) {

override def get(conf: Map[String, String]): Option[T] = {
Expand Down Expand Up @@ -420,7 +421,7 @@ object RapidsConf {
.stringConf
.createOptional

val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.doc("The pattern to use to generate the named pipe path. Occurrences of %p in the pattern " +
"will be replaced with the process ID of the executor.")
.internal
Expand Down Expand Up @@ -1380,12 +1381,17 @@ object RapidsConf {

// INTERNAL TEST AND DEBUG CONFIGS

val TEST_RETRY_OOM_INJECTION_ENABLED = conf("spark.rapids.sql.test.injectRetryOOM")
.doc("Only to be used in tests. If enabled the retry iterator will inject a GpuRetryOOM " +
"or CpuRetryOOM once per invocation.")
val TEST_RETRY_OOM_INJECTION_MODE = conf("spark.rapids.sql.test.injectRetryOOM")
.doc("Only to be used in tests. If `true` the retry iterator will inject a GpuRetryOOM " +
"or CpuRetryOOM once per invocation. Furthermore an extended config " +
"`num_ooms=<int>,skip=<int>,type=CPU|GPU|CPU_OR_GPU,split=<bool>` can be provided to " +
"specify the number of OOMs to generate; how many to skip before doing so; whether to " +
"filter by allocation events by host(CPU), device(GPU), or both (CPU_OR_GPU); " +
"whether to inject *SplitAndRetryOOM instead of plain *RetryOOM exceptions." +
"Note *SplitAndRetryOOM exceptions are not always handled - use with care.")
.internal()
.booleanConf
.createWithDefault(false)
.stringConf
.createWithDefault(false.toString)

val TEST_CONF = conf("spark.rapids.sql.test.enabled")
.doc("Intended to be used by unit tests, if enabled all operations must run on the " +
Expand Down Expand Up @@ -1652,7 +1658,7 @@ object RapidsConf {
.stringConf
.createWithDefault("none")

val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.lz4.chunkSize")
val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.lz4.chunkSize")
.doc("A configurable chunk size to use when compressing with LZ4.")
.internal()
.startupOnly()
Expand Down Expand Up @@ -2061,7 +2067,7 @@ object RapidsConf {
"The gpu to disk spill bounce buffer must have a positive size")
.createWithDefault(128L * 1024 * 1024)

val NON_UTC_TIME_ZONE_ENABLED =
val NON_UTC_TIME_ZONE_ENABLED =
conf("spark.rapids.sql.nonUTC.enabled")
.doc("An option to enable/disable non-UTC time zone support.")
.internal()
Expand Down Expand Up @@ -2302,7 +2308,53 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isTestEnabled: Boolean = get(TEST_CONF)

lazy val testRetryOOMInjectionEnabled : Boolean = get(TEST_RETRY_OOM_INJECTION_ENABLED)
/**
* Convert a string value to the injection configuration OomInjection.
*
* The new format is a CSV in any order
* "num_ooms=<integer>,skip=<integer>,type=<string value of OomInjectionType>"
*
* "type" maps to OomInjectionType to run count against oomCount and skipCount
* "num_ooms" maps to oomCount (default 1), the number of allocations resulting in an OOM
* "skip" maps to skipCount (default 0), the number of matching allocations to skip before
* injecting an OOM at the skip+1st allocation.
* *split* maps to withSplit (default false), determining whether to inject
* *SplitAndRetryOOM instead of plain *RetryOOM exceptions
*
* For backwards compatibility support existing binary configuration
* "false", disabled, i.e. oomCount=0, skipCount=0, injectionType=None
* "true" or anything else but "false" yields the default
* oomCount=1, skipCount=0, injectionType=CPU_OR_GPU, withSplit=false
*/
lazy val testRetryOOMInjectionMode : OomInjectionConf = {
get(TEST_RETRY_OOM_INJECTION_MODE).toLowerCase match {
case "false" =>
OomInjectionConf(numOoms = 0, skipCount = 0,
oomInjectionFilter = OomInjectionType.CPU_OR_GPU, withSplit = false)
case "true" =>
OomInjectionConf(numOoms = 1, skipCount = 0,
oomInjectionFilter = OomInjectionType.CPU_OR_GPU, withSplit = false)
case injectConfStr =>
val injectConfMap = injectConfStr.split(',').map(_.split('=')).collect {
case Array(k, v) => k -> v
}.toMap
val numOoms = injectConfMap.getOrElse("num_ooms", 1.toString)
val skipCount = injectConfMap.getOrElse("skip", 0.toString)
val oomFilterStr = injectConfMap
.getOrElse("type", OomInjectionType.CPU_OR_GPU.toString)
.toUpperCase()
val oomFilter = OomInjectionType.valueOf(oomFilterStr)
val withSplit = injectConfMap.getOrElse("split", false.toString)
val ret = OomInjectionConf(
numOoms = numOoms.toInt,
skipCount = skipCount.toInt,
oomInjectionFilter = oomFilter,
withSplit = withSplit.toBoolean
)
logDebug(s"Parsed ${ret} from ${injectConfStr} via injectConfMap=${injectConfMap}");
ret
}
}

lazy val testingAllowedNonGpu: Seq[String] = get(TEST_ALLOWED_NONGPU)

Expand Down Expand Up @@ -2849,3 +2901,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging {
conf.contains(key)
}
}

case class OomInjectionConf(
numOoms: Int,
skipCount: Int,
withSplit: Boolean,
oomInjectionFilter: OomInjectionType
)
Original file line number Diff line number Diff line change
Expand Up @@ -590,12 +590,25 @@ object RmmRapidsRetryIterator extends Logging {
doSplit = false
try {
// call the user's function
if (config.exists(_.testRetryOOMInjectionEnabled) && !injectedOOM) {
injectedOOM = true
// ensure we have associated our thread with the running task, as
// `forceRetryOOM` requires a prior association.
RmmSpark.currentThreadIsDedicatedToTask(TaskContext.get().taskAttemptId())
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
config.foreach {
case rapidsConf if !injectedOOM && rapidsConf.testRetryOOMInjectionMode.numOoms > 0 =>
injectedOOM = true
// ensure we have associated our thread with the running task, as
// `forceRetryOOM` requires a prior association.
RmmSpark.currentThreadIsDedicatedToTask(TaskContext.get().taskAttemptId())
val injectConf = rapidsConf.testRetryOOMInjectionMode
if (injectConf.withSplit) {
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId,
injectConf.numOoms,
injectConf.oomInjectionFilter.ordinal,
injectConf.skipCount)
} else {
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId,
injectConf.numOoms,
injectConf.oomInjectionFilter.ordinal,
injectConf.skipCount)
}
case _ => ()
}
result = Some(attemptIter.next())
clearInjectedOOMIfNeeded()
Expand Down

0 comments on commit ea43d89

Please sign in to comment.