Skip to content

Commit

Permalink
fix(): moved filter logic into SQL clause
Browse files Browse the repository at this point in the history
  • Loading branch information
Skraye authored and tchiotludo committed Sep 18, 2023
1 parent 95acd5c commit 266c939
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import io.micronaut.context.annotation.Value;
import jakarta.inject.Singleton;
import lombok.Getter;
import org.jooq.Record1;
import org.jooq.SelectConditionStep;
import org.jooq.SelectForUpdateOfStep;
import org.jooq.SelectJoinStep;
import org.jooq.*;
import org.jooq.impl.DSL;

import java.time.Instant;
Expand Down Expand Up @@ -47,15 +44,12 @@ public Optional<WorkerHeartbeat> heartbeatCheckUp(String workerUuid) {
.transactionResult(configuration -> {
SelectForUpdateOfStep<Record1<Object>> select =
this.heartbeatSelectQuery(configuration, workerUuid)
.forUpdate();
.forUpdate();

Optional<WorkerHeartbeat> workerHeartbeat = this.jdbcRepository.fetchOne(select);

if (workerHeartbeat.isPresent()) {
if (workerHeartbeat.get().getStatus().equals(WorkerHeartbeat.Status.DEAD)) {
return Optional.empty();
}
this.save(workerHeartbeat.get().toBuilder().heartbeatDate(Instant.now()).build());
this.save(workerHeartbeat.get().toBuilder().status(WorkerHeartbeat.Status.UP).heartbeatDate(Instant.now()).build());
return workerHeartbeat;
}
return Optional.empty();
Expand All @@ -68,22 +62,22 @@ public void heartbeatStatusUpdate(String workerUuid) {
.transaction(configuration -> {
SelectForUpdateOfStep<Record1<Object>> select =
this.heartbeatSelectQuery(configuration, workerUuid)
.forUpdate();
.and(field("status").eq(WorkerHeartbeat.Status.UP.toString()))
// We consider a heartbeat late if it's older than heartbeat missed times the frequency
.and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(getNbMissed() * getFrequency())))
.forUpdate();

Optional<WorkerHeartbeat> workerHeartbeat = this.jdbcRepository.fetchOne(select);

workerHeartbeat.ifPresent(heartbeat -> {
// We consider a heartbeat late if it's older than heartbeat missed times the frequency
if (heartbeat.getHeartbeatDate().isBefore(Instant.now().minusSeconds(getNbMissed() * getFrequency()))) {
heartbeat.setStatus(WorkerHeartbeat.Status.DEAD);
this.jdbcRepository.persist(heartbeat, this.jdbcRepository.persistFields(heartbeat));
}
heartbeat.setStatus(WorkerHeartbeat.Status.DEAD);
this.jdbcRepository.persist(heartbeat, this.jdbcRepository.persistFields(heartbeat));
});
});
}

public void heartbeatsStatusUpdate() {
this.findAll().forEach(heartbeat -> {
this.findAllAlive().forEach(heartbeat -> {
this.heartbeatStatusUpdate(heartbeat.getWorkerUuid().toString());
}
);
Expand All @@ -95,22 +89,19 @@ public void heartbeatCleanUp(String workerUuid) {
.transaction(configuration -> {
SelectForUpdateOfStep<Record1<Object>> select =
this.heartbeatSelectQuery(configuration, workerUuid)
.forUpdate();
// we delete worker that have dead status more than two times the times considered to be dead
.and(field("status").eq(WorkerHeartbeat.Status.DEAD.toString()))
.and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(2 * getNbMissed() * getFrequency())))
.forUpdate();

Optional<WorkerHeartbeat> workerHeartbeat = this.jdbcRepository.fetchOne(select);

workerHeartbeat.ifPresent(heartbeat -> {
if (heartbeat.getStatus().equals(WorkerHeartbeat.Status.DEAD)
// we delete worker that have dead status more than two times the times considered to be dead
&& heartbeat.getHeartbeatDate().isBefore(Instant.now().minusSeconds(2 * getNbMissed() * getFrequency()))) {
this.delete(heartbeat);
}
});
workerHeartbeat.ifPresent(this::delete);
});
}

public void heartbeatsCleanup() {
this.findAll().forEach(heartbeat -> {
this.findAllDead().forEach(heartbeat -> {
this.heartbeatCleanUp(heartbeat.getWorkerUuid().toString());
}
);
Expand All @@ -122,10 +113,32 @@ public List<WorkerHeartbeat> findAll() {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectJoinStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable());
SelectWhereStep<Record1<Object>> select =
this.heartbeatSelectAllQuery(configuration);

return this.jdbcRepository.fetch(select);
});
}

public List<WorkerHeartbeat> findAllAlive() {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select =
this.heartbeatSelectAllQuery(configuration)
.where(field("status").eq(WorkerHeartbeat.Status.UP.toString()));

return this.jdbcRepository.fetch(select);
});
}

