diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerHeartbeatRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerHeartbeatRepository.java index 6b111d6dc4b..1e5559e1e76 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerHeartbeatRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerHeartbeatRepository.java @@ -5,10 +5,7 @@ import io.micronaut.context.annotation.Value; import jakarta.inject.Singleton; import lombok.Getter; -import org.jooq.Record1; -import org.jooq.SelectConditionStep; -import org.jooq.SelectForUpdateOfStep; -import org.jooq.SelectJoinStep; +import org.jooq.*; import org.jooq.impl.DSL; import java.time.Instant; @@ -47,15 +44,12 @@ public Optional heartbeatCheckUp(String workerUuid) { .transactionResult(configuration -> { SelectForUpdateOfStep> select = this.heartbeatSelectQuery(configuration, workerUuid) - .forUpdate(); + .forUpdate(); Optional workerHeartbeat = this.jdbcRepository.fetchOne(select); if (workerHeartbeat.isPresent()) { - if (workerHeartbeat.get().getStatus().equals(WorkerHeartbeat.Status.DEAD)) { - return Optional.empty(); - } - this.save(workerHeartbeat.get().toBuilder().heartbeatDate(Instant.now()).build()); + this.save(workerHeartbeat.get().toBuilder().status(WorkerHeartbeat.Status.UP).heartbeatDate(Instant.now()).build()); return workerHeartbeat; } return Optional.empty(); @@ -68,22 +62,22 @@ public void heartbeatStatusUpdate(String workerUuid) { .transaction(configuration -> { SelectForUpdateOfStep> select = this.heartbeatSelectQuery(configuration, workerUuid) - .forUpdate(); + .and(field("status").eq(WorkerHeartbeat.Status.UP.toString())) + // We consider a heartbeat late if it's older than heartbeat missed times the frequency + .and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(getNbMissed() * getFrequency()))) + .forUpdate(); Optional workerHeartbeat = this.jdbcRepository.fetchOne(select); workerHeartbeat.ifPresent(heartbeat -> { - // We consider a heartbeat late if it's older than heartbeat missed times the frequency - if (heartbeat.getHeartbeatDate().isBefore(Instant.now().minusSeconds(getNbMissed() * getFrequency()))) { - heartbeat.setStatus(WorkerHeartbeat.Status.DEAD); - this.jdbcRepository.persist(heartbeat, this.jdbcRepository.persistFields(heartbeat)); - } + heartbeat.setStatus(WorkerHeartbeat.Status.DEAD); + this.jdbcRepository.persist(heartbeat, this.jdbcRepository.persistFields(heartbeat)); }); }); } public void heartbeatsStatusUpdate() { - this.findAll().forEach(heartbeat -> { + this.findAllAlive().forEach(heartbeat -> { this.heartbeatStatusUpdate(heartbeat.getWorkerUuid().toString()); } ); @@ -95,22 +89,19 @@ public void heartbeatCleanUp(String workerUuid) { .transaction(configuration -> { SelectForUpdateOfStep> select = this.heartbeatSelectQuery(configuration, workerUuid) - .forUpdate(); + // we delete worker that have dead status more than two times the times considered to be dead + .and(field("status").eq(WorkerHeartbeat.Status.DEAD.toString())) + .and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(2 * getNbMissed() * getFrequency()))) + .forUpdate(); Optional workerHeartbeat = this.jdbcRepository.fetchOne(select); - workerHeartbeat.ifPresent(heartbeat -> { - if (heartbeat.getStatus().equals(WorkerHeartbeat.Status.DEAD) - // we delete worker that have dead status more than two times the times considered to be dead - && heartbeat.getHeartbeatDate().isBefore(Instant.now().minusSeconds(2 * getNbMissed() * getFrequency()))) { - this.delete(heartbeat); - } - }); + workerHeartbeat.ifPresent(this::delete); }); } public void heartbeatsCleanup() { - this.findAll().forEach(heartbeat -> { + this.findAllDead().forEach(heartbeat -> { this.heartbeatCleanUp(heartbeat.getWorkerUuid().toString()); } ); @@ -122,10 +113,32 @@ public List findAll() { return this.jdbcRepository .getDslContextWrapper() .transactionResult(configuration -> { - SelectJoinStep> select = DSL - .using(configuration) - .select(field("value")) - .from(this.jdbcRepository.getTable()); + SelectWhereStep> select = + this.heartbeatSelectAllQuery(configuration); + + return this.jdbcRepository.fetch(select); + }); + } + + public List findAllAlive() { + return this.jdbcRepository + .getDslContextWrapper() + .transactionResult(configuration -> { + SelectConditionStep> select = + this.heartbeatSelectAllQuery(configuration) + .where(field("status").eq(WorkerHeartbeat.Status.UP.toString())); + + return this.jdbcRepository.fetch(select); + }); + } + + public List findAllDead() { + return this.jdbcRepository + .getDslContextWrapper() + .transactionResult(configuration -> { + SelectConditionStep> select = + this.heartbeatSelectAllQuery(configuration) + .where(field("status").eq(WorkerHeartbeat.Status.DEAD.toString())); return this.jdbcRepository.fetch(select); }); @@ -151,4 +164,11 @@ private SelectConditionStep> heartbeatSelectQuery(org.jooq.Confi ); } + private SelectWhereStep> heartbeatSelectAllQuery(org.jooq.Configuration configuration) { + return DSL + .using(configuration) + .select(field("value")) + .from(this.jdbcRepository.getTable()); + } + } diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcHeartbeat.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcHeartbeat.java index 854459b4325..790e8c46d15 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcHeartbeat.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcHeartbeat.java @@ -1,6 +1,7 @@ package io.kestra.jdbc.runner; +import io.kestra.core.runners.Worker; import io.kestra.core.runners.WorkerHeartbeat; import io.kestra.jdbc.repository.AbstractJdbcWorkerHeartbeatRepository; import io.micronaut.context.ApplicationContext; @@ -23,21 +24,27 @@ public class JdbcHeartbeat { @Inject AbstractJdbcWorkerHeartbeatRepository workerHeartbeatRepository; - private final WorkerHeartbeat workerHeartbeat; + private WorkerHeartbeat workerHeartbeat; + private final ApplicationContext applicationContext; + + + public JdbcHeartbeat(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + + @PostConstruct + public void initializedWorkerHeartbeat() throws UnknownHostException { + Worker worker = applicationContext.getBean(Worker.class); - public JdbcHeartbeat(ApplicationContext applicationContext) throws UnknownHostException { -// Worker worker = applicationContext.getBean(Worker.class); this.workerHeartbeat = WorkerHeartbeat.builder() .workerUuid(UUID.randomUUID()) .hostname(InetAddress.getLocalHost().getHostName()) .port(applicationContext.getEnvironment().getProperty("micronaut.server.port", Integer.class).orElse(8080)) .managementPort(applicationContext.getEnvironment().getProperty("endpoints.all.port", Integer.class).orElse(null)) + .workerGroup(worker.getWorkerGroup()) .build(); - } - @PostConstruct - public void initializedWorkerHeartbeat() { log.trace("Initialized heartbeat of: {}", workerHeartbeat.getWorkerUuid()); workerHeartbeatRepository.save( @@ -45,7 +52,7 @@ public void initializedWorkerHeartbeat() { ); } - @Scheduled(fixedDelay = "${kestra.heartbeat.frequency}" + "s") + @Scheduled(initialDelay = "${kestra.heartbeat.frequency}" + "s", fixedDelay = "${kestra.heartbeat.frequency}" + "s") public void updateHeartbeat() { log.trace("Heartbeat of: {}", workerHeartbeat.getWorkerUuid()); if (workerHeartbeatRepository.heartbeatCheckUp(workerHeartbeat.getWorkerUuid().toString()).isEmpty()) {