From e04544061abce46a70fc57db1d55e76fbbdd4731 Mon Sep 17 00:00:00 2001 From: YannC <37600690+Skraye@users.noreply.github.com> Date: Wed, 11 Oct 2023 10:41:20 +0200 Subject: [PATCH] fix(jdbc): handle trigger in jdbc heartbeat/resubmit (#2240) This fix follow the JDBC heartbeat & task resubmit feature recently released in the 0.12 --- .../core/queues/QueueFactoryInterface.java | 2 + .../WorkerTriggerResultQueueInterface.java | 19 ++ .../WorkerJobRunningRepositoryInterface.java | 4 +- .../kestra/core/runners/StandAloneRunner.java | 11 +- .../java/io/kestra/core/runners/Worker.java | 15 +- .../kestra/core/runners/WorkerJobRunning.java | 1 - .../core/runners/WorkerTaskRunning.java | 2 +- .../core/schedulers/AbstractScheduler.java | 24 +-- .../kestra/core/tasks/test/SleepTrigger.java | 49 +++++ .../io/kestra/runner/h2/H2QueueFactory.java | 7 + .../runner/h2/H2WorkerTriggerResultQueue.java | 41 ++++ .../migrations/h2/V1_3__worker_heartbeat.sql | 2 + .../io/kestra/runner/h2/H2HeartbeatTest.java | 7 + .../runner/mysql/MysqlQueueFactory.java | 7 + .../mysql/MysqlWorkerTriggerResultQueue.java | 41 ++++ .../mysql/V1_3__worker_heartbeat.sql | 2 + .../runner/mysql/MysqlHeartbeatTest.java | 7 + .../runner/postgres/PostgresQueueFactory.java | 7 + .../PostgresWorkerTriggerResultQueue.java | 40 ++++ .../postgres/V1_3__worker_heartbeat.sql | 2 + .../postgres/PostgresHeartbeatTest.java | 7 + .../JdbcWorkerTriggerResultQueueService.java | 68 ++++++ ...bstractJdbcWorkerJobRunningRepository.java | 8 +- .../io/kestra/jdbc/runner/JdbcExecutor.java | 4 +- .../io/kestra/jdbc/runner/JdbcHeartbeat.java | 2 +- .../kestra/jdbc/runner/JdbcHeartbeatTest.java | 201 ++++++++++++++++++ .../test/resources/application-heartbeat.yml | 4 + .../runner/memory/MemoryQueueFactory.java | 7 + .../io/kestra/runner/memory/MemoryRunner.java | 6 +- .../MemoryWorkerTriggerResultQueue.java | 43 ++++ 30 files changed, 606 insertions(+), 34 deletions(-) create mode 100644 core/src/main/java/io/kestra/core/queues/WorkerTriggerResultQueueInterface.java create mode 100644 core/src/test/java/io/kestra/core/tasks/test/SleepTrigger.java create mode 100644 jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerTriggerResultQueue.java create mode 100644 jdbc-h2/src/main/resources/migrations/h2/V1_3__worker_heartbeat.sql create mode 100644 jdbc-h2/src/test/java/io/kestra/runner/h2/H2HeartbeatTest.java create mode 100644 jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerTriggerResultQueue.java create mode 100644 jdbc-mysql/src/main/resources/migrations/mysql/V1_3__worker_heartbeat.sql create mode 100644 jdbc-mysql/src/test/java/io/kestra/runner/mysql/MysqlHeartbeatTest.java create mode 100644 jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerTriggerResultQueue.java create mode 100644 jdbc-postgres/src/main/resources/migrations/postgres/V1_3__worker_heartbeat.sql create mode 100644 jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresHeartbeatTest.java create mode 100644 jdbc/src/main/java/io/kestra/jdbc/JdbcWorkerTriggerResultQueueService.java create mode 100644 jdbc/src/test/java/io/kestra/jdbc/runner/JdbcHeartbeatTest.java create mode 100644 jdbc/src/test/resources/application-heartbeat.yml create mode 100644 runner-memory/src/main/java/io/kestra/runner/memory/MemoryWorkerTriggerResultQueue.java diff --git a/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java b/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java index e8a0a2f0b89..29b8a1a253b 100644 --- a/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java +++ b/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java @@ -52,4 +52,6 @@ public interface QueueFactoryInterface { WorkerJobQueueInterface workerJobQueue(); + WorkerTriggerResultQueueInterface workerTriggerResultQueue(); + } diff --git a/core/src/main/java/io/kestra/core/queues/WorkerTriggerResultQueueInterface.java b/core/src/main/java/io/kestra/core/queues/WorkerTriggerResultQueueInterface.java new file mode 100644 index 00000000000..b2c9adc8788 --- /dev/null +++ b/core/src/main/java/io/kestra/core/queues/WorkerTriggerResultQueueInterface.java @@ -0,0 +1,19 @@ +package io.kestra.core.queues; + +import io.kestra.core.exceptions.DeserializationException; +import io.kestra.core.runners.WorkerTriggerResult; +import io.kestra.core.utils.Either; + +import java.io.Closeable; +import java.util.function.Consumer; + +/* + * Required for the QueueFactory, to have common interface with JDBC & Kafka + */ +public interface WorkerTriggerResultQueueInterface extends Closeable { + Runnable receive(String consumerGroup, Class queueType, Consumer> consumer); + + void pause(); + + void cleanup(); +} diff --git a/core/src/main/java/io/kestra/core/repositories/WorkerJobRunningRepositoryInterface.java b/core/src/main/java/io/kestra/core/repositories/WorkerJobRunningRepositoryInterface.java index 8b6b1c9a363..2f4935a7950 100644 --- a/core/src/main/java/io/kestra/core/repositories/WorkerJobRunningRepositoryInterface.java +++ b/core/src/main/java/io/kestra/core/repositories/WorkerJobRunningRepositoryInterface.java @@ -6,8 +6,8 @@ import java.util.Optional; public interface WorkerJobRunningRepositoryInterface { - Optional findByTaskRunId(String taskRunId); + Optional findByKey(String uid); - void deleteByTaskRunId(String taskRunId); + void deleteByKey(String uid); } diff --git a/core/src/main/java/io/kestra/core/runners/StandAloneRunner.java b/core/src/main/java/io/kestra/core/runners/StandAloneRunner.java index 982d1435e80..3c7d9cd6775 100644 --- a/core/src/main/java/io/kestra/core/runners/StandAloneRunner.java +++ b/core/src/main/java/io/kestra/core/runners/StandAloneRunner.java @@ -21,6 +21,7 @@ public class StandAloneRunner implements RunnerInterface, AutoCloseable { @Setter private java.util.concurrent.ExecutorService poolExecutor; @Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors()); @Setter protected boolean schedulerEnabled = true; + @Setter protected boolean workerEnabled = true; @Inject private ExecutorsUtils executorsUtils; @@ -52,10 +53,12 @@ public void run() { poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class)); - Worker worker = new Worker(applicationContext, workerThread, null); - applicationContext.registerSingleton(worker); - poolExecutor.execute(worker); - servers.add(worker); + if(workerEnabled) { + Worker worker = new Worker(applicationContext, workerThread, null); + applicationContext.registerSingleton(worker); + poolExecutor.execute(worker); + servers.add(worker); + } if (schedulerEnabled) { AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class); diff --git a/core/src/main/java/io/kestra/core/runners/Worker.java b/core/src/main/java/io/kestra/core/runners/Worker.java index 36fc1549d41..80e6e3800c5 100644 --- a/core/src/main/java/io/kestra/core/runners/Worker.java +++ b/core/src/main/java/io/kestra/core/runners/Worker.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -552,13 +553,18 @@ public AtomicInteger getMetricRunningCount(WorkerTask workerTask) { @SuppressWarnings("ResultOfMethodCallIgnored") @Override public void close() throws Exception { + closeWorker(Duration.ofMinutes(5)); + } + + @VisibleForTesting + public void closeWorker(Duration awaitDuration) throws Exception { workerJobQueue.pause(); executionKilledQueue.pause(); new Thread( () -> { try { this.executors.shutdown(); - this.executors.awaitTermination(5, TimeUnit.MINUTES); + this.executors.awaitTermination(awaitDuration.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Fail to shutdown the worker", e); } @@ -570,7 +576,7 @@ public void close() throws Exception { Await.until( () -> { - if (this.executors.isTerminated() && this.workerThreadReferences.isEmpty()) { + if (this.executors.isTerminated() || this.workerThreadReferences.isEmpty()) { log.info("No more worker threads busy, shutting down!"); // we ensure that last produce message are send @@ -604,6 +610,11 @@ public void close() throws Exception { metricEntryQueue.close(); } + @VisibleForTesting + public void shutdown() throws IOException { + this.executors.shutdownNow(); + } + public List getWorkerThreadTasks() { return this.workerThreadReferences.stream().map(thread -> thread.workerTask).toList(); } diff --git a/core/src/main/java/io/kestra/core/runners/WorkerJobRunning.java b/core/src/main/java/io/kestra/core/runners/WorkerJobRunning.java index 3f38b03be5f..eb9834af15a 100644 --- a/core/src/main/java/io/kestra/core/runners/WorkerJobRunning.java +++ b/core/src/main/java/io/kestra/core/runners/WorkerJobRunning.java @@ -2,7 +2,6 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; diff --git a/core/src/main/java/io/kestra/core/runners/WorkerTaskRunning.java b/core/src/main/java/io/kestra/core/runners/WorkerTaskRunning.java index 3c39cfac7f4..aca5d8cfd14 100644 --- a/core/src/main/java/io/kestra/core/runners/WorkerTaskRunning.java +++ b/core/src/main/java/io/kestra/core/runners/WorkerTaskRunning.java @@ -30,7 +30,7 @@ public class WorkerTaskRunning extends WorkerJobRunning { @Override public String uid() { - return this.taskRun.getTaskId(); + return this.taskRun.getId(); } public static WorkerTaskRunning of(WorkerTask workerTask, WorkerInstance workerInstance, int partition) { diff --git a/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java b/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java index a4b6463f227..174c2667a64 100644 --- a/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java +++ b/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java @@ -13,26 +13,22 @@ import io.kestra.core.models.triggers.types.Schedule; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; -import io.kestra.core.runners.RunContext; -import io.kestra.core.runners.RunContextFactory; -import io.kestra.core.runners.WorkerJob; -import io.kestra.core.runners.WorkerTrigger; -import io.kestra.core.runners.WorkerTriggerResult; -import io.kestra.core.services.ConditionService; -import io.kestra.core.services.FlowListenersInterface; -import io.kestra.core.services.FlowService; -import io.kestra.core.services.TaskDefaultService; -import io.kestra.core.services.WorkerGroupService; +import io.kestra.core.queues.WorkerTriggerResultQueueInterface; +import io.kestra.core.runners.*; +import io.kestra.core.services.*; import io.kestra.core.utils.Await; import io.kestra.core.utils.ListUtils; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; import jakarta.annotation.PreDestroy; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; import java.time.Duration; import java.time.Instant; @@ -41,9 +37,6 @@ import java.util.*; import java.util.concurrent.*; import java.util.stream.Stream; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; -import org.slf4j.Logger; @Slf4j @Singleton @@ -52,7 +45,7 @@ public abstract class AbstractScheduler implements Scheduler { private final QueueInterface executionQueue; private final QueueInterface triggerQueue; private final QueueInterface workerTaskQueue; - private final QueueInterface workerTriggerResultQueue; + private final WorkerTriggerResultQueueInterface workerTriggerResultQueue; protected final FlowListenersInterface flowListeners; private final RunContextFactory runContextFactory; private final MetricRegistry metricRegistry; @@ -85,7 +78,7 @@ public AbstractScheduler( this.executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED)); this.triggerQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TRIGGER_NAMED)); this.workerTaskQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED)); - this.workerTriggerResultQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED)); + this.workerTriggerResultQueue = applicationContext.getBean(WorkerTriggerResultQueueInterface.class); this.flowListeners = flowListeners; this.runContextFactory = applicationContext.getBean(RunContextFactory.class); this.metricRegistry = applicationContext.getBean(MetricRegistry.class); @@ -152,6 +145,7 @@ public void run() { // listen to WorkerTriggerResult from polling triggers this.workerTriggerResultQueue.receive( + null, Scheduler.class, either -> { if (either.isRight()) { diff --git a/core/src/test/java/io/kestra/core/tasks/test/SleepTrigger.java b/core/src/test/java/io/kestra/core/tasks/test/SleepTrigger.java new file mode 100644 index 00000000000..162d8170def --- /dev/null +++ b/core/src/test/java/io/kestra/core/tasks/test/SleepTrigger.java @@ -0,0 +1,49 @@ +package io.kestra.core.tasks.test; + +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.conditions.ConditionContext; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.triggers.AbstractTrigger; +import io.kestra.core.models.triggers.PollingTriggerInterface; +import io.kestra.core.models.triggers.TriggerContext; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +import javax.validation.constraints.NotNull; +import java.time.Duration; +import java.util.Optional; + +/** + * This trigger is used in unit tests where we need a task that wait a little to be able to check the resubmit of triggers. + */ +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +public class SleepTrigger extends AbstractTrigger implements PollingTriggerInterface { + + @PluginProperty + @NotNull + private Long duration; + + @Override + public Optional evaluate(ConditionContext conditionContext, TriggerContext context) { + // Try catch to avoid flakky test + try { + Thread.sleep(duration); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return Optional.empty(); + } + + @Override + public Duration getInterval() { + return null; + } +} diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java index 454564c8731..099a856e31f 100644 --- a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java @@ -10,6 +10,7 @@ import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.WorkerJobQueueInterface; +import io.kestra.core.queues.WorkerTriggerResultQueueInterface; import io.kestra.core.runners.*; import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.Factory; @@ -121,4 +122,10 @@ public QueueInterface trigger() { public WorkerJobQueueInterface workerJobQueue() { return new H2WorkerJobQueue(applicationContext); } + + @Override + @Singleton + public WorkerTriggerResultQueueInterface workerTriggerResultQueue() { + return new H2WorkerTriggerResultQueue(applicationContext); + } } diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerTriggerResultQueue.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerTriggerResultQueue.java new file mode 100644 index 00000000000..780322b399c --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerTriggerResultQueue.java @@ -0,0 +1,41 @@ +package io.kestra.runner.h2; + +import io.kestra.core.exceptions.DeserializationException; +import io.kestra.core.queues.WorkerTriggerResultQueueInterface; +import io.kestra.core.runners.WorkerTriggerResult; +import io.kestra.core.utils.Either; +import io.kestra.jdbc.JdbcWorkerTriggerResultQueueService; +import io.micronaut.context.ApplicationContext; +import lombok.extern.slf4j.Slf4j; + +import java.util.function.Consumer; + +@Slf4j +public class H2WorkerTriggerResultQueue extends H2Queue implements WorkerTriggerResultQueueInterface { + private final JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService; + + public H2WorkerTriggerResultQueue(ApplicationContext applicationContext) { + super(WorkerTriggerResult.class, applicationContext); + this.jdbcWorkerTriggerResultQueueService = applicationContext.getBean(JdbcWorkerTriggerResultQueueService.class); + } + + @Override + public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { + return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer); + } + + @Override + public void pause() { + jdbcWorkerTriggerResultQueueService.pause(); + } + + @Override + public void cleanup() { + jdbcWorkerTriggerResultQueueService.cleanup(); + } + + @Override + public void close() { + jdbcWorkerTriggerResultQueueService.close(); + } +} diff --git a/jdbc-h2/src/main/resources/migrations/h2/V1_3__worker_heartbeat.sql b/jdbc-h2/src/main/resources/migrations/h2/V1_3__worker_heartbeat.sql new file mode 100644 index 00000000000..73a4fb6173c --- /dev/null +++ b/jdbc-h2/src/main/resources/migrations/h2/V1_3__worker_heartbeat.sql @@ -0,0 +1,2 @@ +ALTER TABLE worker_job_running + DROP COLUMN "taskrun_id"; \ No newline at end of file diff --git a/jdbc-h2/src/test/java/io/kestra/runner/h2/H2HeartbeatTest.java b/jdbc-h2/src/test/java/io/kestra/runner/h2/H2HeartbeatTest.java new file mode 100644 index 00000000000..f49f89ecda7 --- /dev/null +++ b/jdbc-h2/src/test/java/io/kestra/runner/h2/H2HeartbeatTest.java @@ -0,0 +1,7 @@ +package io.kestra.runner.h2; + +import io.kestra.jdbc.runner.JdbcHeartbeatTest; + +class H2HeartbeatTest extends JdbcHeartbeatTest { + +} \ No newline at end of file diff --git a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueueFactory.java b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueueFactory.java index 5df13597585..feb8b268492 100644 --- a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueueFactory.java +++ b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueueFactory.java @@ -10,6 +10,7 @@ import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.WorkerJobQueueInterface; +import io.kestra.core.queues.WorkerTriggerResultQueueInterface; import io.kestra.core.runners.*; import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.Factory; @@ -121,4 +122,10 @@ public QueueInterface trigger() { public WorkerJobQueueInterface workerJobQueue() { return new MysqlWorkerJobQueue(applicationContext); } + + @Override + @Singleton + public WorkerTriggerResultQueueInterface workerTriggerResultQueue() { + return new MysqlWorkerTriggerResultQueue(applicationContext); + } } diff --git a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerTriggerResultQueue.java b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerTriggerResultQueue.java new file mode 100644 index 00000000000..c24bac65f91 --- /dev/null +++ b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerTriggerResultQueue.java @@ -0,0 +1,41 @@ +package io.kestra.runner.mysql; + +import io.kestra.core.exceptions.DeserializationException; +import io.kestra.core.queues.WorkerTriggerResultQueueInterface; +import io.kestra.core.runners.WorkerTriggerResult; +import io.kestra.core.utils.Either; +import io.kestra.jdbc.JdbcWorkerTriggerResultQueueService; +import io.micronaut.context.ApplicationContext; +import lombok.extern.slf4j.Slf4j; + +import java.util.function.Consumer; + +@Slf4j +public class MysqlWorkerTriggerResultQueue extends MysqlQueue implements WorkerTriggerResultQueueInterface { + private final JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService; + + public MysqlWorkerTriggerResultQueue(ApplicationContext applicationContext) { + super(WorkerTriggerResult.class, applicationContext); + this.jdbcWorkerTriggerResultQueueService = applicationContext.getBean(JdbcWorkerTriggerResultQueueService.class); + } + + @Override + public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { + return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer); + } + + @Override + public void pause() { + jdbcWorkerTriggerResultQueueService.pause(); + } + + @Override + public void cleanup() { + jdbcWorkerTriggerResultQueueService.cleanup(); + } + + @Override + public void close() { + jdbcWorkerTriggerResultQueueService.close(); + } +} diff --git a/jdbc-mysql/src/main/resources/migrations/mysql/V1_3__worker_heartbeat.sql b/jdbc-mysql/src/main/resources/migrations/mysql/V1_3__worker_heartbeat.sql new file mode 100644 index 00000000000..5923e1c3859 --- /dev/null +++ b/jdbc-mysql/src/main/resources/migrations/mysql/V1_3__worker_heartbeat.sql @@ -0,0 +1,2 @@ +ALTER TABLE worker_job_running + DROP COLUMN taskrun_id; \ No newline at end of file diff --git a/jdbc-mysql/src/test/java/io/kestra/runner/mysql/MysqlHeartbeatTest.java b/jdbc-mysql/src/test/java/io/kestra/runner/mysql/MysqlHeartbeatTest.java new file mode 100644 index 00000000000..9fcc21076dd --- /dev/null +++ b/jdbc-mysql/src/test/java/io/kestra/runner/mysql/MysqlHeartbeatTest.java @@ -0,0 +1,7 @@ +package io.kestra.runner.mysql; + +import io.kestra.jdbc.runner.JdbcHeartbeatTest; + +class MysqlHeartbeatTest extends JdbcHeartbeatTest { + +} \ No newline at end of file diff --git a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueueFactory.java b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueueFactory.java index b900e7ffc0b..0a9b6ea8800 100644 --- a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueueFactory.java +++ b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueueFactory.java @@ -10,6 +10,7 @@ import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.WorkerJobQueueInterface; +import io.kestra.core.queues.WorkerTriggerResultQueueInterface; import io.kestra.core.runners.*; import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.Factory; @@ -121,4 +122,10 @@ public QueueInterface trigger() { public WorkerJobQueueInterface workerJobQueue() { return new PostgresWorkerJobQueue(applicationContext); } + + @Override + @Singleton + public WorkerTriggerResultQueueInterface workerTriggerResultQueue() { + return new PostgresWorkerTriggerResultQueue(applicationContext); + } } diff --git a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerTriggerResultQueue.java b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerTriggerResultQueue.java new file mode 100644 index 00000000000..af4c7408c0f --- /dev/null +++ b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerTriggerResultQueue.java @@ -0,0 +1,40 @@ +package io.kestra.runner.postgres; + +import io.kestra.core.exceptions.DeserializationException; +import io.kestra.core.queues.WorkerTriggerResultQueueInterface; +import io.kestra.core.runners.WorkerTriggerResult; +import io.kestra.core.utils.Either; +import io.kestra.jdbc.JdbcWorkerTriggerResultQueueService; +import io.micronaut.context.ApplicationContext; +import lombok.extern.slf4j.Slf4j; + +import java.util.function.Consumer; + +@Slf4j +public class PostgresWorkerTriggerResultQueue implements WorkerTriggerResultQueueInterface { + private final JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService; + + public PostgresWorkerTriggerResultQueue(ApplicationContext applicationContext) { + this.jdbcWorkerTriggerResultQueueService = applicationContext.getBean(JdbcWorkerTriggerResultQueueService.class); + } + + @Override + public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { + return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer); + } + + @Override + public void pause() { + jdbcWorkerTriggerResultQueueService.pause(); + } + + @Override + public void cleanup() { + jdbcWorkerTriggerResultQueueService.cleanup(); + } + + @Override + public void close() { + jdbcWorkerTriggerResultQueueService.close(); + } +} diff --git a/jdbc-postgres/src/main/resources/migrations/postgres/V1_3__worker_heartbeat.sql b/jdbc-postgres/src/main/resources/migrations/postgres/V1_3__worker_heartbeat.sql new file mode 100644 index 00000000000..5923e1c3859 --- /dev/null +++ b/jdbc-postgres/src/main/resources/migrations/postgres/V1_3__worker_heartbeat.sql @@ -0,0 +1,2 @@ +ALTER TABLE worker_job_running + DROP COLUMN taskrun_id; \ No newline at end of file diff --git a/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresHeartbeatTest.java b/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresHeartbeatTest.java new file mode 100644 index 00000000000..77d0ce0b551 --- /dev/null +++ b/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresHeartbeatTest.java @@ -0,0 +1,7 @@ +package io.kestra.runner.postgres; + +import io.kestra.jdbc.runner.JdbcHeartbeatTest; + +class PostgresHeartbeatTest extends JdbcHeartbeatTest { + +} \ No newline at end of file diff --git a/jdbc/src/main/java/io/kestra/jdbc/JdbcWorkerTriggerResultQueueService.java b/jdbc/src/main/java/io/kestra/jdbc/JdbcWorkerTriggerResultQueueService.java new file mode 100644 index 00000000000..ef0dade06e2 --- /dev/null +++ b/jdbc/src/main/java/io/kestra/jdbc/JdbcWorkerTriggerResultQueueService.java @@ -0,0 +1,68 @@ +package io.kestra.jdbc; + +import io.kestra.core.exceptions.DeserializationException; +import io.kestra.core.queues.QueueFactoryInterface; +import io.kestra.core.queues.QueueInterface; +import io.kestra.core.runners.WorkerTriggerResult; +import io.kestra.core.utils.Either; +import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository; +import io.kestra.jdbc.runner.JdbcQueue; +import io.micronaut.context.ApplicationContext; +import io.micronaut.inject.qualifiers.Qualifiers; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import lombok.extern.slf4j.Slf4j; + +import java.util.function.Consumer; + +@Singleton +@Slf4j +public class JdbcWorkerTriggerResultQueueService { + private final JdbcQueue workerTriggerResultQueue; + @Inject + private AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository; + private Runnable queueStop; + + @SuppressWarnings("unchecked") + public JdbcWorkerTriggerResultQueueService(ApplicationContext applicationContext) { + this.workerTriggerResultQueue = (JdbcQueue) applicationContext.getBean( + QueueInterface.class, + Qualifiers.byName(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED)); + } + + public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { + this.queueStop = workerTriggerResultQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> { + eithers.forEach(either -> { + if (either.isRight()) { + log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage()); + return; + } + + WorkerTriggerResult workerTriggerResult = either.getLeft(); + jdbcWorkerJobRunningRepository.deleteByKey(workerTriggerResult.getTriggerContext().uid()); + + }); + eithers.forEach(consumer); + }); + return this.queueStop; + } + + public void pause() { + this.stopQueue(); + } + + private void stopQueue() { + synchronized (this) { + if (this.queueStop != null) { + this.queueStop.run(); + this.queueStop = null; + } + } + } + + public void cleanup() { } + + public void close() { + this.stopQueue(); + } +} diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerJobRunningRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerJobRunningRepository.java index 07c1cbeb61f..6f079846e08 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerJobRunningRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcWorkerJobRunningRepository.java @@ -27,13 +27,13 @@ public WorkerJobRunning save(WorkerJobRunning workerJobRunning, DSLContext conte } @Override - public void deleteByTaskRunId(String taskRunId) { - Optional workerJobRunning = this.findByTaskRunId(taskRunId); + public void deleteByKey(String uid) { + Optional workerJobRunning = this.findByKey(uid); workerJobRunning.ifPresent(jobRunning -> this.jdbcRepository.delete(jobRunning)); } @Override - public Optional findByTaskRunId(String taskRunId) { + public Optional findByKey(String uid) { return this.jdbcRepository .getDslContextWrapper() .transactionResult(configuration -> { @@ -42,7 +42,7 @@ public Optional findByTaskRunId(String taskRunId) { .select((field("value"))) .from(this.jdbcRepository.getTable()) .where( - field("taskrun_id").eq(taskRunId) + field("key").eq(uid) ); return this.jdbcRepository.fetchOne(select); diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index 7cf65c305a8..04ddfef23ac 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -35,7 +35,6 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; -import org.jooq.DSLContext; import org.slf4j.event.Level; import java.io.IOException; @@ -477,6 +476,9 @@ private void workerTaskResultQueue(Either workerJobQueue; + + @Inject + @Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED) + QueueInterface workerTaskResultQueue; + + @Inject + @Named(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED) + QueueInterface workerTriggerResultQueue; + + @BeforeAll + void init() throws IOException, URISyntaxException { + jdbcTestUtils.drop(); + jdbcTestUtils.migrate(); + + TestsUtils.loads(repositoryLoader); + } + + @Test + void taskResubmit() throws Exception { + CountDownLatch runningLatch = new CountDownLatch(1); + CountDownLatch resubmitLatch = new CountDownLatch(1); + + Worker worker = new Worker(applicationContext, 8, null); + applicationContext.registerSingleton(worker); + worker.run(); + runner.setSchedulerEnabled(false); + runner.setWorkerEnabled(false); + runner.run(); + + AtomicReference workerTaskResult = new AtomicReference<>(null); + workerTaskResultQueue.receive(either -> { + workerTaskResult.set(either.getLeft()); + + if (either.getLeft().getTaskRun().getState().getCurrent() == Type.SUCCESS) { + resubmitLatch.countDown(); + } + + if (either.getLeft().getTaskRun().getState().getCurrent() == Type.RUNNING) { + runningLatch.countDown(); + } + }); + + workerJobQueue.emit(workerTask(1500)); + runningLatch.await(2, TimeUnit.SECONDS); + worker.shutdown(); + + Worker newWorker = new Worker(applicationContext, 8, null); + applicationContext.registerSingleton(newWorker); + newWorker.run(); + resubmitLatch.await(15, TimeUnit.SECONDS); + + assertThat(workerTaskResult.get().getTaskRun().getState().getCurrent(), is(Type.SUCCESS)); + } + + @Test + void triggerResubmit() throws Exception { + CountDownLatch countDownLatch = new CountDownLatch(1); + + Worker worker = new Worker(applicationContext, 8, null); + applicationContext.registerSingleton(worker); + worker.run(); + runner.setSchedulerEnabled(false); + runner.setWorkerEnabled(false); + runner.run(); + + AtomicReference workerTriggerResult = new AtomicReference<>(null); + workerTriggerResultQueue.receive(either -> { + workerTriggerResult.set(either.getLeft()); + }); + + workerJobQueue.emit(workerTrigger(7000)); + countDownLatch.await(2, TimeUnit.SECONDS); + worker.shutdown(); + + Worker newWorker = new Worker(applicationContext, 8, null); + applicationContext.registerSingleton(newWorker); + newWorker.run(); + countDownLatch.await(12, TimeUnit.SECONDS); + + assertThat(workerTriggerResult.get().getSuccess(), is(true)); + } + + private WorkerTask workerTask(long sleepDuration) { + Sleep bash = Sleep.builder() + .type(Sleep.class.getName()) + .id("unit-test") + .duration(sleepDuration) + .build(); + + Execution execution = TestsUtils.mockExecution(flowBuilder(sleepDuration), ImmutableMap.of()); + + ResolvedTask resolvedTask = ResolvedTask.of(bash); + + return WorkerTask.builder() + .runContext(runContextFactory.of(ImmutableMap.of("key", "value"))) + .task(bash) + .taskRun(TaskRun.of(execution, resolvedTask)) + .build(); + } + + private WorkerTrigger workerTrigger(long sleepDuration) { + SleepTrigger trigger = SleepTrigger.builder() + .type(SleepTrigger.class.getName()) + .id("unit-test") + .duration(sleepDuration) + .build(); + + Map.Entry mockedTrigger = TestsUtils.mockTrigger(runContextFactory, trigger); + + return WorkerTrigger.builder() + .trigger(trigger) + .triggerContext(mockedTrigger.getValue()) + .conditionContext(mockedTrigger.getKey()) + .build(); + } + + private Flow flowBuilder(long sleepDuration) { + Sleep bash = Sleep.builder() + .type(Sleep.class.getName()) + .id("unit-test") + .duration(sleepDuration) + .build(); + + SleepTrigger trigger = SleepTrigger.builder() + .type(SleepTrigger.class.getName()) + .id("unit-test") + .duration(sleepDuration) + .build(); + + return Flow.builder() + .id(IdUtils.create()) + .namespace("io.kestra.unit-test") + .tasks(Collections.singletonList(bash)) + .triggers(Collections.singletonList(trigger)) + .build(); + } +} diff --git a/jdbc/src/test/resources/application-heartbeat.yml b/jdbc/src/test/resources/application-heartbeat.yml new file mode 100644 index 00000000000..ba6f639e4f6 --- /dev/null +++ b/jdbc/src/test/resources/application-heartbeat.yml @@ -0,0 +1,4 @@ +kestra: + heartbeat: + frequency: 3s + heartbeat-missed: 1 \ No newline at end of file diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryQueueFactory.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryQueueFactory.java index 7b6060ae588..b437ad7f018 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryQueueFactory.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryQueueFactory.java @@ -2,6 +2,7 @@ import io.kestra.core.models.executions.MetricEntry; import io.kestra.core.queues.WorkerJobQueueInterface; +import io.kestra.core.queues.WorkerTriggerResultQueueInterface; import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.Factory; import io.micronaut.context.annotation.Prototype; @@ -122,4 +123,10 @@ public QueueInterface trigger() { public WorkerJobQueueInterface workerJobQueue() { return new MemoryWorkerJobQueue(applicationContext); } + + @Override + @Singleton + public WorkerTriggerResultQueueInterface workerTriggerResultQueue() { + return new MemoryWorkerTriggerResultQueue(applicationContext); + } } diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryRunner.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryRunner.java index 871552fa4bd..1342730ac04 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryRunner.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryRunner.java @@ -1,15 +1,15 @@ package io.kestra.runner.memory; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; import io.kestra.core.models.executions.Execution; import io.kestra.core.runners.StandAloneRunner; import io.kestra.core.runners.WorkerJob; import io.kestra.core.runners.WorkerTaskResult; import io.kestra.core.utils.Await; +import jakarta.inject.Singleton; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import java.time.Duration; -import jakarta.inject.Singleton; @Slf4j @Singleton diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryWorkerTriggerResultQueue.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryWorkerTriggerResultQueue.java new file mode 100644 index 00000000000..5b8fa79ddaa --- /dev/null +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryWorkerTriggerResultQueue.java @@ -0,0 +1,43 @@ +package io.kestra.runner.memory; + +import io.kestra.core.exceptions.DeserializationException; +import io.kestra.core.queues.QueueFactoryInterface; +import io.kestra.core.queues.QueueInterface; +import io.kestra.core.queues.WorkerTriggerResultQueueInterface; +import io.kestra.core.runners.WorkerTriggerResult; +import io.kestra.core.utils.Either; +import io.micronaut.context.ApplicationContext; +import io.micronaut.inject.qualifiers.Qualifiers; + +import java.util.function.Consumer; + +public class MemoryWorkerTriggerResultQueue implements WorkerTriggerResultQueueInterface { + QueueInterface workerTriggerResultQueue; + + @SuppressWarnings("unchecked") + public MemoryWorkerTriggerResultQueue(ApplicationContext applicationContext) { + this.workerTriggerResultQueue = (QueueInterface) applicationContext.getBean( + QueueInterface.class, + Qualifiers.byName(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED) + ); + } + + public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { + return workerTriggerResultQueue.receive(consumerGroup, queueType, consumer); + } + + @Override + public void pause() { + + } + + @Override + public void cleanup() { + + } + + @Override + public void close() { + + } +}