From e05d9d6c6a13b407b286610043e6facb4332aab6 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Tue, 8 Mar 2022 17:28:31 +0100 Subject: [PATCH] feat(core): introduce DynamicTask for dynamic task generation for a worker task --- .../io/kestra/core/models/flows/State.java | 9 +++ .../core/models/hierarchies/GraphCluster.java | 12 +-- .../kestra/core/models/tasks/DynamicTask.java | 5 ++ .../kestra/core/runners/ExecutorService.java | 16 ++-- .../io/kestra/core/runners/RunContext.java | 20 ++++- .../java/io/kestra/core/runners/Worker.java | 9 ++- .../kestra/core/runners/WorkerTaskResult.java | 19 +++++ .../core/services/ExecutionService.java | 74 ++++++++++++++----- .../io/kestra/core/tasks/flows/Worker.java | 3 +- .../core/tasks/scripts/AbstractBash.java | 18 +++-- .../io.kestra.core.tasks.flows.Worker.svg | 1 + .../core/runners/ExecutionServiceTest.java | 18 ++++- 12 files changed, 152 insertions(+), 52 deletions(-) create mode 100644 core/src/main/java/io/kestra/core/models/tasks/DynamicTask.java create mode 100644 core/src/main/resources/icons/io.kestra.core.tasks.flows.Worker.svg diff --git a/core/src/main/java/io/kestra/core/models/flows/State.java b/core/src/main/java/io/kestra/core/models/flows/State.java index 0fcd7261781..af406aab466 100644 --- a/core/src/main/java/io/kestra/core/models/flows/State.java +++ b/core/src/main/java/io/kestra/core/models/flows/State.java @@ -52,6 +52,15 @@ public State(Type state, State actual) { this.histories.add(new History(this.current, Instant.now())); } + public static State of(Type state, List histories) { + State result = new State(state); + + result.histories.removeIf(history -> true); + result.histories.addAll(histories); + + return result; + } + public State withState(Type state) { if (this.current == state) { log.warn("Can't change state, already " + current); diff --git a/core/src/main/java/io/kestra/core/models/hierarchies/GraphCluster.java b/core/src/main/java/io/kestra/core/models/hierarchies/GraphCluster.java index 021f4f0b78b..9bedd85f48a 100644 --- a/core/src/main/java/io/kestra/core/models/hierarchies/GraphCluster.java +++ b/core/src/main/java/io/kestra/core/models/hierarchies/GraphCluster.java @@ -10,13 +10,13 @@ @Getter public class GraphCluster extends AbstractGraphTask { @JsonIgnore - private Graph graph = new Graph<>(); + private final Graph graph = new Graph<>(); @JsonIgnore - private GraphClusterRoot root; + private final GraphClusterRoot root; @JsonIgnore - private GraphClusterEnd end; + private final GraphClusterEnd end; public GraphCluster() { super(); @@ -36,10 +36,4 @@ public GraphCluster(Task task, TaskRun taskRun, List values, RelationTyp graph.addNode(this.root); graph.addNode(this.end); } - - public GraphCluster(GraphCluster graphTask, TaskRun taskRun, List values) { - super(graphTask.getTask(), taskRun, values, graphTask.getRelationType()); - - this.graph = graphTask.graph; - } } diff --git a/core/src/main/java/io/kestra/core/models/tasks/DynamicTask.java b/core/src/main/java/io/kestra/core/models/tasks/DynamicTask.java new file mode 100644 index 00000000000..bb68ea50659 --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/tasks/DynamicTask.java @@ -0,0 +1,5 @@ +package io.kestra.core.models.tasks; + +public interface DynamicTask { + +} diff --git a/core/src/main/java/io/kestra/core/runners/ExecutorService.java b/core/src/main/java/io/kestra/core/runners/ExecutorService.java index 6e3c942116a..972123f76f1 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutorService.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutorService.java @@ -8,11 +8,11 @@ import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; +import io.kestra.core.models.tasks.DynamicTask; import io.kestra.core.models.tasks.FlowableTask; import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.services.ConditionService; -import io.kestra.core.tasks.flows.Worker; import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import jakarta.inject.Singleton; @@ -568,6 +568,13 @@ private Executor handleFlowTask(final Executor executor) { } public Execution addDynamicTaskRun(Execution execution, Flow flow, WorkerTaskResult workerTaskResult) throws InternalException { + ArrayList taskRuns = new ArrayList<>(execution.getTaskRunList()); + + // declared dynamic tasks + if (workerTaskResult.getDynamicTaskRuns() != null) { + taskRuns.addAll(workerTaskResult.getDynamicTaskRuns()); + } + // if parent, can be a Worker task that generate dynamic tasks if (workerTaskResult.getTaskRun().getParentTaskRunId() != null) { try { @@ -576,16 +583,13 @@ public Execution addDynamicTaskRun(Execution execution, Flow flow, WorkerTaskRes TaskRun parentTaskRun = execution.findTaskRunByTaskRunId(workerTaskResult.getTaskRun().getParentTaskRunId()); Task parentTask = flow.findTaskByTaskId(parentTaskRun.getTaskId()); - if (parentTask instanceof Worker) { - ArrayList taskRuns = new ArrayList<>(execution.getTaskRunList()); + if (parentTask instanceof DynamicTask) { taskRuns.add(workerTaskResult.getTaskRun()); - - return execution.withTaskRunList(taskRuns); } } } - return null; + return taskRuns.size() > execution.getTaskRunList().size() ? execution.withTaskRunList(taskRuns) : null; } public void log(Logger log, Boolean in, WorkerTask value) { diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index c28ec3fdfa8..5454dbfbd0f 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -38,17 +38,21 @@ public class RunContext { private final static ObjectMapper MAPPER = JacksonMapper.ofJson(); - private VariableRenderer variableRenderer; + // Injected private ApplicationContext applicationContext; + private VariableRenderer variableRenderer; private StorageInterface storageInterface; + private String envPrefix; + private MetricRegistry meterRegistry; + private Path tempBasedPath; + private URI storageOutputPrefix; private URI storageExecutionPrefix; - private String envPrefix; private Map variables; private List> metrics = new ArrayList<>(); - private MetricRegistry meterRegistry; private RunContextLogger runContextLogger; - private Path tempBasedPath; + private final List dynamicWorkerTaskResult = new ArrayList<>(); + protected transient Path temporaryDirectory; /** @@ -572,6 +576,14 @@ private String metricPrefix() { return String.join(".", values); } + public void dynamicWorkerResult(List workerTaskResults) { + dynamicWorkerTaskResult.addAll(workerTaskResults); + } + + public List dynamicWorkerResults() { + return dynamicWorkerTaskResult; + } + public synchronized Path tempDir() { return this.tempDir(true); } diff --git a/core/src/main/java/io/kestra/core/runners/Worker.java b/core/src/main/java/io/kestra/core/runners/Worker.java index c71e7555551..38247ba6192 100644 --- a/core/src/main/java/io/kestra/core/runners/Worker.java +++ b/core/src/main/java/io/kestra/core/runners/Worker.java @@ -229,6 +229,10 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu ) .get(() -> this.runAttempt(current.get())); + // save dynamic WorkerResults since cleanUpTransient will remove them + List dynamicWorkerResults = finalWorkerTask.getRunContext().dynamicWorkerResults(); + + // remove tmp directory if (cleanUp) { finalWorkerTask.getRunContext().cleanup(); } @@ -259,7 +263,7 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu // So we just tryed to failed the status of the worker task, in this case, no log can't be happend, just // changing status must work in order to finish current task (except if we are near the upper bound size). try { - WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask); + WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask, dynamicWorkerResults); this.workerTaskResultQueue.emit(workerTaskResult); return workerTaskResult; } catch (QueueException e) { @@ -267,7 +271,7 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu .withTaskRun(workerTask.getTaskRun() .withState(State.Type.FAILED) ); - WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask); + WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask, dynamicWorkerResults); this.workerTaskResultQueue.emit(workerTaskResult); return workerTaskResult; } finally { @@ -382,7 +386,6 @@ private List addAttempt(WorkerTask workerTask, TaskRunAttempt ta .build(); } - @SuppressWarnings("UnstableApiUsage") public AtomicInteger getMetricRunningCount(WorkerTask workerTask) { String[] tags = this.metricRegistry.tags(workerTask); Arrays.sort(tags); diff --git a/core/src/main/java/io/kestra/core/runners/WorkerTaskResult.java b/core/src/main/java/io/kestra/core/runners/WorkerTaskResult.java index b18f0f42366..2193c421189 100644 --- a/core/src/main/java/io/kestra/core/runners/WorkerTaskResult.java +++ b/core/src/main/java/io/kestra/core/runners/WorkerTaskResult.java @@ -5,6 +5,9 @@ import lombok.Builder; import lombok.Value; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import javax.validation.constraints.NotNull; @Value @@ -14,7 +17,23 @@ public class WorkerTaskResult { @NotNull TaskRun taskRun; + List dynamicTaskRuns; + + public WorkerTaskResult(TaskRun taskRun) { + this.taskRun = taskRun; + this.dynamicTaskRuns = new ArrayList<>(); + } + public WorkerTaskResult(WorkerTask workerTask) { this.taskRun = workerTask.getTaskRun(); + this.dynamicTaskRuns = new ArrayList<>(); + } + + public WorkerTaskResult(WorkerTask workerTask, List dynamicWorkerResults) { + this.taskRun = workerTask.getTaskRun(); + this.dynamicTaskRuns = dynamicWorkerResults + .stream() + .map(WorkerTaskResult::getTaskRun) + .collect(Collectors.toList()); } } diff --git a/core/src/main/java/io/kestra/core/services/ExecutionService.java b/core/src/main/java/io/kestra/core/services/ExecutionService.java index d81ada2d94b..ad31c64bfe3 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionService.java @@ -7,18 +7,22 @@ import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; import io.kestra.core.models.hierarchies.GraphCluster; +import io.kestra.core.models.tasks.Task; import io.kestra.core.repositories.FlowRepositoryInterface; +import io.kestra.core.tasks.flows.Worker; import io.kestra.core.utils.IdUtils; import io.micronaut.context.ApplicationContext; import io.micronaut.core.annotation.Nullable; import java.util.*; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import jakarta.inject.Inject; import jakarta.inject.Singleton; import static io.kestra.core.utils.Rethrow.throwFunction; +import static io.kestra.core.utils.Rethrow.throwPredicate; @Singleton public class ExecutionService { @@ -37,19 +41,12 @@ public Execution restart(final Execution execution, @Nullable Integer revision) final Flow flow = flowRepositoryInterface.findByExecution(execution); - Set taskRunToRestart = this.taskRunWithAncestors( + Set taskRunToRestart = this.taskRunToRestart( execution, - execution - .getTaskRunList() - .stream() - .filter(taskRun -> taskRun.getState().getCurrent().isFailed()) - .collect(Collectors.toList()) + flow, + taskRun -> taskRun.getState().getCurrent().isFailed() ); - if (taskRunToRestart.size() == 0) { - throw new IllegalArgumentException("No failed task found to restart execution from !"); - } - Map mappingTaskRunId = this.mapTaskRunId(execution, revision == null); final String newExecutionId = revision != null ? IdUtils.create() : null; @@ -77,6 +74,50 @@ public Execution restart(final Execution execution, @Nullable Integer revision) return revision != null ? newExecution.withFlowRevision(revision) : newExecution; } + private Set taskRunToRestart(Execution execution, Flow flow, Predicate predicate) throws InternalException { + // Original tasks to be restarted + Set originalTaskRunToRestart = this + .taskRunWithAncestors( + execution, + execution + .getTaskRunList() + .stream() + .filter(predicate) + .collect(Collectors.toList()) + ); + + // we removed Worker + Set finalTaskRunToRestart = originalTaskRunToRestart + .stream() + .filter(throwPredicate(s -> { + TaskRun taskRun = execution.findTaskRunByTaskRunId(s); + Task task = flow.findTaskByTaskId(taskRun.getTaskId()); + return !(task instanceof Worker); + })) + .collect(Collectors.toSet()); + + // we removed task with parent and no more child + Set clonedLambda = finalTaskRunToRestart; + finalTaskRunToRestart = finalTaskRunToRestart + .stream() + .filter(throwPredicate(s -> { + TaskRun taskRun = execution.findTaskRunByTaskRunId(s); + return taskRun.getParentTaskRunId() == null || clonedLambda.contains(taskRun.getParentTaskRunId()); + })) + .collect(Collectors.toSet()); + + + if (finalTaskRunToRestart.size() == 0) { + if (originalTaskRunToRestart.size() > 0) { + throw new IllegalArgumentException("No valid task to restart execution from! Worker task can't be restarted."); + } else { + throw new IllegalArgumentException("No failed task found to restart execution from!"); + } + } + + return finalTaskRunToRestart; + } + public Execution replay(final Execution execution, String taskRunId, @Nullable Integer revision) throws Exception { if (!execution.getState().isTerninated()) { throw new IllegalStateException("Execution must be terminated to be restarted, " + @@ -87,19 +128,12 @@ public Execution replay(final Execution execution, String taskRunId, @Nullable I final Flow flow = flowRepositoryInterface.findByExecution(execution); GraphCluster graphCluster = GraphService.of(flow, execution); - Set taskRunToRestart = this.taskRunWithAncestors( + Set taskRunToRestart = this.taskRunToRestart( execution, - execution - .getTaskRunList() - .stream() - .filter(taskRun -> taskRun.getId().equals(taskRunId)) - .collect(Collectors.toList()) + flow, + taskRun -> taskRun.getId().equals(taskRunId) ); - if (taskRunToRestart.size() == 0) { - throw new IllegalArgumentException("No task found to restart execution from !"); - } - Map mappingTaskRunId = this.mapTaskRunId(execution, false); final String newExecutionId = IdUtils.create(); diff --git a/core/src/main/java/io/kestra/core/tasks/flows/Worker.java b/core/src/main/java/io/kestra/core/tasks/flows/Worker.java index 54287c5e365..93ba64d2c0d 100644 --- a/core/src/main/java/io/kestra/core/tasks/flows/Worker.java +++ b/core/src/main/java/io/kestra/core/tasks/flows/Worker.java @@ -7,6 +7,7 @@ import io.kestra.core.models.executions.NextTaskRun; import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.State; +import io.kestra.core.models.tasks.DynamicTask; import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.runners.RunContext; @@ -58,7 +59,7 @@ ) } ) -public class Worker extends Sequential { +public class Worker extends Sequential implements DynamicTask { @Override public List resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException { List childTasks = this.childTasks(runContext, parentTaskRun); diff --git a/core/src/main/java/io/kestra/core/tasks/scripts/AbstractBash.java b/core/src/main/java/io/kestra/core/tasks/scripts/AbstractBash.java index 6c25035f405..5448d2d6d3d 100644 --- a/core/src/main/java/io/kestra/core/tasks/scripts/AbstractBash.java +++ b/core/src/main/java/io/kestra/core/tasks/scripts/AbstractBash.java @@ -203,13 +203,7 @@ protected ScriptOutput run(RunContext runContext, Supplier supplier) thr workingDirectory, finalCommandsWithInterpreter(commandAsString), this.finalEnv(), - (inputStream, isStdErr) -> { - AbstractLogThread thread = new LogThread(logger, inputStream, isStdErr, runContext); - thread.setName("bash-log-" + (isStdErr ? "-err" : "-out")); - thread.start(); - - return thread; - } + this.defaultLogSupplier(logger, runContext) ); // upload output files @@ -234,6 +228,16 @@ protected ScriptOutput run(RunContext runContext, Supplier supplier) thr .build(); } + protected LogSupplier defaultLogSupplier(Logger logger, RunContext runContext) { + return (inputStream, isStdErr) -> { + AbstractLogThread thread = new LogThread(logger, inputStream, isStdErr, runContext); + thread.setName("bash-log-" + (isStdErr ? "-err" : "-out")); + thread.start(); + + return thread; + }; + } + protected RunResult run(RunContext runContext, Logger logger, Path workingDirectory, List commandsWithInterpreter, Map env, LogSupplier logSupplier) throws Exception { ScriptRunnerInterface executor; if (this.runner == Runner.DOCKER) { diff --git a/core/src/main/resources/icons/io.kestra.core.tasks.flows.Worker.svg b/core/src/main/resources/icons/io.kestra.core.tasks.flows.Worker.svg new file mode 100644 index 00000000000..9b5296916b5 --- /dev/null +++ b/core/src/main/resources/icons/io.kestra.core.tasks.flows.Worker.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java b/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java index 620345b520e..8a9f96a4db4 100644 --- a/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java +++ b/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java @@ -7,14 +7,14 @@ import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.services.ExecutionService; import io.kestra.core.tasks.debugs.Return; +import jakarta.inject.Inject; import org.junit.jupiter.api.Test; import java.util.List; -import jakarta.inject.Inject; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; -import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertThrows; class ExecutionServiceTest extends AbstractMemoryRunnerTest { @Inject @@ -98,6 +98,20 @@ void restartFlowable2() throws Exception { assertThat(restart.getTaskRunList().get(0).getId(), is(restart.getTaskRunList().get(0).getId())); } + @Test + void restartDynamic() throws Exception { + Execution execution = runnerUtils.runOne("io.kestra.tests", "worker", null, (f, e) -> ImmutableMap.of("failed", "true")); + assertThat(execution.getTaskRunList(), hasSize(3)); + assertThat(execution.getState().getCurrent(), is(State.Type.FAILED)); + + // f1 & f2 must be deleted + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> { + executionService.restart(execution, null); + }); + + assertThat(e.getMessage(), containsString("Worker task can't be restarted")); + } + @Test void replaySimple() throws Exception { Execution execution = runnerUtils.runOne("io.kestra.tests", "logs");