Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support default conf props for Spark applications #41

Merged
merged 1 commit into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@ Lighter can be configured by using environment variables. Currently, Lighter sup

## Global properties

| Property | Description | Default |
| -------------------------------------- | ------------------------------------------------------ | -------------------------------- |
| LIGHTER_MAX_RUNNING_JOBS | Max running Batch jobs in parallel | 5 |
| LIGHTER_SPARK_HISTORY_SERVER_URL | Spark history server URL used on frontend | http://localhost/spark-history/ |
| LIGHTER_EXTERNAL_LOGS_URL_TEMPLATE | Template for link to external logs | |
| LIGHTER_PY_GATEWAY_PORT | Port for live Spark session communication | 25333 |
| LIGHETR_URL | URL which can be used to access Lighter form Spark Job | http://lighter.spark:8080 |
| LIGHTER_SESSION_TIMEOUT_MINUTES | Session lifetime in minutes | 90 |
| LIGHTER_STORAGE_JDBC_URL | JDBC url for lighter storage | jdbc:h2:mem:lighter |
| LIGHTER_STORAGE_JDBC_USERNAME | JDBC username | sa |
| LIGHTER_STORAGE_JDBC_PASSWORD | JDBC password | |
| LIGHTER_STORAGE_JDBC_DRIVER_CLASS_NAME | JDBC driver class name | org.h2.Driver |

| Property | Description | Default |
|----------------------------------------|----------------------------------------------------------------|---------------------------------|
| LIGHTER_MAX_RUNNING_JOBS | Max running Batch jobs in parallel | 5 |
| LIGHTER_SPARK_HISTORY_SERVER_URL | Spark history server URL used on frontend | http://localhost/spark-history/ |
| LIGHTER_EXTERNAL_LOGS_URL_TEMPLATE | Template for link to external logs | |
| LIGHTER_PY_GATEWAY_PORT | Port for live Spark session communication | 25333 |
| LIGHETR_URL | URL which can be used to access Lighter form Spark Job | http://lighter.spark:8080 |
| LIGHTER_SESSION_TIMEOUT_MINUTES | Session lifetime in minutes | 90 |
| LIGHTER_STORAGE_JDBC_URL | JDBC url for lighter storage | jdbc:h2:mem:lighter |
| LIGHTER_STORAGE_JDBC_USERNAME | JDBC username | sa |
| LIGHTER_STORAGE_JDBC_PASSWORD | JDBC password | |
| LIGHTER_STORAGE_JDBC_DRIVER_CLASS_NAME | JDBC driver class name | org.h2.Driver |
| LIGHTER_BATCH_DEFAULT_CONF | Default `conf` props for batch applications (JSON)<sup>*</sup> | |
| LIGHTER_SESSION_DEFAULT_CONF | Default `conf` props for session applications (JSON) | |

<sup>*</sup> default confs will be merged with confs provided in submit request, if property is defined in submit request, default will be ignored.
Example of `LIGHTER_BATCH_DEFAULT_CONF`: `{"spark.kubernetes.driverEnv.TEST1":"test1"}`.

## Kubernetes configuration

Expand All @@ -32,8 +36,13 @@ Lighter can be configured by using environment variables. Currently, Lighter sup

## YARN configuration

| Property | Description | Default |
| ---------------------------------- | ----------------------------------------------------------------- | --------------------------------- |
| LIGHTER_YARN_ENABLED | Yarn enabled (Kubernetes should be disabled) | false |
| LIGHTER_YARN_URL | Yarn API URL, `/ws/v1/cluster/` will be appended | |
| HADOOP_CONF_DIR | Path to `core-site.xml`,`hdfs-site.xml` and `yarn-site.xml` files | |
| Property | Description | Default |
|---------------------------------|-------------------------------------------------------------------| --------------------------------- |
| LIGHTER_YARN_ENABLED | Yarn enabled (Kubernetes should be disabled) | false |
| LIGHTER_YARN_URL | Yarn API URL, `/ws/v1/cluster/` will be appended | |
| HADOOP_CONF_DIR | Path to `core-site.xml`,`hdfs-site.xml` and `yarn-site.xml` files | |
| LIGHTER_YARN_KERBEROS_PRINCIPAL | Kerberos principal used for job management<sup>*</sup> | |
| LIGHTER_YARN_KERBEROS_KEYTAB | Kerberos keytab used for job management | |

