Skip to content

Commit

Permalink
Add support for additional args, cleanup description
Browse files Browse the repository at this point in the history
  • Loading branch information
Kartik Khare authored and Kartik Khare committed Aug 27, 2022
1 parent b678399 commit b7e5cab
Showing 1 changed file with 37 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@
*/
//TODO: Fix Scala version causing NoSuchMethodError at runtime with a few plugins
//TODO: Cleanup descriptions for options
//TODO: Add options for most popular spark confs such as numExecutors
@CommandLine.Command(name = "LaunchSparkDataIngestionJob")
public class LaunchSparkDataIngestionJobCommand extends AbstractBaseAdminCommand implements Command {
private static final Logger LOGGER = LoggerFactory.getLogger(LaunchSparkDataIngestionJobCommand.class);
public static final String MAIN_CLASS = "org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand";

@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true, description = "Print "
+ "this message.")
private boolean _help = false;
Expand All @@ -71,7 +72,8 @@ public class LaunchSparkDataIngestionJobCommand extends AbstractBaseAdminCommand
@CommandLine.Option(names = {"-propertyFile"}, required = false, description = "A property file contains context "
+ "values to set the job spec template")
private String _propertyFile;
@CommandLine.Option(names = {"-pluginsToLoad"}, required = false, arity = "1..*", split = ":", description = "Plugins to Load")
@CommandLine.Option(names = {"-pluginsToLoad"}, required = false, arity = "1..*", split = ":", description = "List "
+ "of plugin name separated by : to load at runtime. e.g. pinto-s3:pinot-parquet")
private List<String> _pluginsToLoad;
@CommandLine.Option(names = {"-pinotBaseDir"}, required = false, description = "Pinot binary installation directory")
private String _pinotBaseDir;
Expand All @@ -80,11 +82,14 @@ public class LaunchSparkDataIngestionJobCommand extends AbstractBaseAdminCommand
@CommandLine.Option(names = {"-master"}, required = false, defaultValue = "local", description = "Spark Master")
private String _sparkMaster;
@CommandLine.Option(names = {"-sparkVersion"}, required = false, defaultValue = "SPARK_3", description = "Spark "
+ "Type - can be one of Spark_2 or Spark_3")
+ "plugin to use - can be one of Spark_2 or Spark_3")
private SparkType _sparkVersion;
@CommandLine.Option(names = {"-verbose"}, required = false, defaultValue = "true", description = "Enable verbose logging")
@CommandLine.Option(names = {"-verbose"}, required = false, defaultValue = "true", description = "Enable verbose "
+ "logging from launcher")
private boolean _verbose;
@CommandLine.Option(names = {"-sparkConf"}, required = false, split = ":", mapFallbackValue = "", description = "Spark Conf")
@CommandLine.Option(names = {"-sparkConf"}, required = false, split = ":", mapFallbackValue = "", description =
"Additional Spark configuration values as key value pairs separated by : e.g. -sparkConf spark.executor"
+ ".cores=2:num-executors=3")
private Map<String, String> _sparkConf;
@CommandLine.Unmatched
private String[] _unmatchedArgs;
Expand Down Expand Up @@ -136,8 +141,7 @@ public boolean execute()
if (baseDir != null) {
_pinotBaseDir = baseDir;
} else {
throw new RuntimeException("Either option -pinotBaseDir or env BASEDIR must be set. "
+ "Currently null");
throw new RuntimeException("Either option -pinotBaseDir or env BASEDIR must be set. " + "Currently null");
}
}

Expand All @@ -146,18 +150,17 @@ public boolean execute()

boolean isAppropriateJavaVersion = SystemUtils.isJavaVersionAtMost(_sparkVersion.getJavaVersion());
if (!isAppropriateJavaVersion) {
LOGGER.warn(
"Platform java version should be at most: {}, found: {}. "
LOGGER.warn("Platform java version should be at most: {}, found: {}. "
+ "Ignore this warning if you are running from different environment than your spark cluster",
_sparkVersion.getSparkVersion(), SystemUtils.JAVA_SPECIFICATION_VERSION);
_sparkVersion.getJavaVersion(), SystemUtils.JAVA_SPECIFICATION_VERSION);
}

SparkLauncher sparkLauncher = new SparkLauncher();
sparkLauncher.setMaster(_sparkMaster);
if (_deployMode != null) {
sparkLauncher.setDeployMode(_deployMode);
}
sparkLauncher.setMainClass("org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand");
sparkLauncher.setMainClass(MAIN_CLASS);
SegmentGenerationJobSpec spec;
try {
spec = IngestionJobLauncher.getSegmentGenerationJobSpec(_jobSpecFile, _propertyFile,
Expand All @@ -180,7 +183,7 @@ public boolean execute()
sparkLauncher.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, extraClassPathsString);
sparkLauncher.setConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, extraClassPathsString);

