From ad54f84b1517e96aae23aaf04723e688a6667b99 Mon Sep 17 00:00:00 2001 From: "brian.mulier" Date: Wed, 13 Sep 2023 19:52:19 +0200 Subject: [PATCH] fix(core): generate attempts for subflow tasks closes #2098 --- .../io/kestra/core/runners/ExecutorService.java | 7 ++++++- .../main/java/io/kestra/core/tasks/flows/Flow.java | 14 +++++++------- .../io/kestra/core/tasks/flows/FlowCaseTest.java | 2 ++ 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/io/kestra/core/runners/ExecutorService.java b/core/src/main/java/io/kestra/core/runners/ExecutorService.java index 63717e11438..de656bbf80a 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutorService.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutorService.java @@ -6,6 +6,7 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.NextTaskRun; import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.executions.TaskRunAttempt; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; import io.kestra.core.models.tasks.FlowableTask; @@ -598,7 +599,11 @@ private Executor handleFlowTask(final Executor executor) { } } catch (Exception e) { workerTaskResults.add(WorkerTaskResult.builder() - .taskRun(workerTask.getTaskRun().withState(State.Type.FAILED)) + .taskRun(workerTask.getTaskRun().withState(State.Type.FAILED) + .withAttempts(Collections.singletonList( + TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build() + )) + ) .build() ); executor.withException(e, "handleFlowTask"); diff --git a/core/src/main/java/io/kestra/core/tasks/flows/Flow.java b/core/src/main/java/io/kestra/core/tasks/flows/Flow.java index 182cf36d9e2..a5e5eed0f37 100644 --- a/core/src/main/java/io/kestra/core/tasks/flows/Flow.java +++ b/core/src/main/java/io/kestra/core/tasks/flows/Flow.java @@ -8,6 +8,7 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.ExecutionTrigger; import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.executions.TaskRunAttempt; import io.kestra.core.models.flows.FlowWithException; import io.kestra.core.models.flows.State; import io.kestra.core.models.tasks.RunnableTask; @@ -19,11 +20,7 @@ import javax.annotation.Nullable; import javax.validation.constraints.NotNull; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; @SuperBuilder @ToString @@ -188,9 +185,10 @@ public WorkerTaskResult createWorkerTaskResult( try { builder.outputs(runContext.render(workerTaskExecution.getTask().getOutputs())); } catch (Exception e) { - runContext.logger().warn("Failed to extract ouputs with error: '" + e.getMessage() + "'", e); + runContext.logger().warn("Failed to extract outputs with error: '" + e.getMessage() + "'", e); taskRun = taskRun .withState(State.Type.FAILED) + .withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build())) .withOutputs(builder.build().toMap()); return WorkerTaskResult.builder() @@ -212,7 +210,9 @@ public WorkerTaskResult createWorkerTaskResult( } return WorkerTaskResult.builder() - .taskRun(taskRun) + .taskRun(taskRun.withAttempts( + Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(taskRun.getState().getCurrent())).build()) + )) .build(); } diff --git a/core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java b/core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java index 5805453b3e9..79deee8031e 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java @@ -64,6 +64,8 @@ void run(String input, State.Type fromState, State.Type triggerState, int count countDownLatch.await(1, TimeUnit.MINUTES); assertThat(execution.getTaskRunList(), hasSize(1)); + assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(1)); + assertThat(execution.getTaskRunList().get(0).getAttempts().get(0).getState().getCurrent(), is(fromState)); assertThat(execution.getState().getCurrent(), is(fromState)); if (outputs != null) {