diff --git a/server/src/main/java/com/exacaster/lighter/backend/SparkApp.java b/server/src/main/java/com/exacaster/lighter/backend/SparkApp.java index 2e93c77a..03d38278 100644 --- a/server/src/main/java/com/exacaster/lighter/backend/SparkApp.java +++ b/server/src/main/java/com/exacaster/lighter/backend/SparkApp.java @@ -1,41 +1,46 @@ package com.exacaster.lighter.backend; -import static org.apache.spark.launcher.SparkLauncher.DRIVER_MEMORY; -import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_CORES; -import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_MEMORY; - import com.exacaster.lighter.application.Application; import com.exacaster.lighter.concurrency.EmptyWaitable; import com.exacaster.lighter.concurrency.Waitable; +import org.apache.spark.launcher.SparkLauncher; + import java.io.IOException; +import java.util.Collections; import java.util.Map; import java.util.function.Consumer; -import org.apache.spark.launcher.SparkLauncher; + +import static org.apache.spark.launcher.SparkLauncher.DRIVER_MEMORY; +import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_CORES; +import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_MEMORY; public class SparkApp { private final Map configDefaults; private final Map backendConfiguration; + private final Map envVariables; private final Application application; private final SparkListener listener; public SparkApp(Application application, Map configDefaults, Map backendConfiguration, - SparkListener listener) { + Consumer errorHandler) { + this(application, configDefaults, backendConfiguration, Collections.emptyMap(), new ClusterSparkListener(errorHandler)); + } + + public SparkApp(Application application, + Map configDefaults, + Map backendConfiguration, + Map envVariables, + SparkListener listener) { this.application = application; this.configDefaults = configDefaults; this.backendConfiguration = backendConfiguration; + this.envVariables = envVariables; this.listener = listener; } - public SparkApp(Application application, - Map configDefaults, - Map backendConfiguration, - Consumer errorHandler) { - this(application, configDefaults, backendConfiguration, new ClusterSparkListener(errorHandler)); - } - public Waitable launch() { try { var launcher = buildLauncher(); @@ -50,7 +55,7 @@ public Waitable launch() { private SparkLauncher buildLauncher() { var submitParams = application.getSubmitParams(); - var launcher = new SparkLauncher() + var launcher = new SparkLauncher(envVariables) .setAppName(submitParams.getName()) .setAppResource(submitParams.getFile()); diff --git a/server/src/main/java/com/exacaster/lighter/backend/local/LocalBackend.java b/server/src/main/java/com/exacaster/lighter/backend/local/LocalBackend.java index f5fbad39..5c3f01ad 100644 --- a/server/src/main/java/com/exacaster/lighter/backend/local/LocalBackend.java +++ b/server/src/main/java/com/exacaster/lighter/backend/local/LocalBackend.java @@ -1,10 +1,5 @@ package com.exacaster.lighter.backend.local; -import static com.exacaster.lighter.backend.Constants.DEPLOY_MODE_CLIENT; -import static org.apache.spark.launcher.SparkLauncher.CHILD_PROCESS_LOGGER_NAME; -import static org.apache.spark.launcher.SparkLauncher.DEPLOY_MODE; -import static org.apache.spark.launcher.SparkLauncher.SPARK_MASTER; - import com.exacaster.lighter.application.Application; import com.exacaster.lighter.application.ApplicationInfo; import com.exacaster.lighter.backend.Backend; @@ -13,11 +8,17 @@ import com.exacaster.lighter.log.Log; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; + import java.time.Duration; import java.util.Map; import java.util.Optional; import java.util.function.Consumer; +import static com.exacaster.lighter.backend.Constants.DEPLOY_MODE_CLIENT; +import static org.apache.spark.launcher.SparkLauncher.CHILD_PROCESS_LOGGER_NAME; +import static org.apache.spark.launcher.SparkLauncher.DEPLOY_MODE; +import static org.apache.spark.launcher.SparkLauncher.SPARK_MASTER; + public class LocalBackend implements Backend { private final AppConfiguration conf; @@ -69,6 +70,10 @@ public SparkApp prepareSparkApplication(Application application, Map