Skip to content

Commit

Permalink
feat(core): multiple conditions service (#1950)
Browse files Browse the repository at this point in the history
closes #1917
  • Loading branch information
brian-mulier-p committed Aug 28, 2023
1 parent a5fdd06 commit ceebe42
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,8 @@ default MultipleConditionWindow getOrCreate(Flow flow, MultipleCondition multipl
.build()
);
}

void save(List<MultipleConditionWindow> multipleConditionWindows);

void delete(MultipleConditionWindow multipleConditionWindow);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package io.kestra.core.services;

import io.kestra.core.models.conditions.types.MultipleCondition;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.RunContextFactory;
import jakarta.inject.Inject;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;

import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public abstract class AbstractFlowTriggerService {
@Inject
private ConditionService conditionService;

@Inject
private RunContextFactory runContextFactory;

@Inject
private FlowService flowService;

public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<Flow> allFlows) {
return allFlows
.filter(flow -> !flow.isDisabled())
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)));
}

public Stream<io.kestra.core.models.triggers.types.Flow> flowTriggers(Flow flow) {
return flow.getTriggers()
.stream()
.filter(Predicate.not(AbstractTrigger::isDisabled))
.filter(io.kestra.core.models.triggers.types.Flow.class::isInstance)
.map(io.kestra.core.models.triggers.types.Flow.class::cast);
}

public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, List<Flow> allFlows, Optional<MultipleConditionStorageInterface> multipleConditionStorage) {
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
// prevent recursive flow triggers
.filter(flow -> flowService.removeUnwanted(flow, execution))
// ensure flow & triggers are enabled
.filter(flow -> !flow.isDisabled())
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
// validate flow triggers conditions excluding multiple conditions
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
.filter(flowWithFlowTrigger -> conditionService.valid(
flowWithFlowTrigger.getFlow(),
flowWithFlowTrigger.getTrigger().getConditions().stream()
.filter(Predicate.not(MultipleCondition.class::isInstance))
.toList(),
conditionService.conditionContext(
runContextFactory.of(flowWithFlowTrigger.getFlow(), execution),
flowWithFlowTrigger.getFlow(),
execution
)
)).toList();

Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = null;
if (multipleConditionStorage.isPresent()) {
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
.flatMap(flowWithFlowTrigger ->
flowWithFlowTrigger.getTrigger().getConditions().stream()
.filter(MultipleCondition.class::isInstance)
.map(MultipleCondition.class::cast)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
flowWithFlowTrigger.getFlow(),
multipleConditionStorage.get().getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition),
flowWithFlowTrigger.getTrigger(),
multipleCondition
)
)
).toList();

// evaluate multiple conditions
multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
Map<String, Boolean> results = f.getMultipleCondition()
.getConditions()
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
conditionService.isValid(e.getValue(), f.getFlow(), execution)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return Map.entry(f, f.getMultipleConditionWindow().with(results));
})
.filter(e -> !e.getValue().getResults().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

// persist results
multipleConditionStorage.get().save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
}

