diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index 0f4b46508a7..7cf65c305a8 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -284,7 +284,8 @@ private void executionQueue(Either either) } Executor result = executionRepository.lock(message.getId(), pair -> { - Execution execution = pair.getLeft(); + // as tasks can be processed in parallel, we must merge the execution from the database to the one we received in the queue + Execution execution = mergeExecution(pair.getLeft(), message); ExecutorState executorState = pair.getRight(); final Flow flow = transform(this.flowRepository.findByExecution(execution), execution); @@ -296,7 +297,7 @@ private void executionQueue(Either either) executor = executorService.process(executor); - if (executor.getNexts().size() > 0 && deduplicateNexts(execution, executorState, executor.getNexts())) { + if (!executor.getNexts().isEmpty() && deduplicateNexts(execution, executorState, executor.getNexts())) { executor.withExecution( executorService.onNexts(executor.getFlow(), executor.getExecution(), executor.getNexts()), "onNexts" @@ -304,7 +305,7 @@ private void executionQueue(Either either) } // worker task - if (executor.getWorkerTasks().size() > 0) { + if (!executor.getWorkerTasks().isEmpty()) { List workerTasksDedup = executor .getWorkerTasks() .stream() @@ -326,26 +327,26 @@ private void executionQueue(Either either) } // worker tasks results - if (executor.getWorkerTaskResults().size() > 0) { + if (!executor.getWorkerTaskResults().isEmpty()) { executor.getWorkerTaskResults() .forEach(workerTaskResultQueue::emit); } // schedulerDelay - if (executor.getExecutionDelays().size() > 0) { + if (!executor.getExecutionDelays().isEmpty()) { executor.getExecutionDelays() .forEach(executionDelay -> abstractExecutionDelayStorage.save(executionDelay)); } // worker task execution watchers - if (executor.getWorkerTaskExecutions().size() > 0) { + if (!executor.getWorkerTaskExecutions().isEmpty()) { workerTaskExecutionStorage.save(executor.getWorkerTaskExecutions()); List workerTasksExecutionDedup = executor .getWorkerTaskExecutions() .stream() .filter(workerTaskExecution -> this.deduplicateWorkerTaskExecution(execution, executorState, workerTaskExecution.getTaskRun())) - .collect(Collectors.toList()); + .toList(); workerTasksExecutionDedup .forEach(workerTaskExecution -> { @@ -409,6 +410,25 @@ private void executionQueue(Either either) } } + private Execution mergeExecution(Execution locked, Execution message) { + Execution newExecution = locked; + if (message.getTaskRunList() != null) { + for (TaskRun taskRun : message.getTaskRunList()) { + try { + TaskRun existing = newExecution.findTaskRunByTaskRunId(taskRun.getId()); + // if the taskrun from the message is newer than the one from the execution, we replace it! + if (existing != null && taskRun.getState().maxDate().isAfter(existing.getState().maxDate())) { + newExecution = newExecution.withTaskRun(taskRun); + } + } + catch (InternalException e) { + throw new RuntimeException(e); + } + } + } + return newExecution; + } + private void workerTaskResultQueue(Either either) { if (either.isRight()) { log.error("Unable to deserialize a worker task result: {}", either.getRight().getMessage()); @@ -425,20 +445,6 @@ private void workerTaskResultQueue(Either { Execution execution = pair.getLeft(); Executor current = new Executor(execution, null); @@ -459,10 +465,23 @@ private void workerTaskResultQueue(Either taskRun.getState().isFailed()).count(), greaterThanOrEqualTo(2L)); // Should be 3 + assertThat(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getState().isFailed()).count(), is(3L)); } @Test