Skip to content

Commit

Permalink
fix(core): avoid infinite loop in the executor for Flowable tasks
Browse files Browse the repository at this point in the history
Fixes #1745

In the executor, when a flowable task throw an exception when calling `resolveNexts()` this will fail the current evaluation of the executor that will be retried indefinitly.
Catching exception and returing an empty next will avoid the infinite loop and the task will fail properly.
  • Loading branch information
loicmathieu committed Jul 17, 2023
1 parent c9c2e67 commit d102e22
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 15 deletions.
32 changes: 18 additions & 14 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,23 +221,27 @@ private List<TaskRun> childNextsTaskRun(Executor executor, TaskRun parentTaskRun

if (parent instanceof FlowableTask<?> flowableParent) {

List<NextTaskRun> nexts = flowableParent.resolveNexts(
runContextFactory.of(
executor.getFlow(),
parent,
try {
List<NextTaskRun> nexts = flowableParent.resolveNexts(
runContextFactory.of(
executor.getFlow(),
parent,
executor.getExecution(),
parentTaskRun
),
executor.getExecution(),
parentTaskRun
),
executor.getExecution(),
parentTaskRun
);

if (nexts.size() > 0) {
return this.saveFlowableOutput(
nexts,
executor,
parentTaskRun
);

if (nexts.size() > 0) {
return this.saveFlowableOutput(
nexts,
executor,
parentTaskRun
);
}
} catch (Exception e) {
log.warn("Unable to resolve the next tasks to run", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,12 @@ void sequential() throws TimeoutException {
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}

@Test // this test is a non-reg for an infinite loop in the executor
void flowableWithParentFail() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "flowable-with-parent-fail");

assertThat(execution.getTaskRunList(), hasSize(5));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static void withFailedTemplate(RunnerUtils runnerUtils, TemplateRepositor

assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
LogEntry matchingLog = TestsUtils.awaitLog(logs, logEntry -> logEntry.getMessage().equals("Can't find flow template 'io.kestra.tests.invalid'") && logEntry.getLevel() == Level.ERROR);
LogEntry matchingLog = TestsUtils.awaitLog(logs, logEntry -> logEntry.getMessage().endsWith("Can't find flow template 'io.kestra.tests.invalid'") && logEntry.getLevel() == Level.ERROR);
assertThat(matchingLog, notNullValue());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
id: flowable-with-parent-fail
namespace: io.kestra.tests

tasks:
- id: vision
type: io.kestra.core.tasks.flows.EachParallel
tasks:
- id: metaseg_date
type: io.kestra.core.tasks.flows.EachParallel
tasks:
- id: if
type: io.kestra.core.tasks.flows.If
condition: "{{parents[0].taskrun.value == 'CUMULATIVE' and {% for entry in json(taskrun.value) %}{{ entry.key }}{% endfor %}== 'NEW_MONTH'}}"
then:
- id: when-true
type: io.kestra.core.tasks.log.Log
message: 'Condition was true'
else:
- id: when-false
type: io.kestra.core.tasks.log.Log
message: 'Condition was false'
value: "[{\"NEW_MONTH\":\"2018-01-01\"}]"
value: "[\"MONTHLY\", \"CUMULATIVE\"]"

0 comments on commit d102e22

Please sign in to comment.