if(_deployMode != null && _deployMode.contentEquals("cluster")) {
if (isClusterDeployMode()) {
sparkLauncher.addFile(_jobSpecFile);
sparkLauncher.addAppArgs("-jobSpecFile", FilenameUtils.getName(_jobSpecFile));
} else {
Expand All @@ -197,7 +200,11 @@ public boolean execute()

if (_sparkConf != null) {
for (Map.Entry<String, String> conf : _sparkConf.entrySet()) {
sparkLauncher.setConf(conf.getKey(), conf.getValue());
if (conf.getKey().startsWith("spark")) {
sparkLauncher.setConf(conf.getKey(), conf.getValue());
} else {
sparkLauncher.addSparkArg("--" + conf.getKey(), conf.getValue());
}
}
}

Expand All @@ -206,7 +213,8 @@ public boolean execute()
}
sparkLauncher.setAppName("Pinot Spark Ingestion Job");
sparkLauncher.setVerbose(_verbose);
sparkLauncher.redirectToLog(LOGGER.getName());
sparkLauncher.redirectOutput(ProcessBuilder.Redirect.INHERIT);
sparkLauncher.redirectError(ProcessBuilder.Redirect.INHERIT);
Process process = sparkLauncher.launch();
process.waitFor();
return true;
Expand All @@ -224,7 +232,6 @@ public void infoChanged(SparkAppHandle sparkAppHandle) {
}
}

//TODO: Handle DFS paths correctly
private void addAppResource(SparkLauncher sparkLauncher, String depsJarDir, List<String> extraClassPath)
throws IOException {
if (depsJarDir != null) {
Expand All @@ -240,18 +247,14 @@ private void addAppResource(SparkLauncher sparkLauncher, String depsJarDir, List
if (file.endsWith(".jar") && file.contains("pinot-all")) {
LOGGER.info("Adding jar: {} to appResource", file);
String fileName = FilenameUtils.getName(file);
if(_deployMode != null && _deployMode.contentEquals("cluster")) {
if (isClusterDeployMode() || !isLocalFileUri(fileUri)) {
sparkLauncher.setAppResource("local://" + fileName);
sparkLauncher.addJar(file);
extraClassPath.add(fileName);
} else if (fileUri.getScheme() == null || fileUri.getScheme().contentEquals("file")) {
} else {
sparkLauncher.setAppResource("local://" + file);
sparkLauncher.addJar(file);
extraClassPath.add(file);
} else {
sparkLauncher.setAppResource("local://" + fileName);
sparkLauncher.addJar(file);
extraClassPath.add(fileName);
}
}
}
Expand All @@ -273,8 +276,8 @@ private void addDepsJarToDistributedCache(SparkLauncher sparkLauncher, String de
if (file.endsWith(".jar")) {
String parentDir = FilenameUtils.getName(file.substring(0, file.lastIndexOf('/')));

if (_pluginsToLoad == null || _pluginsToLoad.contains(parentDir) ||
_sparkVersion.getPluginName().contentEquals(parentDir)) {
if (_pluginsToLoad == null || _pluginsToLoad.contains(parentDir) || _sparkVersion.getPluginName()
.contentEquals(parentDir)) {
addJarFilePath(sparkLauncher, extraClassPath, file);
}
}
Expand All @@ -286,24 +289,26 @@ private void addDepsJarToDistributedCache(SparkLauncher sparkLauncher, String de
private void addJarFilePath(SparkLauncher sparkLauncher, List<String> extraClassPath, String file) {
URI fileUri = URI.create(file);
sparkLauncher.addJar(file);
if(_deployMode != null && _deployMode.contentEquals("cluster")) {
if (isClusterDeployMode() || !isLocalFileUri(fileUri)) {
LOGGER.info("Adding deps jar: {} to distributed cache", file);
String fileName = FilenameUtils.getName(file);
if(!fileName.isEmpty()) {
if (!fileName.isEmpty()) {
extraClassPath.add(fileName);
}
} else if (fileUri.getScheme() == null || fileUri.getScheme().contentEquals("file")) {
LOGGER.info("Adding deps jar: {} to distributed cache", file);
extraClassPath.add(file);
} else {
LOGGER.info("Adding deps jar: {} to distributed cache", file);
String fileName = FilenameUtils.getName(file);
if(!fileName.isEmpty()) {
extraClassPath.add(fileName);
}
extraClassPath.add(file);
}
}

private boolean isLocalFileUri(URI fileUri) {
return fileUri.getScheme() == null || fileUri.getScheme().contentEquals("file");
}

private boolean isClusterDeployMode() {
return _deployMode != null && _deployMode.contentEquals("cluster");
}

@Override
public String getName() {
return "LaunchSparkDataIngestionJob";
Expand Down

0 comments on commit b7e5cab

Please sign in to comment.