Skip to content

Commit

Permalink
feat(ui): expand subflow with sub-execution follow handled with SSE
Browse files Browse the repository at this point in the history
closes #774
  • Loading branch information
brian-mulier-p committed Sep 1, 2023
1 parent 4d4086b commit f92ab78
Show file tree
Hide file tree
Showing 15 changed files with 202 additions and 100 deletions.
7 changes: 1 addition & 6 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,6 @@ private Stream<Task> allTasksWithChilds(Task task) {
Stream<Task> taskStream = ((FlowableTask<?>) task).allChildTasks()
.stream()
.flatMap(this::allTasksWithChilds);
if (((FlowableTask<?>) task).getErrors() != null) {
taskStream = Stream.concat(taskStream, ((FlowableTask<?>) task).getErrors()
.stream()
.flatMap(this::allTasksWithChilds));
}

return Stream.concat(
Stream.of(task),
Expand Down Expand Up @@ -340,4 +335,4 @@ public Flow toDeleted() {
true
);
}
}
}
2 changes: 2 additions & 0 deletions core/src/main/java/io/kestra/core/models/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.experimental.SuperBuilder;

import java.time.Duration;
Expand All @@ -30,6 +31,7 @@
@Introspected
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
abstract public class Task {
@Setter
@NotNull
@NotBlank
@Pattern(regexp="[a-zA-Z0-9_-]+")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
Expand All @@ -21,20 +20,22 @@ public class AdvancedGraphService {
private FlowRepositoryInterface flowRepository;

public FlowGraph withExpandedSubflows(Flow flow, List<String> subflows) throws IllegalVariableEvaluationException {
return FlowGraph.of(flow, null, subflowsAsSequential(flow, subflows));
return FlowGraph.of(flow, null, subflowsAsSequential("", flow, subflows));
}

private Sequential toSequential(Flow subflow) {
private Sequential toSequential(String subflowTaskId, Flow subflow) {
subflow.allTasksWithChilds().forEach(task -> task.setId(subflowTaskId + "." + task.getId()));

return Sequential.builder()
.id(subflow.getId())
.id(subflowTaskId)
.tasks(subflow.getTasks())
.errors(subflow.getErrors())
.build();
}

private List<Sequential> subflowsAsSequential(Flow flow, List<String> subflows) {
if (subflows.isEmpty()) {
return Collections.emptyList();
private List<Sequential> subflowsAsSequential(String fullyQualifiedSubflowPath, Flow flow, List<String> subflows) {
if (subflows == null || subflows.isEmpty()) {
return null;
}

return flow.allTasksWithChilds().stream()
Expand All @@ -45,9 +46,10 @@ private List<Sequential> subflowsAsSequential(Flow flow, List<String> subflows)
Flow subflow = flowRepository.findById(flowTask.getNamespace(), flowTask.getFlowId(), Optional.ofNullable(flowTask.getRevision()))
.orElseThrow(() -> new IllegalArgumentException("Subflow '" + flowTask.getNamespace() + "." + flowTask.getFlowId() + "' not found"));

String subflowId = fullyQualifiedSubflowPath + flowTask.getId();
return Stream.concat(
Stream.of(toSequential(subflow)),
subflowsAsSequential(subflow, subflows).stream()
Stream.of(toSequential(flowTask.getId(), subflow)),
subflowsAsSequential(subflowId + ".", subflow, subflows).stream()
);
}).toList();
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/io/kestra/core/services/GraphService.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,11 @@ private static void fillGraph(
FlowableTask<?> flowableTask = ((FlowableTask<?>) currentTask);
currentGraph = flowableTask.tasksTree(execution, currentTaskRun, parentValues, subflows);
}
else if (currentTask instanceof io.kestra.core.tasks.flows.Flow taskFlow && subflows != null && subflows.stream().map(Sequential::getId).toList().contains(taskFlow.getFlowId())) {
else if (currentTask instanceof io.kestra.core.tasks.flows.Flow subflowTask && subflows != null && subflows.stream().map(Sequential::getId).anyMatch(id -> id.equals(subflowTask.getId()))) {
// Get sequential task from subflow that have for id the taskflow.getFlowId() and generate the graph
Sequential subflow = subflows.stream().filter(s -> s.getId().equals(taskFlow.getFlowId())).findFirst().orElse(null);
Sequential subflow = subflows.stream().filter(s -> s.getId().equals(subflowTask.getId())).findFirst().orElse(null);
if (subflow == null) {
throw new IllegalArgumentException("Subflow '" + taskFlow.getId() + "' not found");
throw new IllegalArgumentException("Subflow '" + subflowTask.getId() + "' not found");
}
currentGraph = subflow.tasksTree(execution, currentTaskRun, parentValues, subflows);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package io.kestra.core.models.hierarchies;

import org.junit.jupiter.api.Test;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.services.AdvancedGraphService;
import io.kestra.core.tasks.flows.Sequential;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import jakarta.inject.Inject;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
Expand All @@ -23,6 +26,9 @@ class FlowGraphTest extends AbstractMemoryRunnerTest {
@Inject
private YamlFlowParser yamlFlowParser = new YamlFlowParser();

@Inject
private AdvancedGraphService advancedGraphService;

@Test
void simple() throws IllegalVariableEvaluationException {
Flow flow = this.parse("flows/valids/return.yaml");
Expand Down Expand Up @@ -95,7 +101,7 @@ void parallelNested() throws IllegalVariableEvaluationException {
assertThat(flowGraph.getClusters().size(), is(3));

assertThat(edge(flowGraph, "1_par", "1-4_end").getRelation().getRelationType(), is(RelationType.PARALLEL));
assertThat(edge(flowGraph, "1_par", ".*_root").getRelation().getRelationType(), is(RelationType.PARALLEL));
assertThat(edge(flowGraph, "1_par", ".*_start").getRelation().getRelationType(), is(RelationType.PARALLEL));
assertThat(edge(flowGraph, "1-3-2_par", "1-3-2-1").getRelation().getRelationType(), is(RelationType.SEQUENTIAL));
}

Expand All @@ -108,8 +114,8 @@ void choice() throws IllegalVariableEvaluationException {
assertThat(flowGraph.getEdges().size(), is(17));
assertThat(flowGraph.getClusters().size(), is(2));

assertThat(edge(flowGraph, "parent-seq", ".*_root").getRelation().getRelationType(), is(RelationType.CHOICE));
assertThat(edge(flowGraph, "parent-seq", ".*_root").getRelation().getValue(), is("THIRD"));
assertThat(edge(flowGraph, "parent-seq", ".*_start").getRelation().getRelationType(), is(RelationType.CHOICE));
assertThat(edge(flowGraph, "parent-seq", ".*_start").getRelation().getValue(), is("THIRD"));
assertThat(edge(flowGraph, "parent-seq", "t1").getRelation().getRelationType(), is(RelationType.CHOICE));
assertThat(edge(flowGraph, "parent-seq", "t1").getRelation().getValue(), is("FIRST"));
assertThat(edge(flowGraph, "parent-seq", "default").getRelation().getRelationType(), is(RelationType.CHOICE));
Expand All @@ -126,7 +132,7 @@ void each() throws IllegalVariableEvaluationException {
assertThat(flowGraph.getEdges().size(), is(12));
assertThat(flowGraph.getClusters().size(), is(2));

assertThat(edge(flowGraph, "1-1_return", ".*_root").getRelation().getRelationType(), is(RelationType.DYNAMIC));
assertThat(edge(flowGraph, "1-1_return", ".*_start").getRelation().getRelationType(), is(RelationType.DYNAMIC));
assertThat(edge(flowGraph, "1-2_each", "1-2-1_return").getRelation().getRelationType(), is(RelationType.DYNAMIC));
}

Expand All @@ -139,7 +145,7 @@ void eachParallel() throws IllegalVariableEvaluationException {
assertThat(flowGraph.getEdges().size(), is(10));
assertThat(flowGraph.getClusters().size(), is(2));

assertThat(edge(flowGraph, "1_each", ".*_root").getRelation().getRelationType(), is(RelationType.DYNAMIC));
assertThat(edge(flowGraph, "1_each", ".*_start").getRelation().getRelationType(), is(RelationType.DYNAMIC));
assertThat(flowGraph.getClusters().get(1).getNodes().size(), is(5));
}

Expand All @@ -158,7 +164,7 @@ void parallelWithExecution() throws TimeoutException, IllegalVariableEvaluationE
Execution execution = runnerUtils.runOne("io.kestra.tests", "parallel");

Flow flow = this.parse("flows/valids/parallel.yaml");
FlowGraph flowGraph = FlowGraph.of(flow, execution);
FlowGraph flowGraph = FlowGraph.of(flow, execution, null);

assertThat(flowGraph.getNodes().size(), is(12));
assertThat(flowGraph.getEdges().size(), is(16));
Expand All @@ -179,7 +185,7 @@ void eachWithExecution() throws TimeoutException, IllegalVariableEvaluationExcep
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-sequential");

Flow flow = this.parse("flows/valids/each-sequential.yaml");
FlowGraph flowGraph = FlowGraph.of(flow, execution);
FlowGraph flowGraph = FlowGraph.of(flow, execution, null);

assertThat(flowGraph.getNodes().size(), is(21));
assertThat(flowGraph.getEdges().size(), is(22));
Expand All @@ -189,7 +195,7 @@ void eachWithExecution() throws TimeoutException, IllegalVariableEvaluationExcep
assertThat(edge(flowGraph, "1-1_value 2", "1-1_value 3").getRelation().getValue(), is("value 3"));
assertThat(edge(flowGraph, "1-2_value 3", ".*_end"), is(notNullValue()));

assertThat(edge(flowGraph, "failed_value 1","1-2_value 1").getTarget(), is("1-2_value 1"));
assertThat(edge(flowGraph, "failed_value 1", "1-2_value 1").getTarget(), is("1-2_value 1"));
}

@Test
Expand All @@ -212,6 +218,26 @@ void multipleTriggers() throws IllegalVariableEvaluationException {
assertThat(flowGraph.getClusters().size(), is(1));
}

@Test
void subflow() throws IllegalVariableEvaluationException {
Flow flow = this.parse("flows/valids/task-flow.yaml");
FlowGraph flowGraph = FlowGraph.of(flow);

assertThat(flowGraph.getNodes().size(), is(3));
assertThat(flowGraph.getEdges().size(), is(2));
assertThat(flowGraph.getClusters().size(), is(0));

flowGraph = advancedGraphService.withExpandedSubflows(flow, Collections.singletonList("launch"));

assertThat(flowGraph.getNodes().size(), is(17));
assertThat(flowGraph.getEdges().size(), is(20));
assertThat(flowGraph.getClusters().size(), is(3));

assertThat(((GraphTask) node(flowGraph, "launch")).getTask(), instanceOf(Sequential.class));
// prefixed with subflow task id
assertThat(((GraphTask) node(flowGraph, "launch.parent-seq")).getRelationType(), is(RelationType.CHOICE));
}

private Flow parse(String path) {
URL resource = TestsUtils.class.getClassLoader().getResource(path);
assert resource != null;
Expand Down
2 changes: 1 addition & 1 deletion ui/src/components/executions/ExecutionRoot.vue
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
this.$store
.dispatch("execution/followExecution", this.$route.params)
.then(sse => {
this.sse = sse
this.sse = sse;
this.sse.onmessage = (event) => {
if (event && event.lastEventId === "end") {
self.closeSSE();
Expand Down
Loading

0 comments on commit f92ab78

Please sign in to comment.