Skip to content

Commit

Permalink
Detect multiple jars on the classpath when init plugin [databricks] (#…
Browse files Browse the repository at this point in the history
…9654)

* Detect multiple jars on the classpath when init plugin

Signed-off-by: Haoyang Li <[email protected]>

* clean up

Signed-off-by: Haoyang Li <[email protected]>

* Apply suggestions from code review

Co-authored-by: Gera Shegalov <[email protected]>

* print version info and also check jni/cudf

Signed-off-by: Haoyang Li <[email protected]>

* add config for allowing multiple jars

Signed-off-by: Haoyang Li <[email protected]>

* keep jar path in error messages

Signed-off-by: Haoyang Li <[email protected]>

* address comments

Signed-off-by: Haoyang Li <[email protected]>

* Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala

Co-authored-by: Gera Shegalov <[email protected]>

* Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala

Co-authored-by: Gera Shegalov <[email protected]>

* address comments

Signed-off-by: Haoyang Li <[email protected]>

* Use unique properties for intermediate jars

Signed-off-by: Haoyang Li <[email protected]>

* clean up

Signed-off-by: Haoyang Li <[email protected]>

* address comment

Signed-off-by: Haoyang Li <[email protected]>

* Apply suggestions from code review

Co-authored-by: Gera Shegalov <[email protected]>

* address comments

Signed-off-by: Haoyang Li <[email protected]>

* add the project.artifactId to build-info and check it

Signed-off-by: Haoyang Li <[email protected]>

* remove unnecessary copyright update

Signed-off-by: Haoyang Li <[email protected]>

* remove log

Signed-off-by: Haoyang Li <[email protected]>

* Add 2.13 support

Signed-off-by: Haoyang Li <[email protected]>

* use revision to check duplicate jars

Signed-off-by: Haoyang Li <[email protected]>

* fix 2.13 build

Signed-off-by: Haoyang Li <[email protected]>

* support both SAME_REVISION and NEVER mode

Signed-off-by: Haoyang Li <[email protected]>

* Avoid CI change and filter out test

Signed-off-by: Haoyang Li <[email protected]>

* check values for config

Signed-off-by: Haoyang Li <[email protected]>

* use enum

Signed-off-by: Haoyang Li <[email protected]>

* fix two nits

Signed-off-by: Haoyang Li <[email protected]>

* Do not print log if no multiple jar

Signed-off-by: Haoyang Li <[email protected]>

* ignore subdir when checking multiple jars

Signed-off-by: Haoyang Li <[email protected]>

* Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala

Co-authored-by: Gera Shegalov <[email protected]>

* wip ut

* address comment

Signed-off-by: Haoyang Li <[email protected]>

---------

Signed-off-by: Haoyang Li <[email protected]>
Co-authored-by: Gera Shegalov <[email protected]>
  • Loading branch information
thirtiseven and gerashegalov authored Nov 28, 2023
1 parent c65c5b8 commit 6f96048
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 0 deletions.
67 changes: 67 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids

import java.lang.reflect.InvocationTargetException
import java.net.URL
import java.time.ZoneId
import java.util.Properties

Expand All @@ -25,6 +26,7 @@ import scala.sys.process._
import scala.util.Try

import ai.rapids.cudf.{Cuda, CudaException, CudaFatalException, CudfException, MemoryCleaner}
import com.nvidia.spark.rapids.RapidsConf.AllowMultipleJars
import com.nvidia.spark.rapids.filecache.{FileCache, FileCacheLocalityManager, FileCacheLocalityMsg}
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import org.apache.commons.lang3.exception.ExceptionUtils
Expand Down Expand Up @@ -112,6 +114,67 @@ object RapidsPluginUtils extends Logging {
}
}

