diff --git a/server/src/main/java/com/exacaster/lighter/application/sessions/SessionHandler.java b/server/src/main/java/com/exacaster/lighter/application/sessions/SessionHandler.java index 4fc54eaa..dd0ac896 100644 --- a/server/src/main/java/com/exacaster/lighter/application/sessions/SessionHandler.java +++ b/server/src/main/java/com/exacaster/lighter/application/sessions/SessionHandler.java @@ -56,19 +56,27 @@ public Waitable launch(Application application, Consumer errorHandler return app.launch(); } - @SchedulerLock(name = "keepPermanentSession") + @SchedulerLock(name = "keepPermanentSession", lockAtLeastFor = "1m") @Scheduled(fixedRate = "1m") public void keepPermanentSessions() throws InterruptedException { assertLocked(); + LOG.info("Start provisioning permanent sessions."); for (var sessionConf : appConfiguration.getSessionConfiguration().getPermanentSessions()) { var session = sessionService.fetchOne(sessionConf.getId()); if (session.map(Application::getState).filter(this::running).isEmpty() || session.flatMap(backend::getInfo).map(ApplicationInfo::getState).filter(this::running).isEmpty()) { - sessionService.deleteOne(sessionConf.getId()); - launchSession(sessionService.createSession(sessionConf.getSubmitParams(), sessionConf.getId())) - .waitCompletion(); + LOG.info("Permanent session {} needs to be (re)started.", sessionConf.getId()); + var sessionToLaunch = sessionService.createSession( + sessionConf.getSubmitParams(), + sessionConf.getId() + ); + + sessionService.deleteOne(sessionToLaunch); + launchSession(sessionToLaunch).waitCompletion(); + LOG.info("Permanent session {} (re)started.", sessionConf.getId()); } } + LOG.info("End provisioning permanent sessions."); } @SchedulerLock(name = "processScheduledSessions") diff --git a/server/src/main/java/com/exacaster/lighter/application/sessions/SessionService.java b/server/src/main/java/com/exacaster/lighter/application/sessions/SessionService.java index aed34675..13decb1b 100644 --- a/server/src/main/java/com/exacaster/lighter/application/sessions/SessionService.java +++ b/server/src/main/java/com/exacaster/lighter/application/sessions/SessionService.java @@ -19,6 +19,7 @@ @Singleton public class SessionService { + private final ApplicationStorage applicationStorage; private final StatementStorage statementStorage; private final Backend backend; @@ -86,10 +87,12 @@ public Optional fetchOne(String id) { } public void deleteOne(String id) { - this.fetchOne(id).ifPresent(app -> { - backend.kill(app); - applicationStorage.deleteApplication(id); - }); + this.fetchOne(id).ifPresent(this::deleteOne); + } + + public void deleteOne(Application app) { + backend.kill(app); + applicationStorage.deleteApplication(app.getId()); } public void killOne(Application app) { diff --git a/server/src/main/java/com/exacaster/lighter/spark/SparkListener.java b/server/src/main/java/com/exacaster/lighter/spark/SparkListener.java index 637a14ff..0d161d85 100644 --- a/server/src/main/java/com/exacaster/lighter/spark/SparkListener.java +++ b/server/src/main/java/com/exacaster/lighter/spark/SparkListener.java @@ -7,6 +7,7 @@ import java.util.function.Consumer; import org.apache.spark.launcher.SparkAppHandle; import org.apache.spark.launcher.SparkAppHandle.Listener; +import org.apache.spark.launcher.SparkAppHandle.State; import org.slf4j.Logger; public class SparkListener implements Listener, Waitable { @@ -22,9 +23,12 @@ public SparkListener(Consumer errorHandler) { @Override public void stateChanged(SparkAppHandle handle) { - LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), handle.getState()); + var state = handle.getState(); + LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), state); handle.getError().ifPresent(errorHandler); - if (handle.getState() != null && handle.getState().isFinal()) { + // Disconnect when final or running. + // In case app fails after detach, status will be retrieved by ApplicationStatusHandler. + if (state != null && (state.isFinal() || State.RUNNING.equals(state))) { handle.disconnect(); latch.countDown(); } diff --git a/server/src/test/groovy/com/exacaster/lighter/application/sessions/SessionHandlerTest.groovy b/server/src/test/groovy/com/exacaster/lighter/application/sessions/SessionHandlerTest.groovy index 3b328771..8355c0eb 100644 --- a/server/src/test/groovy/com/exacaster/lighter/application/sessions/SessionHandlerTest.groovy +++ b/server/src/test/groovy/com/exacaster/lighter/application/sessions/SessionHandlerTest.groovy @@ -110,7 +110,7 @@ class SessionHandlerTest extends Specification { handler.keepPermanentSessions() then: "restart permanent session" - 1 * service.deleteOne(session.id) + 1 * service.deleteOne({ it -> it.getId() == session.getId() }) 1 * service.createSession(session.submitParams, session.id) >> permanentSession 1 * handler.launch(permanentSession, _) >> EmptyWaitable.INSTANCE }