Skip to content

Commit

Permalink
fix(core): generate attempts for subflow tasks
Browse files Browse the repository at this point in the history
closes #2098
  • Loading branch information
brian-mulier-p committed Sep 13, 2023
1 parent 0328e82 commit ad54f84
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.FlowableTask;
Expand Down Expand Up @@ -598,7 +599,11 @@ private Executor handleFlowTask(final Executor executor) {
}
} catch (Exception e) {
workerTaskResults.add(WorkerTaskResult.builder()
.taskRun(workerTask.getTaskRun().withState(State.Type.FAILED))
.taskRun(workerTask.getTaskRun().withState(State.Type.FAILED)
.withAttempts(Collections.singletonList(
TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build()
))
)
.build()
);
executor.withException(e, "handleFlowTask");
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/java/io/kestra/core/tasks/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.RunnableTask;
Expand All @@ -19,11 +20,7 @@

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;

@SuperBuilder
@ToString
Expand Down Expand Up @@ -188,9 +185,10 @@ public WorkerTaskResult createWorkerTaskResult(
try {
builder.outputs(runContext.render(workerTaskExecution.getTask().getOutputs()));
} catch (Exception e) {
runContext.logger().warn("Failed to extract ouputs with error: '" + e.getMessage() + "'", e);
runContext.logger().warn("Failed to extract outputs with error: '" + e.getMessage() + "'", e);
taskRun = taskRun
.withState(State.Type.FAILED)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build()))
.withOutputs(builder.build().toMap());

return WorkerTaskResult.builder()
Expand All @@ -212,7 +210,9 @@ public WorkerTaskResult createWorkerTaskResult(
}

return WorkerTaskResult.builder()
.taskRun(taskRun)
.taskRun(taskRun.withAttempts(
Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(taskRun.getState().getCurrent())).build())
))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ void run(String input, State.Type fromState, State.Type triggerState, int count
countDownLatch.await(1, TimeUnit.MINUTES);

assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts().get(0).getState().getCurrent(), is(fromState));
assertThat(execution.getState().getCurrent(), is(fromState));

if (outputs != null) {
Expand Down

0 comments on commit ad54f84

Please sign in to comment.