// compute all executions to create from flow triggers now that multiple conditions storage is populated
List<Execution> executions = validTriggersBeforeMultipleConditionEval.stream().filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage.orElse(null)
)
).map(f -> f.getTrigger().evaluate(
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();

if(multipleConditionStorage.isPresent() && multipleConditionWindowsByFlow != null) {
// purge fulfilled or expired multiple condition windows
Stream.concat(
multipleConditionWindowsByFlow.keySet().stream()
.map(f -> Map.entry(
f.getMultipleCondition().getConditions(),
multipleConditionStorage.get().getOrCreate(f.getFlow(), f.getMultipleCondition())
))
.filter(e -> e.getKey().size() == Optional.ofNullable(e.getValue().getResults())
.map(Map::size)
.orElse(0))
.map(Map.Entry::getValue),
multipleConditionStorage.get().expired().stream()
).forEach(multipleConditionStorage.get()::delete);
}

return executions;
}

@AllArgsConstructor
@Getter
@ToString
protected static class FlowWithFlowTriggerAndMultipleCondition {
private final Flow flow;
private final MultipleConditionWindow multipleConditionWindow;
private final io.kestra.core.models.triggers.types.Flow trigger;
private final MultipleCondition multipleCondition;
}

@AllArgsConstructor
@Getter
@ToString
public static class FlowWithFlowTrigger {
private final Flow flow;
private final io.kestra.core.models.triggers.types.Flow trigger;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.kestra.core.services;

import jakarta.inject.Singleton;

@Singleton
public class DefaultFlowTriggerService extends AbstractFlowTriggerService {
}
153 changes: 1 addition & 152 deletions core/src/main/java/io/kestra/core/services/FlowService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,17 @@

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.models.conditions.types.MultipleCondition;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ListUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
Expand Down Expand Up @@ -83,131 +77,11 @@ private Stream<Flow> keepLastVersionCollector(Stream<Flow> stream) {
.filter(Objects::nonNull);
}

public List<FlowWithFlowTrigger> flowWithFlowTrigger(Stream<Flow> flowStream) {
return flowStream
.filter(flow -> flow.getTriggers() != null && flow.getTriggers().size() > 0)
.filter(flow -> !flow.isDisabled())
.flatMap(flow -> flow.getTriggers()
.stream()
.filter(abstractTrigger -> !abstractTrigger.isDisabled())
.map(trigger -> new FlowWithTrigger(flow, trigger))
)
.filter(f -> f.getTrigger() instanceof io.kestra.core.models.triggers.types.Flow)
.map(f -> new FlowWithFlowTrigger(
f.getFlow(),
(io.kestra.core.models.triggers.types.Flow) f.getTrigger()
)
)
.collect(Collectors.toList());
}

protected boolean removeUnwanted(Flow f, Execution execution) {
// we don't allow recursive
return !f.uidWithoutRevision().equals(Flow.uidWithoutRevision(execution));
}

public List<Execution> flowTriggerExecution(Stream<Flow> flowStream, Execution execution, @Nullable MultipleConditionStorageInterface multipleConditionStorage) {
return flowStream
.filter(flow -> flow.getTriggers() != null && flow.getTriggers().size() > 0)
.filter(flow -> !flow.isDisabled())
.flatMap(flow -> flow.getTriggers()
.stream()
.filter(abstractTrigger -> !abstractTrigger.isDisabled())
.map(trigger -> new FlowWithTrigger(flow, trigger))
)
.filter(f -> conditionService.isValid(
f.getTrigger(),
f.getFlow(),
execution,
multipleConditionStorage
))
.filter(f -> f.getTrigger() instanceof io.kestra.core.models.triggers.types.Flow)
.map(f -> new FlowWithFlowTrigger(
f.getFlow(),
(io.kestra.core.models.triggers.types.Flow) f.getTrigger()
)
)
.filter(f -> this.removeUnwanted(f.getFlow(), execution))
.map(f -> f.getTrigger().evaluate(
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}

private Stream<FlowWithFlowTriggerAndMultipleCondition> multipleFlowStream(
Stream<Flow> flowStream,
MultipleConditionStorageInterface multipleConditionStorage
) {
return flowWithFlowTrigger(flowStream)
.stream()
.flatMap(e -> e.getTrigger()
.getConditions()
.stream()
.filter(condition -> condition instanceof MultipleCondition)
.map(condition -> {
MultipleCondition multipleCondition = (MultipleCondition) condition;

return new FlowWithFlowTriggerAndMultipleCondition(
e.getFlow(),
multipleConditionStorage.getOrCreate(e.getFlow(), multipleCondition),
e.getTrigger(),
multipleCondition
);
})
);
}

public List<MultipleConditionWindow> multipleFlowTrigger(
Stream<Flow> flowStream,
Flow flow,
Execution execution,
MultipleConditionStorageInterface multipleConditionStorage
) {
return multipleFlowStream(flowStream, multipleConditionStorage)
.map(f -> {
Map<String, Boolean> results = f.getMultipleCondition()
.getConditions()
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
conditionService.isValid(e.getValue(), flow, execution)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return f.getMultipleConditionWindow().with(results);
})
.filter(multipleConditionWindow -> multipleConditionWindow.getResults().size() > 0)
.collect(Collectors.toList());
}

public List<MultipleConditionWindow> multipleFlowToDelete(
Stream<Flow> flowStream,
MultipleConditionStorageInterface multipleConditionStorage
) {
return Stream
.concat(
multipleFlowStream(flowStream, multipleConditionStorage)
.filter(f -> f.getMultipleCondition().getConditions().size() ==
(f.getMultipleConditionWindow().getResults() == null ? 0 :
f.getMultipleConditionWindow()
.getResults()
.entrySet()
.stream()
.filter(Map.Entry::getValue)
.count()
)
)
.map(FlowWithFlowTriggerAndMultipleCondition::getMultipleConditionWindow),
multipleConditionStorage.expired().stream()
)
.collect(Collectors.toList());
}

public static List<AbstractTrigger> findRemovedTrigger(Flow flow, Flow previous) {
return ListUtils.emptyOnNull(previous.getTriggers())
.stream()
Expand All @@ -219,7 +93,7 @@ public static List<AbstractTrigger> findRemovedTrigger(Flow flow, Flow previous)
}

public static String cleanupSource(String source) {
return source.replaceFirst("(?m)^revision: \\d+\n?","");
return source.replaceFirst("(?m)^revision: \\d+\n?", "");
}

public static String injectDisabled(String source, Boolean disabled) {
Expand Down Expand Up @@ -306,29 +180,4 @@ private static Object fixSnakeYaml(Object object) {

return object;
}

@AllArgsConstructor
@Getter
private static class FlowWithTrigger {
private final Flow flow;
private final AbstractTrigger trigger;
}

@AllArgsConstructor
@Getter
@ToString
public static class FlowWithFlowTrigger {
private final Flow flow;
private final io.kestra.core.models.triggers.types.Flow trigger;
}

@AllArgsConstructor
@Getter
@ToString
private static class FlowWithFlowTriggerAndMultipleCondition {
private final Flow flow;
private final MultipleConditionWindow multipleConditionWindow;
private final io.kestra.core.models.triggers.types.Flow trigger;
private final MultipleCondition multipleCondition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,18 @@ public void failed() throws InterruptedException, TimeoutException {
});

// first one
Execution execution = runnerUtils.runOne("io.kestra.tests", "trigger-multiplecondition-flow-d", Duration.ofSeconds(60));
Execution execution = runnerUtils.runOne("io.kestra.tests", "trigger-multiplecondition-flow-c", Duration.ofSeconds(60));
assertThat(execution.getTaskRunList().size(), is(1));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));

// wait a little to be sure that the trigger is not launching execution
countDownLatch.await(1, TimeUnit.SECONDS);
assertThat(ended.size(), is(1));

// second one
execution = runnerUtils.runOne("io.kestra.tests", "trigger-multiplecondition-flow-c", Duration.ofSeconds(60));
execution = runnerUtils.runOne("io.kestra.tests", "trigger-multiplecondition-flow-d", Duration.ofSeconds(60));
assertThat(execution.getTaskRunList().size(), is(1));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

// trigger was not done
countDownLatch.await(10, TimeUnit.SECONDS);
Expand Down
Loading

0 comments on commit ceebe42

Please sign in to comment.