Skip to content

Commit

Permalink
feat(ui): display subflow content in topology (#2007)
Browse files Browse the repository at this point in the history
closes #774
  • Loading branch information
brian-mulier-p authored Sep 14, 2023
1 parent e861adc commit 4529282
Show file tree
Hide file tree
Showing 42 changed files with 1,579 additions and 786 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.services.Graph2DotService;
import io.kestra.core.services.GraphService;
import io.kestra.core.utils.GraphUtils;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -32,7 +32,7 @@ public Integer call() throws Exception {
YamlFlowParser parser = applicationContext.getBean(YamlFlowParser.class);
Flow flow = parser.parse(file.toFile(), Flow.class);

GraphCluster graph = GraphService.of(flow, null);
GraphCluster graph = GraphUtils.of(flow, null);

stdOut(Graph2DotService.dot(graph.getGraph()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void run() {
Integer call = PicocliRunner.call(FlowDotCommand.class, ctx, args);

assertThat(call, is(0));
assertThat(out.toString(), containsString("\"date\"[shape=box,label=\"date\"];"));
assertThat(out.toString(), containsString("\"root.date\"[shape=box,label=\"date\"];"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -14,12 +13,14 @@
@Introspected
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
public abstract class AbstractGraph {
@Setter
protected String uid;
@JsonInclude
protected String type;
@Setter
protected boolean error;

public AbstractGraph() {
this.uid = IdUtils.create();
this.type = this.getClass().getName();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

@ToString
@Getter
Expand All @@ -18,36 +19,30 @@ public abstract class AbstractGraphTask extends AbstractGraph {
private final List<String> values;
private final RelationType relationType;

public AbstractGraphTask() {
super();

this.task = null;
this.taskRun = null;
this.values = null;
this.relationType = null;
}

public AbstractGraphTask(Task task, TaskRun taskRun, List<String> values, RelationType relationType) {
super();
public AbstractGraphTask(String uid, Task task, TaskRun taskRun, List<String> values, RelationType relationType) {
super(uid);

this.task = task;
this.taskRun = taskRun;
this.values = values;
this.relationType = relationType;
}

public AbstractGraphTask(Task task, TaskRun taskRun, List<String> values, RelationType relationType) {
this(task.getId(), task, taskRun, values, relationType);
}

@Override
public String getLabel() {
return this.getUid() + (this.getTaskRun() != null ? " > " + this.getTaskRun().getValue() + " (" + this.getTaskRun().getId() + ")" : "");
String[] splitUid = this.getUid().split("\\.");
return splitUid[splitUid.length - 1] + (this.getTaskRun() != null ? " > " + this.getTaskRun().getValue() + " (" + this.getTaskRun().getId() + ")" : "");
}

@Override
public String getUid() {
List<String> list = new ArrayList<>();

if (this.task != null) {
list.add(this.task.getId());
}
list.add(this.uid);

if (values != null) {
list.addAll(values);
Expand All @@ -57,18 +52,11 @@ public String getUid() {
}

@Override
public boolean equals(final Object o) {
if (o == this) return true;

if (!(o instanceof AbstractGraphTask)) {
return false;
}
public boolean equals(Object object) {
if (this == object) return true;
if (object == null || getClass() != object.getClass()) return false;
AbstractGraphTask that = (AbstractGraphTask) object;

return o.hashCode() == this.hashCode();
}

@Override
public int hashCode() {
return (this.uid + this.getClass().getName()).hashCode();
return Objects.equals(this.getUid(), that.getUid());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public AbstractGraphTrigger(AbstractTrigger trigger) {
@Override
public String getUid() {
if (this.trigger != null) {
return this.trigger.getId() + "_trigger";
return this.trigger.getId();
}

return this.uid;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package io.kestra.core.models.hierarchies;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.services.GraphService;
import io.kestra.core.utils.GraphUtils;
import lombok.*;

import java.util.ArrayList;
Expand All @@ -18,18 +16,11 @@ public class FlowGraph {
List<Cluster> clusters;
List<String> flowables;

public static FlowGraph of(Flow flow) throws IllegalVariableEvaluationException {
return FlowGraph.of(flow, null);
}

public static FlowGraph of(Flow flow, Execution execution) throws IllegalVariableEvaluationException {
GraphCluster graph = GraphService.of(flow, execution);

public static FlowGraph of(GraphCluster graph) throws IllegalVariableEvaluationException {
return FlowGraph.builder()
.flowables(GraphService.flowables(flow))
.nodes(GraphService.nodes(graph))
.edges(GraphService.edges(graph))
.clusters(GraphService.clusters(graph, new ArrayList<>())
.nodes(GraphUtils.nodes(graph))
.edges(GraphUtils.edges(graph))
.clusters(GraphUtils.clusters(graph, new ArrayList<>())
.stream()
.map(g -> new Cluster(g.getKey(), g.getKey().getGraph()
.nodes()
Expand Down
16 changes: 6 additions & 10 deletions core/src/main/java/io/kestra/core/models/hierarchies/Graph.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.graph.ValueGraphBuilder;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -27,16 +28,6 @@ public Graph<T, V> addNode(T node) {
}

public Graph<T, V> addEdge(T previous, T next, V value) {
/*
if (!this.graph.nodes().contains(previous)) {
throw new IllegalArgumentException("Unable to find previous node '" + previous + "'@" + previous.hashCode());
}
if (!this.graph.nodes().contains(next)) {
throw new IllegalArgumentException("Unable to find next node '" + next + "'" + next.hashCode());
}
*/

this.graph.putEdgeValue(previous, next, value);

return this;
Expand All @@ -62,7 +53,12 @@ public Set<Edge<T, V>> edges() {
.collect(Collectors.toSet());
}

public void removeNode(T node) {
this.graph.removeNode(node);
}

@Getter
@Setter
@AllArgsConstructor
public static class Edge<T, V> {
T source;
Expand Down
105 changes: 84 additions & 21 deletions core/src/main/java/io/kestra/core/models/hierarchies/GraphCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,30 @@
import io.kestra.core.models.tasks.Task;
import lombok.Getter;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;


@Getter
public class GraphCluster extends AbstractGraph {
@JsonIgnore
protected final Graph<AbstractGraph, Relation> graph = new Graph<>();
private final Graph<AbstractGraph, Relation> graph = new Graph<>();

protected RelationType relationType;
private final RelationType relationType;

@JsonIgnore
private final GraphClusterRoot root;

@JsonIgnore
private final GraphClusterEnd end;

public GraphCluster() {
super();

this.relationType = null;
this.root = new GraphClusterRoot();
this.end = new GraphClusterEnd();
private final AbstractGraphTask taskNode;

graph.addNode(this.root);
graph.addNode(this.end);
public GraphCluster() {
this("root");
}


Expand All @@ -39,25 +38,89 @@ public GraphCluster(String uid) {
this.relationType = null;
this.root = new GraphClusterRoot();
this.end = new GraphClusterEnd();
this.taskNode = null;

graph.addNode(this.root);
graph.addNode(this.end);
this.addNode(this.root);
this.addNode(this.end);
}

public GraphCluster(Task task, TaskRun taskRun, List<String> values, RelationType relationType) {
super();
this(new GraphTask(task.getId(), task, taskRun, values, relationType), task.getId(), relationType);

this.uid = "cluster_" + task.getId();
this.relationType = relationType;
this.addNode(this.taskNode, false);
this.addEdge(this.getRoot(), this.taskNode, new Relation());
}

protected GraphCluster(AbstractGraphTask taskNode, String uid, RelationType relationType) {
super(uid);

this.relationType = relationType;
this.root = new GraphClusterRoot();
this.end = new GraphClusterEnd();
this.taskNode = taskNode;

this.addNode(this.root);
this.addNode(this.end);
}

public void addNode(AbstractGraph node) {
this.addNode(node, true);
}

public void addNode(AbstractGraph node, boolean withClusterUidPrefix) {
if (withClusterUidPrefix) {
node.setUid(prefixedUid(node.uid));
}
this.getGraph().addNode(node);
}

public void addEdge(AbstractGraph source, AbstractGraph target, Relation relation) {
this.getGraph().addEdge(source, target, relation);
}

private String prefixedUid(String uid) {
return Optional.ofNullable(this.uid).map(u -> u + "." + uid).orElse(uid);
}

public Map<GraphCluster, List<AbstractGraph>> allNodesByParent() {
Map<Boolean, List<AbstractGraph>> nodesByIsCluster = this.graph.nodes().stream().collect(Collectors.partitioningBy(n -> n instanceof GraphCluster));

Map<GraphCluster, List<AbstractGraph>> nodesByParent = new HashMap<>(Map.of(
this,
nodesByIsCluster.get(false)
));

nodesByIsCluster.get(true).forEach(n -> {
GraphCluster cluster = (GraphCluster) n;
nodesByParent.putAll(cluster.allNodesByParent());
});

return nodesByParent;
}

@Override
public String getUid() {
return "cluster_" + super.getUid();
}

@Override
public void setUid(String uid) {
graph.nodes().stream().filter(node ->
// filter other clusters' root & end to prevent setting uid multiple times
// this is because we need other clusters' root & end to have edges over them, but they are already managed by their own cluster
(!(node instanceof GraphClusterRoot) && !(node instanceof GraphClusterEnd))
|| node.equals(this.root) || node.equals(this.end))
.forEach(node -> node.setUid(uid + node.uid.substring(this.uid.length())));

super.setUid(uid);
}

graph.addNode(this.root);
graph.addNode(this.end);
@Override
public void setError(boolean error) {
this.error = error;

GraphTask flowableGraphTask = new GraphTask(task, taskRun, values, relationType);
this.getGraph().addNode(flowableGraphTask);
this.getGraph().addEdge(this.getRoot(), flowableGraphTask, new Relation());
this.taskNode.error = error;
this.root.error = error;
this.end.error = error;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
@Getter
public class GraphClusterEnd extends AbstractGraph {
public GraphClusterEnd() {
super(IdUtils.create() + "_end");
super(IdUtils.create());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
@Getter
public class GraphClusterRoot extends AbstractGraph {
public GraphClusterRoot() {
super(IdUtils.create() + "_root");
super(IdUtils.create());
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package io.kestra.core.models.hierarchies;

import lombok.Getter;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.Task;
import lombok.Getter;

import java.util.List;

@Getter
public class GraphTask extends AbstractGraphTask {
public GraphTask() {
super();
public GraphTask(String uid, Task task, TaskRun taskRun, List<String> values, RelationType relationType) {
super(uid, task, taskRun, values, relationType);
}

public GraphTask(Task task, TaskRun taskRun, List<String> values, RelationType relationType) {
Expand Down
Loading

0 comments on commit 4529282

Please sign in to comment.