Skip to content

Commit

Permalink
feat(core): allow subflows to inherit parent flow's labels (#2121)
Browse files Browse the repository at this point in the history
closes #2114
  • Loading branch information
yuri1969 authored Sep 22, 2023
1 parent 444a775 commit 456b18d
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ private Executor handleFlowTask(final Executor executor) {
);

try {
Execution execution = flowTask.createExecution(runContext, flowExecutorInterface());
Execution execution = flowTask.createExecution(runContext, flowExecutorInterface(), executor.getExecution());

WorkerTaskExecution workerTaskExecution = WorkerTaskExecution.builder()
.task(flowTask)
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/java/io/kestra/core/tasks/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {
@PluginProperty
private final Boolean transmitFailed = false;

@Builder.Default
@Schema(
title = "Inherit labels from the calling execution",
description = "By default, we don't inherit any labels of the calling execution"
)
@PluginProperty
private final Boolean inheritLabels = false;

@Schema(
title = "Extract outputs from triggered executions.",
description = "Allow to specify key value (with value rendered), in order to extract any outputs from " +
Expand All @@ -117,7 +125,7 @@ public String flowUidWithoutRevision() {
}

@SuppressWarnings("unchecked")
public Execution createExecution(RunContext runContext, FlowExecutorInterface flowExecutorInterface) throws Exception {
public Execution createExecution(RunContext runContext, FlowExecutorInterface flowExecutorInterface, Execution currentExecution) throws Exception {
RunnerUtils runnerUtils = runContext.getApplicationContext().getBean(RunnerUtils.class);

Map<String, String> inputs = new HashMap<>();
Expand All @@ -128,6 +136,9 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl
}

List<Label> labels = new ArrayList<>();
if (this.inheritLabels) {
labels.addAll(currentExecution.getLabels());
}
if (this.labels != null) {
for (Map.Entry<String, String> entry: this.labels.entrySet()) {
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
Expand Down
15 changes: 13 additions & 2 deletions core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package io.kestra.core.tasks.flows;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
Expand Down Expand Up @@ -41,7 +44,7 @@ public void invalidOutputs() throws Exception {
}

@SuppressWarnings({"ResultOfMethodCallIgnored", "unchecked"})
void run(String input, State.Type fromState, State.Type triggerState, int count, String outputs) throws Exception {
void run(String input, State.Type fromState, State.Type triggerState, int count, String outputs) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<Execution> triggered = new AtomicReference<>();

Expand All @@ -58,7 +61,8 @@ void run(String input, State.Type fromState, State.Type triggerState, int count
"task-flow",
null,
(f, e) -> ImmutableMap.of("string", input),
Duration.ofMinutes(1)
Duration.ofMinutes(1),
List.of(new Label("mainFlowExecutionLabel", "execFoo"))
);

countDownLatch.await(1, TimeUnit.MINUTES);
Expand All @@ -85,5 +89,12 @@ void run(String input, State.Type fromState, State.Type triggerState, int count

assertThat(triggered.get().getTaskRunList(), hasSize(count));
assertThat(triggered.get().getState().getCurrent(), is(triggerState));

assertThat(triggered.get().getLabels(), hasItems(
new Label("mainFlowExecutionLabel", "execFoo"),
new Label("mainFlowLabel", "flowFoo"),
new Label("launchTaskLabel", "launchFoo"),
new Label("switchFlowLabel", "switchFoo")
));
}
}
3 changes: 3 additions & 0 deletions core/src/test/resources/flows/valids/switch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ inputs:
type: STRING
defaults: amazing

labels:
switchFlowLabel: switchFoo

tasks:
- id: parent-seq
type: io.kestra.core.tasks.flows.Switch
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/resources/flows/valids/task-flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ inputs:
- name: string
type: STRING

labels:
mainFlowLabel: flowFoo

tasks:
- id: launch
type: io.kestra.core.tasks.flows.Flow
Expand All @@ -14,5 +17,8 @@ tasks:
string: "{{ inputs.string }}"
wait: true
transmitFailed: true
inheritLabels: true
labels:
launchTaskLabel: launchFoo
outputs:
extracted: "{{ outputs.default.value ?? outputs['error-t1'].value }}"

0 comments on commit 456b18d

Please sign in to comment.