From 3dbbd1c82db3deee777860b7bd366d49ac1bbf82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Tue, 10 Oct 2023 12:08:52 +0200 Subject: [PATCH] fix(jdbc,runner-memory): worker task execution has already been created if we don't wait on a flow task --- .../java/io/kestra/jdbc/runner/JdbcExecutor.java | 13 ++++++++----- .../io/kestra/runner/memory/MemoryExecutor.java | 14 +++++++++----- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index 04ddfef23ac..33776e5fbb3 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -386,13 +386,16 @@ private void executionQueue(Either either) if (conditionService.isTerminatedWithListeners(flow, execution)) { workerTaskExecutionStorage.get(execution.getId()) .ifPresent(workerTaskExecution -> { - Flow workerTaskFlow = this.flowRepository.findByExecution(execution); + // If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service. + if (workerTaskExecution.getTask().getWait()) { + Flow workerTaskFlow = this.flowRepository.findByExecution(execution); - WorkerTaskResult workerTaskResult = workerTaskExecution - .getTask() - .createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution); + WorkerTaskResult workerTaskResult = workerTaskExecution + .getTask() + .createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution); - this.workerTaskResultQueue.emit(workerTaskResult); + this.workerTaskResultQueue.emit(workerTaskResult); + } workerTaskExecutionStorage.delete(workerTaskExecution); }); diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java index 8db5e594a4f..885cf42d0fe 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java @@ -251,13 +251,17 @@ private void handleExecution(ExecutionState state) { // worker task execution if (conditionService.isTerminatedWithListeners(flow, execution) && WORKERTASKEXECUTIONS_WATCHER.containsKey(execution.getId())) { WorkerTaskExecution workerTaskExecution = WORKERTASKEXECUTIONS_WATCHER.get(execution.getId()); - Flow workerTaskFlow = this.flowRepository.findByExecution(execution); - WorkerTaskResult workerTaskResult = workerTaskExecution - .getTask() - .createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution); + // If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service. + if (workerTaskExecution.getTask().getWait()) { + Flow workerTaskFlow = this.flowRepository.findByExecution(execution); - this.workerTaskResultQueue.emit(workerTaskResult); + WorkerTaskResult workerTaskResult = workerTaskExecution + .getTask() + .createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution); + + this.workerTaskResultQueue.emit(workerTaskResult); + } WORKERTASKEXECUTIONS_WATCHER.remove(execution.getId()); }