Skip to content

Commit

Permalink
Merge pull request #685 from jmilkiewicz/fix-env-variables-local
Browse files Browse the repository at this point in the history
setting required env variables for local mode
  • Loading branch information
pdambrauskas authored Oct 2, 2023
2 parents b1e8f12 + 5733d94 commit 7853e0c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
33 changes: 19 additions & 14 deletions server/src/main/java/com/exacaster/lighter/backend/SparkApp.java
Original file line number Diff line number Diff line change
@@ -1,41 +1,46 @@
package com.exacaster.lighter.backend;

import static org.apache.spark.launcher.SparkLauncher.DRIVER_MEMORY;
import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_CORES;
import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_MEMORY;

import com.exacaster.lighter.application.Application;
import com.exacaster.lighter.concurrency.EmptyWaitable;
import com.exacaster.lighter.concurrency.Waitable;
import org.apache.spark.launcher.SparkLauncher;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.spark.launcher.SparkLauncher;

import static org.apache.spark.launcher.SparkLauncher.DRIVER_MEMORY;
import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_CORES;
import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_MEMORY;

public class SparkApp {

private final Map<String, String> configDefaults;
private final Map<String, String> backendConfiguration;
private final Map<String, String> envVariables;
private final Application application;
private final SparkListener listener;

public SparkApp(Application application,
Map<String, String> configDefaults,
Map<String, String> backendConfiguration,
SparkListener listener) {
Consumer<Throwable> errorHandler) {
this(application, configDefaults, backendConfiguration, Collections.emptyMap(), new ClusterSparkListener(errorHandler));
}

public SparkApp(Application application,
Map<String, String> configDefaults,
Map<String, String> backendConfiguration,
Map<String, String> envVariables,
SparkListener listener) {
this.application = application;
this.configDefaults = configDefaults;
this.backendConfiguration = backendConfiguration;
this.envVariables = envVariables;
this.listener = listener;
}

public SparkApp(Application application,
Map<String, String> configDefaults,
Map<String, String> backendConfiguration,
Consumer<Throwable> errorHandler) {
this(application, configDefaults, backendConfiguration, new ClusterSparkListener(errorHandler));
}

public Waitable launch() {
try {
var launcher = buildLauncher();
Expand All @@ -50,7 +55,7 @@ public Waitable launch() {

private SparkLauncher buildLauncher() {
var submitParams = application.getSubmitParams();
var launcher = new SparkLauncher()
var launcher = new SparkLauncher(envVariables)
.setAppName(submitParams.getName())
.setAppResource(submitParams.getFile());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package com.exacaster.lighter.backend.local;

import static com.exacaster.lighter.backend.Constants.DEPLOY_MODE_CLIENT;
import static org.apache.spark.launcher.SparkLauncher.CHILD_PROCESS_LOGGER_NAME;
import static org.apache.spark.launcher.SparkLauncher.DEPLOY_MODE;
import static org.apache.spark.launcher.SparkLauncher.SPARK_MASTER;

import com.exacaster.lighter.application.Application;
import com.exacaster.lighter.application.ApplicationInfo;
import com.exacaster.lighter.backend.Backend;
Expand All @@ -13,11 +8,17 @@
import com.exacaster.lighter.log.Log;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

import static com.exacaster.lighter.backend.Constants.DEPLOY_MODE_CLIENT;
import static org.apache.spark.launcher.SparkLauncher.CHILD_PROCESS_LOGGER_NAME;
import static org.apache.spark.launcher.SparkLauncher.DEPLOY_MODE;
import static org.apache.spark.launcher.SparkLauncher.SPARK_MASTER;

public class LocalBackend implements Backend {

private final AppConfiguration conf;
Expand Down Expand Up @@ -69,6 +70,10 @@ public SparkApp prepareSparkApplication(Application application, Map<String, Str
SPARK_MASTER, "local[*]",
CHILD_PROCESS_LOGGER_NAME, localApp.getLoggerName()
),
Map.of("LIGHTER_SESSION_ID", application.getId(),
"PY_GATEWAY_PORT", conf.getPyGatewayPort().toString(),
"PY_GATEWAY_HOST", "localhost"
),
localApp
);
}
Expand Down

0 comments on commit 7853e0c

Please sign in to comment.