<sup>*</sup> Principal & Keytab provided in `LIGHTER_YARN_KERBEROS_PRINCIPAL` and `LIGHTER_YARN_KERBEROS_KEYTAB` will be used by spark job
as well, if `spark.kerberos.keytab` is not explicitly declared in `LIGHTER_BATCH_DEFAULT_CONF` or provided on submit request.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.exacaster.lighter.application;

import java.util.HashMap;
import java.util.Map;

public final class Utils {

private Utils() {
}

public static Map<String, String> merge(Map<String, String> current, Map<String, String> other) {
var result = new HashMap<>(current);
other.forEach((key, val) -> {
result.computeIfAbsent(key, (k) -> val);
});
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
import com.exacaster.lighter.application.Application;
import com.exacaster.lighter.application.ApplicationState;
import com.exacaster.lighter.application.ApplicationStatusHandler;
import com.exacaster.lighter.application.Utils;
import com.exacaster.lighter.backend.Backend;
import com.exacaster.lighter.concurrency.Waitable;
import com.exacaster.lighter.configuration.AppConfiguration;
import com.exacaster.lighter.spark.ConfigModifier;
import com.exacaster.lighter.spark.SparkApp;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Singleton;
import java.util.List;
import java.util.function.Consumer;

import java.util.stream.Collectors;
import net.javacrumbs.shedlock.micronaut.SchedulerLock;
import org.slf4j.Logger;
Expand All @@ -38,8 +40,12 @@ public BatchHandler(Backend backend, BatchService batchService, AppConfiguration
}

public Waitable launch(Application application, Consumer<Throwable> errorHandler) {
var app = new SparkApp(application.getSubmitParams(), errorHandler);
return app.launch(backend.getSubmitConfiguration(application));
List<ConfigModifier> configModifiers = List.of(
(current) -> Utils.merge(current, appConfiguration.getBatchDefaultConf()),
(current) -> backend.getSubmitConfiguration(application, current)
);
var app = new SparkApp(application.getSubmitParams(), errorHandler, configModifiers);
return app.launch();
}

@SchedulerLock(name = "processScheduledBatches")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
import com.exacaster.lighter.application.ApplicationInfo;
import com.exacaster.lighter.application.ApplicationState;
import com.exacaster.lighter.application.ApplicationStatusHandler;
import com.exacaster.lighter.application.Utils;
import com.exacaster.lighter.application.sessions.processors.StatementHandler;
import com.exacaster.lighter.backend.Backend;
import com.exacaster.lighter.concurrency.Waitable;
import com.exacaster.lighter.configuration.AppConfiguration;
import com.exacaster.lighter.configuration.AppConfiguration.SessionConfiguration;
import com.exacaster.lighter.spark.ConfigModifier;
import com.exacaster.lighter.spark.SparkApp;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Singleton;
Expand All @@ -32,7 +33,7 @@ public class SessionHandler {
private final Backend backend;
private final StatementHandler statementStatusChecker;
private final ApplicationStatusHandler statusTracker;
private final SessionConfiguration sessionConfiguration;
private final AppConfiguration appConfiguration;

public SessionHandler(SessionService sessionService,
Backend backend,
Expand All @@ -43,19 +44,23 @@ public SessionHandler(SessionService sessionService,
this.backend = backend;
this.statementStatusChecker = statementStatusChecker;
this.statusTracker = statusTracker;
this.sessionConfiguration = appConfiguration.getSessionConfiguration();
this.appConfiguration = appConfiguration;
}

public Waitable launch(Application application, Consumer<Throwable> errorHandler) {
var app = new SparkApp(application.getSubmitParams(), errorHandler);
return app.launch(backend.getSubmitConfiguration(application));
List<ConfigModifier> configModifiers = List.of(
(current) -> Utils.merge(current, appConfiguration.getSessionDefaultConf()),
(current) -> backend.getSubmitConfiguration(application, current)
);
var app = new SparkApp(application.getSubmitParams(), errorHandler, configModifiers);
return app.launch();
}

@SchedulerLock(name = "keepPermanentSession")
@Scheduled(fixedRate = "1m")
public void keepPermanentSessions() {
assertLocked();
sessionConfiguration.getPermanentSessions().forEach(sessionConf -> {
appConfiguration.getSessionConfiguration().getPermanentSessions().forEach(sessionConf -> {
var session = sessionService.fetchOne(sessionConf.getId());
if (session.map(Application::getState).filter(this::running).isEmpty() ||
session.flatMap(backend::getInfo).map(ApplicationInfo::getState).filter(this::running).isEmpty()) {
Expand Down Expand Up @@ -101,6 +106,7 @@ public void trackRunning() {
@Scheduled(fixedRate = "10m")
public void handleTimeout() {
assertLocked();
var sessionConfiguration = appConfiguration.getSessionConfiguration();
var timeout = sessionConfiguration.getTimeoutMinutes();
if (timeout != null) {
sessionService.fetchRunning()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ public interface Backend {

void kill(Application application);

default Map<String, String> getSubmitConfiguration(Application application) {
return Map.of();
default Map<String, String> getSubmitConfiguration(Application application,
Map<String, String> current) {
return current;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ public KubernetesBackend(KubernetesProperties properties, AppConfiguration conf,
}

@Override
public Map<String, String> getSubmitConfiguration(Application application) {
public Map<String, String> getSubmitConfiguration(Application application,
Map<String, String> current) {
URI uri = URI.create(conf.getUrl());
var host = uri.getHost();
var props = new HashMap<>(Map.of(
var props = new HashMap<>(current);
props.putAll(Map.of(
"spark.master", properties.getMaster(),
"spark.kubernetes.driver.label." + SPARK_APP_TAG_LABEL, application.getId(),
"spark.kubernetes.executor.label." + SPARK_APP_TAG_LABEL, application.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,22 @@ public void kill(Application application) {
}

@Override
public Map<String, String> getSubmitConfiguration(Application application) {
public Map<String, String> getSubmitConfiguration(Application application,
Map<String, String> current) {
URI uri = URI.create(conf.getUrl());
var host = uri.getHost();
var props = new HashMap<>(Map.of(
var props = new HashMap<>(current);
props.putAll(Map.of(
"spark.master", "yarn",
"spark.yarn.tags", "lighter," + application.getId(),
"spark.yarn.submit.waitAppCompletion", "false",
"spark.yarn.appMasterEnv.PY_GATEWAY_PORT", String.valueOf(conf.getPyGatewayPort()),
"spark.yarn.appMasterEnv.PY_GATEWAY_HOST", host,
"spark.yarn.appMasterEnv.LIGHTER_SESSION_ID", application.getId()
));
if (yarnProperties.getKerberosKeytab() != null && yarnProperties.getKerberosPrincipal() != null) {
props.put("spark.kerberos.keytab", yarnProperties.getKerberosKeytab());
props.put("spark.kerberos.principal", yarnProperties.getKerberosPrincipal());
if (!props.containsKey("spark.kerberos.keytab") && yarnProperties.getKerberos() != null) {
props.put("spark.kerberos.keytab", yarnProperties.getKerberos().getKeytab());
props.put("spark.kerberos.principal", yarnProperties.getKerberos().getPrincipal());
}
return props;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ public YarnBackend backend(YarnProperties yarnProperties, AppConfiguration conf,
var yarnConfiguration = new Configuration(false);
yarnConfiguration.addResource(new Path(hadoopConfDir, "core-site.xml"));
yarnConfiguration.addResource(new Path(hadoopConfDir, "yarn-site.xml"));
if (yarnProperties.getKerberosKeytab() != null && yarnProperties.getKerberosPrincipal() != null) {
var kerberos = yarnProperties.getKerberos();
if (kerberos != null) {
yarnConfiguration.setBoolean(HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED, true);
UserGroupInformation.setConfiguration(yarnConfiguration);
loginUserFromKeytab(yarnProperties.getKerberosPrincipal(), yarnProperties.getKerberosKeytab());
loginUserFromKeytab(kerberos.getPrincipal(), kerberos.getKeytab());
}
var yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,58 @@
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Nullable;
import java.util.StringJoiner;
import javax.validation.constraints.NotBlank;
import org.apache.commons.lang3.builder.ToStringBuilder;

@ConfigurationProperties("lighter.yarn")
@Requires(property="lighter.yarn.enabled", value = "true")
public class YarnProperties {

private final String kerberosPrincipal;
private final String kerberosKeytab;
private final KerberosProperties kerberos;

@ConfigurationInject
public YarnProperties(@Nullable String kerberosPrincipal, @Nullable String kerberosKeytab) {
this.kerberosPrincipal = kerberosPrincipal;
this.kerberosKeytab = kerberosKeytab;
public YarnProperties(@Nullable KerberosProperties kerberos) {
this.kerberos = kerberos;
}

public String getKerberosPrincipal() {
return kerberosPrincipal;
}

public String getKerberosKeytab() {
return kerberosKeytab;
@ConfigurationProperties("kerberos")
public KerberosProperties getKerberos() {
return kerberos;
}

@Override
public String toString() {
return new StringJoiner(", ", YarnProperties.class.getSimpleName() + "[", "]")
.add("kerberosPrincipal='" + kerberosPrincipal + "'")
.add("kerberosKeytab='" + kerberosKeytab + "'")
return new ToStringBuilder(this)
.append("kerberos", kerberos)
.toString();
}

@ConfigurationProperties("kerberos")
public static class KerberosProperties {

private final String principal;
private final String keytab;

@ConfigurationInject
public KerberosProperties(@NotBlank String principal, @NotBlank String keytab) {
this.principal = principal;
this.keytab = keytab;
}

public String getPrincipal() {
return principal;
}

public String getKeytab() {
return keytab;
}

@Override
public String toString() {
return new ToStringBuilder(this)
.append("principal", principal)
.append("keytab", keytab)
.toString();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.exacaster.lighter.configuration;

import static io.micronaut.core.convert.format.MapFormat.MapTransformation.FLAT;
import static io.micronaut.core.naming.conventions.StringConvention.RAW;
import static java.util.Optional.ofNullable;

import com.exacaster.lighter.spark.SubmitParams;
import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -9,7 +13,9 @@
import io.micronaut.context.annotation.Primary;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.format.MapFormat;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;

@ConfigurationProperties("lighter")
Expand All @@ -25,20 +31,27 @@ public class AppConfiguration {
@JsonProperty(access = Access.WRITE_ONLY)
private final String url;
private final SessionConfiguration sessionConfiguration;
private final Map<String, String> batchDefaultConf;
private final Map<String, String> sessionDefaultConf;

@ConfigurationInject
public AppConfiguration(Integer maxRunningJobs,
@Nullable String sparkHistoryServerUrl,
@Nullable String externalLogsUrlTemplate,
Integer pyGatewayPort,
String url,
SessionConfiguration sessionConfiguration) {
SessionConfiguration sessionConfiguration,
@MapFormat(transformation = FLAT, keyFormat = RAW)
@Nullable Map<String, String> batchDefaultConf,
@Nullable Map<String, String> sessionDefaultConf) {
this.maxRunningJobs = maxRunningJobs;
this.sparkHistoryServerUrl = sparkHistoryServerUrl;
this.externalLogsUrlTemplate = externalLogsUrlTemplate;
this.pyGatewayPort = pyGatewayPort;
this.url = url;
this.sessionConfiguration = sessionConfiguration;
this.batchDefaultConf = ofNullable(batchDefaultConf).orElse(Map.of());
this.sessionDefaultConf = ofNullable(sessionDefaultConf).orElse(Map.of());
}

public Integer getMaxRunningJobs() {
Expand All @@ -65,12 +78,22 @@ public SessionConfiguration getSessionConfiguration() {
return sessionConfiguration;
}

public Map<String, String> getBatchDefaultConf() {
return batchDefaultConf;
}

public Map<String, String> getSessionDefaultConf() {
return sessionDefaultConf;
}

@Override
public String toString() {
return new StringJoiner(", ", AppConfiguration.class.getSimpleName() + "[", "]")
.add("maxRunningJobs=" + maxRunningJobs)
.add("sparkHistoryServerUrl=" + sparkHistoryServerUrl)
.add("sessionConfiguration=" + sessionConfiguration)
.add("batchDefaultConf=" + batchDefaultConf)
.add("sessionDefaultConf=" + sessionDefaultConf)
.toString();
}

Expand Down
Loading