-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33084][CORE][SQL] Add jar support ivy path #29966
Changes from all commits
afaf7bd
51daf9a
3579de0
d6e8caf
169e1f8
0e589ec
b3e3211
9161340
0e3c1ec
300ca56
63e877b
733e62c
b60ba1e
883b9d3
208afc2
ba9ea29
10b3737
7f878c2
d2c1950
2200076
5a9cc30
875d8a7
e921245
614a865
8c5cb7c
f460974
1f7dc01
050c410
ff611a6
03aca3b
653b919
6e48275
bdc5035
9c22882
9c88f8d
8220e5a
49ac62c
b69a62e
273a5ac
ebe1c9c
6034fb2
e22e398
afea73f
13000f2
bce3d40
d53f302
57c351d
8c53b83
4048c5b
aa53482
2ffb431
8c18cdf
6bd41cd
fbc236c
90491d5
75ff3ce
4c44dae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1929,7 +1929,7 @@ class SparkContext(config: SparkConf) extends Logging { | |
} | ||
|
||
private def addJar(path: String, addedOnSubmit: Boolean): Unit = { | ||
def addLocalJarFile(file: File): String = { | ||
def addLocalJarFile(file: File): Seq[String] = { | ||
try { | ||
if (!file.exists()) { | ||
throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found") | ||
|
@@ -1938,15 +1938,15 @@ class SparkContext(config: SparkConf) extends Logging { | |
throw new IllegalArgumentException( | ||
s"Directory ${file.getAbsoluteFile} is not allowed for addJar") | ||
} | ||
env.rpcEnv.fileServer.addJar(file) | ||
Seq(env.rpcEnv.fileServer.addJar(file)) | ||
} catch { | ||
case NonFatal(e) => | ||
logError(s"Failed to add $path to Spark environment", e) | ||
null | ||
Nil | ||
} | ||
} | ||
|
||
def checkRemoteJarFile(path: String): String = { | ||
def checkRemoteJarFile(path: String): Seq[String] = { | ||
val hadoopPath = new Path(path) | ||
val scheme = hadoopPath.toUri.getScheme | ||
if (!Array("http", "https", "ftp").contains(scheme)) { | ||
|
@@ -1959,47 +1959,58 @@ class SparkContext(config: SparkConf) extends Logging { | |
throw new IllegalArgumentException( | ||
s"Directory ${path} is not allowed for addJar") | ||
} | ||
path | ||
Seq(path) | ||
} catch { | ||
case NonFatal(e) => | ||
logError(s"Failed to add $path to Spark environment", e) | ||
null | ||
Nil | ||
} | ||
} else { | ||
path | ||
Seq(path) | ||
} | ||
} | ||
|
||
if (path == null || path.isEmpty) { | ||
logWarning("null or empty path specified as parameter to addJar") | ||
} else { | ||
val key = if (path.contains("\\") && Utils.isWindows) { | ||
val (keys, scheme) = if (path.contains("\\") && Utils.isWindows) { | ||
// For local paths with backslashes on Windows, URI throws an exception | ||
addLocalJarFile(new File(path)) | ||
(addLocalJarFile(new File(path)), "local") | ||
} else { | ||
val uri = new Path(path).toUri | ||
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies | ||
Utils.validateURL(uri) | ||
uri.getScheme match { | ||
val uriScheme = uri.getScheme | ||
val jarPaths = uriScheme match { | ||
// A JAR file which exists only on the driver node | ||
case null => | ||
// SPARK-22585 path without schema is not url encoded | ||
addLocalJarFile(new File(uri.getPath)) | ||
// A JAR file which exists only on the driver node | ||
case "file" => addLocalJarFile(new File(uri.getPath)) | ||
// A JAR file which exists locally on every worker node | ||
case "local" => "file:" + uri.getPath | ||
case "local" => Seq("file:" + uri.getPath) | ||
case "ivy" => | ||
// Since `new Path(path).toUri` will lose query information, | ||
// so here we use `URI.create(path)` | ||
DependencyUtils.resolveMavenDependencies(URI.create(path)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if two added jars have the same dependency with different versions? e.g.,
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
.flatMap(jar => addLocalJarFile(new File(jar))) | ||
case _ => checkRemoteJarFile(path) | ||
} | ||
(jarPaths, uriScheme) | ||
} | ||
if (key != null) { | ||
if (keys.nonEmpty) { | ||
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis | ||
if (addedJars.putIfAbsent(key, timestamp).isEmpty) { | ||
logInfo(s"Added JAR $path at $key with timestamp $timestamp") | ||
val (added, existed) = keys.partition(addedJars.putIfAbsent(_, timestamp).isEmpty) | ||
if (added.nonEmpty) { | ||
val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI" | ||
logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with timestamp $timestamp") | ||
postEnvironmentUpdate() | ||
} else { | ||
logWarning(s"The jar $path has been added already. Overwriting of added jars " + | ||
"is not supported in the current version.") | ||
} | ||
if (existed.nonEmpty) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add tests to check if this warning message is shown only once by using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sure |
||
val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI" | ||
logInfo(s"The $jarMessage $path at ${existed.mkString(",")} has been added already." + | ||
" Overwriting of added jar is not supported in the current version.") | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,7 @@ | |
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy | ||
package org.apache.spark.util | ||
|
||
import java.io.File | ||
import java.net.URI | ||
|
@@ -25,12 +25,140 @@ import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.fs.{FileSystem, Path} | ||
|
||
import org.apache.spark.{SecurityManager, SparkConf, SparkException} | ||
import org.apache.spark.deploy.SparkSubmitUtils | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.util.{MutableURLClassLoader, Utils} | ||
|
||
private[deploy] object DependencyUtils extends Logging { | ||
case class IvyProperties( | ||
packagesExclusions: String, | ||
packages: String, | ||
repositories: String, | ||
ivyRepoPath: String, | ||
ivySettingsPath: String) | ||
|
||
private[spark] object DependencyUtils extends Logging { | ||
|
||
def getIvyProperties(): IvyProperties = { | ||
val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) = Seq( | ||
"spark.jars.excludes", | ||
"spark.jars.packages", | ||
"spark.jars.repositories", | ||
"spark.jars.ivy", | ||
"spark.jars.ivySettings" | ||
).map(sys.props.get(_).orNull) | ||
IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) | ||
} | ||
|
||
AngersZhuuuu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private def isInvalidQueryString(tokens: Array[String]): Boolean = { | ||
tokens.length != 2 || StringUtils.isBlank(tokens(0)) || StringUtils.isBlank(tokens(1)) | ||
} | ||
|
||
/** | ||
* Parse URI query string's parameter value of `transitive` and `exclude`. | ||
* Other invalid parameters will be ignored. | ||
* | ||
* @param uri Ivy URI need to be downloaded. | ||
* @return Tuple value of parameter `transitive` and `exclude` value. | ||
* | ||
* 1. transitive: whether to download dependency jar of Ivy URI, default value is false | ||
* and this parameter value is case-sensitive. Invalid value will be treat as false. | ||
* Example: Input: exclude=org.mortbay.jetty:jetty&transitive=true | ||
* Output: true | ||
* | ||
* 2. exclude: comma separated exclusions to apply when resolving transitive dependencies, | ||
* consists of `group:module` pairs separated by commas. | ||
* Example: Input: excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http | ||
* Output: [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http] | ||
*/ | ||
private def parseQueryParams(uri: URI): (Boolean, String) = { | ||
val uriQuery = uri.getQuery | ||
if (uriQuery == null) { | ||
(false, "") | ||
} else { | ||
val mapTokens = uriQuery.split("&").map(_.split("=")) | ||
if (mapTokens.exists(isInvalidQueryString)) { | ||
throw new IllegalArgumentException( | ||
s"Invalid query string in Ivy URI ${uri.toString}: $uriQuery") | ||
} | ||
val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1) | ||
|
||
// Parse transitive parameters (e.g., transitive=true) in an Ivy URI, default value is false | ||
val transitiveParams = groupedParams.get("transitive") | ||
if (transitiveParams.map(_.size).getOrElse(0) > 1) { | ||
logWarning("It's best to specify `transitive` parameter in ivy URI query only once." + | ||
" If there are multiple `transitive` parameter, we will select the last one") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hive has the same behaviour with this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No, but we can have this warning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If so, what's the hive behaviour? it will throw an exception instead selecting the last one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Select the last one There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, ok. |
||
} | ||
val transitive = | ||
transitiveParams.flatMap(_.takeRight(1).map(_._2 == "true").headOption).getOrElse(false) | ||
AngersZhuuuu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Parse an excluded list (e.g., exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http) | ||
// in an Ivy URI. When download Ivy URI jar, Spark won't download transitive jar | ||
// in a excluded list. | ||
val exclusionList = groupedParams.get("exclude").map { params => | ||
params.map(_._2).flatMap { excludeString => | ||
val excludes = excludeString.split(",") | ||
if (excludes.map(_.split(":")).exists(isInvalidQueryString)) { | ||
throw new IllegalArgumentException( | ||
s"Invalid exclude string in Ivy URI ${uri.toString}:" + | ||
" expected 'org:module,org:module,..', found " + excludeString) | ||
} | ||
excludes | ||
}.mkString(",") | ||
}.getOrElse("") | ||
|
||
val validParams = Set("transitive", "exclude") | ||
val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq | ||
if (invalidParams.nonEmpty) { | ||
logWarning(s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " + | ||
s"in Ivy URI query `$uriQuery`.") | ||
} | ||
|
||
(transitive, exclusionList) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if an invalid param is given in hive, e.g., There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like we should at least warn on invalid param? |
||
} | ||
} | ||
|
||
/** | ||
* Download Ivy URI's dependency jars. | ||
* | ||
* @param uri Ivy URI need to be downloaded. The URI format should be: | ||
* `ivy://group:module:version[?query]` | ||
* Ivy URI query part format should be: | ||
* `parameter=value¶meter=value...` | ||
* Note that currently Ivy URI query part support two parameters: | ||
* 1. transitive: whether to download dependent jars related to your Ivy URI. | ||
* transitive=false or `transitive=true`, if not set, the default value is false. | ||
* 2. exclude: exclusion list when download Ivy URI jar and dependency jars. | ||
* The `exclude` parameter content is a ',' separated `group:module` pair string : | ||
* `exclude=group:module,group:module...` | ||
* @return Comma separated string list of jars downloaded. | ||
*/ | ||
def resolveMavenDependencies(uri: URI): Seq[String] = { | ||
val ivyProperties = DependencyUtils.getIvyProperties() | ||
val authority = uri.getAuthority | ||
if (authority == null) { | ||
throw new IllegalArgumentException( | ||
s"Invalid Ivy URI authority in uri ${uri.toString}:" + | ||
" Expected 'org:module:version', found null.") | ||
} | ||
if (authority.split(":").length != 3) { | ||
throw new IllegalArgumentException( | ||
s"Invalid Ivy URI authority in uri ${uri.toString}:" + | ||
s" Expected 'org:module:version', found $authority.") | ||
} | ||
|
||
val (transitive, exclusionList) = parseQueryParams(uri) | ||
|
||
resolveMavenDependencies( | ||
transitive, | ||
exclusionList, | ||
authority, | ||
ivyProperties.repositories, | ||
ivyProperties.ivyRepoPath, | ||
Option(ivyProperties.ivySettingsPath) | ||
).split(",") | ||
} | ||
|
||
def resolveMavenDependencies( | ||
packagesTransitive: Boolean, | ||
packagesExclusions: String, | ||
packages: String, | ||
repositories: String, | ||
|
@@ -51,7 +179,8 @@ private[deploy] object DependencyUtils extends Logging { | |
SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath)) | ||
} | ||
|
||
SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions) | ||
SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, | ||
transitive = packagesTransitive, exclusions = exclusions) | ||
} | ||
|
||
def resolveAndDownloadJars( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AngersZhuuuu, out of curiosity, is the Ivy URI the standard form documented somewhere? or something specific to Spark that you came up with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From hive https://issues.apache.org/jira/browse/HIVE-9664, since it download jar use
ivy
then use schema asivy
? I think this useful for a lot of companies that have standard package management, so I implemented it in Spark