Skip to content

Commit

Permalink
Improve session timeout handling
Browse files Browse the repository at this point in the history
  • Loading branch information
pdambrauskas authored and Paulius Dambrauskas committed Aug 7, 2023
1 parent 438a369 commit 74df75b
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 19 deletions.
31 changes: 16 additions & 15 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@ Lighter can be configured by using environment variables. Currently, Lighter sup

## Global properties

| Property | Description | Default |
|----------------------------------------|----------------------------------------------------------------|---------------------------------|
| LIGHTER_MAX_RUNNING_JOBS | Max running Batch jobs in parallel | 5 |
| LIGHTER_MAX_STARTING_JOBS | Max starting Batch jobs in parallel | 5 |
| LIGHTER_SPARK_HISTORY_SERVER_URL | Spark history server URL used on frontend | http://localhost/spark-history/ |
| LIGHTER_EXTERNAL_LOGS_URL_TEMPLATE | Template for link to external logs | |
| LIGHTER_PY_GATEWAY_PORT | Port for live Spark session communication | 25333 |
| LIGHTER_URL | URL which can be used to access Lighter form Spark Job | http://lighter.spark:8080 |
| LIGHTER_SESSION_TIMEOUT_MINUTES | Session lifetime in minutes | 90 |
| LIGHTER_STORAGE_JDBC_URL | JDBC url for lighter storage | jdbc:h2:mem:lighter |
| LIGHTER_STORAGE_JDBC_USERNAME | JDBC username | sa |
| LIGHTER_STORAGE_JDBC_PASSWORD | JDBC password | |
| LIGHTER_STORAGE_JDBC_DRIVER_CLASS_NAME | JDBC driver class name | org.h2.Driver |
| LIGHTER_BATCH_DEFAULT_CONF | Default `conf` props for batch applications (JSON)<sup>*</sup> | |
| LIGHTER_SESSION_DEFAULT_CONF | Default `conf` props for session applications (JSON) | |
| Property | Description | Default |
|----------------------------------------|-------------------------------------------------------------------------------------------|---------------------------------|
| LIGHTER_MAX_RUNNING_JOBS | Max running Batch jobs in parallel | 5 |
| LIGHTER_MAX_STARTING_JOBS | Max starting Batch jobs in parallel | 5 |
| LIGHTER_SPARK_HISTORY_SERVER_URL | Spark history server URL used on frontend | http://localhost/spark-history/ |
| LIGHTER_EXTERNAL_LOGS_URL_TEMPLATE | Template for link to external logs | |
| LIGHTER_PY_GATEWAY_PORT | Port for live Spark session communication | 25333 |
| LIGHTER_URL | URL which can be used to access Lighter form Spark Job | http://lighter.spark:8080 |
| LIGHTER_SESSION_TIMEOUT_MINUTES | Session lifetime in minutes (from last statement creation). Use negative value to disable | 90 |
| LIGHTER_SESSION_TIMEOUT_ACTIVE | Should lighter kill sessions with waiting statements | false |
| LIGHTER_STORAGE_JDBC_URL | JDBC url for lighter storage | jdbc:h2:mem:lighter |
| LIGHTER_STORAGE_JDBC_USERNAME | JDBC username | sa |
| LIGHTER_STORAGE_JDBC_PASSWORD | JDBC password | |
| LIGHTER_STORAGE_JDBC_DRIVER_CLASS_NAME | JDBC driver class name | org.h2.Driver |
| LIGHTER_BATCH_DEFAULT_CONF | Default `conf` props for batch applications (JSON)<sup>*</sup> | |
| LIGHTER_SESSION_DEFAULT_CONF | Default `conf` props for session applications (JSON) | |

<sup>*</sup> default confs will be merged with confs provided in submit request, if property is defined in submit request, default will be ignored.
Example of `LIGHTER_BATCH_DEFAULT_CONF`: `{"spark.kubernetes.driverEnv.TEST1":"test1"}`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,23 @@ public void handleTimeout() {
assertLocked();
var sessionConfiguration = appConfiguration.getSessionConfiguration();
var timeout = sessionConfiguration.getTimeoutMinutes();
if (timeout != null) {
if (timeout != null && timeout > 0) {
sessionService.fetchRunning()
.stream()
.filter(s -> sessionConfiguration.getPermanentSessions().stream()
.noneMatch(conf -> conf.getId().equals(s.getId())))
.filter(s -> isNotPermanent(sessionConfiguration, s))
.filter(s -> sessionConfiguration.shouldTimeoutActive() || !sessionService.isActive(s))
.filter(s -> sessionService.lastUsed(s.getId()).isBefore(LocalDateTime.now().minusMinutes(timeout)))
.peek(s -> LOG.info("Killing because of timeout {}, session: {}", timeout, s))
.forEach(sessionService::killOne);
}

}

private boolean isNotPermanent(AppConfiguration.SessionConfiguration sessionConfiguration, Application session) {
return sessionConfiguration.getPermanentSessions().stream()
.noneMatch(conf -> conf.getId().equals(session.getId()));
}

private <T> List<T> selfOrEmpty(List<T> list) {
return ofNullable(list).orElse(List.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,8 @@ public List<Statement> getStatements(String id, Integer from, Integer size) {
.limit(size)
.collect(Collectors.toList());
}

public boolean isActive(Application application) {
return statementHandler.hasWaitingStatement(application);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,26 @@ public String toString() {
public static class SessionConfiguration {

private final Integer timeoutMinutes;
private final Boolean timeoutActive;
private final List<PermanentSession> permanentSessions;

@ConfigurationInject
public SessionConfiguration(@Nullable Integer timeoutMinutes,
Boolean timeoutActive,
List<PermanentSession> permanentSessions) {
this.timeoutMinutes = timeoutMinutes;
this.timeoutActive = timeoutActive;
this.permanentSessions = permanentSessions;
}

public Integer getTimeoutMinutes() {
return timeoutMinutes;
}

public boolean shouldTimeoutActive() {
return Boolean.TRUE.equals(timeoutActive);
}

public List<PermanentSession> getPermanentSessions() {
return permanentSessions;
}
Expand Down
1 change: 1 addition & 0 deletions server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ lighter:
url: http://lighter.spark:8080
session:
timeout-minutes: 90
timeout-active: false
permanent-sessions: []
kubernetes:
enabled: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,23 @@ class SessionHandlerTest extends Specification {
0 * service.killOne(permanentSession)
}

def "preserves active timeouted sessions"() {
given:
def oldSession = newSession()
service.lastUsed(oldSession.id) >> LocalDateTime.now().minusMinutes(conf.sessionConfiguration.timeoutMinutes + 1)
service.isActive(oldSession) >> true

1 * service.fetchRunning() >> [
oldSession,
]

when:
handler.handleTimeout()

then:
0 * service.killOne(oldSession)
}

def "tracks running"() {
given:
def session = app()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class Factories {
null,
5432,
"http://lighter:8080",
new AppConfiguration.SessionConfiguration(20, [new AppConfiguration.PermanentSession("permanentSessionId", submitParams())]),
new AppConfiguration.SessionConfiguration(20, false,
[new AppConfiguration.PermanentSession("permanentSessionId", submitParams())]),
["spark.kubernetes.driverEnv.TEST": "test"],
["spark.kubernetes.driverEnv.TEST": "test"]
)
Expand Down

0 comments on commit 74df75b

Please sign in to comment.