public List<WorkerHeartbeat> findAllDead() {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select =
this.heartbeatSelectAllQuery(configuration)
.where(field("status").eq(WorkerHeartbeat.Status.DEAD.toString()));

return this.jdbcRepository.fetch(select);
});
Expand All @@ -151,4 +164,11 @@ private SelectConditionStep<Record1<Object>> heartbeatSelectQuery(org.jooq.Confi
);
}

private SelectWhereStep<Record1<Object>> heartbeatSelectAllQuery(org.jooq.Configuration configuration) {
return DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable());
}

}
21 changes: 14 additions & 7 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcHeartbeat.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.jdbc.runner;


import io.kestra.core.runners.Worker;
import io.kestra.core.runners.WorkerHeartbeat;
import io.kestra.jdbc.repository.AbstractJdbcWorkerHeartbeatRepository;
import io.micronaut.context.ApplicationContext;
Expand All @@ -23,29 +24,35 @@ public class JdbcHeartbeat {
@Inject
AbstractJdbcWorkerHeartbeatRepository workerHeartbeatRepository;

private final WorkerHeartbeat workerHeartbeat;
private WorkerHeartbeat workerHeartbeat;

private final ApplicationContext applicationContext;


public JdbcHeartbeat(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

@PostConstruct
public void initializedWorkerHeartbeat() throws UnknownHostException {
Worker worker = applicationContext.getBean(Worker.class);

public JdbcHeartbeat(ApplicationContext applicationContext) throws UnknownHostException {
// Worker worker = applicationContext.getBean(Worker.class);
this.workerHeartbeat = WorkerHeartbeat.builder()
.workerUuid(UUID.randomUUID())
.hostname(InetAddress.getLocalHost().getHostName())
.port(applicationContext.getEnvironment().getProperty("micronaut.server.port", Integer.class).orElse(8080))
.managementPort(applicationContext.getEnvironment().getProperty("endpoints.all.port", Integer.class).orElse(null))
.workerGroup(worker.getWorkerGroup())
.build();
}

@PostConstruct
public void initializedWorkerHeartbeat() {
log.trace("Initialized heartbeat of: {}", workerHeartbeat.getWorkerUuid());

workerHeartbeatRepository.save(
workerHeartbeat
);
}

@Scheduled(fixedDelay = "${kestra.heartbeat.frequency}" + "s")
@Scheduled(initialDelay = "${kestra.heartbeat.frequency}" + "s", fixedDelay = "${kestra.heartbeat.frequency}" + "s")
public void updateHeartbeat() {
log.trace("Heartbeat of: {}", workerHeartbeat.getWorkerUuid());
if (workerHeartbeatRepository.heartbeatCheckUp(workerHeartbeat.getWorkerUuid().toString()).isEmpty()) {
Expand Down

0 comments on commit 266c939

Please sign in to comment.