Skip to content

Commit

Permalink
fix(core): post-review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
brian-mulier-p committed Aug 28, 2023
1 parent 974729b commit 28fb2de
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

public abstract class AbstractFlowTriggerService<S extends MultipleConditionStorageInterface> {
@Inject
protected Optional<S> multipleConditionStorage;

public abstract class AbstractFlowTriggerService {
@Inject
private ConditionService conditionService;

Expand All @@ -30,18 +27,11 @@ public abstract class AbstractFlowTriggerService<S extends MultipleConditionStor
@Inject
private FlowService flowService;

public AbstractFlowTriggerService(Optional<S> multipleConditionStorage, ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) {
this.multipleConditionStorage = multipleConditionStorage;
this.conditionService = conditionService;
this.runContextFactory = runContextFactory;
this.flowService = flowService;
}

public Stream<FlowService.FlowWithFlowTrigger> withFlowTriggersOnly(Stream<Flow> allFlows) {
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<Flow> allFlows) {
return allFlows
.filter(flow -> !flow.isDisabled())
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowService.FlowWithFlowTrigger(flow, trigger)));
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)));
}

public Stream<io.kestra.core.models.triggers.types.Flow> flowTriggers(Flow flow) {
Expand All @@ -52,15 +42,15 @@ public Stream<io.kestra.core.models.triggers.types.Flow> flowTriggers(Flow flow)
.map(io.kestra.core.models.triggers.types.Flow.class::cast);
}

public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, List<Flow> allFlows) {
List<FlowService.FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, List<Flow> allFlows, Optional<MultipleConditionStorageInterface> multipleConditionStorage) {
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
// prevent recursive flow triggers
.filter(flow -> flowService.removeUnwanted(flow, execution))
// ensure flow & triggers are enabled
.filter(flow -> !flow.isDisabled())
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
// validate flow triggers conditions excluding multiple conditions
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowService.FlowWithFlowTrigger(flow, trigger)))
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
.filter(flowWithFlowTrigger -> conditionService.valid(
flowWithFlowTrigger.getFlow(),
flowWithFlowTrigger.getTrigger().getConditions().stream()
Expand Down Expand Up @@ -155,4 +145,12 @@ protected static class FlowWithFlowTriggerAndMultipleCondition {
private final io.kestra.core.models.triggers.types.Flow trigger;
private final MultipleCondition multipleCondition;
}

@AllArgsConstructor
@Getter
@ToString
public static class FlowWithFlowTrigger {
private final Flow flow;
private final io.kestra.core.models.triggers.types.Flow trigger;
}
}
11 changes: 0 additions & 11 deletions core/src/main/java/io/kestra/core/services/FlowService.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
Expand Down Expand Up @@ -183,12 +180,4 @@ private static Object fixSnakeYaml(Object object) {

return object;
}

@AllArgsConstructor
@Getter
@ToString
public static class FlowWithFlowTrigger {
private final Flow flow;
private final io.kestra.core.models.triggers.types.Flow trigger;
}
}
9 changes: 5 additions & 4 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
Expand Down Expand Up @@ -75,9 +76,6 @@ public class JdbcExecutor implements ExecutorInterface {
@Inject
private RunContextFactory runContextFactory;

@Inject
private FlowService flowService;

@Inject
private TaskDefaultService taskDefaultService;

Expand All @@ -90,6 +88,9 @@ public class JdbcExecutor implements ExecutorInterface {
@Inject
private ConditionService conditionService;

@Inject
private MultipleConditionStorageInterface multipleConditionStorage;

@Inject
private JdbcFlowTriggerService flowTriggerService;

Expand Down Expand Up @@ -295,7 +296,7 @@ private void executionQueue(Execution message) {
conditionService.isTerminatedWithListeners(flow, execution) &&
this.deduplicateFlowTrigger(execution, executorState)
) {
flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows)
flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows, Optional.of(multipleConditionStorage))
.forEach(this.executionQueue::emit);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
package io.kestra.jdbc.runner;

import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.services.AbstractFlowTriggerService;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.FlowService;
import jakarta.inject.Singleton;

import java.util.Optional;

@JdbcRunnerEnabled
@Singleton
public class JdbcFlowTriggerService extends AbstractFlowTriggerService<AbstractJdbcMultipleConditionStorage> {
public JdbcFlowTriggerService(Optional<AbstractJdbcMultipleConditionStorage> multipleConditionStorage, ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) {
super(multipleConditionStorage, conditionService, runContextFactory, flowService);
}
public class JdbcFlowTriggerService extends AbstractFlowTriggerService {
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
Expand Down Expand Up @@ -95,6 +96,8 @@ public class MemoryExecutor implements ExecutorInterface {
@Inject
private MemoryFlowTriggerService flowTriggerService;

private MultipleConditionStorageInterface multipleConditionStorage = new MemoryMultipleConditionStorage();

@Override
public void run() {
flowListeners.run();
Expand Down Expand Up @@ -126,7 +129,7 @@ private Flow transform(Flow flow, Execution execution) {
(namespace, id) -> templateExecutorInterface.get().findById(namespace, id).orElse(null)
);
} catch (InternalException e) {
log.debug("Failed to inject template", e);
log.debug("Failed to inject template", e);
}
}

Expand Down Expand Up @@ -233,7 +236,7 @@ private void handleExecution(ExecutionState state) {

// multiple condition
if (conditionService.isTerminatedWithListeners(flow, execution)) {
flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows)
flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows, Optional.of(multipleConditionStorage))
.forEach(this.executionQueue::emit);
}

Expand Down Expand Up @@ -374,7 +377,6 @@ private boolean deduplicateNexts(Execution execution, List<TaskRun> taskRuns) {
}



private static class ExecutionState {
private final Execution execution;
private Map<String, TaskRun> taskRuns = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,9 @@
package io.kestra.runner.memory;

import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.services.AbstractFlowTriggerService;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.FlowService;
import jakarta.inject.Singleton;

import javax.annotation.PostConstruct;
import java.util.Optional;

@MemoryQueueEnabled
@Singleton
public class MemoryFlowTriggerService extends AbstractFlowTriggerService<MemoryMultipleConditionStorage> {
public MemoryFlowTriggerService(Optional<MemoryMultipleConditionStorage> multipleConditionStorage, ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) {
super(multipleConditionStorage, conditionService, runContextFactory, flowService);
}

@PostConstruct
private void init() {
this.multipleConditionStorage = Optional.of(new MemoryMultipleConditionStorage());
}
public class MemoryFlowTriggerService extends AbstractFlowTriggerService {
}

0 comments on commit 28fb2de

Please sign in to comment.