Skip to content

Commit

Permalink
fix(jdbc): worker update and task resubmit in only one transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
Skraye committed Sep 28, 2023
1 parent b3b9e81 commit 64ee18a
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@

import io.kestra.core.runners.WorkerJobRunning;

import java.util.List;
import java.util.Optional;

public interface WorkerJobRunningRepositoryInterface {
Optional<WorkerJobRunning> findByTaskRunId(String taskRunId);

void deleteByTaskRunId(String taskRunId);

List<WorkerJobRunning> getWorkerJobWithWorkerDead(List<String> workersAlive);
}
14 changes: 7 additions & 7 deletions jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,37 @@
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcPostgresWorkerJobQueueService;
import io.kestra.jdbc.JdbcWorkerJobQueueService;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;

import java.util.function.Consumer;

@Slf4j
public class H2WorkerJobQueue implements WorkerJobQueueInterface {
private final JdbcPostgresWorkerJobQueueService jdbcPostgresWorkerJobQueueService;
private final JdbcWorkerJobQueueService jdbcworkerjobQueueService;

public H2WorkerJobQueue(ApplicationContext applicationContext) {
this.jdbcPostgresWorkerJobQueueService = applicationContext.getBean(JdbcPostgresWorkerJobQueueService.class);
this.jdbcworkerjobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
}

@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return jdbcPostgresWorkerJobQueueService.receive(consumerGroup, queueType, consumer);
return jdbcworkerjobQueueService.receive(consumerGroup, queueType, consumer);
}

@Override
public void pause() {
jdbcPostgresWorkerJobQueueService.pause();
jdbcworkerjobQueueService.pause();
}

@Override
public void cleanup() {
jdbcPostgresWorkerJobQueueService.cleanup();
jdbcworkerjobQueueService.cleanup();
}

