Skip to content

Commit

Permalink
Merge pull request #26 from exacaster/launch_schedules_after_job_comp…
Browse files Browse the repository at this point in the history
…letion

Launch scheduled jobs right after job completion
  • Loading branch information
pdambrauskas authored Feb 11, 2022
2 parents d54b678 + 218446d commit 2bbe429
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ public void processApplicationIdle(Application application) {
);
}

public void processApplicationRunning(Application app) {
backend.getInfo(app).ifPresentOrElse(
info -> trackStatus(app, info),
() -> checkZombie(app)
);
public ApplicationState processApplicationRunning(Application app) {
return backend.getInfo(app)
.map(info -> trackStatus(app, info))
.orElseGet(() -> checkZombie(app));
}

public void processApplicationError(Application application, Throwable error) {
Expand All @@ -69,7 +68,7 @@ public void processApplicationError(Application application, Throwable error) {
);
}

private void trackStatus(Application app, ApplicationInfo info) {
private ApplicationState trackStatus(Application app, ApplicationInfo info) {
LOG.info("Tracking {}, info: {}", app, info);
applicationStorage.saveApplication(ApplicationBuilder.builder(app)
.setState(info.getState())
Expand All @@ -80,9 +79,11 @@ private void trackStatus(Application app, ApplicationInfo info) {
if (info.getState().isComplete()) {
backend.getLogs(app).ifPresent(logService::save);
}

return info.getState();
}

private void checkZombie(Application app) {
private ApplicationState checkZombie(Application app) {
LOG.info("No info for {}", app);
if (app.getContactedAt() != null && app.getContactedAt().isBefore(LocalDateTime.now().minusMinutes(30))) {
LOG.info("Assuming zombie ({})", app.getId());
Expand All @@ -91,6 +92,8 @@ private void checkZombie(Application app) {
.build());
logService.save(new Log(app.getId(),
"Application was not reachable for 10 minutes, so we assume something went wrong"));
return ApplicationState.ERROR;
}
return app.getState();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,19 @@ private Integer countEmptySlots() {
}

@SchedulerLock(name = "trackRunning")
@Scheduled(fixedRate = "2m")
@Scheduled(fixedRate = "1m")
public void trackRunning() {
assertLocked();
batchService.fetchRunning().forEach(statusTracker::processApplicationRunning);
var completedCount = batchService.fetchRunning().stream()
.map(statusTracker::processApplicationRunning)
.filter(ApplicationState::isComplete)
.count();
LOG.info("Completed {} jobs", completedCount);

// If there are completed jobs, we can launch scheduled jobs immediately
if (completedCount > 0) {
this.processScheduledBatches();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,27 +85,30 @@ class ApplicationStatusHandlerTest extends Specification {

when:
1 * backend.getInfo(application) >> Optional.of(new ApplicationInfo(ApplicationState.BUSY, "we"))
handler.processApplicationRunning(application)
def returnedState = handler.processApplicationRunning(application)
def updated = storage.findApplication(application.id).get()

then:
returnedState == updated.state
updated.state == ApplicationState.BUSY

when:
1 * backend.getInfo(updated) >> Optional.empty()
handler.processApplicationRunning(updated)
returnedState = handler.processApplicationRunning(updated)
updated = storage.findApplication(application.id).get()

then:
returnedState == updated.state
updated.state == ApplicationState.BUSY

when: "No info for more than 30 mins"
updated = storage.saveApplication(ApplicationBuilder.builder(updated).setContactedAt(LocalDateTime.now().minusHours(1)).build())
1 * backend.getInfo(updated) >> Optional.empty()
handler.processApplicationRunning(updated)
returnedState = handler.processApplicationRunning(updated)
updated = storage.findApplication(application.id).get()

then:
returnedState == updated.state
updated.state == ApplicationState.ERROR
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,25 @@ class BatchHandlerTest extends Specification {
_ * service.fetchByState(ApplicationState.NOT_STARTED, config.getMaxRunningJobs() - 1) >> []
0 * handler.launch(app, _) >> { }
}

def "tracks running jobs"() {
given:
def app = newApplication()
service.fetchRunning() >> [app]
service.fetchByState(*_) >> []

when:
handler.trackRunning()

then:
1 * statusHandler.processApplicationRunning(app) >> ApplicationState.BUSY
0 * handler.processScheduledBatches()

when:
handler.trackRunning()

then:
1 * statusHandler.processApplicationRunning(app) >> ApplicationState.SUCCESS
1 * handler.processScheduledBatches()
}
}

0 comments on commit 2bbe429

Please sign in to comment.