From a85d0e1f3c952aee14e2a2a7fcfb465c06d65139 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Tue, 5 Sep 2023 15:31:06 +0200 Subject: [PATCH] feat(core): make the queue fault tolerant (#1981) --- .../exceptions/DeserializationException.java | 15 ++++++++---- .../io/kestra/core/queues/QueueInterface.java | 11 +++++---- .../core/queues/WorkerJobQueueInterface.java | 4 +++- .../io/kestra/core/runners/FlowListeners.java | 8 ++++++- .../java/io/kestra/core/runners/Indexer.java | 10 +++++++- .../io/kestra/core/runners/RunnerUtils.java | 4 ++-- .../java/io/kestra/core/runners/Worker.java | 14 +++++++---- .../core/schedulers/AbstractScheduler.java | 8 ++++++- .../core/schedulers/DefaultScheduler.java | 16 +++++++++++-- .../io/kestra/core/tasks/flows/Template.java | 2 +- .../core/runners/FlowTriggerCaseTest.java | 3 ++- .../MultipleConditionTriggerCaseTest.java | 6 +++-- .../io/kestra/core/runners/RetryTest.java | 3 --- .../kestra/core/runners/RunContextTest.java | 4 ++-- .../core/runners/TaskDefaultsCaseTest.java | 2 +- .../io/kestra/core/runners/WorkerTest.java | 8 +++---- .../schedulers/SchedulerConditionTest.java | 4 +++- .../schedulers/SchedulerScheduleTest.java | 5 +++- .../core/schedulers/SchedulerThreadTest.java | 3 ++- .../core/services/ConditionServiceTest.java | 2 +- .../core/tasks/flows/EachSequentialTest.java | 2 +- .../kestra/core/tasks/flows/FlowCaseTest.java | 3 ++- .../kestra/core/tasks/flows/TemplateTest.java | 4 ++-- .../kestra/core/tasks/flows/TimeoutTest.java | 2 +- .../core/tasks/flows/VariablesTest.java | 2 +- .../io/kestra/runner/h2/H2WorkerJobQueue.java | 4 +++- .../runner/mysql/MysqlWorkerJobQueue.java | 4 +++- .../postgres/PostgresWorkerJobQueue.java | 4 +++- .../kestra/jdbc/AbstractJdbcRepository.java | 7 ++---- .../AbstractJdbcFlowRepository.java | 2 +- .../io/kestra/jdbc/runner/JdbcExecutor.java | 23 +++++++++++++++---- .../java/io/kestra/jdbc/runner/JdbcQueue.java | 16 +++++++------ .../io/kestra/jdbc/runner/JdbcScheduler.java | 8 ++++++- .../io/kestra/jdbc/runner/JdbcQueueTest.java | 18 ++++++++++----- .../kestra/runner/memory/MemoryExecutor.java | 19 +++++++++++++-- .../io/kestra/runner/memory/MemoryQueue.java | 21 ++++++++++------- .../runner/memory/MemoryWorkerJobQueue.java | 4 +++- .../controllers/ExecutionController.java | 16 +++++++++++-- .../webserver/controllers/LogController.java | 7 +++++- 39 files changed, 213 insertions(+), 85 deletions(-) diff --git a/core/src/main/java/io/kestra/core/exceptions/DeserializationException.java b/core/src/main/java/io/kestra/core/exceptions/DeserializationException.java index 0d38c0dfe55..7ec7985eb9a 100644 --- a/core/src/main/java/io/kestra/core/exceptions/DeserializationException.java +++ b/core/src/main/java/io/kestra/core/exceptions/DeserializationException.java @@ -1,15 +1,22 @@ package io.kestra.core.exceptions; -import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; +import java.io.IOException; public class DeserializationException extends RuntimeException { private static final long serialVersionUID = 1L; - public DeserializationException(InvalidTypeIdException cause) { - super(cause); + private String record; + + public String getRecord() { + return record; } - public DeserializationException(Throwable cause) { + public DeserializationException(IOException cause, String record) { super(cause); + this.record = record; + } + + public DeserializationException(String message) { + super(message); } } diff --git a/core/src/main/java/io/kestra/core/queues/QueueInterface.java b/core/src/main/java/io/kestra/core/queues/QueueInterface.java index fe071752fb3..645efaa21e7 100644 --- a/core/src/main/java/io/kestra/core/queues/QueueInterface.java +++ b/core/src/main/java/io/kestra/core/queues/QueueInterface.java @@ -1,5 +1,8 @@ package io.kestra.core.queues; +import io.kestra.core.exceptions.DeserializationException; +import io.kestra.core.utils.Either; + import java.io.Closeable; import java.util.function.Consumer; @@ -22,17 +25,17 @@ default void delete(T message) throws QueueException { void delete(String consumerGroup, T message) throws QueueException; - default Runnable receive(Consumer consumer) { + default Runnable receive(Consumer> consumer) { return receive((String) null, consumer); } - Runnable receive(String consumerGroup, Consumer consumer); + Runnable receive(String consumerGroup, Consumer> consumer); - default Runnable receive(Class queueType, Consumer consumer) { + default Runnable receive(Class queueType, Consumer> consumer) { return receive(null, queueType, consumer); } - Runnable receive(String consumerGroup, Class queueType, Consumer consumer); + Runnable receive(String consumerGroup, Class queueType, Consumer> consumer); void pause(); } diff --git a/core/src/main/java/io/kestra/core/queues/WorkerJobQueueInterface.java b/core/src/main/java/io/kestra/core/queues/WorkerJobQueueInterface.java index 8a0c2dc8a63..a5e4937c94e 100644 --- a/core/src/main/java/io/kestra/core/queues/WorkerJobQueueInterface.java +++ b/core/src/main/java/io/kestra/core/queues/WorkerJobQueueInterface.java @@ -1,12 +1,14 @@ package io.kestra.core.queues; +import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.runners.WorkerJob; +import io.kestra.core.utils.Either; import java.io.Closeable; import java.util.function.Consumer; public interface WorkerJobQueueInterface extends Closeable { - Runnable receive(String consumerGroup, Class queueType, Consumer consumer); + Runnable receive(String consumerGroup, Class queueType, Consumer> consumer); void pause(); } diff --git a/core/src/main/java/io/kestra/core/runners/FlowListeners.java b/core/src/main/java/io/kestra/core/runners/FlowListeners.java index c665e9f3478..e3c2a708452 100644 --- a/core/src/main/java/io/kestra/core/runners/FlowListeners.java +++ b/core/src/main/java/io/kestra/core/runners/FlowListeners.java @@ -54,7 +54,13 @@ public void run() { if (!this.isStarted) { this.isStarted = true; - this.flowQueue.receive(flow -> { + this.flowQueue.receive(either -> { + if (either.isRight()) { + log.error("Unable to deserialize a flow: {}", either.getRight().getMessage()); + return; + } + + Flow flow = either.getLeft(); Optional previous = this.previous(flow); if (flow.isDeleted()) { diff --git a/core/src/main/java/io/kestra/core/runners/Indexer.java b/core/src/main/java/io/kestra/core/runners/Indexer.java index 829ffb50f3e..e6ad4903f2d 100644 --- a/core/src/main/java/io/kestra/core/runners/Indexer.java +++ b/core/src/main/java/io/kestra/core/runners/Indexer.java @@ -17,7 +17,9 @@ import jakarta.inject.Inject; import jakarta.inject.Named; import jakarta.inject.Singleton; +import lombok.extern.slf4j.Slf4j; +@Slf4j @Singleton @Requires(beans = {ExecutionRepositoryInterface.class, LogRepositoryInterface.class, TriggerRepositoryInterface.class}) public class Indexer implements IndexerInterface { @@ -57,7 +59,13 @@ public void run() { } protected void send(QueueInterface queueInterface, SaveRepositoryInterface saveRepositoryInterface) { - queueInterface.receive(Indexer.class, item -> { + queueInterface.receive(Indexer.class, either -> { + if (either.isRight()) { + log.error("unable to deserialize an item: {}", either.getRight().getMessage()); + return; + } + + T item = either.getLeft(); this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_REQUEST_COUNT, "type", item.getClass().getName()).increment(); this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT, "type", item.getClass().getName()).increment(); diff --git a/core/src/main/java/io/kestra/core/runners/RunnerUtils.java b/core/src/main/java/io/kestra/core/runners/RunnerUtils.java index 22c1a2a7229..81aa4d7aa34 100644 --- a/core/src/main/java/io/kestra/core/runners/RunnerUtils.java +++ b/core/src/main/java/io/kestra/core/runners/RunnerUtils.java @@ -342,8 +342,8 @@ public Execution awaitExecution(Predicate predicate, Runnable executi AtomicReference receive = new AtomicReference<>(); Runnable cancel = this.executionQueue.receive(current -> { - if (predicate.test(current)) { - receive.set(current); + if (predicate.test(current.getLeft())) { + receive.set(current.getLeft()); } }); diff --git a/core/src/main/java/io/kestra/core/runners/Worker.java b/core/src/main/java/io/kestra/core/runners/Worker.java index d748e79f226..1860e81c843 100644 --- a/core/src/main/java/io/kestra/core/runners/Worker.java +++ b/core/src/main/java/io/kestra/core/runners/Worker.java @@ -108,14 +108,14 @@ public Worker(ApplicationContext applicationContext, int thread, String workerGr @Override public void run() { this.executionKilledQueue.receive(executionKilled -> { - if (executionKilled != null) { + if (executionKilled != null && executionKilled.isLeft()) { // @FIXME: the hashset will never expire killed execution - killedExecution.add(executionKilled.getExecutionId()); + killedExecution.add(executionKilled.getLeft().getExecutionId()); synchronized (this) { workerThreadReferences .stream() - .filter(workerThread -> executionKilled.getExecutionId().equals(workerThread.getWorkerTask().getTaskRun().getExecutionId())) + .filter(workerThread -> executionKilled.getLeft().getExecutionId().equals(workerThread.getWorkerTask().getTaskRun().getExecutionId())) .forEach(WorkerThread::kill); } } @@ -124,8 +124,14 @@ public void run() { this.workerJobQueue.receive( this.workerGroup, Worker.class, - workerTask -> { + either -> { executors.execute(() -> { + if (either.isRight()) { + log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage()); + return; + } + + WorkerJob workerTask = either.getLeft(); if (workerTask instanceof WorkerTask task) { handleTask(task); } diff --git a/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java b/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java index 73f077780fc..e56f858c772 100644 --- a/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java +++ b/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java @@ -153,7 +153,13 @@ public void run() { // listen to WorkerTriggerResult from polling triggers this.workerTriggerResultQueue.receive( Scheduler.class, - workerTriggerResult -> { + either -> { + if (either.isRight()) { + log.error("Unable to deserialize a worker trigger result: {}", either.getRight().getMessage()); + return; + } + + WorkerTriggerResult workerTriggerResult = either.getLeft(); if (workerTriggerResult.getSuccess() && workerTriggerResult.getExecution().isPresent()) { var triggerExecution = new SchedulerExecutionWithTrigger( workerTriggerResult.getExecution().get(), diff --git a/core/src/main/java/io/kestra/core/schedulers/DefaultScheduler.java b/core/src/main/java/io/kestra/core/schedulers/DefaultScheduler.java index 7233a013249..2c24d06848d 100644 --- a/core/src/main/java/io/kestra/core/schedulers/DefaultScheduler.java +++ b/core/src/main/java/io/kestra/core/schedulers/DefaultScheduler.java @@ -48,7 +48,13 @@ public void run() { QueueInterface executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED)); QueueInterface triggerQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TRIGGER_NAMED)); - executionQueue.receive(execution -> { + executionQueue.receive(either -> { + if (either.isRight()) { + log.error("Unable to deserialize and execution: {}", either.getRight().getMessage()); + return; + } + + Execution execution = either.getLeft(); if (execution.getTrigger() != null) { Trigger trigger = Await.until(() -> watchingTrigger.get(execution.getId()), Duration.ofSeconds(5)); var flow = flowRepository.findById(execution.getNamespace(), execution.getFlowId()).orElse(null); @@ -61,7 +67,13 @@ public void run() { } }); - triggerQueue.receive(trigger -> { + triggerQueue.receive(either -> { + if (either.isRight()) { + log.error("Unable to deserialize a trigger: {}", either.getRight().getMessage()); + return; + } + + Trigger trigger = either.getLeft(); if (trigger != null && trigger.getExecutionId() != null) { this.watchingTrigger.put(trigger.getExecutionId(), trigger); } diff --git a/core/src/main/java/io/kestra/core/tasks/flows/Template.java b/core/src/main/java/io/kestra/core/tasks/flows/Template.java index 6459a4bbf9c..981ebdce42f 100644 --- a/core/src/main/java/io/kestra/core/tasks/flows/Template.java +++ b/core/src/main/java/io/kestra/core/tasks/flows/Template.java @@ -191,7 +191,7 @@ public Template.Output outputs(RunContext runContext, Execution execution, TaskR protected io.kestra.core.models.templates.Template findTemplate(ApplicationContext applicationContext) throws IllegalVariableEvaluationException { if (!applicationContext.containsBean(TemplateExecutorInterface.class)) { - throw new DeserializationException(new Exception("Templates are disabled, please check your configuration")); + throw new DeserializationException("Templates are disabled, please check your configuration"); } TemplateExecutorInterface templateExecutor = applicationContext.getBean(TemplateExecutorInterface.class); diff --git a/core/src/test/java/io/kestra/core/runners/FlowTriggerCaseTest.java b/core/src/test/java/io/kestra/core/runners/FlowTriggerCaseTest.java index 47dc3e3591e..1cf7a87556b 100644 --- a/core/src/test/java/io/kestra/core/runners/FlowTriggerCaseTest.java +++ b/core/src/test/java/io/kestra/core/runners/FlowTriggerCaseTest.java @@ -35,7 +35,8 @@ public void trigger() throws InterruptedException, TimeoutException { AtomicReference flowListener = new AtomicReference<>(); AtomicReference flowListenerNoInput = new AtomicReference<>(); - executionQueue.receive(execution -> { + executionQueue.receive(either -> { + Execution execution = either.getLeft(); if (execution.getState().getCurrent() == State.Type.SUCCESS) { if (flowListenerNoInput.get() == null && execution.getFlowId().equals("trigger-flow-listener-no-inputs")) { flowListenerNoInput.set(execution); diff --git a/core/src/test/java/io/kestra/core/runners/MultipleConditionTriggerCaseTest.java b/core/src/test/java/io/kestra/core/runners/MultipleConditionTriggerCaseTest.java index bd92823eafe..0e37819d6e7 100644 --- a/core/src/test/java/io/kestra/core/runners/MultipleConditionTriggerCaseTest.java +++ b/core/src/test/java/io/kestra/core/runners/MultipleConditionTriggerCaseTest.java @@ -41,7 +41,8 @@ public void trigger() throws InterruptedException, TimeoutException { ConcurrentHashMap ended = new ConcurrentHashMap<>(); Flow flow = flowRepository.findById("io.kestra.tests", "trigger-multiplecondition-listener").orElseThrow(); - executionQueue.receive(execution -> { + executionQueue.receive(either -> { + Execution execution = either.getLeft(); synchronized (ended) { if (execution.getState().getCurrent() == State.Type.SUCCESS) { if (!ended.containsKey(execution.getId())) { @@ -89,8 +90,9 @@ public void failed() throws InterruptedException, TimeoutException { CountDownLatch countDownLatch = new CountDownLatch(3); ConcurrentHashMap ended = new ConcurrentHashMap<>(); - executionQueue.receive(execution -> { + executionQueue.receive(either -> { synchronized (ended) { + Execution execution = either.getLeft(); if (execution.getState().getCurrent().isTerminated()) { if (!ended.containsKey(execution.getId())) { ended.put(execution.getId(), execution); diff --git a/core/src/test/java/io/kestra/core/runners/RetryTest.java b/core/src/test/java/io/kestra/core/runners/RetryTest.java index bc2c1d54d78..8831346ec98 100644 --- a/core/src/test/java/io/kestra/core/runners/RetryTest.java +++ b/core/src/test/java/io/kestra/core/runners/RetryTest.java @@ -40,9 +40,6 @@ void retrySuccessAtFirstAttempt() throws TimeoutException { @Test void retryFailed() throws TimeoutException { - List executions = new ArrayList<>(); - executionQueue.receive(executions::add); - Execution execution = runnerUtils.runOne("io.kestra.tests", "retry-failed"); assertThat(execution.getTaskRunList(), hasSize(2)); diff --git a/core/src/test/java/io/kestra/core/runners/RunContextTest.java b/core/src/test/java/io/kestra/core/runners/RunContextTest.java index 4920225a5fc..eaa8e9a9966 100644 --- a/core/src/test/java/io/kestra/core/runners/RunContextTest.java +++ b/core/src/test/java/io/kestra/core/runners/RunContextTest.java @@ -54,7 +54,7 @@ class RunContextTest extends AbstractMemoryRunnerTest { void logs() throws TimeoutException { List logs = new CopyOnWriteArrayList<>(); LogEntry matchingLog; - workerTaskLogQueue.receive(logs::add); + workerTaskLogQueue.receive(either -> logs.add(either.getLeft())); Execution execution = runnerUtils.runOne("io.kestra.tests", "logs"); @@ -79,7 +79,7 @@ void logs() throws TimeoutException { @Test void inputsLarge() throws TimeoutException { List logs = new CopyOnWriteArrayList<>(); - workerTaskLogQueue.receive(logs::add); + workerTaskLogQueue.receive(either -> logs.add(either.getLeft())); char[] chars = new char[1024 * 11]; Arrays.fill(chars, 'a'); diff --git a/core/src/test/java/io/kestra/core/runners/TaskDefaultsCaseTest.java b/core/src/test/java/io/kestra/core/runners/TaskDefaultsCaseTest.java index b7b98d08394..6161a0bd52c 100644 --- a/core/src/test/java/io/kestra/core/runners/TaskDefaultsCaseTest.java +++ b/core/src/test/java/io/kestra/core/runners/TaskDefaultsCaseTest.java @@ -77,7 +77,7 @@ public void taskDefaults() throws TimeoutException { public void invalidTaskDefaults() throws TimeoutException { List logs = new CopyOnWriteArrayList<>(); - logQueue.receive(logs::add); + logQueue.receive(either -> logs.add(either.getLeft())); Execution execution = runnerUtils.runOne("io.kestra.tests", "invalid-task-defaults", Duration.ofSeconds(60)); diff --git a/core/src/test/java/io/kestra/core/runners/WorkerTest.java b/core/src/test/java/io/kestra/core/runners/WorkerTest.java index ba8b35be4c2..f5e7cec603d 100644 --- a/core/src/test/java/io/kestra/core/runners/WorkerTest.java +++ b/core/src/test/java/io/kestra/core/runners/WorkerTest.java @@ -64,7 +64,7 @@ void success() throws TimeoutException { worker.run(); AtomicReference workerTaskResult = new AtomicReference<>(null); - workerTaskResultQueue.receive(workerTaskResult::set); + workerTaskResultQueue.receive(either -> workerTaskResult.set(either.getLeft())); workerTaskQueue.emit(workerTask(1000)); @@ -89,7 +89,7 @@ void failOnWorkerTaskWithFlowable() throws TimeoutException { worker.run(); AtomicReference workerTaskResult = new AtomicReference<>(null); - workerTaskResultQueue.receive(workerTaskResult::set); + workerTaskResultQueue.receive(either -> workerTaskResult.set(either.getLeft())); Pause pause = Pause.builder() .type(Pause.class.getName()) @@ -133,13 +133,13 @@ void failOnWorkerTaskWithFlowable() throws TimeoutException { @Test void killed() throws InterruptedException, TimeoutException { List logs = new CopyOnWriteArrayList<>(); - workerTaskLogQueue.receive(logs::add); + workerTaskLogQueue.receive(either -> logs.add(either.getLeft())); Worker worker = new Worker(applicationContext, 8, null); worker.run(); List workerTaskResult = new ArrayList<>(); - workerTaskResultQueue.receive(workerTaskResult::add); + workerTaskResultQueue.receive(either -> workerTaskResult.add(either.getLeft())); WorkerTask workerTask = workerTask(999000); diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java index afbc9f8232e..fef5505b3b8 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java @@ -1,6 +1,7 @@ package io.kestra.core.schedulers; import io.kestra.core.models.conditions.types.DayWeekInMonthCondition; +import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; import io.kestra.core.models.triggers.Trigger; @@ -79,7 +80,8 @@ void schedule() throws Exception { triggerState); Worker worker = new TestMethodScopedWorker(applicationContext, 8, null)) { // wait for execution - Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, execution -> { + Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, either -> { + Execution execution = either.getLeft(); if (execution.getState().getCurrent() == State.Type.CREATED) { executionQueue.emit(execution.withState(State.Type.SUCCESS)); diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java index ed4339ae75b..ef53b3d0740 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java @@ -1,5 +1,6 @@ package io.kestra.core.schedulers; +import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; import io.kestra.core.models.triggers.types.Schedule; @@ -10,6 +11,7 @@ import org.junitpioneer.jupiter.RetryingTest; import org.junitpioneer.jupiter.RetryingTest; +import java.lang.reflect.Executable; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; import java.util.*; @@ -76,7 +78,8 @@ void schedule() throws Exception { try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy); Worker worker = new TestMethodScopedWorker(applicationContext, 8, null)) { // wait for execution - Runnable assertionStop = executionQueue.receive(execution -> { + Runnable assertionStop = executionQueue.receive(either -> { + Execution execution = either.getLeft(); assertThat(execution.getInputs().get("testInputs"), is("test-inputs")); assertThat(execution.getInputs().get("def"), is("awesome")); diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java index 7f471ca39f4..8337fcbd6ba 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java @@ -75,7 +75,8 @@ void thread() throws Exception { AtomicReference last = new AtomicReference<>(); // wait for execution - Runnable assertionStop = executionQueue.receive(SchedulerThreadTest.class, execution -> { + Runnable assertionStop = executionQueue.receive(SchedulerThreadTest.class, either -> { + Execution execution = either.getLeft(); last.set(execution); assertThat(execution.getFlowId(), is(flow.getId())); diff --git a/core/src/test/java/io/kestra/core/services/ConditionServiceTest.java b/core/src/test/java/io/kestra/core/services/ConditionServiceTest.java index eb5de358f90..c3aaf85f424 100644 --- a/core/src/test/java/io/kestra/core/services/ConditionServiceTest.java +++ b/core/src/test/java/io/kestra/core/services/ConditionServiceTest.java @@ -67,7 +67,7 @@ void valid() { @Test void exception() { List logs = new CopyOnWriteArrayList<>(); - logQueue.receive(logs::add); + logQueue.receive(either -> logs.add(either.getLeft())); Flow flow = TestsUtils.mockFlow(); Schedule schedule = Schedule.builder().id("unit").type(Schedule.class.getName()).cron("0 0 1 * *").build(); diff --git a/core/src/test/java/io/kestra/core/tasks/flows/EachSequentialTest.java b/core/src/test/java/io/kestra/core/tasks/flows/EachSequentialTest.java index f8c73a11a4b..1b235ef7853 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/EachSequentialTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/EachSequentialTest.java @@ -91,7 +91,7 @@ void eachNull() throws TimeoutException { public static void eachNullTest(RunnerUtils runnerUtils, QueueInterface logQueue) throws TimeoutException { List logs = new CopyOnWriteArrayList<>(); - logQueue.receive(logs::add); + logQueue.receive(either -> logs.add(either.getLeft())); Execution execution = runnerUtils.runOne("io.kestra.tests", "each-null", Duration.ofSeconds(60)); diff --git a/core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java b/core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java index 85170232452..5805453b3e9 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java @@ -45,7 +45,8 @@ void run(String input, State.Type fromState, State.Type triggerState, int count CountDownLatch countDownLatch = new CountDownLatch(1); AtomicReference triggered = new AtomicReference<>(); - executionQueue.receive(execution -> { + executionQueue.receive(either -> { + Execution execution = either.getLeft(); if (execution.getFlowId().equals("switch") && execution.getState().getCurrent().isTerminated()) { countDownLatch.countDown(); triggered.set(execution); diff --git a/core/src/test/java/io/kestra/core/tasks/flows/TemplateTest.java b/core/src/test/java/io/kestra/core/tasks/flows/TemplateTest.java index df02ee28c0a..4b115be8679 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/TemplateTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/TemplateTest.java @@ -53,7 +53,7 @@ public static void withTemplate(RunnerUtils runnerUtils, TemplateRepositoryInter "flows/templates/with-template.yaml"))); List logs = new CopyOnWriteArrayList<>(); - logQueue.receive(logs::add); + logQueue.receive(either -> logs.add(either.getLeft())); Execution execution = runnerUtils.runOne( @@ -84,7 +84,7 @@ public static void withFailedTemplate(RunnerUtils runnerUtils, TemplateRepositor "flows/templates/with-failed-template.yaml"))); List logs = new CopyOnWriteArrayList<>(); - logQueue.receive(logs::add); + logQueue.receive(either -> logs.add(either.getLeft())); Execution execution = runnerUtils.runOne("io.kestra.tests", "with-failed-template", Duration.ofSeconds(60)); diff --git a/core/src/test/java/io/kestra/core/tasks/flows/TimeoutTest.java b/core/src/test/java/io/kestra/core/tasks/flows/TimeoutTest.java index cf9cbea1157..14ee82a3ef1 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/TimeoutTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/TimeoutTest.java @@ -39,7 +39,7 @@ class TimeoutTest extends AbstractMemoryRunnerTest { @Test void timeout() throws TimeoutException { List logs = new CopyOnWriteArrayList<>(); - workerTaskLogQueue.receive(logs::add); + workerTaskLogQueue.receive(either -> logs.add(either.getLeft())); Flow flow = Flow.builder() .id(IdUtils.create()) diff --git a/core/src/test/java/io/kestra/core/tasks/flows/VariablesTest.java b/core/src/test/java/io/kestra/core/tasks/flows/VariablesTest.java index a3bbdc8f179..062347d6841 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/VariablesTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/VariablesTest.java @@ -42,7 +42,7 @@ void recursiveVars() throws TimeoutException { @Test void invalidVars() throws TimeoutException { List logs = new CopyOnWriteArrayList<>(); - workerTaskLogQueue.receive(logs::add); + workerTaskLogQueue.receive(either -> logs.add(either.getLeft())); Execution execution = runnerUtils.runOne("io.kestra.tests", "variables-invalid"); diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java index 7dbfb4b0258..18fde250227 100644 --- a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java @@ -1,9 +1,11 @@ package io.kestra.runner.h2; +import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.WorkerJobQueueInterface; import io.kestra.core.runners.WorkerJob; +import io.kestra.core.utils.Either; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; @@ -21,7 +23,7 @@ public H2WorkerJobQueue(ApplicationContext applicationContext) { } @Override - public Runnable receive(String consumerGroup, Class queueType, Consumer consumer) { + public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { return workerTaskQueue.receive(consumerGroup, queueType, consumer); } diff --git a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerJobQueue.java b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerJobQueue.java index b7bd21c1e4d..8211264f010 100644 --- a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerJobQueue.java +++ b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlWorkerJobQueue.java @@ -1,9 +1,11 @@ package io.kestra.runner.mysql; +import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.WorkerJobQueueInterface; import io.kestra.core.runners.WorkerJob; +import io.kestra.core.utils.Either; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; @@ -21,7 +23,7 @@ public MysqlWorkerJobQueue(ApplicationContext applicationContext) { } @Override - public Runnable receive(String consumerGroup, Class queueType, Consumer consumer) { + public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { return workerTaskQueue.receive(consumerGroup, queueType, consumer); } diff --git a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerJobQueue.java b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerJobQueue.java index 8b9359464c1..831d9b8d65d 100644 --- a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerJobQueue.java +++ b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresWorkerJobQueue.java @@ -1,9 +1,11 @@ package io.kestra.runner.postgres; +import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.WorkerJobQueueInterface; import io.kestra.core.runners.WorkerJob; +import io.kestra.core.utils.Either; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; @@ -21,7 +23,7 @@ public PostgresWorkerJobQueue(ApplicationContext applicationContext) { } @Override - public Runnable receive(String consumerGroup, Class queueType, Consumer consumer) { + public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { return workerTaskQueue.receive(consumerGroup, queueType, consumer); } diff --git a/jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java b/jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java index 564f1b22f61..baabb5c7089 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java @@ -1,6 +1,5 @@ - package io.kestra.jdbc; +package io.kestra.jdbc; -import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; import com.google.common.collect.ImmutableMap; import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.models.executions.metrics.MetricAggregation; @@ -164,10 +163,8 @@ public Instant getDate(R record, String groupByType) { public T deserialize(String record) { try { return JacksonMapper.ofJson().readValue(record, cls); - } catch (InvalidTypeIdException e) { - throw new DeserializationException(e); } catch (IOException e) { - throw new DeserializationException(e); + throw new DeserializationException(e, record); } } diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcFlowRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcFlowRepository.java index 4d24e369c0f..8f10e1f2ff6 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcFlowRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcFlowRepository.java @@ -69,7 +69,7 @@ public AbstractJdbcFlowRepository(io.kestra.jdbc.AbstractJdbcRepository jd .tasks(List.of()) .build(); } catch (JsonProcessingException ex) { - throw new DeserializationException(ex); + throw new DeserializationException(ex, source); } } }); diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index c6ad983e9cc..e9fe48b8688 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -1,5 +1,6 @@ package io.kestra.jdbc.runner; +import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.exceptions.InternalException; import io.kestra.core.metrics.MetricRegistry; import io.kestra.core.models.executions.Execution; @@ -20,6 +21,7 @@ import io.kestra.core.tasks.flows.Template; import io.kestra.core.topologies.FlowTopologyService; import io.kestra.core.utils.Await; +import io.kestra.core.utils.Either; import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository; import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository; import io.micronaut.context.ApplicationContext; @@ -173,11 +175,12 @@ public void run() { flowQueue.receive( FlowTopology.class, - flow -> { - if (flow == null || flow instanceof FlowWithException) { + either -> { + if (either == null || either.isRight() || either.getLeft() == null || either.getLeft() instanceof FlowWithException) { return; } + Flow flow = either.getLeft(); flowTopologyRepository.save( flow, (flow.isDeleted() ? @@ -195,7 +198,13 @@ public void run() { ); } - private void executionQueue(Execution message) { + private void executionQueue(Either either) { + if (either.isRight()) { + log.error("Unable to deserialize an execution: {}", either.getRight().getMessage()); + return; + } + + Execution message = either.getLeft(); if (skipExecutionService.skipExecution(message.getId())) { log.warn("Skipping execution {}", message.getId()); return; @@ -328,7 +337,13 @@ private void executionQueue(Execution message) { } - private void workerTaskResultQueue(WorkerTaskResult message) { + private void workerTaskResultQueue(Either either) { + if (either.isRight()) { + log.error("Unable to deserialize a worker task result: {}", either.getRight().getMessage()); + return; + } + + WorkerTaskResult message = either.getLeft(); if (skipExecutionService.skipExecution(message.getTaskRun().getTaskId())) { log.warn("Skipping execution {}", message.getTaskRun().getExecutionId()); return; diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java index 27508b5dba8..3cfb41aea82 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java @@ -3,10 +3,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.CaseFormat; +import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.queues.QueueException; import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.QueueService; import io.kestra.core.serializers.JacksonMapper; +import io.kestra.core.utils.Either; import io.kestra.core.utils.ExecutorsUtils; import io.kestra.core.utils.IdUtils; import io.kestra.jdbc.JooqDSLContextWrapper; @@ -142,7 +144,7 @@ public void delete(String consumerGroup, T message) throws QueueException { abstract protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List offsets); @Override - public Runnable receive(String consumerGroup, Consumer consumer) { + public Runnable receive(String consumerGroup, Consumer> consumer) { AtomicInteger maxOffset = new AtomicInteger(); // fetch max offset @@ -164,7 +166,7 @@ public Runnable receive(String consumerGroup, Consumer consumer) { Result result = this.receiveFetch(ctx, consumerGroup, maxOffset.get()); - if (result.size() > 0) { + if (!result.isEmpty()) { List offsets = result.map(record -> record.get("offset", Integer.class)); maxOffset.set(offsets.get(offsets.size() - 1)); @@ -180,7 +182,7 @@ public Runnable receive(String consumerGroup, Consumer consumer) { } @Override - public Runnable receive(String consumerGroup, Class queueType, Consumer consumer) { + public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { String queueName = queueName(queueType); return this.poll(() -> { @@ -189,7 +191,7 @@ public Runnable receive(String consumerGroup, Class queueType, Consumer co Result result = this.receiveFetch(ctx, consumerGroup, queueName); - if (result.size() > 0) { + if (!result.isEmpty()) { this.updateGroupOffsets( ctx, @@ -252,13 +254,13 @@ private Runnable poll(Supplier runnable) { }; } - private void send(Result fetch, Consumer consumer) { + private void send(Result fetch, Consumer> consumer) { fetch .map(record -> { try { - return JacksonMapper.ofJson().readValue(record.get("value", String.class), cls); + return Either.left(JacksonMapper.ofJson().readValue(record.get("value", String.class), cls)); } catch (JsonProcessingException e) { - throw new RuntimeException(e); + return Either.right(new DeserializationException(e, record.get("value", String.class))); } }) .forEach(consumer); diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java index fc58854c5bd..00ea2c462a3 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java @@ -53,7 +53,13 @@ public void run() { executionQueue.receive( Scheduler.class, - execution -> { + either -> { + if (either.isRight()) { + log.error("Unable to dserialize an execution: {}", either.getRight().getMessage()); + return; + } + + Execution execution = either.getLeft(); if (execution.getTrigger() != null) { var flow = flowRepository.findById(execution.getNamespace(), execution.getFlowId()).orElse(null); if (execution.isDeleted() || conditionService.isTerminatedWithListeners(flow, execution)) { diff --git a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java index dc4c6633815..38193c81339 100644 --- a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java +++ b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java @@ -34,7 +34,8 @@ abstract public class JdbcQueueTest { void noGroup() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2); - flowQueue.receive(flow -> { + flowQueue.receive(either -> { + Flow flow = either.getLeft(); if (flow.getNamespace().equals("io.kestra.f1")) { flowQueue.emit(builder("io.kestra.f2")); } @@ -53,7 +54,8 @@ void noGroup() throws InterruptedException { void withGroup() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2); - flowQueue.receive("consumer_group", flow -> { + flowQueue.receive("consumer_group", either -> { + Flow flow = either.getLeft(); if (flow.getNamespace().equals("io.kestra.f1")) { flowQueue.emit("consumer_group", builder("io.kestra.f2")); } @@ -76,7 +78,8 @@ void withType() throws InterruptedException { AtomicReference namespace = new AtomicReference<>(); CountDownLatch countDownLatch = new CountDownLatch(1); - flowQueue.receive(Indexer.class, flow -> { + flowQueue.receive(Indexer.class, either -> { + Flow flow = either.getLeft(); namespace.set(flow.getNamespace()); countDownLatch.countDown(); }); @@ -89,7 +92,8 @@ void withType() throws InterruptedException { flowQueue.emit(builder("io.kestra.f2")); CountDownLatch countDownLatch2 = new CountDownLatch(1); - flowQueue.receive(Indexer.class, flow -> { + flowQueue.receive(Indexer.class, either -> { + Flow flow = either.getLeft(); namespace.set(flow.getNamespace()); countDownLatch2.countDown(); }); @@ -106,7 +110,8 @@ void withGroupAndType() throws InterruptedException { AtomicReference namespace = new AtomicReference<>(); CountDownLatch countDownLatch = new CountDownLatch(1); - flowQueue.receive("consumer_group", Indexer.class, flow -> { + flowQueue.receive("consumer_group", Indexer.class, either -> { + Flow flow = either.getLeft(); namespace.set(flow.getNamespace()); countDownLatch.countDown(); }); @@ -119,7 +124,8 @@ void withGroupAndType() throws InterruptedException { flowQueue.emit("consumer_group", builder("io.kestra.f2")); CountDownLatch countDownLatch2 = new CountDownLatch(1); - flowQueue.receive("consumer_group", Indexer.class, flow -> { + flowQueue.receive("consumer_group", Indexer.class, either -> { + Flow flow = either.getLeft(); namespace.set(flow.getNamespace()); countDownLatch2.countDown(); }); diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java index a27104637dc..8db5e594a4f 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java @@ -1,5 +1,6 @@ package io.kestra.runner.memory; +import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.exceptions.InternalException; import io.kestra.core.metrics.MetricRegistry; import io.kestra.core.models.executions.Execution; @@ -14,6 +15,7 @@ import io.kestra.core.runners.*; import io.kestra.core.services.*; import io.kestra.core.tasks.flows.Template; +import io.kestra.core.utils.Either; import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import jakarta.inject.Named; @@ -109,7 +111,13 @@ public void run() { this.workerTaskResultQueue.receive(MemoryExecutor.class, this::workerTaskResultQueue); } - private void executionQueue(Execution message) { + private void executionQueue(Either either) { + if (either.isRight()) { + log.error("Unable to deserialize the execution: {}", either.getRight().getMessage()); + return; + } + + Execution message = either.getLeft(); if (skipExecutionService.skipExecution(message.getId())) { log.warn("Skipping execution {}", message.getId()); return; @@ -297,7 +305,14 @@ private void toExecution(Executor executor) { } } - private void workerTaskResultQueue(WorkerTaskResult message) { + private void workerTaskResultQueue(Either either) { + if (either.isRight()) { + log.error("Unable to deserialize the worker task result: {}", either.getRight().getMessage()); + return; + } + + WorkerTaskResult message = either.getLeft(); + if (skipExecutionService.skipExecution(message.getTaskRun().getExecutionId())) { log.warn("Skipping execution {}", message.getTaskRun().getExecutionId()); return; diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryQueue.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryQueue.java index cd5195dca7d..8bf6a355c9c 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryQueue.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryQueue.java @@ -1,8 +1,11 @@ package io.kestra.runner.memory; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.hash.Hashing; +import io.kestra.core.exceptions.DeserializationException; +import io.kestra.core.utils.Either; import io.micronaut.context.ApplicationContext; import lombok.extern.slf4j.Slf4j; import io.kestra.core.queues.QueueService; @@ -26,7 +29,7 @@ public class MemoryQueue implements QueueInterface { private final QueueService queueService; private final Class cls; - private final Map>> queues = new ConcurrentHashMap<>(); + private final Map>>> queues = new ConcurrentHashMap<>(); public MemoryQueue(Class cls, ApplicationContext applicationContext) { if (poolExecutor == null) { @@ -55,7 +58,7 @@ private void produce(String key, T message) { this.queues .forEach((consumerGroup, consumers) -> { poolExecutor.execute(() -> { - Consumer consumer; + Consumer> consumer; synchronized (this) { if (consumers.size() == 0) { @@ -67,13 +70,15 @@ private void produce(String key, T message) { } } - // we force serialization to be a the same case than an another queue with serialization + // we force serialization to be at the same case than an another queue implementation with serialization // this enabled debugging classLoader + String source = null; try { - T serialized = message == null ? null : mapper.readValue(mapper.writeValueAsString(message), this.cls); - consumer.accept(serialized); + source = mapper.writeValueAsString(message); + T serialized = message == null ? null : mapper.readValue(source, this.cls); + consumer.accept(Either.left(serialized)); } catch (JsonProcessingException e) { - throw new RuntimeException(e); + consumer.accept(Either.right(new DeserializationException(e, source))); } }); }); @@ -95,12 +100,12 @@ public void delete(String consumerGroup, T message) throws QueueException { } @Override - public Runnable receive(String consumerGroup, Consumer consumer) { + public Runnable receive(String consumerGroup, Consumer> consumer) { return this.receive(consumerGroup, null, consumer); } @Override - public synchronized Runnable receive(String consumerGroup, Class queueType, Consumer consumer) { + public synchronized Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { String queueName; if (queueType == null) { queueName = UUID.randomUUID().toString(); diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryWorkerJobQueue.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryWorkerJobQueue.java index c82b24e6c9c..566d2bba921 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryWorkerJobQueue.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryWorkerJobQueue.java @@ -1,5 +1,7 @@ package io.kestra.runner.memory; +import io.kestra.core.exceptions.DeserializationException; +import io.kestra.core.utils.Either; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; import io.kestra.core.queues.QueueFactoryInterface; @@ -21,7 +23,7 @@ public MemoryWorkerJobQueue(ApplicationContext applicationContext) { } @Override - public Runnable receive(String consumerGroup, Class queueType, Consumer consumer) { + public Runnable receive(String consumerGroup, Class queueType, Consumer> consumer) { return workerTaskQueue.receive(consumerGroup, queueType, consumer); } diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java index 107ba597c67..6a8705530d3 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java @@ -463,7 +463,13 @@ public Execution trigger( return Single .create(emitter -> { - Runnable receive = this.executionQueue.receive(item -> { + Runnable receive = this.executionQueue.receive(either -> { + if (either.isRight()) { + log.error("Unable to deserialize the execution: {}", either.getRight().getMessage()); + return; + } + + Execution item = either.getLeft(); if (item.getId().equals(current.getId())) { Flow flow = flowRepository.findByExecution(current); @@ -921,7 +927,13 @@ public Flowable> follow( emitter.onNext(Event.of(execution).id("progress")); // consume new value - Runnable receive = this.executionQueue.receive(current -> { + Runnable receive = this.executionQueue.receive(either -> { + if (either.isRight()) { + log.error("Unable to deserialize the execution: {}", either.getRight().getMessage()); + return; + } + + Execution current = either.getLeft(); if (current.getId().equals(executionId)) { emitter.onNext(Event.of(current).id("progress")); diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/LogController.java b/webserver/src/main/java/io/kestra/webserver/controllers/LogController.java index 0c3acc5fa14..30af88407c9 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/LogController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/LogController.java @@ -131,7 +131,12 @@ public Flowable> follow( .forEach(logEntry -> emitter.onNext(Event.of(logEntry).id("progress"))); // consume in realtime - Runnable receive = this.logQueue.receive(current -> { + Runnable receive = this.logQueue.receive(either -> { + if (either.isRight()) { + return; + } + + LogEntry current = either.getLeft(); if (current.getExecutionId() != null && current.getExecutionId().equals(executionId)) { if (levels.contains(current.getLevel().name())) { emitter.onNext(Event.of(current).id("progress"));