Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve session timeout handling #614

Merged
merged 2 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@ Lighter can be configured by using environment variables. Currently, Lighter sup

## Global properties

| Property | Description | Default |
|----------------------------------------|----------------------------------------------------------------|---------------------------------|
| LIGHTER_MAX_RUNNING_JOBS | Max running Batch jobs in parallel | 5 |
| LIGHTER_MAX_STARTING_JOBS | Max starting Batch jobs in parallel | 5 |
| LIGHTER_SPARK_HISTORY_SERVER_URL | Spark history server URL used on frontend | http://localhost/spark-history/ |
| LIGHTER_EXTERNAL_LOGS_URL_TEMPLATE | Template for link to external logs | |
| LIGHTER_PY_GATEWAY_PORT | Port for live Spark session communication | 25333 |
| LIGHTER_URL | URL which can be used to access Lighter form Spark Job | http://lighter.spark:8080 |
| LIGHTER_SESSION_TIMEOUT_MINUTES | Session lifetime in minutes | 90 |
| LIGHTER_STORAGE_JDBC_URL | JDBC url for lighter storage | jdbc:h2:mem:lighter |
| LIGHTER_STORAGE_JDBC_USERNAME | JDBC username | sa |
| LIGHTER_STORAGE_JDBC_PASSWORD | JDBC password | |
| LIGHTER_STORAGE_JDBC_DRIVER_CLASS_NAME | JDBC driver class name | org.h2.Driver |
| LIGHTER_BATCH_DEFAULT_CONF | Default `conf` props for batch applications (JSON)<sup>*</sup> | |
| LIGHTER_SESSION_DEFAULT_CONF | Default `conf` props for session applications (JSON) | |
| Property | Description | Default |
|----------------------------------------|--------------------------------------------------------------------------------------------------------------------|---------------------------------|
| LIGHTER_MAX_RUNNING_JOBS | Max running Batch jobs in parallel | 5 |
| LIGHTER_MAX_STARTING_JOBS | Max starting Batch jobs in parallel | 5 |
| LIGHTER_SPARK_HISTORY_SERVER_URL | Spark history server URL used on frontend | http://localhost/spark-history/ |
| LIGHTER_EXTERNAL_LOGS_URL_TEMPLATE | Template for link to external logs | |
| LIGHTER_PY_GATEWAY_PORT | Port for live Spark session communication | 25333 |
| LIGHTER_URL | URL which can be used to access Lighter form Spark Job | http://lighter.spark:8080 |
| LIGHTER_SESSION_TIMEOUT_MINUTES | Session lifetime in minutes (from last statement creation). Use negative value to disable | 90 |
| LIGHTER_SESSION_TIMEOUT_ACTIVE | Should Lighter kill sessions with waiting statements (obsolete when `LIGHTER_SESSION_TIMEOUT_MINUTES` is negative) | false |
| LIGHTER_STORAGE_JDBC_URL | JDBC url for lighter storage | jdbc:h2:mem:lighter |
| LIGHTER_STORAGE_JDBC_USERNAME | JDBC username | sa |
| LIGHTER_STORAGE_JDBC_PASSWORD | JDBC password | |
| LIGHTER_STORAGE_JDBC_DRIVER_CLASS_NAME | JDBC driver class name | org.h2.Driver |
| LIGHTER_BATCH_DEFAULT_CONF | Default `conf` props for batch applications (JSON)<sup>*</sup> | |
| LIGHTER_SESSION_DEFAULT_CONF | Default `conf` props for session applications (JSON) | |

<sup>*</sup> default confs will be merged with confs provided in submit request, if property is defined in submit request, default will be ignored.
Example of `LIGHTER_BATCH_DEFAULT_CONF`: `{"spark.kubernetes.driverEnv.TEST1":"test1"}`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,23 @@ public void handleTimeout() {
assertLocked();
var sessionConfiguration = appConfiguration.getSessionConfiguration();
var timeout = sessionConfiguration.getTimeoutMinutes();
if (timeout != null) {
if (timeout != null && timeout > 0) {
sessionService.fetchRunning()
.stream()
.filter(s -> sessionConfiguration.getPermanentSessions().stream()
.noneMatch(conf -> conf.getId().equals(s.getId())))
.filter(s -> isNotPermanent(sessionConfiguration, s))
.filter(s -> sessionConfiguration.shouldTimeoutActive() || !sessionService.isActive(s))
.filter(s -> sessionService.lastUsed(s.getId()).isBefore(LocalDateTime.now().minusMinutes(timeout)))
.peek(s -> LOG.info("Killing because of timeout {}, session: {}", timeout, s))
.forEach(sessionService::killOne);
}

}

private boolean isNotPermanent(AppConfiguration.SessionConfiguration sessionConfiguration, Application session) {
return sessionConfiguration.getPermanentSessions().stream()
.noneMatch(conf -> conf.getId().equals(session.getId()));
}

private <T> List<T> selfOrEmpty(List<T> list) {
return ofNullable(list).orElse(List.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,8 @@ public List<Statement> getStatements(String id, Integer from, Integer size) {
.limit(size)
.collect(Collectors.toList());
}

public boolean isActive(Application application) {
return statementHandler.hasWaitingStatement(application);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,26 @@ public String toString() {
public static class SessionConfiguration {

private final Integer timeoutMinutes;
private final Boolean timeoutActive;
private final List<PermanentSession> permanentSessions;

@ConfigurationInject
public SessionConfiguration(@Nullable Integer timeoutMinutes,
Boolean timeoutActive,
List<PermanentSession> permanentSessions) {
this.timeoutMinutes = timeoutMinutes;
this.timeoutActive = timeoutActive;
this.permanentSessions = permanentSessions;
}

public Integer getTimeoutMinutes() {
return timeoutMinutes;
}

public boolean shouldTimeoutActive() {
return Boolean.TRUE.equals(timeoutActive);
}

public List<PermanentSession> getPermanentSessions() {
return permanentSessions;
}
Expand Down
1 change: 1 addition & 0 deletions server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ lighter:
url: http://lighter.spark:8080
session:
timeout-minutes: 90
timeout-active: false
permanent-sessions: []
kubernetes:
enabled: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,23 @@ class SessionHandlerTest extends Specification {
0 * service.killOne(permanentSession)
}

def "preserves active timeouted sessions"() {
given:
def oldSession = newSession()
service.lastUsed(oldSession.id) >> LocalDateTime.now().minusMinutes(conf.sessionConfiguration.timeoutMinutes + 1)
service.isActive(oldSession) >> true

1 * service.fetchRunning() >> [
oldSession,
]

when:
handler.handleTimeout()

then:
0 * service.killOne(oldSession)
}

def "tracks running"() {
given:
def session = app()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class Factories {
null,
5432,
"http://lighter:8080",
new AppConfiguration.SessionConfiguration(20, [new AppConfiguration.PermanentSession("permanentSessionId", submitParams())]),
new AppConfiguration.SessionConfiguration(20, false,
[new AppConfiguration.PermanentSession("permanentSessionId", submitParams())]),
["spark.kubernetes.driverEnv.TEST": "test"],
["spark.kubernetes.driverEnv.TEST": "test"]
)
Expand Down