Skip to content

Commit

Permalink
fix(core): no longer crashing app on Flow triggers without condition
Browse files Browse the repository at this point in the history
closes #2060
  • Loading branch information
brian-mulier-p committed Sep 8, 2023
1 parent 92b1c21 commit 7044a4c
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, Li
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
.filter(flowWithFlowTrigger -> conditionService.valid(
flowWithFlowTrigger.getFlow(),
flowWithFlowTrigger.getTrigger().getConditions().stream()
Optional.ofNullable(flowWithFlowTrigger.getTrigger().getConditions()).stream().flatMap(Collection::stream)
.filter(Predicate.not(MultipleCondition.class::isInstance))
.toList(),
conditionService.conditionContext(
Expand All @@ -67,7 +67,7 @@ public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, Li
if (multipleConditionStorage.isPresent()) {
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
.flatMap(flowWithFlowTrigger ->
flowWithFlowTrigger.getTrigger().getConditions().stream()
Optional.ofNullable(flowWithFlowTrigger.getTrigger().getConditions()).stream().flatMap(Collection::stream)
.filter(MultipleCondition.class::isInstance)
.map(MultipleCondition.class::cast)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void trigger() throws InterruptedException, TimeoutException {
executionQueue.receive(either -> {
Execution execution = either.getLeft();
synchronized (ended) {
if (execution.getState().getCurrent() == State.Type.SUCCESS) {
if (execution.getState().getCurrent() == State.Type.SUCCESS && !execution.getFlowId().equals("trigger-flow-listener-no-condition")) {
if (!ended.containsKey(execution.getId())) {
ended.put(execution.getId(), execution);
countDownLatch.countDown();
Expand Down Expand Up @@ -93,7 +93,7 @@ public void failed() throws InterruptedException, TimeoutException {
executionQueue.receive(either -> {
synchronized (ended) {
Execution execution = either.getLeft();
if (execution.getState().getCurrent().isTerminated()) {
if (execution.getState().getCurrent().isTerminated() && !execution.getFlowId().equals("trigger-flow-listener-no-condition")) {
if (!ended.containsKey(execution.getId())) {
ended.put(execution.getId(), execution);
countDownLatch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void restartFailedThenSuccess() throws Exception {

// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getState().getCurrent() == State.Type.SUCCESS,
execution -> !execution.getFlowId().equals("trigger-flow-listener-no-condition") && execution.getState().getCurrent() == State.Type.SUCCESS,
throwRunnable(() -> {
Execution restartedExec = executionService.restart(firstExecution, null);
executionQueue.emit(restartedExec);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package io.kestra.core.tasks.flows;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -16,7 +19,7 @@ public class BadFlowableTest extends AbstractMemoryRunnerTest {
void sequential() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "bad-flowable");

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat("Task runs were: "+ execution.getTaskRunList().stream().map(TaskRun::getTaskId).toList(), execution.getTaskRunList(), hasSize(2));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void run(RunnerUtils runnerUtils) throws Exception {
);

execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
e -> !e.getFlowId().equals("trigger-flow-listener-no-condition") && e.getState().getCurrent() == State.Type.SUCCESS,
() -> executionQueue.emit(restarted),
Duration.ofSeconds(5)
);
Expand All @@ -81,7 +81,7 @@ public void runDelay(RunnerUtils runnerUtils) throws Exception {
assertThat(execution.getTaskRunList(), hasSize(1));

execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
e -> !e.getFlowId().equals("trigger-flow-listener-no-condition") && e.getState().getCurrent() == State.Type.SUCCESS,
() -> {},
Duration.ofSeconds(5)
);
Expand All @@ -104,7 +104,7 @@ public void runTimeout(RunnerUtils runnerUtils) throws Exception {
Duration.ofSeconds(5)
);

assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat("Task runs were: " + execution.getTaskRunList().toString(), execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.FAILED).count(), is(1L));
assertThat(execution.getTaskRunList(), hasSize(1));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
id: trigger-flow-listener-no-condition
namespace: io.kestra.tests

inputs:
- name: from-parent
type: STRING

tasks:
- id: only-listener
type: io.kestra.core.tasks.debugs.Return
format: "simple return"

triggers:
- id: listen-flow
type: io.kestra.core.models.triggers.types.Flow

0 comments on commit 7044a4c

Please sign in to comment.