@Override
public void close() {
jdbcPostgresWorkerJobQueueService.close();
jdbcworkerjobQueueService.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,37 @@
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcPostgresWorkerJobQueueService;
import io.kestra.jdbc.JdbcWorkerJobQueueService;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;

import java.util.function.Consumer;

@Slf4j
public class MysqlWorkerJobQueue implements WorkerJobQueueInterface {
private final JdbcPostgresWorkerJobQueueService jdbcPostgresWorkerJobQueueService;
private final JdbcWorkerJobQueueService jdbcworkerjobQueueService;

public MysqlWorkerJobQueue(ApplicationContext applicationContext) {
this.jdbcPostgresWorkerJobQueueService = applicationContext.getBean(JdbcPostgresWorkerJobQueueService.class);
this.jdbcworkerjobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
}

@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return jdbcPostgresWorkerJobQueueService.receive(consumerGroup, queueType, consumer);
return jdbcworkerjobQueueService.receive(consumerGroup, queueType, consumer);
}

@Override
public void pause() {
jdbcPostgresWorkerJobQueueService.pause();
jdbcworkerjobQueueService.pause();
}

@Override
public void cleanup() {
jdbcPostgresWorkerJobQueueService.cleanup();
jdbcworkerjobQueueService.cleanup();
}

@Override
public void close() {
jdbcPostgresWorkerJobQueueService.close();
jdbcworkerjobQueueService.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,37 @@
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcPostgresWorkerJobQueueService;
import io.kestra.jdbc.JdbcWorkerJobQueueService;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;

import java.util.function.Consumer;

@Slf4j
public class PostgresWorkerJobQueue implements WorkerJobQueueInterface {
private final JdbcPostgresWorkerJobQueueService jdbcPostgresWorkerJobQueueService;
private final JdbcWorkerJobQueueService jdbcworkerjobQueueService;

public PostgresWorkerJobQueue(ApplicationContext applicationContext) {
this.jdbcPostgresWorkerJobQueueService = applicationContext.getBean(JdbcPostgresWorkerJobQueueService.class);
this.jdbcworkerjobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
}

@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return jdbcPostgresWorkerJobQueueService.receive(consumerGroup, queueType, consumer);
return jdbcworkerjobQueueService.receive(consumerGroup, queueType, consumer);
}

@Override
public void pause() {
jdbcPostgresWorkerJobQueueService.pause();
jdbcworkerjobQueueService.pause();
}

@Override
public void cleanup() {
jdbcPostgresWorkerJobQueueService.cleanup();
jdbcworkerjobQueueService.cleanup();
}

@Override
public void close() {
jdbcPostgresWorkerJobQueueService.close();
jdbcworkerjobQueueService.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

@Singleton
@Slf4j
public class JdbcPostgresWorkerJobQueueService {
public class JdbcWorkerJobQueueService {
private final JdbcQueue<WorkerJob> workerTaskQueue;
private final JdbcHeartbeat jdbcHeartbeat;
private final AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
Expand All @@ -27,7 +27,7 @@ public class JdbcPostgresWorkerJobQueueService {
private WorkerInstance workerInstance;

@SuppressWarnings("unchecked")
public JdbcPostgresWorkerJobQueueService(ApplicationContext applicationContext) {
public JdbcWorkerJobQueueService(ApplicationContext applicationContext) {
this.workerTaskQueue = (JdbcQueue<WorkerJob>) applicationContext.getBean(
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jooq.Record1;
import org.jooq.SelectConditionStep;
import org.jooq.SelectForUpdateOfStep;
import org.jooq.SelectWhereStep;
import org.jooq.*;
import org.jooq.impl.DSL;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

@Singleton
@Getter
Expand All @@ -39,7 +36,7 @@ public Optional<WorkerInstance> findByWorkerUuid(String workerUuid) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = this.heartbeatSelectQuery(configuration, workerUuid);
SelectConditionStep<Record1<Object>> select = this.heartbeatSelectQuery(DSL.using(configuration), workerUuid);

return this.jdbcRepository.fetchOne(select);
});
Expand All @@ -50,7 +47,7 @@ public Optional<WorkerInstance> heartbeatCheckUp(String workerUuid) {
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectForUpdateOfStep<Record1<Object>> select =
this.heartbeatSelectQuery(configuration, workerUuid)
this.heartbeatSelectQuery(DSL.using(configuration), workerUuid)
.forUpdate();

Optional<WorkerInstance> workerInstance = this.jdbcRepository.fetchOne(select);
Expand All @@ -64,103 +61,84 @@ public Optional<WorkerInstance> heartbeatCheckUp(String workerUuid) {
});
}

public void heartbeatStatusUpdate(String workerUuid) {
this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
SelectForUpdateOfStep<Record1<Object>> select =
this.heartbeatSelectQuery(configuration, workerUuid)
.and(field("status").eq(WorkerInstance.Status.UP.toString()))
// We consider a heartbeat late if it's older than heartbeat missed times the frequency
.and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(getNbMissed() * getFrequency().getSeconds())))
.forUpdate();
public void heartbeatStatusUpdate(String workerUuid, DSLContext context) {
SelectForUpdateOfStep<Record1<Object>> select =
this.heartbeatSelectQuery(context, workerUuid)
.and(field("status").eq(WorkerInstance.Status.UP.toString()))
// We consider a heartbeat dead if it's older than heartbeat missed times the frequency
.and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(getNbMissed() * getFrequency().getSeconds())))
.forUpdate();

Optional<WorkerInstance> workerInstance = this.jdbcRepository.fetchOne(select);
Optional<WorkerInstance> workerInstance = this.jdbcRepository.fetchOne(select);

workerInstance.ifPresent(heartbeat -> {
heartbeat.setStatus(WorkerInstance.Status.DEAD);
workerInstance.ifPresent(heartbeat -> {
heartbeat.setStatus(WorkerInstance.Status.DEAD);

log.warn("Detected evicted worker: {}", heartbeat);
log.warn("Detected evicted worker: {}", heartbeat);

this.jdbcRepository.persist(heartbeat, this.jdbcRepository.persistFields(heartbeat));
});
});
this.jdbcRepository.persist(heartbeat, context, this.jdbcRepository.persistFields(heartbeat));
});
}

public void heartbeatsStatusUpdate() {
this.findAllAlive().forEach(heartbeat -> {
this.heartbeatStatusUpdate(heartbeat.getWorkerUuid().toString());
public void heartbeatsStatusUpdate(DSLContext context) {
this.findAllAlive(context).forEach(heartbeat -> {
this.heartbeatStatusUpdate(heartbeat.getWorkerUuid().toString(), context);
}
);
}

public Boolean heartbeatCleanUp(String workerUuid) {
AtomicReference<Boolean> bool = new AtomicReference<>(false);
public void lockedWorkersUpdate(Function<DSLContext, Void> function) {
this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
SelectForUpdateOfStep<Record1<Object>> select =
this.heartbeatSelectQuery(configuration, workerUuid)
// we delete worker that have dead status more than two times the times considered to be dead
.and(field("status").eq(WorkerInstance.Status.DEAD.toString()))
.and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(2 * getNbMissed() * getFrequency().getSeconds())))
.forUpdate();
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);

Optional<WorkerInstance> workerInstance = this.jdbcRepository.fetchOne(select);
// Update all workers status
heartbeatsStatusUpdate(context);

if(workerInstance.isPresent()) {
this.delete(workerInstance.get());
bool.set(true);
}
function.apply(context);

return null;
});
return bool.get();
}

