From 218446d54c74bbe07b8abda3592afafc7aeabf66 Mon Sep 17 00:00:00 2001 From: paulius Date: Fri, 11 Feb 2022 08:55:31 +0200 Subject: [PATCH] Launch scheduled jobs right after job completion --- .../application/ApplicationStatusHandler.java | 17 ++++++++------- .../application/batch/BatchHandler.java | 13 ++++++++++-- .../ApplicationStatusHandlerTest.groovy | 9 +++++--- .../application/batch/BatchHandlerTest.groovy | 21 +++++++++++++++++++ 4 files changed, 48 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/com/exacaster/lighter/application/ApplicationStatusHandler.java b/server/src/main/java/com/exacaster/lighter/application/ApplicationStatusHandler.java index e5470040..6e66bd9d 100644 --- a/server/src/main/java/com/exacaster/lighter/application/ApplicationStatusHandler.java +++ b/server/src/main/java/com/exacaster/lighter/application/ApplicationStatusHandler.java @@ -45,11 +45,10 @@ public void processApplicationIdle(Application application) { ); } - public void processApplicationRunning(Application app) { - backend.getInfo(app).ifPresentOrElse( - info -> trackStatus(app, info), - () -> checkZombie(app) - ); + public ApplicationState processApplicationRunning(Application app) { + return backend.getInfo(app) + .map(info -> trackStatus(app, info)) + .orElseGet(() -> checkZombie(app)); } public void processApplicationError(Application application, Throwable error) { @@ -69,7 +68,7 @@ public void processApplicationError(Application application, Throwable error) { ); } - private void trackStatus(Application app, ApplicationInfo info) { + private ApplicationState trackStatus(Application app, ApplicationInfo info) { LOG.info("Tracking {}, info: {}", app, info); applicationStorage.saveApplication(ApplicationBuilder.builder(app) .setState(info.getState()) @@ -80,9 +79,11 @@ private void trackStatus(Application app, ApplicationInfo info) { if (info.getState().isComplete()) { backend.getLogs(app).ifPresent(logService::save); } + + return info.getState(); } - private void checkZombie(Application app) { + private ApplicationState checkZombie(Application app) { LOG.info("No info for {}", app); if (app.getContactedAt() != null && app.getContactedAt().isBefore(LocalDateTime.now().minusMinutes(30))) { LOG.info("Assuming zombie ({})", app.getId()); @@ -91,6 +92,8 @@ private void checkZombie(Application app) { .build()); logService.save(new Log(app.getId(), "Application was not reachable for 10 minutes, so we assume something went wrong")); + return ApplicationState.ERROR; } + return app.getState(); } } diff --git a/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java b/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java index dc6f8b0f..fb0699f7 100644 --- a/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java +++ b/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java @@ -58,10 +58,19 @@ private Integer countEmptySlots() { } @SchedulerLock(name = "trackRunning") - @Scheduled(fixedRate = "2m") + @Scheduled(fixedRate = "1m") public void trackRunning() { assertLocked(); - batchService.fetchRunning().forEach(statusTracker::processApplicationRunning); + var completedCount = batchService.fetchRunning().stream() + .map(statusTracker::processApplicationRunning) + .filter(ApplicationState::isComplete) + .count(); + LOG.info("Completed {} jobs", completedCount); + + // If there are completed jobs, we can launch scheduled jobs immediately + if (completedCount > 0) { + this.processScheduledBatches(); + } } } diff --git a/server/src/test/groovy/com/exacaster/lighter/application/ApplicationStatusHandlerTest.groovy b/server/src/test/groovy/com/exacaster/lighter/application/ApplicationStatusHandlerTest.groovy index 4ee9bada..16015466 100644 --- a/server/src/test/groovy/com/exacaster/lighter/application/ApplicationStatusHandlerTest.groovy +++ b/server/src/test/groovy/com/exacaster/lighter/application/ApplicationStatusHandlerTest.groovy @@ -85,27 +85,30 @@ class ApplicationStatusHandlerTest extends Specification { when: 1 * backend.getInfo(application) >> Optional.of(new ApplicationInfo(ApplicationState.BUSY, "we")) - handler.processApplicationRunning(application) + def returnedState = handler.processApplicationRunning(application) def updated = storage.findApplication(application.id).get() then: + returnedState == updated.state updated.state == ApplicationState.BUSY when: 1 * backend.getInfo(updated) >> Optional.empty() - handler.processApplicationRunning(updated) + returnedState = handler.processApplicationRunning(updated) updated = storage.findApplication(application.id).get() then: + returnedState == updated.state updated.state == ApplicationState.BUSY when: "No info for more than 30 mins" updated = storage.saveApplication(ApplicationBuilder.builder(updated).setContactedAt(LocalDateTime.now().minusHours(1)).build()) 1 * backend.getInfo(updated) >> Optional.empty() - handler.processApplicationRunning(updated) + returnedState = handler.processApplicationRunning(updated) updated = storage.findApplication(application.id).get() then: + returnedState == updated.state updated.state == ApplicationState.ERROR } } diff --git a/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy b/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy index 6678bb17..7578f63c 100644 --- a/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy +++ b/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy @@ -52,4 +52,25 @@ class BatchHandlerTest extends Specification { _ * service.fetchByState(ApplicationState.NOT_STARTED, config.getMaxRunningJobs() - 1) >> [] 0 * handler.launch(app, _) >> { } } + + def "tracks running jobs"() { + given: + def app = newApplication() + service.fetchRunning() >> [app] + service.fetchByState(*_) >> [] + + when: + handler.trackRunning() + + then: + 1 * statusHandler.processApplicationRunning(app) >> ApplicationState.BUSY + 0 * handler.processScheduledBatches() + + when: + handler.trackRunning() + + then: + 1 * statusHandler.processApplicationRunning(app) >> ApplicationState.SUCCESS + 1 * handler.processScheduledBatches() + } }