diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index 769a074a743..4dc7c3d8724 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -124,6 +124,9 @@ kestra: workerheartbeat: cls: io.kestra.core.runners.WorkerHeartbeat table: "worker_heartbeat" + workerjobrunning: + cls: io.kestra.core.runners.WorkerJobRunning + table: "worker_job_running" queues: min-poll-interval: 25ms @@ -170,5 +173,5 @@ kestra: fixed-delay: 1h heartbeat: - frequency: 10 + frequency: 5 heartbeat-missed: 3 \ No newline at end of file diff --git a/core/src/main/java/io/kestra/core/queues/QueueService.java b/core/src/main/java/io/kestra/core/queues/QueueService.java index adcd0a08482..0bd59bf642e 100644 --- a/core/src/main/java/io/kestra/core/queues/QueueService.java +++ b/core/src/main/java/io/kestra/core/queues/QueueService.java @@ -1,10 +1,7 @@ package io.kestra.core.queues; import io.kestra.core.models.Setting; -import io.kestra.core.models.executions.Execution; -import io.kestra.core.models.executions.ExecutionKilled; -import io.kestra.core.models.executions.LogEntry; -import io.kestra.core.models.executions.MetricEntry; +import io.kestra.core.models.executions.*; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.templates.Template; import io.kestra.core.models.topologies.FlowTopology; diff --git a/core/src/main/java/io/kestra/core/repositories/WorkerJobRunningInterface.java b/core/src/main/java/io/kestra/core/repositories/WorkerJobRunningInterface.java new file mode 100644 index 00000000000..b836018049c --- /dev/null +++ b/core/src/main/java/io/kestra/core/repositories/WorkerJobRunningInterface.java @@ -0,0 +1,16 @@ +package io.kestra.core.repositories; + +import io.kestra.core.runners.WorkerJobRunning; + +import java.util.List; +import java.util.Optional; + +public interface WorkerJobRunningInterface { + Optional findByTaskRunId(String taskRunId); + + void delete(String taskRunId); + + WorkerJobRunning save(WorkerJobRunning workerJobRunning); + + List getWorkerJobWithWorkerDead(List workersAlive); +} diff --git a/core/src/main/java/io/kestra/core/runners/WorkerHeartbeat.java b/core/src/main/java/io/kestra/core/runners/WorkerHeartbeat.java index 74ab9bb6369..283105f9a33 100644 --- a/core/src/main/java/io/kestra/core/runners/WorkerHeartbeat.java +++ b/core/src/main/java/io/kestra/core/runners/WorkerHeartbeat.java @@ -1,9 +1,6 @@ package io.kestra.core.runners; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.ToString; +import lombok.*; import lombok.experimental.SuperBuilder; import java.time.Instant; @@ -19,6 +16,16 @@ public class WorkerHeartbeat extends WorkerInstance { @Builder.Default private Instant heartbeatDate = Instant.now(); + public WorkerInstance toWorkerInstance() { + return WorkerInstance.builder() + .workerUuid(this.getWorkerUuid()) + .hostname(this.getHostname()) + .port(this.getPort()) + .managementPort(this.getManagementPort()) + .workerGroup(this.getWorkerGroup()) + .build(); + } + public enum Status { UP, DEAD } diff --git a/core/src/main/java/io/kestra/core/runners/WorkerInstance.java b/core/src/main/java/io/kestra/core/runners/WorkerInstance.java index d1b5ba77069..3bb169ec8d9 100644 --- a/core/src/main/java/io/kestra/core/runners/WorkerInstance.java +++ b/core/src/main/java/io/kestra/core/runners/WorkerInstance.java @@ -1,9 +1,6 @@ package io.kestra.core.runners; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.ToString; +import lombok.*; import lombok.experimental.SuperBuilder; import javax.validation.constraints.NotNull; @@ -15,6 +12,7 @@ @SuperBuilder(toBuilder = true) @ToString @NoArgsConstructor +@Getter public class WorkerInstance { @NotNull private UUID workerUuid; diff --git a/core/src/main/java/io/kestra/core/runners/WorkerJob.java b/core/src/main/java/io/kestra/core/runners/WorkerJob.java index 81c2b205e37..be891e3b36e 100644 --- a/core/src/main/java/io/kestra/core/runners/WorkerJob.java +++ b/core/src/main/java/io/kestra/core/runners/WorkerJob.java @@ -12,4 +12,6 @@ public abstract class WorkerJob { abstract public String getType(); abstract public String uid(); + + abstract public String taskRunId(); } diff --git a/core/src/main/java/io/kestra/core/runners/WorkerTask.java b/core/src/main/java/io/kestra/core/runners/WorkerTask.java index c5977f58076..c411bfd361a 100644 --- a/core/src/main/java/io/kestra/core/runners/WorkerTask.java +++ b/core/src/main/java/io/kestra/core/runners/WorkerTask.java @@ -41,4 +41,9 @@ public Logger logger() { public String uid() { return this.taskRun.getTaskId(); } + + @Override + public String taskRunId() { + return this.taskRun.getId(); + } } diff --git a/core/src/main/java/io/kestra/core/runners/WorkerTrigger.java b/core/src/main/java/io/kestra/core/runners/WorkerTrigger.java index d5507bcd0f5..e7c17733d08 100644 --- a/core/src/main/java/io/kestra/core/runners/WorkerTrigger.java +++ b/core/src/main/java/io/kestra/core/runners/WorkerTrigger.java @@ -31,4 +31,9 @@ public class WorkerTrigger extends WorkerJob { public String uid() { return triggerContext.uid(); } + + @Override + public String taskRunId() { + return triggerContext.uid(); + } } diff --git a/core/src/main/java/io/kestra/core/runners/WorkerTriggerRunning.java b/core/src/main/java/io/kestra/core/runners/WorkerTriggerRunning.java index bf8054f967f..bc915bde8e2 100644 --- a/core/src/main/java/io/kestra/core/runners/WorkerTriggerRunning.java +++ b/core/src/main/java/io/kestra/core/runners/WorkerTriggerRunning.java @@ -4,7 +4,6 @@ import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.TriggerContext; -import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; diff --git a/jdbc-h2/src/main/java/io/kestra/repository/h2/H2WorkerJobRunningRepository.java b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2WorkerJobRunningRepository.java new file mode 100644 index 00000000000..ef20263ffc1 --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2WorkerJobRunningRepository.java @@ -0,0 +1,16 @@ +package io.kestra.repository.h2; + +import io.kestra.core.runners.WorkerJobRunning; +import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +@Singleton +@H2RepositoryEnabled +public class H2WorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository { + @Inject + public H2WorkerJobRunningRepository(ApplicationContext applicationContext) { + super(new H2Repository<>(WorkerJobRunning.class, applicationContext)); + } +} diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java index 18fde250227..b20ff83673a 100644 --- a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java @@ -4,27 +4,76 @@ import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.WorkerJobQueueInterface; -import io.kestra.core.runners.WorkerJob; +import io.kestra.core.runners.*; import io.kestra.core.utils.Either; +import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository; +import io.kestra.jdbc.runner.JdbcHeartbeat; +import io.kestra.jdbc.runner.JdbcQueue; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; +import lombok.extern.slf4j.Slf4j; +import java.net.UnknownHostException; import java.util.function.Consumer; +@Slf4j public class H2WorkerJobQueue implements WorkerJobQueueInterface { - QueueInterface workerTaskQueue; + JdbcQueue workerTaskQueue; + JdbcHeartbeat jdbcHeartbeat; + AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository; @SuppressWarnings("unchecked") public H2WorkerJobQueue(ApplicationContext applicationContext) { - this.workerTaskQueue = (QueueInterface) applicationContext.getBean( + this.workerTaskQueue = (JdbcQueue) applicationContext.getBean( QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED) ); + this.jdbcHeartbeat = applicationContext.getBean(JdbcHeartbeat.class); + this.jdbcWorkerJobRunningRepository = applicationContext.getBean(AbstractJdbcWorkerJobRunningRepository.class); } @Override public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { - return workerTaskQueue.receive(consumerGroup, queueType, consumer); + return workerTaskQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> { + + eithers.forEach(either -> { + if (either.isRight()) { + log.error("Unable to deserialize an execution: {}", either.getRight().getMessage()); + return; + } + WorkerJob workerJob = either.getLeft(); + WorkerInstance workerInstance = null; + try { + workerInstance = jdbcHeartbeat.get().toWorkerInstance(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + WorkerJobRunning workerJobRunning; + + if (workerJob instanceof WorkerTask workerTask) { + workerJobRunning = WorkerTaskRunning.of( + workerTask, + workerInstance, + 0 + ); + } else if (workerJob instanceof WorkerTrigger workerTrigger) { + workerJobRunning = WorkerTriggerRunning.of( + workerTrigger, + workerInstance, + 0 + ); + } + else { + throw new IllegalArgumentException("Message is of type " + workerJob.getClass() + " which should never occurs"); + } + + jdbcWorkerJobRunningRepository.save(workerJobRunning, dslContext); + + log.trace("Sending a workerJobRunning: {}", workerJobRunning); + }); + + eithers.forEach(consumer); + }); } @Override diff --git a/jdbc-h2/src/main/resources/migrations/h2/V1_2__initial.sql b/jdbc-h2/src/main/resources/migrations/h2/V1_2__initial.sql index 7a5df7b0461..781d490a7ea 100644 --- a/jdbc-h2/src/main/resources/migrations/h2/V1_2__initial.sql +++ b/jdbc-h2/src/main/resources/migrations/h2/V1_2__initial.sql @@ -9,4 +9,12 @@ CREATE TABLE IF NOT EXISTS worker_heartbeat ( "worker_group" VARCHAR(150) GENERATED ALWAYS AS (JQ_STRING("value",'.workerGroup')), "status" VARCHAR(10) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.status')), "heartbeat_date" TIMESTAMP NOT NULL GENERATED ALWAYS AS (PARSEDATETIME(JQ_STRING("value", '.heartbeatDate'), 'yyyy-MM-dd''T''HH:mm:ss.SSSXXX')) - ); \ No newline at end of file +); + +/* ----------------------- worker_job_running ----------------------- */ +CREATE TABLE IF NOT EXISTS worker_job_running ( + "key" VARCHAR(250) NOT NULL PRIMARY KEY, + "value" TEXT NOT NULL, + "worker_uuid" VARCHAR(36) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.workerInstance.workerUuid')), + "taskrun_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.taskRun.id')) +); \ No newline at end of file diff --git a/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlWorkerJobRunningRepository.java b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlWorkerJobRunningRepository.java new file mode 100644 index 00000000000..58a379557ca --- /dev/null +++ b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlWorkerJobRunningRepository.java @@ -0,0 +1,16 @@ +package io.kestra.repository.mysql; + +import io.kestra.core.runners.WorkerJobRunning; +import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +@Singleton +@MysqlRepositoryEnabled +public class MysqlWorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository { + @Inject + public MysqlWorkerJobRunningRepository(ApplicationContext applicationContext) { + super(new MysqlRepository<>(WorkerJobRunning.class, applicationContext)); + } +} diff --git a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerJobQueue.java b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerJobQueue.java index 8211264f010..5873cff3483 100644 --- a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerJobQueue.java +++ b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerJobQueue.java @@ -4,27 +4,76 @@ import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.WorkerJobQueueInterface; -import io.kestra.core.runners.WorkerJob; +import io.kestra.core.runners.*; import io.kestra.core.utils.Either; +import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository; +import io.kestra.jdbc.runner.JdbcHeartbeat; +import io.kestra.jdbc.runner.JdbcQueue; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; +import lombok.extern.slf4j.Slf4j; +import java.net.UnknownHostException; import java.util.function.Consumer; +@Slf4j public class MysqlWorkerJobQueue implements WorkerJobQueueInterface { - QueueInterface workerTaskQueue; + JdbcQueue workerTaskQueue; + JdbcHeartbeat jdbcHeartbeat; + AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository; @SuppressWarnings("unchecked") public MysqlWorkerJobQueue(ApplicationContext applicationContext) { - this.workerTaskQueue = (QueueInterface) applicationContext.getBean( + this.workerTaskQueue = (JdbcQueue) applicationContext.getBean( QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED) ); + this.jdbcHeartbeat = applicationContext.getBean(JdbcHeartbeat.class); + this.jdbcWorkerJobRunningRepository = applicationContext.getBean(AbstractJdbcWorkerJobRunningRepository.class); } @Override public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { - return workerTaskQueue.receive(consumerGroup, queueType, consumer); + return workerTaskQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> { + + eithers.forEach(either -> { + if (either.isRight()) { + log.error("Unable to deserialize an execution: {}", either.getRight().getMessage()); + return; + } + WorkerJob workerJob = either.getLeft(); + WorkerInstance workerInstance = null; + try { + workerInstance = jdbcHeartbeat.get().toWorkerInstance(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + WorkerJobRunning workerJobRunning; + + if (workerJob instanceof WorkerTask workerTask) { + workerJobRunning = WorkerTaskRunning.of( + workerTask, + workerInstance, + 0 + ); + } else if (workerJob instanceof WorkerTrigger workerTrigger) { + workerJobRunning = WorkerTriggerRunning.of( + workerTrigger, + workerInstance, + 0 + ); + } + else { + throw new IllegalArgumentException("Message is of type " + workerJob.getClass() + " which should never occurs"); + } + + jdbcWorkerJobRunningRepository.save(workerJobRunning, dslContext); + + log.trace("Sending a workerJobRunning: {}", workerJobRunning); + }); + + eithers.forEach(consumer); + }); } @Override diff --git a/jdbc-mysql/src/main/resources/migrations/mysql/V1_2__initial.sql b/jdbc-mysql/src/main/resources/migrations/mysql/V1_2__initial.sql index da99cd3b41b..e1f381c5349 100644 --- a/jdbc-mysql/src/main/resources/migrations/mysql/V1_2__initial.sql +++ b/jdbc-mysql/src/main/resources/migrations/mysql/V1_2__initial.sql @@ -2,12 +2,19 @@ CREATE TABLE IF NOT EXISTS worker_heartbeat ( `key` VARCHAR(250) NOT NULL PRIMARY KEY, `value` JSON NOT NULL, - `worker_uuid` VARCHAR(36)GENERATED ALWAYS AS (value ->> '$.workerUuid') STORED NOT NULL , - `hostname` VARCHAR(150)GENERATED ALWAYS AS (value ->> '$.hostname') STORED NOT NULL , + `worker_uuid` VARCHAR(36) GENERATED ALWAYS AS (value ->> '$.workerUuid') STORED NOT NULL, + `hostname` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.hostname') STORED NOT NULL, `port` INT UNSIGNED GENERATED ALWAYS AS (value ->> '$.port') STORED, `management_port` INT UNSIGNED GENERATED ALWAYS AS (value ->> '$.managementPort') STORED, `worker_group` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.workerGroup') STORED, - `status` VARCHAR(10)GENERATED ALWAYS AS (value ->> '$.status') STORED NOT NULL , + `status` VARCHAR(10)GENERATED ALWAYS AS (value ->> '$.status') STORED NOT NULL, `heartbeat_date` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.heartbeatDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL ) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; +/* ----------------------- worker_job_running ----------------------- */ +CREATE TABLE IF NOT EXISTS worker_job_running ( + `key` VARCHAR(250) NOT NULL PRIMARY KEY, + `value` JSON NOT NULL, + `worker_uuid` VARCHAR(36) GENERATED ALWAYS AS (value ->> '$.workerInstance.workerUuid') STORED NOT NULL, + `taskrun_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.taskRun.id') STORED NOT NULL + ) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; \ No newline at end of file diff --git a/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresWorkerJobRunningRepository.java b/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresWorkerJobRunningRepository.java new file mode 100644 index 00000000000..ace88ca0bed --- /dev/null +++ b/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresWorkerJobRunningRepository.java @@ -0,0 +1,16 @@ +package io.kestra.repository.postgres; + +import io.kestra.core.runners.WorkerJobRunning; +import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +@Singleton +@PostgresRepositoryEnabled +public class PostgresWorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository { + @Inject + public PostgresWorkerJobRunningRepository(ApplicationContext applicationContext) { + super(new PostgresRepository<>(WorkerJobRunning.class, applicationContext)); + } +} diff --git a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerJobQueue.java b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerJobQueue.java index 831d9b8d65d..634811a2d6e 100644 --- a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerJobQueue.java +++ b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerJobQueue.java @@ -4,29 +4,80 @@ import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.WorkerJobQueueInterface; -import io.kestra.core.runners.WorkerJob; +import io.kestra.core.runners.*; import io.kestra.core.utils.Either; +import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository; +import io.kestra.jdbc.runner.JdbcHeartbeat; +import io.kestra.jdbc.runner.JdbcQueue; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; +import lombok.extern.slf4j.Slf4j; +import java.net.UnknownHostException; import java.util.function.Consumer; +@Slf4j public class PostgresWorkerJobQueue implements WorkerJobQueueInterface { - QueueInterface workerTaskQueue; + JdbcQueue workerTaskQueue; + JdbcHeartbeat jdbcHeartbeat; + AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository; @SuppressWarnings("unchecked") public PostgresWorkerJobQueue(ApplicationContext applicationContext) { - this.workerTaskQueue = (QueueInterface) applicationContext.getBean( + this.workerTaskQueue = (JdbcQueue) applicationContext.getBean( QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED) ); + this.jdbcHeartbeat = applicationContext.getBean(JdbcHeartbeat.class); + this.jdbcWorkerJobRunningRepository = applicationContext.getBean(AbstractJdbcWorkerJobRunningRepository.class); + } @Override public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { - return workerTaskQueue.receive(consumerGroup, queueType, consumer); + return workerTaskQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> { + + eithers.forEach(either -> { + if (either.isRight()) { + log.error("Unable to deserialize an execution: {}", either.getRight().getMessage()); + return; + } + WorkerJob workerJob = either.getLeft(); + WorkerInstance workerInstance = null; + try { + workerInstance = jdbcHeartbeat.get().toWorkerInstance(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + WorkerJobRunning workerJobRunning; + + if (workerJob instanceof WorkerTask workerTask) { + workerJobRunning = WorkerTaskRunning.of( + workerTask, + workerInstance, + 0 + ); + } else if (workerJob instanceof WorkerTrigger workerTrigger) { + workerJobRunning = WorkerTriggerRunning.of( + workerTrigger, + workerInstance, + 0 + ); + } + else { + throw new IllegalArgumentException("Message is of type " + workerJob.getClass() + " which should never occurs"); + } + + jdbcWorkerJobRunningRepository.save(workerJobRunning, dslContext); + + log.trace("Sending a workerJobRunning: {}", workerJobRunning); + }); + + eithers.forEach(consumer); + }); } + @Override public void pause() { diff --git a/jdbc-postgres/src/main/resources/migrations/postgres/V1_2__initial.sql b/jdbc-postgres/src/main/resources/migrations/postgres/V1_2__initial.sql index 5dcb79e58e2..e9ef4beaed9 100644 --- a/jdbc-postgres/src/main/resources/migrations/postgres/V1_2__initial.sql +++ b/jdbc-postgres/src/main/resources/migrations/postgres/V1_2__initial.sql @@ -10,3 +10,11 @@ CREATE TABLE IF NOT EXISTS worker_heartbeat ( status VARCHAR(10) NOT NULL GENERATED ALWAYS AS (value ->> 'status') STORED, heartbeat_date TIMESTAMPTZ NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'heartbeatDate')) STORED ); + +/* ----------------------- worker_job_running ----------------------- */ +CREATE TABLE IF NOT EXISTS worker_job_running ( + key VARCHAR(250) NOT NULL PRIMARY KEY, + value JSONB NOT NULL, + worker_uuid VARCHAR(36) NOT NULL GENERATED ALWAYS AS (value -> 'workerInstance' ->> 'workerUuid') STORED, + taskrun_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value -> 'taskRun' ->> 'id') STORED +); \ No newline at end of file diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerJobRunningRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerJobRunningRepository.java new file mode 100644 index 00000000000..27aef6ea7a1 --- /dev/null +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerJobRunningRepository.java @@ -0,0 +1,97 @@ +package io.kestra.jdbc.repository; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.kestra.core.repositories.WorkerJobRunningInterface; +import io.kestra.core.runners.WorkerJobRunning; +import io.kestra.core.runners.WorkerTaskRunning; +import io.kestra.core.runners.WorkerTriggerRunning; +import io.kestra.core.serializers.JacksonMapper; +import jakarta.inject.Singleton; +import lombok.extern.slf4j.Slf4j; +import org.jooq.DSLContext; +import org.jooq.Record1; +import org.jooq.SelectConditionStep; +import org.jooq.impl.DSL; + +import java.util.List; +import java.util.Optional; + +@Singleton +@Slf4j +public abstract class AbstractJdbcWorkerJobRunningRepository extends AbstractJdbcRepository implements WorkerJobRunningInterface { + protected io.kestra.jdbc.AbstractJdbcRepository jdbcRepository; + private static final ObjectMapper MAPPER = JacksonMapper.ofJson(); + + public AbstractJdbcWorkerJobRunningRepository(io.kestra.jdbc.AbstractJdbcRepository jdbcRepository) { + this.jdbcRepository = jdbcRepository; + } + + @Override + public WorkerJobRunning save(WorkerJobRunning workerJobRunning) { + this.jdbcRepository.persist(workerJobRunning, this.jdbcRepository.persistFields(workerJobRunning)); + return workerJobRunning; + } + + public WorkerJobRunning save(WorkerJobRunning workerJobRunning, DSLContext context) { + this.jdbcRepository.persist(workerJobRunning, context, this.jdbcRepository.persistFields(workerJobRunning)); + return workerJobRunning; + } + + @Override + public void delete(String taskRunId) { + Optional workerJobRunning = this.findByTaskRunId(taskRunId); + if(workerJobRunning.isPresent()) { + this.jdbcRepository.delete(workerJobRunning.get()); + } + else { + log.warn("Can't delete WorkerJobRunning with taskRunId {}", taskRunId); + } + } + + @Override + public Optional findByTaskRunId(String taskRunId) { + return this.jdbcRepository + .getDslContextWrapper() + .transactionResult(configuration -> { + SelectConditionStep> select = DSL + .using(configuration) + .select((field("value"))) + .from(this.jdbcRepository.getTable()) + .where( + field("taskrun_id").eq(taskRunId) + ); + + return this.jdbcRepository.fetchOne(select); + }); + } + + @Override + public List getWorkerJobWithWorkerDead(List workersAlive) { + return this.jdbcRepository + .getDslContextWrapper() + .transactionResult(configuration -> DSL + .using(configuration) + .select(field("value")) + .from(this.jdbcRepository.getTable()) + .where(field("worker_uuid").notIn(workersAlive)) + .forUpdate() + .fetch() + .map(r -> { + WorkerJobRunning value; + // TODO: find a better way to do this + try { + value = MAPPER.readValue(r.get("value").toString(), WorkerTaskRunning.class); + } catch (JsonProcessingException e) { + try { + value = MAPPER.readValue(r.get("value").toString(), WorkerTriggerRunning.class); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + } + return value; + }) + ); + } +} + diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index f376a7c132f..7a0e2a069f2 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -25,6 +25,7 @@ import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository; import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository; import io.kestra.jdbc.repository.AbstractJdbcWorkerHeartbeatRepository; +import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository; import io.micronaut.context.ApplicationContext; import io.micronaut.scheduling.annotation.Scheduled; import io.micronaut.transaction.exceptions.CannotCreateTransactionException; @@ -123,7 +124,7 @@ public class JdbcExecutor implements ExecutorInterface { private AbstractJdbcFlowTopologyRepository flowTopologyRepository; @Inject - AbstractJdbcWorkerHeartbeatRepository workerHeartbeatRepository; + private AbstractJdbcWorkerHeartbeatRepository workerHeartbeatRepository; protected List allFlows; @@ -137,6 +138,9 @@ public class JdbcExecutor implements ExecutorInterface { @Inject private SkipExecutionService skipExecutionService; + @Inject + private AbstractJdbcWorkerJobRunningRepository workerJobRunningRepository; + @SneakyThrows @Override public void run() { @@ -208,6 +212,28 @@ public void run() { protected void workersUpdate() { workerHeartbeatRepository.heartbeatsStatusUpdate(); workerHeartbeatRepository.heartbeatsCleanup(); + this.deadWorkerTaskResubmit(); + } + + private void deadWorkerTaskResubmit() { + workerJobRunningRepository.getWorkerJobWithWorkerDead(workerHeartbeatRepository.findAllAlive().stream().map(workerHeartbeat -> workerHeartbeat.getWorkerUuid().toString()).collect(Collectors.toList())) + .forEach(workerJobRunning -> { + if (workerJobRunning instanceof WorkerTaskRunning workerTaskRunning) { + workerTaskQueue.emit(WorkerTask.builder() + .taskRun(workerTaskRunning.getTaskRun()) + .task(workerTaskRunning.getTask()) + .runContext(workerTaskRunning.getRunContext()) + .build()); + } else if (workerJobRunning instanceof WorkerTriggerRunning workerTriggerRunning) { + workerTaskQueue.emit(WorkerTrigger.builder() + .trigger(workerTriggerRunning.getTrigger()) + .conditionContext(workerTriggerRunning.getConditionContext()) + .triggerContext(workerTriggerRunning.getTriggerContext()) + .build()); + } else { + throw new IllegalArgumentException("Object is of type " + workerJobRunning.getClass() + " which should never occurs"); + } + }); } private void executionQueue(Either either) { @@ -373,6 +399,9 @@ private void workerTaskResultQueue(Either { diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcHeartbeat.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcHeartbeat.java index 790e8c46d15..62b53af167c 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcHeartbeat.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcHeartbeat.java @@ -11,7 +11,6 @@ import jakarta.inject.Singleton; import lombok.extern.slf4j.Slf4j; -import javax.annotation.PostConstruct; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.UUID; @@ -33,10 +32,7 @@ public JdbcHeartbeat(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } - @PostConstruct - public void initializedWorkerHeartbeat() throws UnknownHostException { - Worker worker = applicationContext.getBean(Worker.class); - + public void registerWorkerHeartbeat(Worker worker) throws UnknownHostException { this.workerHeartbeat = WorkerHeartbeat.builder() .workerUuid(UUID.randomUUID()) .hostname(InetAddress.getLocalHost().getHostName()) @@ -45,7 +41,7 @@ public void initializedWorkerHeartbeat() throws UnknownHostException { .workerGroup(worker.getWorkerGroup()) .build(); - log.trace("Initialized heartbeat of: {}", workerHeartbeat.getWorkerUuid()); + log.trace("Registered WorkerHeartbeat of: {}", workerHeartbeat.getWorkerUuid()); workerHeartbeatRepository.save( workerHeartbeat @@ -53,10 +49,23 @@ public void initializedWorkerHeartbeat() throws UnknownHostException { } @Scheduled(initialDelay = "${kestra.heartbeat.frequency}" + "s", fixedDelay = "${kestra.heartbeat.frequency}" + "s") - public void updateHeartbeat() { - log.trace("Heartbeat of: {}", workerHeartbeat.getWorkerUuid()); - if (workerHeartbeatRepository.heartbeatCheckUp(workerHeartbeat.getWorkerUuid().toString()).isEmpty()) { - Runtime.getRuntime().exit(1); + public void updateHeartbeat() throws UnknownHostException { + if (applicationContext.containsBean(Worker.class)) { + if (workerHeartbeat == null) { + registerWorkerHeartbeat(applicationContext.getBean(Worker.class)); + } + log.trace("Heartbeat of: {}", workerHeartbeat.getWorkerUuid()); + if (workerHeartbeatRepository.heartbeatCheckUp(workerHeartbeat.getWorkerUuid().toString()).isEmpty()) { + Runtime.getRuntime().exit(1); + } } } + + public WorkerHeartbeat get() throws UnknownHostException { + if (workerHeartbeat == null) { + registerWorkerHeartbeat(applicationContext.getBean(Worker.class)); + } + return workerHeartbeat; + } + } diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java index 3cfb41aea82..b65ae537cd0 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java @@ -11,8 +11,8 @@ import io.kestra.core.utils.Either; import io.kestra.core.utils.ExecutorsUtils; import io.kestra.core.utils.IdUtils; -import io.kestra.jdbc.JooqDSLContextWrapper; import io.kestra.jdbc.JdbcConfiguration; +import io.kestra.jdbc.JooqDSLContextWrapper; import io.kestra.jdbc.repository.AbstractJdbcRepository; import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.ConfigurationProperties; @@ -20,10 +20,11 @@ import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.jooq.*; import org.jooq.Record; +import org.jooq.*; import org.jooq.impl.DSL; +import javax.sql.DataSource; import java.io.IOException; import java.time.Duration; import java.time.ZonedDateTime; @@ -35,9 +36,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; -import javax.sql.DataSource; @Slf4j public abstract class JdbcQueue implements QueueInterface { @@ -45,7 +46,7 @@ public abstract class JdbcQueue implements QueueInterface { private static ExecutorService poolExecutor; - private final QueueService queueService; + protected final QueueService queueService; protected final Class cls; @@ -183,6 +184,31 @@ public Runnable receive(String consumerGroup, Consumer queueType, Consumer> consumer) { + return this.receiveImpl( + consumerGroup, + queueType, + (dslContext, eithers) -> { + eithers.forEach(consumer); + }, + false + ); + } + + public Runnable receiveTransaction(String consumerGroup, Class queueType, BiConsumer>> consumer) { + return this.receiveImpl( + consumerGroup, + queueType, + consumer, + true + ); + } + + public Runnable receiveImpl( + String consumerGroup, + Class queueType, + BiConsumer>> consumer, + Boolean inTransaction + ) { String queueName = queueName(queueType); return this.poll(() -> { @@ -192,6 +218,9 @@ public Runnable receive(String consumerGroup, Class queueType, Consumer result = this.receiveFetch(ctx, consumerGroup, queueName); if (!result.isEmpty()) { + if (inTransaction) { + consumer.accept(ctx, this.map(result)); + } this.updateGroupOffsets( ctx, @@ -204,13 +233,15 @@ public Runnable receive(String consumerGroup, Class queueType, Consumer queueType) { + protected String queueName(Class queueType) { return CaseFormat.UPPER_CAMEL.to( CaseFormat.LOWER_UNDERSCORE, queueType.getSimpleName() @@ -218,7 +249,7 @@ private String queueName(Class queueType) { } @SuppressWarnings("BusyWait") - private Runnable poll(Supplier runnable) { + protected Runnable poll(Supplier runnable) { AtomicBoolean running = new AtomicBoolean(true); AtomicLong sleep = new AtomicLong(configuration.getMaxPollInterval().toMillis()); AtomicReference lastPoll = new AtomicReference<>(ZonedDateTime.now()); @@ -254,15 +285,19 @@ private Runnable poll(Supplier runnable) { }; } - private void send(Result fetch, Consumer> consumer) { - fetch + protected List> map(Result fetch) { + return fetch .map(record -> { try { - return Either.left(JacksonMapper.ofJson().readValue(record.get("value", String.class), cls)); + return Either.left(JacksonMapper.ofJson().readValue(record.get("value", String.class), cls)); } catch (JsonProcessingException e) { - return Either.right(new DeserializationException(e, record.get("value", String.class))); + return Either.right(new DeserializationException(e, record.get("value", String.class))); } - }) + }); + } + + protected void send(Result fetch, Consumer> consumer) { + this.map(fetch) .forEach(consumer); }