diff --git a/core/src/main/java/io/kestra/core/models/flows/Flow.java b/core/src/main/java/io/kestra/core/models/flows/Flow.java index ec142ce2c9d..8dc5e956768 100644 --- a/core/src/main/java/io/kestra/core/models/flows/Flow.java +++ b/core/src/main/java/io/kestra/core/models/flows/Flow.java @@ -179,11 +179,6 @@ private Stream allTasksWithChilds(Task task) { Stream taskStream = ((FlowableTask) task).allChildTasks() .stream() .flatMap(this::allTasksWithChilds); - if (((FlowableTask) task).getErrors() != null) { - taskStream = Stream.concat(taskStream, ((FlowableTask) task).getErrors() - .stream() - .flatMap(this::allTasksWithChilds)); - } return Stream.concat( Stream.of(task), @@ -340,4 +335,4 @@ public Flow toDeleted() { true ); } -} +} \ No newline at end of file diff --git a/core/src/main/java/io/kestra/core/models/tasks/Task.java b/core/src/main/java/io/kestra/core/models/tasks/Task.java index fc64c825f76..3e8f81472f8 100644 --- a/core/src/main/java/io/kestra/core/models/tasks/Task.java +++ b/core/src/main/java/io/kestra/core/models/tasks/Task.java @@ -12,6 +12,7 @@ import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.Setter; import lombok.experimental.SuperBuilder; import java.time.Duration; @@ -30,6 +31,7 @@ @Introspected @JsonInclude(JsonInclude.Include.NON_DEFAULT) abstract public class Task { + @Setter @NotNull @NotBlank @Pattern(regexp="[a-zA-Z0-9_-]+") diff --git a/core/src/main/java/io/kestra/core/services/AdvancedGraphService.java b/core/src/main/java/io/kestra/core/services/AdvancedGraphService.java index ff9a377d541..005e0d10266 100644 --- a/core/src/main/java/io/kestra/core/services/AdvancedGraphService.java +++ b/core/src/main/java/io/kestra/core/services/AdvancedGraphService.java @@ -9,7 +9,6 @@ import jakarta.inject.Singleton; import lombok.extern.slf4j.Slf4j; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Stream; @@ -21,20 +20,22 @@ public class AdvancedGraphService { private FlowRepositoryInterface flowRepository; public FlowGraph withExpandedSubflows(Flow flow, List subflows) throws IllegalVariableEvaluationException { - return FlowGraph.of(flow, null, subflowsAsSequential(flow, subflows)); + return FlowGraph.of(flow, null, subflowsAsSequential("", flow, subflows)); } - private Sequential toSequential(Flow subflow) { + private Sequential toSequential(String subflowTaskId, Flow subflow) { + subflow.allTasksWithChilds().forEach(task -> task.setId(subflowTaskId + "." + task.getId())); + return Sequential.builder() - .id(subflow.getId()) + .id(subflowTaskId) .tasks(subflow.getTasks()) .errors(subflow.getErrors()) .build(); } - private List subflowsAsSequential(Flow flow, List subflows) { - if (subflows.isEmpty()) { - return Collections.emptyList(); + private List subflowsAsSequential(String fullyQualifiedSubflowPath, Flow flow, List subflows) { + if (subflows == null || subflows.isEmpty()) { + return null; } return flow.allTasksWithChilds().stream() @@ -45,9 +46,10 @@ private List subflowsAsSequential(Flow flow, List subflows) Flow subflow = flowRepository.findById(flowTask.getNamespace(), flowTask.getFlowId(), Optional.ofNullable(flowTask.getRevision())) .orElseThrow(() -> new IllegalArgumentException("Subflow '" + flowTask.getNamespace() + "." + flowTask.getFlowId() + "' not found")); + String subflowId = fullyQualifiedSubflowPath + flowTask.getId(); return Stream.concat( - Stream.of(toSequential(subflow)), - subflowsAsSequential(subflow, subflows).stream() + Stream.of(toSequential(flowTask.getId(), subflow)), + subflowsAsSequential(subflowId + ".", subflow, subflows).stream() ); }).toList(); } diff --git a/core/src/main/java/io/kestra/core/services/GraphService.java b/core/src/main/java/io/kestra/core/services/GraphService.java index e422d19f8a7..a346188d192 100644 --- a/core/src/main/java/io/kestra/core/services/GraphService.java +++ b/core/src/main/java/io/kestra/core/services/GraphService.java @@ -283,11 +283,11 @@ private static void fillGraph( FlowableTask flowableTask = ((FlowableTask) currentTask); currentGraph = flowableTask.tasksTree(execution, currentTaskRun, parentValues, subflows); } - else if (currentTask instanceof io.kestra.core.tasks.flows.Flow taskFlow && subflows != null && subflows.stream().map(Sequential::getId).toList().contains(taskFlow.getFlowId())) { + else if (currentTask instanceof io.kestra.core.tasks.flows.Flow subflowTask && subflows != null && subflows.stream().map(Sequential::getId).anyMatch(id -> id.equals(subflowTask.getId()))) { // Get sequential task from subflow that have for id the taskflow.getFlowId() and generate the graph - Sequential subflow = subflows.stream().filter(s -> s.getId().equals(taskFlow.getFlowId())).findFirst().orElse(null); + Sequential subflow = subflows.stream().filter(s -> s.getId().equals(subflowTask.getId())).findFirst().orElse(null); if (subflow == null) { - throw new IllegalArgumentException("Subflow '" + taskFlow.getId() + "' not found"); + throw new IllegalArgumentException("Subflow '" + subflowTask.getId() + "' not found"); } currentGraph = subflow.tasksTree(execution, currentTaskRun, parentValues, subflows); } else { diff --git a/core/src/test/java/io/kestra/core/models/hierarchies/FlowGraphTest.java b/core/src/test/java/io/kestra/core/models/hierarchies/FlowGraphTest.java index 64de5866375..9ef40d3b7bd 100644 --- a/core/src/test/java/io/kestra/core/models/hierarchies/FlowGraphTest.java +++ b/core/src/test/java/io/kestra/core/models/hierarchies/FlowGraphTest.java @@ -1,20 +1,23 @@ package io.kestra.core.models.hierarchies; -import org.junit.jupiter.api.Test; import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.InternalException; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.Flow; import io.kestra.core.runners.AbstractMemoryRunnerTest; import io.kestra.core.serializers.YamlFlowParser; +import io.kestra.core.services.AdvancedGraphService; +import io.kestra.core.tasks.flows.Sequential; import io.kestra.core.utils.TestsUtils; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; import java.io.File; import java.net.URL; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import jakarta.inject.Inject; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; @@ -23,6 +26,9 @@ class FlowGraphTest extends AbstractMemoryRunnerTest { @Inject private YamlFlowParser yamlFlowParser = new YamlFlowParser(); + @Inject + private AdvancedGraphService advancedGraphService; + @Test void simple() throws IllegalVariableEvaluationException { Flow flow = this.parse("flows/valids/return.yaml"); @@ -95,7 +101,7 @@ void parallelNested() throws IllegalVariableEvaluationException { assertThat(flowGraph.getClusters().size(), is(3)); assertThat(edge(flowGraph, "1_par", "1-4_end").getRelation().getRelationType(), is(RelationType.PARALLEL)); - assertThat(edge(flowGraph, "1_par", ".*_root").getRelation().getRelationType(), is(RelationType.PARALLEL)); + assertThat(edge(flowGraph, "1_par", ".*_start").getRelation().getRelationType(), is(RelationType.PARALLEL)); assertThat(edge(flowGraph, "1-3-2_par", "1-3-2-1").getRelation().getRelationType(), is(RelationType.SEQUENTIAL)); } @@ -108,8 +114,8 @@ void choice() throws IllegalVariableEvaluationException { assertThat(flowGraph.getEdges().size(), is(17)); assertThat(flowGraph.getClusters().size(), is(2)); - assertThat(edge(flowGraph, "parent-seq", ".*_root").getRelation().getRelationType(), is(RelationType.CHOICE)); - assertThat(edge(flowGraph, "parent-seq", ".*_root").getRelation().getValue(), is("THIRD")); + assertThat(edge(flowGraph, "parent-seq", ".*_start").getRelation().getRelationType(), is(RelationType.CHOICE)); + assertThat(edge(flowGraph, "parent-seq", ".*_start").getRelation().getValue(), is("THIRD")); assertThat(edge(flowGraph, "parent-seq", "t1").getRelation().getRelationType(), is(RelationType.CHOICE)); assertThat(edge(flowGraph, "parent-seq", "t1").getRelation().getValue(), is("FIRST")); assertThat(edge(flowGraph, "parent-seq", "default").getRelation().getRelationType(), is(RelationType.CHOICE)); @@ -126,7 +132,7 @@ void each() throws IllegalVariableEvaluationException { assertThat(flowGraph.getEdges().size(), is(12)); assertThat(flowGraph.getClusters().size(), is(2)); - assertThat(edge(flowGraph, "1-1_return", ".*_root").getRelation().getRelationType(), is(RelationType.DYNAMIC)); + assertThat(edge(flowGraph, "1-1_return", ".*_start").getRelation().getRelationType(), is(RelationType.DYNAMIC)); assertThat(edge(flowGraph, "1-2_each", "1-2-1_return").getRelation().getRelationType(), is(RelationType.DYNAMIC)); } @@ -139,7 +145,7 @@ void eachParallel() throws IllegalVariableEvaluationException { assertThat(flowGraph.getEdges().size(), is(10)); assertThat(flowGraph.getClusters().size(), is(2)); - assertThat(edge(flowGraph, "1_each", ".*_root").getRelation().getRelationType(), is(RelationType.DYNAMIC)); + assertThat(edge(flowGraph, "1_each", ".*_start").getRelation().getRelationType(), is(RelationType.DYNAMIC)); assertThat(flowGraph.getClusters().get(1).getNodes().size(), is(5)); } @@ -158,7 +164,7 @@ void parallelWithExecution() throws TimeoutException, IllegalVariableEvaluationE Execution execution = runnerUtils.runOne("io.kestra.tests", "parallel"); Flow flow = this.parse("flows/valids/parallel.yaml"); - FlowGraph flowGraph = FlowGraph.of(flow, execution); + FlowGraph flowGraph = FlowGraph.of(flow, execution, null); assertThat(flowGraph.getNodes().size(), is(12)); assertThat(flowGraph.getEdges().size(), is(16)); @@ -179,7 +185,7 @@ void eachWithExecution() throws TimeoutException, IllegalVariableEvaluationExcep Execution execution = runnerUtils.runOne("io.kestra.tests", "each-sequential"); Flow flow = this.parse("flows/valids/each-sequential.yaml"); - FlowGraph flowGraph = FlowGraph.of(flow, execution); + FlowGraph flowGraph = FlowGraph.of(flow, execution, null); assertThat(flowGraph.getNodes().size(), is(21)); assertThat(flowGraph.getEdges().size(), is(22)); @@ -189,7 +195,7 @@ void eachWithExecution() throws TimeoutException, IllegalVariableEvaluationExcep assertThat(edge(flowGraph, "1-1_value 2", "1-1_value 3").getRelation().getValue(), is("value 3")); assertThat(edge(flowGraph, "1-2_value 3", ".*_end"), is(notNullValue())); - assertThat(edge(flowGraph, "failed_value 1","1-2_value 1").getTarget(), is("1-2_value 1")); + assertThat(edge(flowGraph, "failed_value 1", "1-2_value 1").getTarget(), is("1-2_value 1")); } @Test @@ -212,6 +218,26 @@ void multipleTriggers() throws IllegalVariableEvaluationException { assertThat(flowGraph.getClusters().size(), is(1)); } + @Test + void subflow() throws IllegalVariableEvaluationException { + Flow flow = this.parse("flows/valids/task-flow.yaml"); + FlowGraph flowGraph = FlowGraph.of(flow); + + assertThat(flowGraph.getNodes().size(), is(3)); + assertThat(flowGraph.getEdges().size(), is(2)); + assertThat(flowGraph.getClusters().size(), is(0)); + + flowGraph = advancedGraphService.withExpandedSubflows(flow, Collections.singletonList("launch")); + + assertThat(flowGraph.getNodes().size(), is(17)); + assertThat(flowGraph.getEdges().size(), is(20)); + assertThat(flowGraph.getClusters().size(), is(3)); + + assertThat(((GraphTask) node(flowGraph, "launch")).getTask(), instanceOf(Sequential.class)); + // prefixed with subflow task id + assertThat(((GraphTask) node(flowGraph, "launch.parent-seq")).getRelationType(), is(RelationType.CHOICE)); + } + private Flow parse(String path) { URL resource = TestsUtils.class.getClassLoader().getResource(path); assert resource != null; diff --git a/ui/src/components/executions/ExecutionRoot.vue b/ui/src/components/executions/ExecutionRoot.vue index ab09a660642..1dceb870ce7 100644 --- a/ui/src/components/executions/ExecutionRoot.vue +++ b/ui/src/components/executions/ExecutionRoot.vue @@ -95,7 +95,7 @@ this.$store .dispatch("execution/followExecution", this.$route.params) .then(sse => { - this.sse = sse + this.sse = sse; this.sse.onmessage = (event) => { if (event && event.lastEventId === "end") { self.closeSSE(); diff --git a/ui/src/components/executions/Topology.vue b/ui/src/components/executions/Topology.vue index 19b2ea8c1f7..d8c8665dcf9 100644 --- a/ui/src/components/executions/Topology.vue +++ b/ui/src/components/executions/Topology.vue @@ -7,25 +7,27 @@ :flow-id="execution.flowId" :namespace="execution.namespace" :flow-graph="flowGraph" - :source="flow.source" + :source="flow?.source" :execution="execution" is-read-only @follow="forwardEvent('follow', $event)" view-type="topology" + @expand-subflow="onExpandSubflow($event)" /> diff --git a/ui/src/components/flows/FlowRoot.vue b/ui/src/components/flows/FlowRoot.vue index d3504d18862..bdbfa342390 100644 --- a/ui/src/components/flows/FlowRoot.vue +++ b/ui/src/components/flows/FlowRoot.vue @@ -72,7 +72,6 @@ return this.$store.dispatch("flow/loadFlow", this.$route.params).then(() => { if (this.flow) { this.previousFlow = this.flowKey(); - // TODO NECESSARY ? this.$store.dispatch("flow/loadGraph", { flow: this.flow }); diff --git a/ui/src/components/flows/Topology.vue b/ui/src/components/flows/Topology.vue index de74ec1984a..941842c416f 100644 --- a/ui/src/components/flows/Topology.vue +++ b/ui/src/components/flows/Topology.vue @@ -41,7 +41,7 @@ methods: { onExpandSubflow(event) { this.$store.dispatch("flow/loadGraph", { - flow:this.flow, + flow: this.flow, params: { subflows: event } diff --git a/ui/src/components/inputs/EditorView.vue b/ui/src/components/inputs/EditorView.vue index f9d0b806806..e85088aae44 100644 --- a/ui/src/components/inputs/EditorView.vue +++ b/ui/src/components/inputs/EditorView.vue @@ -359,9 +359,6 @@ const generateGraph = async () => { await fetchGraph(flowYaml.value); - if (props.flowGraph) { - lowCodeEditorRef.value.generateGraph(); - } } const loadingState = (value) => { diff --git a/ui/src/components/inputs/LowCodeEditor.vue b/ui/src/components/inputs/LowCodeEditor.vue index a13a7ea3d21..0fa16288ba3 100644 --- a/ui/src/components/inputs/LowCodeEditor.vue +++ b/ui/src/components/inputs/LowCodeEditor.vue @@ -33,17 +33,7 @@ // Vue flow methods to interact with Graph const { id, - getNodes, - removeNodes, - getEdges, - removeEdges, - fitView, - getElements, - removeSelectedElements, - onNodeDragStart, - onNodeDragStop, - onNodeDrag, - setElements, + fitView } = useVueFlow({id: vueflowId.value}); // props @@ -119,11 +109,6 @@ observeWidth(); }) - - watch(() => props.flowGraph, () => { - generateGraph(); - }) - watch(() => isDrawerOpen.value, () => { if (!isDrawerOpen.value) { isShowDescriptionOpen.value = false; @@ -135,7 +120,6 @@ watch(() => props.viewType, () => { isHorizontal.value = props.viewType === "source-topology" ? false : (props.viewType?.indexOf("blueprint") !== -1 ? true : localStorage.getItem("topology-orientation") === "1") - generateGraph(); }) // Event listeners & Watchers @@ -143,7 +127,6 @@ const resizeObserver = new ResizeObserver(function () { clearTimeout(timer.value); timer.value = setTimeout(() => { - generateGraph(); nextTick(() => { fitView() }) @@ -248,7 +231,6 @@ localStorage.getItem("topology-orientation") !== "0" ? "0" : "1" ); isHorizontal.value = localStorage.getItem("topology-orientation") === "1"; - generateGraph(); fitView(); }; @@ -271,7 +253,7 @@ tab: "topology", id: data.link.executionId, }, - }).href,'_blank');; + }).href,'_blank'); }) } else { window.open(router.resolve({ @@ -281,18 +263,6 @@ } } - - const generateGraph = () => { - // VueFlowUtils.cleanGraph(vueflowId.value); - // - // nextTick(() => { - // emit("loading", true); - // fitView(); - // setElements(elements.value); - // emit("loading", false); - // }) - } - const showLogs = (event) => { selectedTask.value = event isShowLogsOpen.value = true; @@ -330,11 +300,6 @@ emit("expand-subflow", event) } } - - // Expose method to be triggered by parents - defineExpose({ - generateGraph - })