Skip to content

Commit

Permalink
Merge pull request #44 from exacaster/wait_completion_when_launching_…
Browse files Browse the repository at this point in the history
…permanent

Bugfix: Wait for permanent session submit completion
  • Loading branch information
pdambrauskas authored May 11, 2022
2 parents d534b19 + 863014b commit a8f5824
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,17 @@ public Waitable launch(Application application, Consumer<Throwable> errorHandler

@SchedulerLock(name = "keepPermanentSession")
@Scheduled(fixedRate = "1m")
public void keepPermanentSessions() {
public void keepPermanentSessions() throws InterruptedException {
assertLocked();
appConfiguration.getSessionConfiguration().getPermanentSessions().forEach(sessionConf -> {
for (var sessionConf : appConfiguration.getSessionConfiguration().getPermanentSessions()) {
var session = sessionService.fetchOne(sessionConf.getId());
if (session.map(Application::getState).filter(this::running).isEmpty() ||
session.flatMap(backend::getInfo).map(ApplicationInfo::getState).filter(this::running).isEmpty()) {
sessionService.deleteOne(sessionConf.getId());
launchSession(sessionService.createSession(sessionConf.getSubmitParams(), sessionConf.getId()));
launchSession(sessionService.createSession(sessionConf.getSubmitParams(), sessionConf.getId()))
.waitCompletion();
}
});
}
}

@SchedulerLock(name = "processScheduledSessions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.exacaster.lighter.application.ApplicationState
import com.exacaster.lighter.application.ApplicationStatusHandler
import com.exacaster.lighter.application.sessions.processors.StatementHandler
import com.exacaster.lighter.backend.Backend
import com.exacaster.lighter.concurrency.EmptyWaitable
import com.exacaster.lighter.configuration.AppConfiguration
import net.javacrumbs.shedlock.core.LockAssert
import spock.lang.Specification
Expand Down Expand Up @@ -111,7 +112,7 @@ class SessionHandlerTest extends Specification {
then: "restart permanent session"
1 * service.deleteOne(session.id)
1 * service.createSession(session.submitParams, session.id) >> permanentSession
1 * handler.launch(permanentSession, _) >> {}
1 * handler.launch(permanentSession, _) >> EmptyWaitable.INSTANCE
}

def app() {
Expand Down

0 comments on commit a8f5824

Please sign in to comment.