Skip to content

Commit

Permalink
Fix Scala conflicts by excluding kafka plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
Kartik Khare authored and Kartik Khare committed Aug 27, 2022
1 parent b7e5cab commit 2ba3254
Showing 1 changed file with 29 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@
* Class to implement LaunchDataIngestionJob command.
*
*/
//TODO: Fix Scala version causing NoSuchMethodError at runtime with a few plugins
//TODO: Cleanup descriptions for options
@CommandLine.Command(name = "LaunchSparkDataIngestionJob")
public class LaunchSparkDataIngestionJobCommand extends AbstractBaseAdminCommand implements Command {
private static final Logger LOGGER = LoggerFactory.getLogger(LaunchSparkDataIngestionJobCommand.class);
public static final String MAIN_CLASS = "org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand";
public static final String SPARK_HOME = "SPARK_HOME";
public static final String BASEDIR = "basedir";
public static final String LOCAL_FILE_PREFIX = "local://";
public static final String PINOT_MAIN_JAR_PREFIX = "pinot-all";

@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true, description = "Print "
+ "this message.")
Expand All @@ -75,6 +77,14 @@ public class LaunchSparkDataIngestionJobCommand extends AbstractBaseAdminCommand
@CommandLine.Option(names = {"-pluginsToLoad"}, required = false, arity = "1..*", split = ":", description = "List "
+ "of plugin name separated by : to load at runtime. e.g. pinto-s3:pinot-parquet")
private List<String> _pluginsToLoad;

// Kafka plugins need to be excluded as they contain scala dependencies which cause
// NoSuchMethodErrors with runtime spark.
// It is also fine to exclude Kafka plugins as they are not going to be used in batch ingestion in any case
@CommandLine.Option(names = {"-pluginsToExclude"}, defaultValue = "pinot-kafka-0.9:pinot-kafka-2.0", required =
false, arity = "1..*", split = ":", description =
"List " + "of plugin name separated by : to not load at runtime. e.g. pinto-s3:pinot-parquet")
private List<String> _pluginsToExclude;
@CommandLine.Option(names = {"-pinotBaseDir"}, required = false, description = "Pinot binary installation directory")
private String _pinotBaseDir;
@CommandLine.Option(names = {"-deployMode"}, required = false, description = "Spark Deploy Mode")
Expand Down Expand Up @@ -137,15 +147,16 @@ public void setHelp(boolean help) {
public boolean execute()
throws Exception {
if (_pinotBaseDir == null) {
String baseDir = System.getProperty("basedir");
String baseDir = System.getProperty(BASEDIR);
if (baseDir != null) {
_pinotBaseDir = baseDir;
} else {
throw new RuntimeException("Either option -pinotBaseDir or env BASEDIR must be set. " + "Currently null");
throw new RuntimeException(
String.format("Either option -pinotBaseDir or env %s must be set. " + "Currently null", BASEDIR));
}
}

Preconditions.checkNotNull(System.getenv("SPARK_HOME"),
Preconditions.checkNotNull(System.getenv(SPARK_HOME),
"SPARK_HOME environment variable should be set to Spark installation path");

boolean isAppropriateJavaVersion = SystemUtils.isJavaVersionAtMost(_sparkVersion.getJavaVersion());
Expand All @@ -160,7 +171,9 @@ public boolean execute()
if (_deployMode != null) {
sparkLauncher.setDeployMode(_deployMode);
}

sparkLauncher.setMainClass(MAIN_CLASS);

SegmentGenerationJobSpec spec;
try {
spec = IngestionJobLauncher.getSegmentGenerationJobSpec(_jobSpecFile, _propertyFile,
Expand Down Expand Up @@ -220,18 +233,6 @@ public boolean execute()
return true;
}

class SparkAppListener implements SparkAppHandle.Listener {
@Override
public void stateChanged(SparkAppHandle sparkAppHandle) {
LOGGER.info("Spark Application State changed: {}", sparkAppHandle.getState().toString());
}

@Override
public void infoChanged(SparkAppHandle sparkAppHandle) {
LOGGER.info("Spark Info changed: {}", sparkAppHandle.getState().toString());
}
}

private void addAppResource(SparkLauncher sparkLauncher, String depsJarDir, List<String> extraClassPath)
throws IOException {
if (depsJarDir != null) {
Expand All @@ -244,15 +245,15 @@ private void addAppResource(SparkLauncher sparkLauncher, String depsJarDir, List
for (String file : files) {
URI fileUri = URI.create(file);
if (!pinotFS.isDirectory(fileUri)) {
if (file.endsWith(".jar") && file.contains("pinot-all")) {
if (file.endsWith(".jar") && file.contains(PINOT_MAIN_JAR_PREFIX)) {
LOGGER.info("Adding jar: {} to appResource", file);
String fileName = FilenameUtils.getName(file);
if (isClusterDeployMode() || !isLocalFileUri(fileUri)) {
sparkLauncher.setAppResource("local://" + fileName);
sparkLauncher.setAppResource(LOCAL_FILE_PREFIX + fileName);
sparkLauncher.addJar(file);
extraClassPath.add(fileName);
} else {
sparkLauncher.setAppResource("local://" + file);
sparkLauncher.setAppResource(LOCAL_FILE_PREFIX + file);
sparkLauncher.addJar(file);
extraClassPath.add(file);
}
Expand All @@ -276,8 +277,7 @@ private void addDepsJarToDistributedCache(SparkLauncher sparkLauncher, String de
if (file.endsWith(".jar")) {
String parentDir = FilenameUtils.getName(file.substring(0, file.lastIndexOf('/')));

if (_pluginsToLoad == null || _pluginsToLoad.contains(parentDir) || _sparkVersion.getPluginName()
.contentEquals(parentDir)) {
if (shouldLoadPlugin(parentDir)) {
addJarFilePath(sparkLauncher, extraClassPath, file);
}
}
Expand All @@ -286,6 +286,13 @@ private void addDepsJarToDistributedCache(SparkLauncher sparkLauncher, String de
}
}

private boolean shouldLoadPlugin(String parentDir) {
boolean shouldLoadPlugin = _pluginsToLoad == null && !_pluginsToExclude.contains(parentDir);
shouldLoadPlugin = shouldLoadPlugin || (_pluginsToLoad != null && _pluginsToLoad.contains(parentDir));
shouldLoadPlugin = shouldLoadPlugin || _sparkVersion.getPluginName().contentEquals(parentDir);
return shouldLoadPlugin;
}

private void addJarFilePath(SparkLauncher sparkLauncher, List<String> extraClassPath, String file) {
URI fileUri = URI.create(file);
sparkLauncher.addJar(file);
Expand Down

0 comments on commit 2ba3254

Please sign in to comment.