From 974729bf52bcf42bebc2fd31ed7fc74fb2b98f8f Mon Sep 17 00:00:00 2001 From: "brian.mulier" Date: Thu, 24 Aug 2023 19:25:03 +0200 Subject: [PATCH 1/3] fix(core): multiple conditions are now working properly for "double flow in success within window" example + exported all multiple conditions related evaluation / save to a service closes #1917 --- .../MultipleConditionStorageInterface.java | 4 + .../services/AbstractFlowTriggerService.java | 158 ++++++++++++++++++ .../io/kestra/core/services/FlowService.java | 142 +--------------- .../MultipleConditionTriggerCaseTest.java | 8 +- .../MultipleConditionTriggerFailedTest.java | 12 -- .../AbstractJdbcMultipleConditionStorage.java | 2 + .../io/kestra/jdbc/runner/JdbcExecutor.java | 21 +-- .../jdbc/runner/JdbcFlowTriggerService.java | 17 ++ .../kestra/runner/memory/MemoryExecutor.java | 19 +-- .../memory/MemoryFlowTriggerService.java | 23 +++ .../MemoryMultipleConditionStorage.java | 2 + 11 files changed, 219 insertions(+), 189 deletions(-) create mode 100644 core/src/main/java/io/kestra/core/services/AbstractFlowTriggerService.java delete mode 100644 core/src/test/java/io/kestra/core/runners/MultipleConditionTriggerFailedTest.java create mode 100644 jdbc/src/main/java/io/kestra/jdbc/runner/JdbcFlowTriggerService.java create mode 100644 runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowTriggerService.java diff --git a/core/src/main/java/io/kestra/core/models/triggers/multipleflows/MultipleConditionStorageInterface.java b/core/src/main/java/io/kestra/core/models/triggers/multipleflows/MultipleConditionStorageInterface.java index f5672eebb6c..3e774f97479 100644 --- a/core/src/main/java/io/kestra/core/models/triggers/multipleflows/MultipleConditionStorageInterface.java +++ b/core/src/main/java/io/kestra/core/models/triggers/multipleflows/MultipleConditionStorageInterface.java @@ -48,4 +48,8 @@ default MultipleConditionWindow getOrCreate(Flow flow, MultipleCondition multipl .build() ); } + + void save(List multipleConditionWindows); + + void delete(MultipleConditionWindow multipleConditionWindow); } diff --git a/core/src/main/java/io/kestra/core/services/AbstractFlowTriggerService.java b/core/src/main/java/io/kestra/core/services/AbstractFlowTriggerService.java new file mode 100644 index 00000000000..93f49ba88e4 --- /dev/null +++ b/core/src/main/java/io/kestra/core/services/AbstractFlowTriggerService.java @@ -0,0 +1,158 @@ +package io.kestra.core.services; + +import io.kestra.core.models.conditions.types.MultipleCondition; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.triggers.AbstractTrigger; +import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface; +import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow; +import io.kestra.core.runners.RunContextFactory; +import jakarta.inject.Inject; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + +import java.util.*; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class AbstractFlowTriggerService { + @Inject + protected Optional multipleConditionStorage; + + @Inject + private ConditionService conditionService; + + @Inject + private RunContextFactory runContextFactory; + + @Inject + private FlowService flowService; + + public AbstractFlowTriggerService(Optional multipleConditionStorage, ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) { + this.multipleConditionStorage = multipleConditionStorage; + this.conditionService = conditionService; + this.runContextFactory = runContextFactory; + this.flowService = flowService; + } + + public Stream withFlowTriggersOnly(Stream allFlows) { + return allFlows + .filter(flow -> !flow.isDisabled()) + .filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty()) + .flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowService.FlowWithFlowTrigger(flow, trigger))); + } + + public Stream flowTriggers(Flow flow) { + return flow.getTriggers() + .stream() + .filter(Predicate.not(AbstractTrigger::isDisabled)) + .filter(io.kestra.core.models.triggers.types.Flow.class::isInstance) + .map(io.kestra.core.models.triggers.types.Flow.class::cast); + } + + public List computeExecutionsFromFlowTriggers(Execution execution, List allFlows) { + List validTriggersBeforeMultipleConditionEval = allFlows.stream() + // prevent recursive flow triggers + .filter(flow -> flowService.removeUnwanted(flow, execution)) + // ensure flow & triggers are enabled + .filter(flow -> !flow.isDisabled()) + .filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty()) + // validate flow triggers conditions excluding multiple conditions + .flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowService.FlowWithFlowTrigger(flow, trigger))) + .filter(flowWithFlowTrigger -> conditionService.valid( + flowWithFlowTrigger.getFlow(), + flowWithFlowTrigger.getTrigger().getConditions().stream() + .filter(Predicate.not(MultipleCondition.class::isInstance)) + .toList(), + conditionService.conditionContext( + runContextFactory.of(flowWithFlowTrigger.getFlow(), execution), + flowWithFlowTrigger.getFlow(), + execution + ) + )).toList(); + + Map multipleConditionWindowsByFlow = null; + if (multipleConditionStorage.isPresent()) { + List flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream() + .flatMap(flowWithFlowTrigger -> + flowWithFlowTrigger.getTrigger().getConditions().stream() + .filter(MultipleCondition.class::isInstance) + .map(MultipleCondition.class::cast) + .map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition( + flowWithFlowTrigger.getFlow(), + multipleConditionStorage.get().getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition), + flowWithFlowTrigger.getTrigger(), + multipleCondition + ) + ) + ).toList(); + + // evaluate multiple conditions + multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> { + Map results = f.getMultipleCondition() + .getConditions() + .entrySet() + .stream() + .map(e -> new AbstractMap.SimpleEntry<>( + e.getKey(), + conditionService.isValid(e.getValue(), f.getFlow(), execution) + )) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return Map.entry(f, f.getMultipleConditionWindow().with(results)); + }) + .filter(e -> !e.getValue().getResults().isEmpty()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + // persist results + multipleConditionStorage.get().save(new ArrayList<>(multipleConditionWindowsByFlow.values())); + } + + // compute all executions to create from flow triggers now that multiple conditions storage is populated + List executions = validTriggersBeforeMultipleConditionEval.stream().filter(flowWithFlowTrigger -> + conditionService.isValid( + flowWithFlowTrigger.getTrigger(), + flowWithFlowTrigger.getFlow(), + execution, + multipleConditionStorage.orElse(null) + ) + ).map(f -> f.getTrigger().evaluate( + runContextFactory.of(f.getFlow(), execution), + f.getFlow(), + execution + )) + .filter(Optional::isPresent) + .map(Optional::get) + .toList(); + + if(multipleConditionStorage.isPresent() && multipleConditionWindowsByFlow != null) { + // purge fulfilled or expired multiple condition windows + Stream.concat( + multipleConditionWindowsByFlow.keySet().stream() + .map(f -> Map.entry( + f.getMultipleCondition().getConditions(), + multipleConditionStorage.get().getOrCreate(f.getFlow(), f.getMultipleCondition()) + )) + .filter(e -> e.getKey().size() == Optional.ofNullable(e.getValue().getResults()) + .map(Map::size) + .orElse(0)) + .map(Map.Entry::getValue), + multipleConditionStorage.get().expired().stream() + ).forEach(multipleConditionStorage.get()::delete); + } + + return executions; + } + + @AllArgsConstructor + @Getter + @ToString + protected static class FlowWithFlowTriggerAndMultipleCondition { + private final Flow flow; + private final MultipleConditionWindow multipleConditionWindow; + private final io.kestra.core.models.triggers.types.Flow trigger; + private final MultipleCondition multipleCondition; + } +} diff --git a/core/src/main/java/io/kestra/core/services/FlowService.java b/core/src/main/java/io/kestra/core/services/FlowService.java index f0c080f9af7..5881846e3a3 100644 --- a/core/src/main/java/io/kestra/core/services/FlowService.java +++ b/core/src/main/java/io/kestra/core/services/FlowService.java @@ -2,12 +2,9 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; -import io.kestra.core.models.conditions.types.MultipleCondition; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.triggers.AbstractTrigger; -import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface; -import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow; import io.kestra.core.runners.RunContextFactory; import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.utils.ListUtils; @@ -83,131 +80,11 @@ private Stream keepLastVersionCollector(Stream stream) { .filter(Objects::nonNull); } - public List flowWithFlowTrigger(Stream flowStream) { - return flowStream - .filter(flow -> flow.getTriggers() != null && flow.getTriggers().size() > 0) - .filter(flow -> !flow.isDisabled()) - .flatMap(flow -> flow.getTriggers() - .stream() - .filter(abstractTrigger -> !abstractTrigger.isDisabled()) - .map(trigger -> new FlowWithTrigger(flow, trigger)) - ) - .filter(f -> f.getTrigger() instanceof io.kestra.core.models.triggers.types.Flow) - .map(f -> new FlowWithFlowTrigger( - f.getFlow(), - (io.kestra.core.models.triggers.types.Flow) f.getTrigger() - ) - ) - .collect(Collectors.toList()); - } - protected boolean removeUnwanted(Flow f, Execution execution) { // we don't allow recursive return !f.uidWithoutRevision().equals(Flow.uidWithoutRevision(execution)); } - public List flowTriggerExecution(Stream flowStream, Execution execution, @Nullable MultipleConditionStorageInterface multipleConditionStorage) { - return flowStream - .filter(flow -> flow.getTriggers() != null && flow.getTriggers().size() > 0) - .filter(flow -> !flow.isDisabled()) - .flatMap(flow -> flow.getTriggers() - .stream() - .filter(abstractTrigger -> !abstractTrigger.isDisabled()) - .map(trigger -> new FlowWithTrigger(flow, trigger)) - ) - .filter(f -> conditionService.isValid( - f.getTrigger(), - f.getFlow(), - execution, - multipleConditionStorage - )) - .filter(f -> f.getTrigger() instanceof io.kestra.core.models.triggers.types.Flow) - .map(f -> new FlowWithFlowTrigger( - f.getFlow(), - (io.kestra.core.models.triggers.types.Flow) f.getTrigger() - ) - ) - .filter(f -> this.removeUnwanted(f.getFlow(), execution)) - .map(f -> f.getTrigger().evaluate( - runContextFactory.of(f.getFlow(), execution), - f.getFlow(), - execution - )) - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toList()); - } - - private Stream multipleFlowStream( - Stream flowStream, - MultipleConditionStorageInterface multipleConditionStorage - ) { - return flowWithFlowTrigger(flowStream) - .stream() - .flatMap(e -> e.getTrigger() - .getConditions() - .stream() - .filter(condition -> condition instanceof MultipleCondition) - .map(condition -> { - MultipleCondition multipleCondition = (MultipleCondition) condition; - - return new FlowWithFlowTriggerAndMultipleCondition( - e.getFlow(), - multipleConditionStorage.getOrCreate(e.getFlow(), multipleCondition), - e.getTrigger(), - multipleCondition - ); - }) - ); - } - - public List multipleFlowTrigger( - Stream flowStream, - Flow flow, - Execution execution, - MultipleConditionStorageInterface multipleConditionStorage - ) { - return multipleFlowStream(flowStream, multipleConditionStorage) - .map(f -> { - Map results = f.getMultipleCondition() - .getConditions() - .entrySet() - .stream() - .map(e -> new AbstractMap.SimpleEntry<>( - e.getKey(), - conditionService.isValid(e.getValue(), flow, execution) - )) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - return f.getMultipleConditionWindow().with(results); - }) - .filter(multipleConditionWindow -> multipleConditionWindow.getResults().size() > 0) - .collect(Collectors.toList()); - } - - public List multipleFlowToDelete( - Stream flowStream, - MultipleConditionStorageInterface multipleConditionStorage - ) { - return Stream - .concat( - multipleFlowStream(flowStream, multipleConditionStorage) - .filter(f -> f.getMultipleCondition().getConditions().size() == - (f.getMultipleConditionWindow().getResults() == null ? 0 : - f.getMultipleConditionWindow() - .getResults() - .entrySet() - .stream() - .filter(Map.Entry::getValue) - .count() - ) - ) - .map(FlowWithFlowTriggerAndMultipleCondition::getMultipleConditionWindow), - multipleConditionStorage.expired().stream() - ) - .collect(Collectors.toList()); - } - public static List findRemovedTrigger(Flow flow, Flow previous) { return ListUtils.emptyOnNull(previous.getTriggers()) .stream() @@ -219,7 +96,7 @@ public static List findRemovedTrigger(Flow flow, Flow previous) } public static String cleanupSource(String source) { - return source.replaceFirst("(?m)^revision: \\d+\n?",""); + return source.replaceFirst("(?m)^revision: \\d+\n?", ""); } public static String injectDisabled(String source, Boolean disabled) { @@ -307,13 +184,6 @@ private static Object fixSnakeYaml(Object object) { return object; } - @AllArgsConstructor - @Getter - private static class FlowWithTrigger { - private final Flow flow; - private final AbstractTrigger trigger; - } - @AllArgsConstructor @Getter @ToString @@ -321,14 +191,4 @@ public static class FlowWithFlowTrigger { private final Flow flow; private final io.kestra.core.models.triggers.types.Flow trigger; } - - @AllArgsConstructor - @Getter - @ToString - private static class FlowWithFlowTriggerAndMultipleCondition { - private final Flow flow; - private final MultipleConditionWindow multipleConditionWindow; - private final io.kestra.core.models.triggers.types.Flow trigger; - private final MultipleCondition multipleCondition; - } } diff --git a/core/src/test/java/io/kestra/core/runners/MultipleConditionTriggerCaseTest.java b/core/src/test/java/io/kestra/core/runners/MultipleConditionTriggerCaseTest.java index d11b82c9b69..bd92823eafe 100644 --- a/core/src/test/java/io/kestra/core/runners/MultipleConditionTriggerCaseTest.java +++ b/core/src/test/java/io/kestra/core/runners/MultipleConditionTriggerCaseTest.java @@ -101,18 +101,18 @@ public void failed() throws InterruptedException, TimeoutException { }); // first one - Execution execution = runnerUtils.runOne("io.kestra.tests", "trigger-multiplecondition-flow-d", Duration.ofSeconds(60)); + Execution execution = runnerUtils.runOne("io.kestra.tests", "trigger-multiplecondition-flow-c", Duration.ofSeconds(60)); assertThat(execution.getTaskRunList().size(), is(1)); - assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); + assertThat(execution.getState().getCurrent(), is(State.Type.FAILED)); // wait a little to be sure that the trigger is not launching execution countDownLatch.await(1, TimeUnit.SECONDS); assertThat(ended.size(), is(1)); // second one - execution = runnerUtils.runOne("io.kestra.tests", "trigger-multiplecondition-flow-c", Duration.ofSeconds(60)); + execution = runnerUtils.runOne("io.kestra.tests", "trigger-multiplecondition-flow-d", Duration.ofSeconds(60)); assertThat(execution.getTaskRunList().size(), is(1)); - assertThat(execution.getState().getCurrent(), is(State.Type.FAILED)); + assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); // trigger was not done countDownLatch.await(10, TimeUnit.SECONDS); diff --git a/core/src/test/java/io/kestra/core/runners/MultipleConditionTriggerFailedTest.java b/core/src/test/java/io/kestra/core/runners/MultipleConditionTriggerFailedTest.java deleted file mode 100644 index 57232b1b68b..00000000000 --- a/core/src/test/java/io/kestra/core/runners/MultipleConditionTriggerFailedTest.java +++ /dev/null @@ -1,12 +0,0 @@ -package io.kestra.core.runners; - -import io.micronaut.test.extensions.junit5.annotation.MicronautTest; -import jakarta.inject.Inject; -import org.junit.jupiter.api.Test; - -@MicronautTest -public class MultipleConditionTriggerFailedTest extends AbstractMemoryRunnerTest { - @Inject - private MultipleConditionTriggerCaseTest runnerCaseTest; - -} diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcMultipleConditionStorage.java b/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcMultipleConditionStorage.java index 702d807b13c..24d0eaa2ad9 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcMultipleConditionStorage.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcMultipleConditionStorage.java @@ -61,6 +61,7 @@ public List expired() { }); } + @Override public synchronized void save(List multipleConditionWindows) { this.jdbcRepository .getDslContextWrapper() @@ -75,6 +76,7 @@ public synchronized void save(List multipleConditionWin }); } + @Override public void delete(MultipleConditionWindow multipleConditionWindow) { this.jdbcRepository.delete(multipleConditionWindow); } diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index 3f3c25c3a5d..839e9528f43 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -91,13 +91,13 @@ public class JdbcExecutor implements ExecutorInterface { private ConditionService conditionService; @Inject - private MetricRegistry metricRegistry; + private JdbcFlowTriggerService flowTriggerService; @Inject - protected FlowListenersInterface flowListeners; + private MetricRegistry metricRegistry; @Inject - private AbstractJdbcMultipleConditionStorage multipleConditionStorage; + protected FlowListenersInterface flowListeners; @Inject private AbstractJdbcWorkerTaskExecutionStorage workerTaskExecutionStorage; @@ -295,21 +295,8 @@ private void executionQueue(Execution message) { conditionService.isTerminatedWithListeners(flow, execution) && this.deduplicateFlowTrigger(execution, executorState) ) { - // multiple conditions storage - multipleConditionStorage.save( - flowService - .multipleFlowTrigger(allFlows.stream(), flow, execution, multipleConditionStorage) - ); - - // Flow Trigger - flowService - .flowTriggerExecution(allFlows.stream(), execution, multipleConditionStorage) + flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows) .forEach(this.executionQueue::emit); - - // Trigger is done, remove matching multiple condition - flowService - .multipleFlowToDelete(allFlows.stream(), multipleConditionStorage) - .forEach(multipleConditionStorage::delete); } // worker task execution diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcFlowTriggerService.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcFlowTriggerService.java new file mode 100644 index 00000000000..b32b0efff64 --- /dev/null +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcFlowTriggerService.java @@ -0,0 +1,17 @@ +package io.kestra.jdbc.runner; + +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.services.AbstractFlowTriggerService; +import io.kestra.core.services.ConditionService; +import io.kestra.core.services.FlowService; +import jakarta.inject.Singleton; + +import java.util.Optional; + +@JdbcRunnerEnabled +@Singleton +public class JdbcFlowTriggerService extends AbstractFlowTriggerService { + public JdbcFlowTriggerService(Optional multipleConditionStorage, ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) { + super(multipleConditionStorage, conditionService, runContextFactory, flowService); + } +} diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java index bcfd8f3e8ee..a6b2691c05b 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java @@ -35,7 +35,6 @@ @MemoryQueueEnabled @Slf4j public class MemoryExecutor implements ExecutorInterface { - private static final MemoryMultipleConditionStorage multipleConditionStorage = new MemoryMultipleConditionStorage(); private static final ConcurrentHashMap EXECUTIONS = new ConcurrentHashMap<>(); private static final ConcurrentHashMap WORKERTASKEXECUTIONS_WATCHER = new ConcurrentHashMap<>(); private List allFlows; @@ -93,6 +92,9 @@ public class MemoryExecutor implements ExecutorInterface { @Inject private SkipExecutionService skipExecutionService; + @Inject + private MemoryFlowTriggerService flowTriggerService; + @Override public void run() { flowListeners.run(); @@ -231,21 +233,8 @@ private void handleExecution(ExecutionState state) { // multiple condition if (conditionService.isTerminatedWithListeners(flow, execution)) { - // multiple conditions storage - multipleConditionStorage.save( - flowService - .multipleFlowTrigger(allFlows.stream(), flow, execution, multipleConditionStorage) - ); - - // Flow Trigger - flowService - .flowTriggerExecution(allFlows.stream(), execution, multipleConditionStorage) + flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows) .forEach(this.executionQueue::emit); - - // Trigger is done, remove matching multiple condition - flowService - .multipleFlowToDelete(allFlows.stream(), multipleConditionStorage) - .forEach(multipleConditionStorage::delete); } // worker task execution diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowTriggerService.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowTriggerService.java new file mode 100644 index 00000000000..ab235adfa50 --- /dev/null +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowTriggerService.java @@ -0,0 +1,23 @@ +package io.kestra.runner.memory; + +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.services.AbstractFlowTriggerService; +import io.kestra.core.services.ConditionService; +import io.kestra.core.services.FlowService; +import jakarta.inject.Singleton; + +import javax.annotation.PostConstruct; +import java.util.Optional; + +@MemoryQueueEnabled +@Singleton +public class MemoryFlowTriggerService extends AbstractFlowTriggerService { + public MemoryFlowTriggerService(Optional multipleConditionStorage, ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) { + super(multipleConditionStorage, conditionService, runContextFactory, flowService); + } + + @PostConstruct + private void init() { + this.multipleConditionStorage = Optional.of(new MemoryMultipleConditionStorage()); + } +} diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryMultipleConditionStorage.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryMultipleConditionStorage.java index b59d1d17449..ce3d10d9661 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryMultipleConditionStorage.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryMultipleConditionStorage.java @@ -51,6 +51,7 @@ public List expired() { .collect(Collectors.toList()); } + @Override public synchronized void save(List multipleConditionWindows) { multipleConditionWindows .forEach(window -> { @@ -70,6 +71,7 @@ public synchronized void save(List multipleConditionWin }); } + @Override public void delete(MultipleConditionWindow multipleConditionWindow) { find(multipleConditionWindow.getNamespace(), multipleConditionWindow.getFlowId()) .ifPresent(byCondition -> { From 28fb2debdc00b00f722a46a872214596c01b79e6 Mon Sep 17 00:00:00 2001 From: "brian.mulier" Date: Mon, 28 Aug 2023 11:20:10 +0200 Subject: [PATCH 2/3] fix(core): post-review fixes --- .../services/AbstractFlowTriggerService.java | 30 +++++++++---------- .../io/kestra/core/services/FlowService.java | 11 ------- .../io/kestra/jdbc/runner/JdbcExecutor.java | 9 +++--- .../jdbc/runner/JdbcFlowTriggerService.java | 10 +------ .../kestra/runner/memory/MemoryExecutor.java | 8 +++-- .../memory/MemoryFlowTriggerService.java | 16 +--------- 6 files changed, 26 insertions(+), 58 deletions(-) diff --git a/core/src/main/java/io/kestra/core/services/AbstractFlowTriggerService.java b/core/src/main/java/io/kestra/core/services/AbstractFlowTriggerService.java index 93f49ba88e4..80ae142b6d2 100644 --- a/core/src/main/java/io/kestra/core/services/AbstractFlowTriggerService.java +++ b/core/src/main/java/io/kestra/core/services/AbstractFlowTriggerService.java @@ -17,10 +17,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -public abstract class AbstractFlowTriggerService { - @Inject - protected Optional multipleConditionStorage; - +public abstract class AbstractFlowTriggerService { @Inject private ConditionService conditionService; @@ -30,18 +27,11 @@ public abstract class AbstractFlowTriggerService multipleConditionStorage, ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) { - this.multipleConditionStorage = multipleConditionStorage; - this.conditionService = conditionService; - this.runContextFactory = runContextFactory; - this.flowService = flowService; - } - - public Stream withFlowTriggersOnly(Stream allFlows) { + public Stream withFlowTriggersOnly(Stream allFlows) { return allFlows .filter(flow -> !flow.isDisabled()) .filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty()) - .flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowService.FlowWithFlowTrigger(flow, trigger))); + .flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger))); } public Stream flowTriggers(Flow flow) { @@ -52,15 +42,15 @@ public Stream flowTriggers(Flow flow) .map(io.kestra.core.models.triggers.types.Flow.class::cast); } - public List computeExecutionsFromFlowTriggers(Execution execution, List allFlows) { - List validTriggersBeforeMultipleConditionEval = allFlows.stream() + public List computeExecutionsFromFlowTriggers(Execution execution, List allFlows, Optional multipleConditionStorage) { + List validTriggersBeforeMultipleConditionEval = allFlows.stream() // prevent recursive flow triggers .filter(flow -> flowService.removeUnwanted(flow, execution)) // ensure flow & triggers are enabled .filter(flow -> !flow.isDisabled()) .filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty()) // validate flow triggers conditions excluding multiple conditions - .flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowService.FlowWithFlowTrigger(flow, trigger))) + .flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger))) .filter(flowWithFlowTrigger -> conditionService.valid( flowWithFlowTrigger.getFlow(), flowWithFlowTrigger.getTrigger().getConditions().stream() @@ -155,4 +145,12 @@ protected static class FlowWithFlowTriggerAndMultipleCondition { private final io.kestra.core.models.triggers.types.Flow trigger; private final MultipleCondition multipleCondition; } + + @AllArgsConstructor + @Getter + @ToString + public static class FlowWithFlowTrigger { + private final Flow flow; + private final io.kestra.core.models.triggers.types.Flow trigger; + } } diff --git a/core/src/main/java/io/kestra/core/services/FlowService.java b/core/src/main/java/io/kestra/core/services/FlowService.java index 5881846e3a3..18cfc3b9757 100644 --- a/core/src/main/java/io/kestra/core/services/FlowService.java +++ b/core/src/main/java/io/kestra/core/services/FlowService.java @@ -12,10 +12,7 @@ import io.micronaut.core.annotation.Nullable; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import lombok.AllArgsConstructor; -import lombok.Getter; import lombok.SneakyThrows; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; import java.util.*; @@ -183,12 +180,4 @@ private static Object fixSnakeYaml(Object object) { return object; } - - @AllArgsConstructor - @Getter - @ToString - public static class FlowWithFlowTrigger { - private final Flow flow; - private final io.kestra.core.models.triggers.types.Flow trigger; - } } diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index 839e9528f43..bd0769abbf2 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -9,6 +9,7 @@ import io.kestra.core.models.flows.FlowWithException; import io.kestra.core.models.flows.State; import io.kestra.core.models.topologies.FlowTopology; +import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.repositories.FlowRepositoryInterface; @@ -75,9 +76,6 @@ public class JdbcExecutor implements ExecutorInterface { @Inject private RunContextFactory runContextFactory; - @Inject - private FlowService flowService; - @Inject private TaskDefaultService taskDefaultService; @@ -90,6 +88,9 @@ public class JdbcExecutor implements ExecutorInterface { @Inject private ConditionService conditionService; + @Inject + private MultipleConditionStorageInterface multipleConditionStorage; + @Inject private JdbcFlowTriggerService flowTriggerService; @@ -295,7 +296,7 @@ private void executionQueue(Execution message) { conditionService.isTerminatedWithListeners(flow, execution) && this.deduplicateFlowTrigger(execution, executorState) ) { - flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows) + flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows, Optional.of(multipleConditionStorage)) .forEach(this.executionQueue::emit); } diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcFlowTriggerService.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcFlowTriggerService.java index b32b0efff64..7f1fe8c1374 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcFlowTriggerService.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcFlowTriggerService.java @@ -1,17 +1,9 @@ package io.kestra.jdbc.runner; -import io.kestra.core.runners.RunContextFactory; import io.kestra.core.services.AbstractFlowTriggerService; -import io.kestra.core.services.ConditionService; -import io.kestra.core.services.FlowService; import jakarta.inject.Singleton; -import java.util.Optional; - @JdbcRunnerEnabled @Singleton -public class JdbcFlowTriggerService extends AbstractFlowTriggerService { - public JdbcFlowTriggerService(Optional multipleConditionStorage, ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) { - super(multipleConditionStorage, conditionService, runContextFactory, flowService); - } +public class JdbcFlowTriggerService extends AbstractFlowTriggerService { } diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java index a6b2691c05b..3b48ea744b1 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java @@ -7,6 +7,7 @@ import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; +import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.repositories.FlowRepositoryInterface; @@ -95,6 +96,8 @@ public class MemoryExecutor implements ExecutorInterface { @Inject private MemoryFlowTriggerService flowTriggerService; + private MultipleConditionStorageInterface multipleConditionStorage = new MemoryMultipleConditionStorage(); + @Override public void run() { flowListeners.run(); @@ -126,7 +129,7 @@ private Flow transform(Flow flow, Execution execution) { (namespace, id) -> templateExecutorInterface.get().findById(namespace, id).orElse(null) ); } catch (InternalException e) { - log.debug("Failed to inject template", e); + log.debug("Failed to inject template", e); } } @@ -233,7 +236,7 @@ private void handleExecution(ExecutionState state) { // multiple condition if (conditionService.isTerminatedWithListeners(flow, execution)) { - flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows) + flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows, Optional.of(multipleConditionStorage)) .forEach(this.executionQueue::emit); } @@ -374,7 +377,6 @@ private boolean deduplicateNexts(Execution execution, List taskRuns) { } - private static class ExecutionState { private final Execution execution; private Map taskRuns = new ConcurrentHashMap<>(); diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowTriggerService.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowTriggerService.java index ab235adfa50..78476f8e8d6 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowTriggerService.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowTriggerService.java @@ -1,23 +1,9 @@ package io.kestra.runner.memory; -import io.kestra.core.runners.RunContextFactory; import io.kestra.core.services.AbstractFlowTriggerService; -import io.kestra.core.services.ConditionService; -import io.kestra.core.services.FlowService; import jakarta.inject.Singleton; -import javax.annotation.PostConstruct; -import java.util.Optional; - @MemoryQueueEnabled @Singleton -public class MemoryFlowTriggerService extends AbstractFlowTriggerService { - public MemoryFlowTriggerService(Optional multipleConditionStorage, ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) { - super(multipleConditionStorage, conditionService, runContextFactory, flowService); - } - - @PostConstruct - private void init() { - this.multipleConditionStorage = Optional.of(new MemoryMultipleConditionStorage()); - } +public class MemoryFlowTriggerService extends AbstractFlowTriggerService { } From 31ce2d7f1d570efe9ef23cfd24499ac2acdd444d Mon Sep 17 00:00:00 2001 From: "brian.mulier" Date: Mon, 28 Aug 2023 12:22:50 +0200 Subject: [PATCH 3/3] fix(core): post-review fixes --- .../kestra/core/services/DefaultFlowTriggerService.java | 7 +++++++ .../main/java/io/kestra/jdbc/runner/JdbcExecutor.java | 2 +- .../io/kestra/jdbc/runner/JdbcFlowTriggerService.java | 9 --------- .../java/io/kestra/runner/memory/MemoryExecutor.java | 4 ++-- .../kestra/runner/memory/MemoryFlowTriggerService.java | 9 --------- 5 files changed, 10 insertions(+), 21 deletions(-) create mode 100644 core/src/main/java/io/kestra/core/services/DefaultFlowTriggerService.java delete mode 100644 jdbc/src/main/java/io/kestra/jdbc/runner/JdbcFlowTriggerService.java delete mode 100644 runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowTriggerService.java diff --git a/core/src/main/java/io/kestra/core/services/DefaultFlowTriggerService.java b/core/src/main/java/io/kestra/core/services/DefaultFlowTriggerService.java new file mode 100644 index 00000000000..ff906102460 --- /dev/null +++ b/core/src/main/java/io/kestra/core/services/DefaultFlowTriggerService.java @@ -0,0 +1,7 @@ +package io.kestra.core.services; + +import jakarta.inject.Singleton; + +@Singleton +public class DefaultFlowTriggerService extends AbstractFlowTriggerService { +} \ No newline at end of file diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index bd0769abbf2..c6ad983e9cc 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -92,7 +92,7 @@ public class JdbcExecutor implements ExecutorInterface { private MultipleConditionStorageInterface multipleConditionStorage; @Inject - private JdbcFlowTriggerService flowTriggerService; + private AbstractFlowTriggerService flowTriggerService; @Inject private MetricRegistry metricRegistry; diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcFlowTriggerService.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcFlowTriggerService.java deleted file mode 100644 index 7f1fe8c1374..00000000000 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcFlowTriggerService.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.kestra.jdbc.runner; - -import io.kestra.core.services.AbstractFlowTriggerService; -import jakarta.inject.Singleton; - -@JdbcRunnerEnabled -@Singleton -public class JdbcFlowTriggerService extends AbstractFlowTriggerService { -} diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java index 3b48ea744b1..a27104637dc 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java @@ -94,9 +94,9 @@ public class MemoryExecutor implements ExecutorInterface { private SkipExecutionService skipExecutionService; @Inject - private MemoryFlowTriggerService flowTriggerService; + private AbstractFlowTriggerService flowTriggerService; - private MultipleConditionStorageInterface multipleConditionStorage = new MemoryMultipleConditionStorage(); + private final MultipleConditionStorageInterface multipleConditionStorage = new MemoryMultipleConditionStorage(); @Override public void run() { diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowTriggerService.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowTriggerService.java deleted file mode 100644 index 78476f8e8d6..00000000000 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowTriggerService.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.kestra.runner.memory; - -import io.kestra.core.services.AbstractFlowTriggerService; -import jakarta.inject.Singleton; - -@MemoryQueueEnabled -@Singleton -public class MemoryFlowTriggerService extends AbstractFlowTriggerService { -}