public Boolean heartbeatsCleanup() {
AtomicReference<Boolean> bool = new AtomicReference<>(false);
this.findAllDead().forEach(heartbeat -> {
bool.set(this.heartbeatCleanUp(heartbeat.getWorkerUuid().toString()));
}
);
return bool.get();
public List<WorkerInstance> findAll(DSLContext context) {
return this.jdbcRepository.fetch(this.heartbeatSelectAllQuery(context));
}

@Override
public List<WorkerInstance> findAll() {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectWhereStep<Record1<Object>> select =
this.heartbeatSelectAllQuery(configuration);

return this.jdbcRepository.fetch(select);
DSLContext context = DSL.using(configuration);
return this.jdbcRepository.fetch(this.heartbeatSelectAllQuery(context));
});
}

public List<WorkerInstance> findAllAlive() {
public List<WorkerInstance> findAllAlive(DSLContext context) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select =
this.heartbeatSelectAllQuery(configuration)
.where(field("status").eq(WorkerInstance.Status.UP.toString()));

return this.jdbcRepository.fetch(select);
});
.transactionResult(configuration -> this.jdbcRepository.fetch(
this.heartbeatSelectAllQuery(context)
.where(field("status").eq(WorkerInstance.Status.UP.toString()))
));
}

public List<WorkerInstance> findAllDead() {
public List<WorkerInstance> findAllToDelete(DSLContext context) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select =
this.heartbeatSelectAllQuery(configuration)
.where(field("status").eq(WorkerInstance.Status.DEAD.toString()));
.transactionResult(configuration -> this.jdbcRepository.fetch(
this.heartbeatSelectAllQuery(context)
.where(field("status").eq(WorkerInstance.Status.DEAD.toString()))
.and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(2 * getNbMissed() * getFrequency().getSeconds())))
));
}

return this.jdbcRepository.fetch(select);
});
public void delete(DSLContext context, WorkerInstance workerInstance) {
this.jdbcRepository.delete(context, workerInstance);
}

@Override
public void delete(WorkerInstance workerInstance) {
this.jdbcRepository.delete(workerInstance);
}
Expand All @@ -171,20 +149,17 @@ public WorkerInstance save(WorkerInstance workerInstance) {
return workerInstance;
}

private SelectConditionStep<Record1<Object>> heartbeatSelectQuery(org.jooq.Configuration configuration, String workerUuid) {
return DSL
.using(configuration)
private SelectConditionStep<Record1<Object>> heartbeatSelectQuery(DSLContext context, String workerUuid) {
return context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
field("worker_uuid").eq(workerUuid)
);
}

private SelectWhereStep<Record1<Object>> heartbeatSelectAllQuery(org.jooq.Configuration configuration) {
return DSL
.using(configuration)
.select(field("value"))
private SelectJoinStep<Record1<Object>> heartbeatSelectAllQuery(DSLContext dsl) {
return dsl.select(field("value"))
.from(this.jdbcRepository.getTable());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,14 @@ public Optional<WorkerJobRunning> findByTaskRunId(String taskRunId) {
});
}

@Override
public List<WorkerJobRunning> getWorkerJobWithWorkerDead(List<String> workersAlive) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> DSL
.using(configuration)
public List<WorkerJobRunning> getWorkerJobWithWorkerDead(DSLContext context, List<String> workersToDelete) {
return context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(field("worker_uuid").notIn(workersAlive))
.where(field("worker_uuid").in(workersToDelete))
.forUpdate()
.fetch()
.map(r -> this.jdbcRepository.deserialize(r.get("value").toString()))
.map(r -> this.jdbcRepository.deserialize(r.get("value").toString())
);
}
}
Expand Down
Loading

0 comments on commit 64ee18a

Please sign in to comment.