diff --git a/core/src/main/java/io/kestra/core/repositories/WorkerJobRunningRepositoryInterface.java b/core/src/main/java/io/kestra/core/repositories/WorkerJobRunningRepositoryInterface.java index 1dbb640109c..8b6b1c9a363 100644 --- a/core/src/main/java/io/kestra/core/repositories/WorkerJobRunningRepositoryInterface.java +++ b/core/src/main/java/io/kestra/core/repositories/WorkerJobRunningRepositoryInterface.java @@ -3,7 +3,6 @@ import io.kestra.core.runners.WorkerJobRunning; -import java.util.List; import java.util.Optional; public interface WorkerJobRunningRepositoryInterface { @@ -11,5 +10,4 @@ public interface WorkerJobRunningRepositoryInterface { void deleteByTaskRunId(String taskRunId); - List getWorkerJobWithWorkerDead(List workersAlive); } diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java index ea84bb1140e..bcec06b24b8 100644 --- a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java @@ -4,7 +4,7 @@ import io.kestra.core.queues.WorkerJobQueueInterface; import io.kestra.core.runners.WorkerJob; import io.kestra.core.utils.Either; -import io.kestra.jdbc.JdbcPostgresWorkerJobQueueService; +import io.kestra.jdbc.JdbcWorkerJobQueueService; import io.micronaut.context.ApplicationContext; import lombok.extern.slf4j.Slf4j; @@ -12,29 +12,29 @@ @Slf4j public class H2WorkerJobQueue implements WorkerJobQueueInterface { - private final JdbcPostgresWorkerJobQueueService jdbcPostgresWorkerJobQueueService; + private final JdbcWorkerJobQueueService jdbcworkerjobQueueService; public H2WorkerJobQueue(ApplicationContext applicationContext) { - this.jdbcPostgresWorkerJobQueueService = applicationContext.getBean(JdbcPostgresWorkerJobQueueService.class); + this.jdbcworkerjobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class); } @Override public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { - return jdbcPostgresWorkerJobQueueService.receive(consumerGroup, queueType, consumer); + return jdbcworkerjobQueueService.receive(consumerGroup, queueType, consumer); } @Override public void pause() { - jdbcPostgresWorkerJobQueueService.pause(); + jdbcworkerjobQueueService.pause(); } @Override public void cleanup() { - jdbcPostgresWorkerJobQueueService.cleanup(); + jdbcworkerjobQueueService.cleanup(); } @Override public void close() { - jdbcPostgresWorkerJobQueueService.close(); + jdbcworkerjobQueueService.close(); } } diff --git a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerJobQueue.java b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerJobQueue.java index 73a93a60eee..e50e543e362 100644 --- a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerJobQueue.java +++ b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerJobQueue.java @@ -4,7 +4,7 @@ import io.kestra.core.queues.WorkerJobQueueInterface; import io.kestra.core.runners.WorkerJob; import io.kestra.core.utils.Either; -import io.kestra.jdbc.JdbcPostgresWorkerJobQueueService; +import io.kestra.jdbc.JdbcWorkerJobQueueService; import io.micronaut.context.ApplicationContext; import lombok.extern.slf4j.Slf4j; @@ -12,29 +12,29 @@ @Slf4j public class MysqlWorkerJobQueue implements WorkerJobQueueInterface { - private final JdbcPostgresWorkerJobQueueService jdbcPostgresWorkerJobQueueService; + private final JdbcWorkerJobQueueService jdbcworkerjobQueueService; public MysqlWorkerJobQueue(ApplicationContext applicationContext) { - this.jdbcPostgresWorkerJobQueueService = applicationContext.getBean(JdbcPostgresWorkerJobQueueService.class); + this.jdbcworkerjobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class); } @Override public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { - return jdbcPostgresWorkerJobQueueService.receive(consumerGroup, queueType, consumer); + return jdbcworkerjobQueueService.receive(consumerGroup, queueType, consumer); } @Override public void pause() { - jdbcPostgresWorkerJobQueueService.pause(); + jdbcworkerjobQueueService.pause(); } @Override public void cleanup() { - jdbcPostgresWorkerJobQueueService.cleanup(); + jdbcworkerjobQueueService.cleanup(); } @Override public void close() { - jdbcPostgresWorkerJobQueueService.close(); + jdbcworkerjobQueueService.close(); } } diff --git a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerJobQueue.java b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerJobQueue.java index 1de20d880f7..39a86e251f3 100644 --- a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerJobQueue.java +++ b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerJobQueue.java @@ -4,7 +4,7 @@ import io.kestra.core.queues.WorkerJobQueueInterface; import io.kestra.core.runners.WorkerJob; import io.kestra.core.utils.Either; -import io.kestra.jdbc.JdbcPostgresWorkerJobQueueService; +import io.kestra.jdbc.JdbcWorkerJobQueueService; import io.micronaut.context.ApplicationContext; import lombok.extern.slf4j.Slf4j; @@ -12,29 +12,29 @@ @Slf4j public class PostgresWorkerJobQueue implements WorkerJobQueueInterface { - private final JdbcPostgresWorkerJobQueueService jdbcPostgresWorkerJobQueueService; + private final JdbcWorkerJobQueueService jdbcworkerjobQueueService; public PostgresWorkerJobQueue(ApplicationContext applicationContext) { - this.jdbcPostgresWorkerJobQueueService = applicationContext.getBean(JdbcPostgresWorkerJobQueueService.class); + this.jdbcworkerjobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class); } @Override public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { - return jdbcPostgresWorkerJobQueueService.receive(consumerGroup, queueType, consumer); + return jdbcworkerjobQueueService.receive(consumerGroup, queueType, consumer); } @Override public void pause() { - jdbcPostgresWorkerJobQueueService.pause(); + jdbcworkerjobQueueService.pause(); } @Override public void cleanup() { - jdbcPostgresWorkerJobQueueService.cleanup(); + jdbcworkerjobQueueService.cleanup(); } @Override public void close() { - jdbcPostgresWorkerJobQueueService.close(); + jdbcworkerjobQueueService.close(); } } diff --git a/jdbc/src/main/java/io/kestra/jdbc/JdbcPostgresWorkerJobQueueService.java b/jdbc/src/main/java/io/kestra/jdbc/JdbcWorkerJobQueueService.java similarity index 96% rename from jdbc/src/main/java/io/kestra/jdbc/JdbcPostgresWorkerJobQueueService.java rename to jdbc/src/main/java/io/kestra/jdbc/JdbcWorkerJobQueueService.java index 49a5eb04986..f995a12d443 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/JdbcPostgresWorkerJobQueueService.java +++ b/jdbc/src/main/java/io/kestra/jdbc/JdbcWorkerJobQueueService.java @@ -18,7 +18,7 @@ @Singleton @Slf4j -public class JdbcPostgresWorkerJobQueueService { +public class JdbcWorkerJobQueueService { private final JdbcQueue workerTaskQueue; private final JdbcHeartbeat jdbcHeartbeat; private final AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository; @@ -27,7 +27,7 @@ public class JdbcPostgresWorkerJobQueueService { private WorkerInstance workerInstance; @SuppressWarnings("unchecked") - public JdbcPostgresWorkerJobQueueService(ApplicationContext applicationContext) { + public JdbcWorkerJobQueueService(ApplicationContext applicationContext) { this.workerTaskQueue = (JdbcQueue) applicationContext.getBean( QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED) diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerInstanceRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerInstanceRepository.java index b46b3e0c4ce..51559568836 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerInstanceRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerInstanceRepository.java @@ -6,17 +6,14 @@ import jakarta.inject.Singleton; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.jooq.Record1; -import org.jooq.SelectConditionStep; -import org.jooq.SelectForUpdateOfStep; -import org.jooq.SelectWhereStep; +import org.jooq.*; import org.jooq.impl.DSL; import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; @Singleton @Getter @@ -39,7 +36,7 @@ public Optional findByWorkerUuid(String workerUuid) { return this.jdbcRepository .getDslContextWrapper() .transactionResult(configuration -> { - SelectConditionStep> select = this.heartbeatSelectQuery(configuration, workerUuid); + SelectConditionStep> select = this.heartbeatSelectQuery(DSL.using(configuration), workerUuid); return this.jdbcRepository.fetchOne(select); }); @@ -50,7 +47,7 @@ public Optional heartbeatCheckUp(String workerUuid) { .getDslContextWrapper() .transactionResult(configuration -> { SelectForUpdateOfStep> select = - this.heartbeatSelectQuery(configuration, workerUuid) + this.heartbeatSelectQuery(DSL.using(configuration), workerUuid) .forUpdate(); Optional workerInstance = this.jdbcRepository.fetchOne(select); @@ -64,103 +61,84 @@ public Optional heartbeatCheckUp(String workerUuid) { }); } - public void heartbeatStatusUpdate(String workerUuid) { - this.jdbcRepository - .getDslContextWrapper() - .transaction(configuration -> { - SelectForUpdateOfStep> select = - this.heartbeatSelectQuery(configuration, workerUuid) - .and(field("status").eq(WorkerInstance.Status.UP.toString())) - // We consider a heartbeat late if it's older than heartbeat missed times the frequency - .and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(getNbMissed() * getFrequency().getSeconds()))) - .forUpdate(); + public void heartbeatStatusUpdate(String workerUuid, DSLContext context) { + SelectForUpdateOfStep> select = + this.heartbeatSelectQuery(context, workerUuid) + .and(field("status").eq(WorkerInstance.Status.UP.toString())) + // We consider a heartbeat dead if it's older than heartbeat missed times the frequency + .and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(getNbMissed() * getFrequency().getSeconds()))) + .forUpdate(); - Optional workerInstance = this.jdbcRepository.fetchOne(select); + Optional workerInstance = this.jdbcRepository.fetchOne(select); - workerInstance.ifPresent(heartbeat -> { - heartbeat.setStatus(WorkerInstance.Status.DEAD); + workerInstance.ifPresent(heartbeat -> { + heartbeat.setStatus(WorkerInstance.Status.DEAD); - log.warn("Detected evicted worker: {}", heartbeat); + log.warn("Detected evicted worker: {}", heartbeat); - this.jdbcRepository.persist(heartbeat, this.jdbcRepository.persistFields(heartbeat)); - }); - }); + this.jdbcRepository.persist(heartbeat, context, this.jdbcRepository.persistFields(heartbeat)); + }); } - public void heartbeatsStatusUpdate() { - this.findAllAlive().forEach(heartbeat -> { - this.heartbeatStatusUpdate(heartbeat.getWorkerUuid().toString()); + public void heartbeatsStatusUpdate(DSLContext context) { + this.findAllAlive(context).forEach(heartbeat -> { + this.heartbeatStatusUpdate(heartbeat.getWorkerUuid().toString(), context); } ); } - public Boolean heartbeatCleanUp(String workerUuid) { - AtomicReference bool = new AtomicReference<>(false); + public void lockedWorkersUpdate(Function function) { this.jdbcRepository .getDslContextWrapper() - .transaction(configuration -> { - SelectForUpdateOfStep> select = - this.heartbeatSelectQuery(configuration, workerUuid) - // we delete worker that have dead status more than two times the times considered to be dead - .and(field("status").eq(WorkerInstance.Status.DEAD.toString())) - .and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(2 * getNbMissed() * getFrequency().getSeconds()))) - .forUpdate(); + .transactionResult(configuration -> { + DSLContext context = DSL.using(configuration); - Optional workerInstance = this.jdbcRepository.fetchOne(select); + // Update all workers status + heartbeatsStatusUpdate(context); - if(workerInstance.isPresent()) { - this.delete(workerInstance.get()); - bool.set(true); - } + function.apply(context); + + return null; }); - return bool.get(); } - public Boolean heartbeatsCleanup() { - AtomicReference bool = new AtomicReference<>(false); - this.findAllDead().forEach(heartbeat -> { - bool.set(this.heartbeatCleanUp(heartbeat.getWorkerUuid().toString())); - } - ); - return bool.get(); + public List findAll(DSLContext context) { + return this.jdbcRepository.fetch(this.heartbeatSelectAllQuery(context)); } - @Override public List findAll() { return this.jdbcRepository .getDslContextWrapper() .transactionResult(configuration -> { - SelectWhereStep> select = - this.heartbeatSelectAllQuery(configuration); - - return this.jdbcRepository.fetch(select); + DSLContext context = DSL.using(configuration); + return this.jdbcRepository.fetch(this.heartbeatSelectAllQuery(context)); }); } - public List findAllAlive() { + public List findAllAlive(DSLContext context) { return this.jdbcRepository .getDslContextWrapper() - .transactionResult(configuration -> { - SelectConditionStep> select = - this.heartbeatSelectAllQuery(configuration) - .where(field("status").eq(WorkerInstance.Status.UP.toString())); - - return this.jdbcRepository.fetch(select); - }); + .transactionResult(configuration -> this.jdbcRepository.fetch( + this.heartbeatSelectAllQuery(context) + .where(field("status").eq(WorkerInstance.Status.UP.toString())) + )); } - public List findAllDead() { + public List findAllToDelete(DSLContext context) { return this.jdbcRepository .getDslContextWrapper() - .transactionResult(configuration -> { - SelectConditionStep> select = - this.heartbeatSelectAllQuery(configuration) - .where(field("status").eq(WorkerInstance.Status.DEAD.toString())); + .transactionResult(configuration -> this.jdbcRepository.fetch( + this.heartbeatSelectAllQuery(context) + .where(field("status").eq(WorkerInstance.Status.DEAD.toString())) + .and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(2 * getNbMissed() * getFrequency().getSeconds()))) + )); + } - return this.jdbcRepository.fetch(select); - }); + public void delete(DSLContext context, WorkerInstance workerInstance) { + this.jdbcRepository.delete(context, workerInstance); } + @Override public void delete(WorkerInstance workerInstance) { this.jdbcRepository.delete(workerInstance); } @@ -171,9 +149,8 @@ public WorkerInstance save(WorkerInstance workerInstance) { return workerInstance; } - private SelectConditionStep> heartbeatSelectQuery(org.jooq.Configuration configuration, String workerUuid) { - return DSL - .using(configuration) + private SelectConditionStep> heartbeatSelectQuery(DSLContext context, String workerUuid) { + return context .select(field("value")) .from(this.jdbcRepository.getTable()) .where( @@ -181,10 +158,8 @@ private SelectConditionStep> heartbeatSelectQuery(org.jooq.Confi ); } - private SelectWhereStep> heartbeatSelectAllQuery(org.jooq.Configuration configuration) { - return DSL - .using(configuration) - .select(field("value")) + private SelectJoinStep> heartbeatSelectAllQuery(DSLContext dsl) { + return dsl.select(field("value")) .from(this.jdbcRepository.getTable()); } diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerJobRunningRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerJobRunningRepository.java index 5574eb32bfd..07c1cbeb61f 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerJobRunningRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerJobRunningRepository.java @@ -49,18 +49,14 @@ public Optional findByTaskRunId(String taskRunId) { }); } - @Override - public List getWorkerJobWithWorkerDead(List workersAlive) { - return this.jdbcRepository - .getDslContextWrapper() - .transactionResult(configuration -> DSL - .using(configuration) + public List getWorkerJobWithWorkerDead(DSLContext context, List workersToDelete) { + return context .select(field("value")) .from(this.jdbcRepository.getTable()) - .where(field("worker_uuid").notIn(workersAlive)) + .where(field("worker_uuid").in(workersToDelete)) .forUpdate() .fetch() - .map(r -> this.jdbcRepository.deserialize(r.get("value").toString())) + .map(r -> this.jdbcRepository.deserialize(r.get("value").toString()) ); } } diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index 4998bb9f2a8..0f4b46508a7 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -35,6 +35,7 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; +import org.jooq.DSLContext; import org.slf4j.event.Level; import java.io.IOException; @@ -221,53 +222,53 @@ public void run() { } protected void workersUpdate() { - workerInstanceRepository.heartbeatsStatusUpdate(); - if (workerInstanceRepository.heartbeatsCleanup()) { - this.deadWorkerTaskResubmit(); - } - } + workerInstanceRepository.lockedWorkersUpdate(context -> { + List workersToDelete = workerInstanceRepository + .findAllToDelete(context); + List workersToDeleteUuids = workersToDelete.stream().map(worker -> worker.getWorkerUuid().toString()).collect(Collectors.toList()); + + // Before deleting a worker, we resubmit all his tasks + workerJobRunningRepository.getWorkerJobWithWorkerDead(context, workersToDeleteUuids) + .forEach(workerJobRunning -> { + if (workerJobRunning instanceof WorkerTaskRunning workerTaskRunning) { + workerTaskQueue.emit(WorkerTask.builder() + .taskRun(workerTaskRunning.getTaskRun()) + .task(workerTaskRunning.getTask()) + .runContext(workerTaskRunning.getRunContext()) + .build() + ); - private void deadWorkerTaskResubmit() { - List aliveWorkerUuid = workerInstanceRepository - .findAllAlive() - .stream() - .map(workerInstance -> workerInstance.getWorkerUuid().toString()) - .collect(Collectors.toList()); - - workerJobRunningRepository.getWorkerJobWithWorkerDead(aliveWorkerUuid) - .forEach(workerJobRunning -> { - if (workerJobRunning instanceof WorkerTaskRunning workerTaskRunning) { - workerTaskQueue.emit(WorkerTask.builder() - .taskRun(workerTaskRunning.getTaskRun()) - .task(workerTaskRunning.getTask()) - .runContext(workerTaskRunning.getRunContext()) - .build() - ); + log.warn( + "[namespace: {}] [flow: {}] [execution: {}] [taskrun: {}] WorkerTask is being resend", + workerTaskRunning.getTaskRun().getNamespace(), + workerTaskRunning.getTaskRun().getFlowId(), + workerTaskRunning.getTaskRun().getExecutionId(), + workerTaskRunning.getTaskRun().getId() + ); + } else if (workerJobRunning instanceof WorkerTriggerRunning workerTriggerRunning) { + workerTaskQueue.emit(WorkerTrigger.builder() + .trigger(workerTriggerRunning.getTrigger()) + .conditionContext(workerTriggerRunning.getConditionContext()) + .triggerContext(workerTriggerRunning.getTriggerContext()) + .build()); + + log.warn( + "[namespace: {}] [flow: {}] [trigger: {}] WorkerTrigger is being resend", + workerTriggerRunning.getTriggerContext().getNamespace(), + workerTriggerRunning.getTriggerContext().getFlowId(), + workerTriggerRunning.getTriggerContext().getTriggerId() + ); + } else { + throw new IllegalArgumentException("Object is of type " + workerJobRunning.getClass() + " which should never occurs"); + } + }); - log.warn( - "[namespace: {}] [flow: {}] [execution: {}] [taskrun: {}] WorkerTask is being resend", - workerTaskRunning.getTaskRun().getNamespace(), - workerTaskRunning.getTaskRun().getFlowId(), - workerTaskRunning.getTaskRun().getExecutionId(), - workerTaskRunning.getTaskRun().getId() - ); - } else if (workerJobRunning instanceof WorkerTriggerRunning workerTriggerRunning) { - workerTaskQueue.emit(WorkerTrigger.builder() - .trigger(workerTriggerRunning.getTrigger()) - .conditionContext(workerTriggerRunning.getConditionContext()) - .triggerContext(workerTriggerRunning.getTriggerContext()) - .build()); - - log.warn( - "[namespace: {}] [flow: {}] [trigger: {}] WorkerTrigger is being resend", - workerTriggerRunning.getTriggerContext().getNamespace(), - workerTriggerRunning.getTriggerContext().getFlowId(), - workerTriggerRunning.getTriggerContext().getTriggerId() - ); - } else { - throw new IllegalArgumentException("Object is of type " + workerJobRunning.getClass() + " which should never occurs"); - } + workersToDelete.forEach(worker -> { + workerInstanceRepository.delete(context, worker); }); + + return null; + }); } private void executionQueue(Either either) { diff --git a/jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcWorkerInstanceRepositoryTest.java b/jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcWorkerInstanceRepositoryTest.java index 311cbae92ab..0c930c82027 100644 --- a/jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcWorkerInstanceRepositoryTest.java +++ b/jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcWorkerInstanceRepositoryTest.java @@ -5,6 +5,8 @@ import io.kestra.jdbc.JooqDSLContextWrapper; import io.micronaut.test.extensions.junit5.annotation.MicronautTest; import jakarta.inject.Inject; +import org.jooq.DSLContext; +import org.jooq.impl.DSL; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -51,13 +53,17 @@ protected void delete() { WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString()); workerInstanceRepository.save(workerInstance); - Optional find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString()); - assertThat(find.isPresent(), is(true)); - assertThat(find.get().getWorkerUuid(), is(workerInstance.getWorkerUuid())); + dslContextWrapper.transaction(configuration -> { + DSLContext context = DSL.using(configuration); + + Optional find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString()); + assertThat(find.isPresent(), is(true)); + assertThat(find.get().getWorkerUuid(), is(workerInstance.getWorkerUuid())); - workerInstanceRepository.delete(workerInstance); - find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString()); - assertThat(find.isPresent(), is(false)); + workerInstanceRepository.delete(context, workerInstance); + find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString()); + assertThat(find.isPresent(), is(false)); + }); } @Test @@ -70,14 +76,18 @@ protected void findAll() { workerInstanceRepository.save(workerInstanceAlive); workerInstanceRepository.save(workerInstanceDead); - List finds = workerInstanceRepository.findAll(); - assertThat(finds.size(), is(3)); + dslContextWrapper.transaction(configuration -> { + DSLContext context = DSL.using(configuration); - finds = workerInstanceRepository.findAllAlive(); - assertThat(finds.size(), is(2)); + List finds = workerInstanceRepository.findAll(context); + assertThat(finds.size(), is(3)); - finds = workerInstanceRepository.findAllDead(); - assertThat(finds.size(), is(1)); + finds = workerInstanceRepository.findAllToDelete(context); + assertThat(finds.size(), is(1)); + + finds = workerInstanceRepository.findAllAlive(context); + assertThat(finds.size(), is(2)); + }); } @Test @@ -108,22 +118,14 @@ protected void heartbeatsStatusUpdate() { workerInstance.setHeartbeatDate(Instant.now().minusSeconds(3600)); workerInstanceRepository.save(workerInstance); - workerInstanceRepository.heartbeatsStatusUpdate(); - Optional find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString()); - assertThat(find.isPresent(), is(true)); - assertThat(find.get().getStatus(), is(WorkerInstance.Status.DEAD)); - } - - @Test - protected void heartbeatsCleanup() { - WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString()); - workerInstance.setHeartbeatDate(Instant.now().minusSeconds(3600)); - workerInstance.setStatus(WorkerInstance.Status.DEAD); - workerInstanceRepository.save(workerInstance); + dslContextWrapper.transaction(configuration -> { + DSLContext context = DSL.using(configuration); - workerInstanceRepository.heartbeatsCleanup(); - Optional find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString()); - assertThat(find.isPresent(), is(false)); + workerInstanceRepository.heartbeatsStatusUpdate(context); + Optional find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString()); + assertThat(find.isPresent(), is(true)); + assertThat(find.get().getStatus(), is(WorkerInstance.Status.DEAD)); + }); } private WorkerInstance createWorkerInstance(String workerUuid, Boolean alive) { @@ -135,6 +137,7 @@ private WorkerInstance createWorkerInstance(String workerUuid, Boolean alive) { .partitions(null) .port(0) .status(alive ? WorkerInstance.Status.UP : WorkerInstance.Status.DEAD) + .heartbeatDate(alive ? Instant.now() : Instant.now().minusSeconds(3600)) .build(); }