Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/multiple-conditions-service #1950

Merged
merged 3 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,8 @@ default MultipleConditionWindow getOrCreate(Flow flow, MultipleCondition multipl
.build()
);
}

void save(List<MultipleConditionWindow> multipleConditionWindows);

void delete(MultipleConditionWindow multipleConditionWindow);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package io.kestra.core.services;

import io.kestra.core.models.conditions.types.MultipleCondition;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.RunContextFactory;
import jakarta.inject.Inject;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;

import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public abstract class AbstractFlowTriggerService {
@Inject
private ConditionService conditionService;

@Inject
private RunContextFactory runContextFactory;

@Inject
private FlowService flowService;

public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<Flow> allFlows) {
return allFlows
.filter(flow -> !flow.isDisabled())
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)));
}

public Stream<io.kestra.core.models.triggers.types.Flow> flowTriggers(Flow flow) {
return flow.getTriggers()
.stream()
.filter(Predicate.not(AbstractTrigger::isDisabled))
.filter(io.kestra.core.models.triggers.types.Flow.class::isInstance)
.map(io.kestra.core.models.triggers.types.Flow.class::cast);
}

public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, List<Flow> allFlows, Optional<MultipleConditionStorageInterface> multipleConditionStorage) {
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
// prevent recursive flow triggers
.filter(flow -> flowService.removeUnwanted(flow, execution))
// ensure flow & triggers are enabled
.filter(flow -> !flow.isDisabled())
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
// validate flow triggers conditions excluding multiple conditions
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
.filter(flowWithFlowTrigger -> conditionService.valid(
flowWithFlowTrigger.getFlow(),
flowWithFlowTrigger.getTrigger().getConditions().stream()
.filter(Predicate.not(MultipleCondition.class::isInstance))
.toList(),
conditionService.conditionContext(
runContextFactory.of(flowWithFlowTrigger.getFlow(), execution),
flowWithFlowTrigger.getFlow(),
execution
)
)).toList();

Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = null;
if (multipleConditionStorage.isPresent()) {
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
.flatMap(flowWithFlowTrigger ->
flowWithFlowTrigger.getTrigger().getConditions().stream()
.filter(MultipleCondition.class::isInstance)
.map(MultipleCondition.class::cast)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
flowWithFlowTrigger.getFlow(),
multipleConditionStorage.get().getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition),
flowWithFlowTrigger.getTrigger(),
multipleCondition
)
)
).toList();

// evaluate multiple conditions
multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
Map<String, Boolean> results = f.getMultipleCondition()
.getConditions()
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
conditionService.isValid(e.getValue(), f.getFlow(), execution)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return Map.entry(f, f.getMultipleConditionWindow().with(results));
})
.filter(e -> !e.getValue().getResults().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

// persist results
multipleConditionStorage.get().save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
}

// compute all executions to create from flow triggers now that multiple conditions storage is populated
List<Execution> executions = validTriggersBeforeMultipleConditionEval.stream().filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage.orElse(null)
)
).map(f -> f.getTrigger().evaluate(
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();

if(multipleConditionStorage.isPresent() && multipleConditionWindowsByFlow != null) {
// purge fulfilled or expired multiple condition windows
Stream.concat(
multipleConditionWindowsByFlow.keySet().stream()
.map(f -> Map.entry(
f.getMultipleCondition().getConditions(),
multipleConditionStorage.get().getOrCreate(f.getFlow(), f.getMultipleCondition())
))
.filter(e -> e.getKey().size() == Optional.ofNullable(e.getValue().getResults())
.map(Map::size)
.orElse(0))
.map(Map.Entry::getValue),
multipleConditionStorage.get().expired().stream()
).forEach(multipleConditionStorage.get()::delete);
}

return executions;
}

@AllArgsConstructor
@Getter
@ToString
protected static class FlowWithFlowTriggerAndMultipleCondition {
private final Flow flow;
private final MultipleConditionWindow multipleConditionWindow;
private final io.kestra.core.models.triggers.types.Flow trigger;
private final MultipleCondition multipleCondition;
}

@AllArgsConstructor
@Getter
@ToString
public static class FlowWithFlowTrigger {
private final Flow flow;
private final io.kestra.core.models.triggers.types.Flow trigger;
}
}
153 changes: 1 addition & 152 deletions core/src/main/java/io/kestra/core/services/FlowService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,17 @@

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.models.conditions.types.MultipleCondition;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ListUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
Expand Down Expand Up @@ -83,131 +77,11 @@ private Stream<Flow> keepLastVersionCollector(Stream<Flow> stream) {
.filter(Objects::nonNull);
}

public List<FlowWithFlowTrigger> flowWithFlowTrigger(Stream<Flow> flowStream) {
return flowStream
.filter(flow -> flow.getTriggers() != null && flow.getTriggers().size() > 0)
.filter(flow -> !flow.isDisabled())
.flatMap(flow -> flow.getTriggers()
.stream()
.filter(abstractTrigger -> !abstractTrigger.isDisabled())
.map(trigger -> new FlowWithTrigger(flow, trigger))
)
.filter(f -> f.getTrigger() instanceof io.kestra.core.models.triggers.types.Flow)
.map(f -> new FlowWithFlowTrigger(
f.getFlow(),
(io.kestra.core.models.triggers.types.Flow) f.getTrigger()
)
)
.collect(Collectors.toList());
}

protected boolean removeUnwanted(Flow f, Execution execution) {
// we don't allow recursive
return !f.uidWithoutRevision().equals(Flow.uidWithoutRevision(execution));
}
brian-mulier-p marked this conversation as resolved.
Show resolved Hide resolved