private def detectMultipleJar(propName: String, jarName: String, conf: RapidsConf): Unit = {
val classloader = ShimLoader.getShimClassLoader()
val possibleRapidsJarURLs = classloader.getResources(propName).asScala.toSet.toSeq.filter {
url => {
val urlPath = url.toString
// Filter out submodule jars, e.g. rapids-4-spark-aggregator_2.12-23.12.0-spark341.jar,
// and files stored under subdirs of '!/', e.g.
// rapids-4-spark_2.12-23.12.0-cuda11.jar!/spark330/rapids4spark-version-info.properties
// We only want to find the main jar, e.g.
// rapids-4-spark_2.12-23.12.0-cuda11.jar!/rapids4spark-version-info.properties
!urlPath.contains("rapids-4-spark-") && urlPath.endsWith("!/" + propName)
}
}
val revisionRegex = "revision=(.*)".r
val revisionMap: Map[String, Seq[URL]] = possibleRapidsJarURLs.map { url =>
val versionInfo = scala.io.Source.fromURL(url).getLines().toSeq
val revision = versionInfo
.collect {
case revisionRegex(revision) => revision
}
.headOption
.getOrElse("UNKNOWN")
(revision, url)
}.groupBy(_._1).mapValues(_.map(_._2)).toMap
lazy val rapidsJarsVersMsg = revisionMap.map {
case (revision, urls) => {
s"revison: $revision" + urls.map {
url => "\n\tjar URL: " + url.toString.split("!").head + "\n\t" +
scala.io.Source.fromURL(url).getLines().toSeq.mkString("\n\t")
}.mkString + "\n"
}
}.mkString
// scalastyle:off line.size.limit
lazy val msg = s"""Multiple $jarName jars found in the classpath:
|$rapidsJarsVersMsg
|Please make sure there is only one $jarName jar in the classpath.
|If it is impossible to fix the classpath you can suppress the error by setting ${RapidsConf.ALLOW_MULTIPLE_JARS.key} to SAME_REVISION or ALWAYS.
""".stripMargin
// scalastyle:on line.size.limit

conf.allowMultipleJars match {
case AllowMultipleJars.ALWAYS =>
if (revisionMap.size != 1 || revisionMap.values.exists(_.size != 1)) {
logWarning(msg)
}
case AllowMultipleJars.SAME_REVISION =>
require(revisionMap.size == 1, msg)
if (revisionMap.values.exists(_.size != 1)) {
logWarning(msg)
}
case AllowMultipleJars.NEVER =>
require(revisionMap.size == 1 && revisionMap.values.forall(_.size == 1), msg)
}
}

def detectMultipleJars(conf: RapidsConf): Unit = {
detectMultipleJar(PLUGIN_PROPS_FILENAME, "rapids-4-spark", conf)
detectMultipleJar(JNI_PROPS_FILENAME, "spark-rapids-jni", conf)
detectMultipleJar(CUDF_PROPS_FILENAME, "cudf", conf)
}

// This assumes Apache Spark logic, if CSPs are setting defaults differently, we may need
// to handle.
def estimateCoresOnExec(conf: SparkConf): Int = {
Expand Down Expand Up @@ -310,6 +373,7 @@ class RapidsDriverPlugin extends DriverPlugin with Logging {
val sparkConf = pluginContext.conf
RapidsPluginUtils.fixupConfigsOnDriver(sparkConf)
val conf = new RapidsConf(sparkConf)
RapidsPluginUtils.detectMultipleJars(conf)
RapidsPluginUtils.logPluginMode(conf)
GpuCoreDumpHandler.driverInit(sc, conf)

Expand Down Expand Up @@ -364,6 +428,9 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
val numCores = RapidsPluginUtils.estimateCoresOnExec(sparkConf)
val conf = new RapidsConf(extraConf.asScala.toMap)

// Fail if there are multiple plugin jars in the classpath.
RapidsPluginUtils.detectMultipleJars(conf)

// Compare if the cudf version mentioned in the classpath is equal to the version which
// plugin expects. If there is a version mismatch, throw error. This check can be disabled
// by setting this config spark.rapids.cudfVersionOverride=true
Expand Down
27 changes: 27 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,22 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

object AllowMultipleJars extends Enumeration {
val ALWAYS, SAME_REVISION, NEVER = Value
}

val ALLOW_MULTIPLE_JARS = conf("spark.rapids.sql.allowMultipleJars")
.internal()
.startupOnly()
.doc("Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. " +
"Spark will take the first one it finds, so the version may not be expected. Possisble " +
"values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same " +
"revision, NEVER: do not allow multiple jars at all.")
.stringConf
.transform(_.toUpperCase(java.util.Locale.ROOT))
.checkValues(AllowMultipleJars.values.map(_.toString))
.createWithDefault(AllowMultipleJars.SAME_REVISION.toString)

val ALLOW_DISABLE_ENTIRE_PLAN = conf("spark.rapids.allowDisableEntirePlan")
.internal()
.doc("The plugin has the ability to detect possibe incompatibility with some specific " +
Expand Down Expand Up @@ -2641,6 +2657,17 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val cudfVersionOverride: Boolean = get(CUDF_VERSION_OVERRIDE)

lazy val allowMultipleJars: AllowMultipleJars.Value = {
get(ALLOW_MULTIPLE_JARS) match {
case "ALWAYS" => AllowMultipleJars.ALWAYS
case "NEVER" => AllowMultipleJars.NEVER
case "SAME_REVISION" => AllowMultipleJars.SAME_REVISION
case other =>
throw new IllegalArgumentException(s"Internal Error $other is not supported for " +
s"${ALLOW_MULTIPLE_JARS.key}")
}
}

lazy val allowDisableEntirePlan: Boolean = get(ALLOW_DISABLE_ENTIRE_PLAN)

lazy val useArrowCopyOptimization: Boolean = get(USE_ARROW_OPT)
Expand Down

0 comments on commit 6f96048

Please sign in to comment.