public List<Execution> flowTriggerExecution(Stream<Flow> flowStream, Execution execution, @Nullable MultipleConditionStorageInterface multipleConditionStorage) {
return flowStream
.filter(flow -> flow.getTriggers() != null && flow.getTriggers().size() > 0)
.filter(flow -> !flow.isDisabled())
.flatMap(flow -> flow.getTriggers()
.stream()
.filter(abstractTrigger -> !abstractTrigger.isDisabled())
.map(trigger -> new FlowWithTrigger(flow, trigger))
)
.filter(f -> conditionService.isValid(
f.getTrigger(),
f.getFlow(),
execution,
multipleConditionStorage
))
.filter(f -> f.getTrigger() instanceof io.kestra.core.models.triggers.types.Flow)
.map(f -> new FlowWithFlowTrigger(
f.getFlow(),
(io.kestra.core.models.triggers.types.Flow) f.getTrigger()
)
)
.filter(f -> this.removeUnwanted(f.getFlow(), execution))
.map(f -> f.getTrigger().evaluate(
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}

private Stream<FlowWithFlowTriggerAndMultipleCondition> multipleFlowStream(
Stream<Flow> flowStream,
MultipleConditionStorageInterface multipleConditionStorage
) {
return flowWithFlowTrigger(flowStream)
.stream()
.flatMap(e -> e.getTrigger()
.getConditions()
.stream()
.filter(condition -> condition instanceof MultipleCondition)
.map(condition -> {
MultipleCondition multipleCondition = (MultipleCondition) condition;

return new FlowWithFlowTriggerAndMultipleCondition(
e.getFlow(),
multipleConditionStorage.getOrCreate(e.getFlow(), multipleCondition),
e.getTrigger(),
multipleCondition
);
})
);
}

public List<MultipleConditionWindow> multipleFlowTrigger(
Stream<Flow> flowStream,
Flow flow,
Execution execution,
MultipleConditionStorageInterface multipleConditionStorage
) {
return multipleFlowStream(flowStream, multipleConditionStorage)
.map(f -> {
Map<String, Boolean> results = f.getMultipleCondition()
.getConditions()
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
conditionService.isValid(e.getValue(), flow, execution)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return f.getMultipleConditionWindow().with(results);
})
.filter(multipleConditionWindow -> multipleConditionWindow.getResults().size() > 0)
.collect(Collectors.toList());
}

public List<MultipleConditionWindow> multipleFlowToDelete(
Stream<Flow> flowStream,
MultipleConditionStorageInterface multipleConditionStorage
) {
return Stream
.concat(
multipleFlowStream(flowStream, multipleConditionStorage)
.filter(f -> f.getMultipleCondition().getConditions().size() ==
(f.getMultipleConditionWindow().getResults() == null ? 0 :
f.getMultipleConditionWindow()
.getResults()
.entrySet()
.stream()
.filter(Map.Entry::getValue)
.count()
)
)
.map(FlowWithFlowTriggerAndMultipleCondition::getMultipleConditionWindow),
multipleConditionStorage.expired().stream()
)
.collect(Collectors.toList());
}

public static List<AbstractTrigger> findRemovedTrigger(Flow flow, Flow previous) {
return ListUtils.emptyOnNull(previous.getTriggers())
.stream()
Expand All @@ -219,7 +93,7 @@ public static List<AbstractTrigger> findRemovedTrigger(Flow flow, Flow previous)
}

public static String cleanupSource(String source) {
return source.replaceFirst("(?m)^revision: \\d+\n?","");
return source.replaceFirst("(?m)^revision: \\d+\n?", "");
}

public static String injectDisabled(String source, Boolean disabled) {
Expand Down Expand Up @@ -306,29 +180,4 @@ private static Object fixSnakeYaml(Object object) {

return object;
}

@AllArgsConstructor
@Getter
private static class FlowWithTrigger {
private final Flow flow;
private final AbstractTrigger trigger;
}

@AllArgsConstructor
@Getter
@ToString
public static class FlowWithFlowTrigger {
private final Flow flow;
private final io.kestra.core.models.triggers.types.Flow trigger;
}

@AllArgsConstructor
@Getter
@ToString
private static class FlowWithFlowTriggerAndMultipleCondition {
private final Flow flow;
private final MultipleConditionWindow multipleConditionWindow;
private final io.kestra.core.models.triggers.types.Flow trigger;
private final MultipleCondition multipleCondition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,18 @@ public void failed() throws InterruptedException, TimeoutException {
});

// first one
Execution execution = runnerUtils.runOne("io.kestra.tests", "trigger-multiplecondition-flow-d", Duration.ofSeconds(60));
Execution execution = runnerUtils.runOne("io.kestra.tests", "trigger-multiplecondition-flow-c", Duration.ofSeconds(60));
assertThat(execution.getTaskRunList().size(), is(1));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));

// wait a little to be sure that the trigger is not launching execution
countDownLatch.await(1, TimeUnit.SECONDS);
assertThat(ended.size(), is(1));

// second one
execution = runnerUtils.runOne("io.kestra.tests", "trigger-multiplecondition-flow-c", Duration.ofSeconds(60));
execution = runnerUtils.runOne("io.kestra.tests", "trigger-multiplecondition-flow-d", Duration.ofSeconds(60));
assertThat(execution.getTaskRunList().size(), is(1));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

// trigger was not done
countDownLatch.await(10, TimeUnit.SECONDS);
Expand Down

This file was deleted.

Loading