Skip to content

Commit

Permalink
feat(core): make the queue fault tolerant (#1981)
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu authored Sep 5, 2023
1 parent b08f701 commit a85d0e1
Show file tree
Hide file tree
Showing 39 changed files with 213 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package io.kestra.core.exceptions;

import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import java.io.IOException;

public class DeserializationException extends RuntimeException {
private static final long serialVersionUID = 1L;

public DeserializationException(InvalidTypeIdException cause) {
super(cause);
private String record;

public String getRecord() {
return record;
}

public DeserializationException(Throwable cause) {
public DeserializationException(IOException cause, String record) {
super(cause);
this.record = record;
}

public DeserializationException(String message) {
super(message);
}
}
11 changes: 7 additions & 4 deletions core/src/main/java/io/kestra/core/queues/QueueInterface.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.kestra.core.queues;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.utils.Either;

import java.io.Closeable;
import java.util.function.Consumer;

Expand All @@ -22,17 +25,17 @@ default void delete(T message) throws QueueException {

void delete(String consumerGroup, T message) throws QueueException;

default Runnable receive(Consumer<T> consumer) {
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive((String) null, consumer);
}

Runnable receive(String consumerGroup, Consumer<T> consumer);
Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer);

default Runnable receive(Class<?> queueType, Consumer<T> consumer) {
default Runnable receive(Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer) {
return receive(null, queueType, consumer);
}

Runnable receive(String consumerGroup, Class<?> queueType, Consumer<T> consumer);
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer);

void pause();
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.kestra.core.queues;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;

import java.io.Closeable;
import java.util.function.Consumer;

public interface WorkerJobQueueInterface extends Closeable {
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<WorkerJob> consumer);
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer);

void pause();
}
8 changes: 7 additions & 1 deletion core/src/main/java/io/kestra/core/runners/FlowListeners.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ public void run() {
if (!this.isStarted) {
this.isStarted = true;

this.flowQueue.receive(flow -> {
this.flowQueue.receive(either -> {
if (either.isRight()) {
log.error("Unable to deserialize a flow: {}", either.getRight().getMessage());
return;
}

Flow flow = either.getLeft();
Optional<Flow> previous = this.previous(flow);

if (flow.isDeleted()) {
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/io/kestra/core/runners/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Singleton
@Requires(beans = {ExecutionRepositoryInterface.class, LogRepositoryInterface.class, TriggerRepositoryInterface.class})
public class Indexer implements IndexerInterface {
Expand Down Expand Up @@ -57,7 +59,13 @@ public void run() {
}

protected <T> void send(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
queueInterface.receive(Indexer.class, item -> {
queueInterface.receive(Indexer.class, either -> {
if (either.isRight()) {
log.error("unable to deserialize an item: {}", either.getRight().getMessage());
return;
}

T item = either.getLeft();
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_REQUEST_COUNT, "type", item.getClass().getName()).increment();
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT, "type", item.getClass().getName()).increment();

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/core/runners/RunnerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ public Execution awaitExecution(Predicate<Execution> predicate, Runnable executi
AtomicReference<Execution> receive = new AtomicReference<>();

Runnable cancel = this.executionQueue.receive(current -> {
if (predicate.test(current)) {
receive.set(current);
if (predicate.test(current.getLeft())) {
receive.set(current.getLeft());
}
});

Expand Down
14 changes: 10 additions & 4 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ public Worker(ApplicationContext applicationContext, int thread, String workerGr
@Override
public void run() {
this.executionKilledQueue.receive(executionKilled -> {
if (executionKilled != null) {
if (executionKilled != null && executionKilled.isLeft()) {
// @FIXME: the hashset will never expire killed execution
killedExecution.add(executionKilled.getExecutionId());
killedExecution.add(executionKilled.getLeft().getExecutionId());

synchronized (this) {
workerThreadReferences
.stream()
.filter(workerThread -> executionKilled.getExecutionId().equals(workerThread.getWorkerTask().getTaskRun().getExecutionId()))
.filter(workerThread -> executionKilled.getLeft().getExecutionId().equals(workerThread.getWorkerTask().getTaskRun().getExecutionId()))
.forEach(WorkerThread::kill);
}
}
Expand All @@ -124,8 +124,14 @@ public void run() {
this.workerJobQueue.receive(
this.workerGroup,
Worker.class,
workerTask -> {
either -> {
executors.execute(() -> {
if (either.isRight()) {
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
return;
}

WorkerJob workerTask = either.getLeft();
if (workerTask instanceof WorkerTask task) {
handleTask(task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,13 @@ public void run() {
// listen to WorkerTriggerResult from polling triggers
this.workerTriggerResultQueue.receive(
Scheduler.class,
workerTriggerResult -> {
either -> {
if (either.isRight()) {
log.error("Unable to deserialize a worker trigger result: {}", either.getRight().getMessage());
return;
}

WorkerTriggerResult workerTriggerResult = either.getLeft();
if (workerTriggerResult.getSuccess() && workerTriggerResult.getExecution().isPresent()) {
var triggerExecution = new SchedulerExecutionWithTrigger(
workerTriggerResult.getExecution().get(),
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/java/io/kestra/core/schedulers/DefaultScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ public void run() {
QueueInterface<Execution> executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
QueueInterface<Trigger> triggerQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TRIGGER_NAMED));

executionQueue.receive(execution -> {
executionQueue.receive(either -> {
if (either.isRight()) {
log.error("Unable to deserialize and execution: {}", either.getRight().getMessage());
return;
}

Execution execution = either.getLeft();
if (execution.getTrigger() != null) {
Trigger trigger = Await.until(() -> watchingTrigger.get(execution.getId()), Duration.ofSeconds(5));
var flow = flowRepository.findById(execution.getNamespace(), execution.getFlowId()).orElse(null);
Expand All @@ -61,7 +67,13 @@ public void run() {
}
});

triggerQueue.receive(trigger -> {
triggerQueue.receive(either -> {
if (either.isRight()) {
log.error("Unable to deserialize a trigger: {}", either.getRight().getMessage());
return;
}

Trigger trigger = either.getLeft();
if (trigger != null && trigger.getExecutionId() != null) {
this.watchingTrigger.put(trigger.getExecutionId(), trigger);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public Template.Output outputs(RunContext runContext, Execution execution, TaskR

protected io.kestra.core.models.templates.Template findTemplate(ApplicationContext applicationContext) throws IllegalVariableEvaluationException {
if (!applicationContext.containsBean(TemplateExecutorInterface.class)) {
throw new DeserializationException(new Exception("Templates are disabled, please check your configuration"));
throw new DeserializationException("Templates are disabled, please check your configuration");
}

TemplateExecutorInterface templateExecutor = applicationContext.getBean(TemplateExecutorInterface.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public void trigger() throws InterruptedException, TimeoutException {
AtomicReference<Execution> flowListener = new AtomicReference<>();
AtomicReference<Execution> flowListenerNoInput = new AtomicReference<>();

executionQueue.receive(execution -> {
executionQueue.receive(either -> {
Execution execution = either.getLeft();
if (execution.getState().getCurrent() == State.Type.SUCCESS) {
if (flowListenerNoInput.get() == null && execution.getFlowId().equals("trigger-flow-listener-no-inputs")) {
flowListenerNoInput.set(execution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public void trigger() throws InterruptedException, TimeoutException {
ConcurrentHashMap<String, Execution> ended = new ConcurrentHashMap<>();
Flow flow = flowRepository.findById("io.kestra.tests", "trigger-multiplecondition-listener").orElseThrow();

executionQueue.receive(execution -> {
executionQueue.receive(either -> {
Execution execution = either.getLeft();
synchronized (ended) {
if (execution.getState().getCurrent() == State.Type.SUCCESS) {
if (!ended.containsKey(execution.getId())) {
Expand Down Expand Up @@ -89,8 +90,9 @@ public void failed() throws InterruptedException, TimeoutException {
CountDownLatch countDownLatch = new CountDownLatch(3);
ConcurrentHashMap<String, Execution> ended = new ConcurrentHashMap<>();

executionQueue.receive(execution -> {
executionQueue.receive(either -> {
synchronized (ended) {
Execution execution = either.getLeft();
if (execution.getState().getCurrent().isTerminated()) {
if (!ended.containsKey(execution.getId())) {
ended.put(execution.getId(), execution);
Expand Down
3 changes: 0 additions & 3 deletions core/src/test/java/io/kestra/core/runners/RetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ void retrySuccessAtFirstAttempt() throws TimeoutException {

@Test
void retryFailed() throws TimeoutException {
List<Execution> executions = new ArrayList<>();
executionQueue.receive(executions::add);

Execution execution = runnerUtils.runOne("io.kestra.tests", "retry-failed");

assertThat(execution.getTaskRunList(), hasSize(2));
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/io/kestra/core/runners/RunContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class RunContextTest extends AbstractMemoryRunnerTest {
void logs() throws TimeoutException {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
LogEntry matchingLog;
workerTaskLogQueue.receive(logs::add);
workerTaskLogQueue.receive(either -> logs.add(either.getLeft()));

Execution execution = runnerUtils.runOne("io.kestra.tests", "logs");

Expand All @@ -79,7 +79,7 @@ void logs() throws TimeoutException {
@Test
void inputsLarge() throws TimeoutException {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
workerTaskLogQueue.receive(logs::add);
workerTaskLogQueue.receive(either -> logs.add(either.getLeft()));

char[] chars = new char[1024 * 11];
Arrays.fill(chars, 'a');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void taskDefaults() throws TimeoutException {

public void invalidTaskDefaults() throws TimeoutException {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
logQueue.receive(logs::add);
logQueue.receive(either -> logs.add(either.getLeft()));

Execution execution = runnerUtils.runOne("io.kestra.tests", "invalid-task-defaults", Duration.ofSeconds(60));

Expand Down
8 changes: 4 additions & 4 deletions core/src/test/java/io/kestra/core/runners/WorkerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void success() throws TimeoutException {
worker.run();

AtomicReference<WorkerTaskResult> workerTaskResult = new AtomicReference<>(null);
workerTaskResultQueue.receive(workerTaskResult::set);
workerTaskResultQueue.receive(either -> workerTaskResult.set(either.getLeft()));

workerTaskQueue.emit(workerTask(1000));

Expand All @@ -89,7 +89,7 @@ void failOnWorkerTaskWithFlowable() throws TimeoutException {
worker.run();

AtomicReference<WorkerTaskResult> workerTaskResult = new AtomicReference<>(null);
workerTaskResultQueue.receive(workerTaskResult::set);
workerTaskResultQueue.receive(either -> workerTaskResult.set(either.getLeft()));

Pause pause = Pause.builder()
.type(Pause.class.getName())
Expand Down Expand Up @@ -133,13 +133,13 @@ void failOnWorkerTaskWithFlowable() throws TimeoutException {
@Test
void killed() throws InterruptedException, TimeoutException {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
workerTaskLogQueue.receive(logs::add);
workerTaskLogQueue.receive(either -> logs.add(either.getLeft()));

Worker worker = new Worker(applicationContext, 8, null);
worker.run();

List<WorkerTaskResult> workerTaskResult = new ArrayList<>();
workerTaskResultQueue.receive(workerTaskResult::add);
workerTaskResultQueue.receive(either -> workerTaskResult.add(either.getLeft()));

WorkerTask workerTask = workerTask(999000);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.core.schedulers;

import io.kestra.core.models.conditions.types.DayWeekInMonthCondition;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger;
Expand Down Expand Up @@ -79,7 +80,8 @@ void schedule() throws Exception {
triggerState);
Worker worker = new TestMethodScopedWorker(applicationContext, 8, null)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, execution -> {
Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, either -> {
Execution execution = either.getLeft();
if (execution.getState().getCurrent() == State.Type.CREATED) {
executionQueue.emit(execution.withState(State.Type.SUCCESS));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.schedulers;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.types.Schedule;
Expand All @@ -10,6 +11,7 @@
import org.junitpioneer.jupiter.RetryingTest;
import org.junitpioneer.jupiter.RetryingTest;

import java.lang.reflect.Executable;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
Expand Down Expand Up @@ -76,7 +78,8 @@ void schedule() throws Exception {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy);
Worker worker = new TestMethodScopedWorker(applicationContext, 8, null)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(execution -> {
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
assertThat(execution.getInputs().get("testInputs"), is("test-inputs"));
assertThat(execution.getInputs().get("def"), is("awesome"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ void thread() throws Exception {
AtomicReference<Execution> last = new AtomicReference<>();

// wait for execution
Runnable assertionStop = executionQueue.receive(SchedulerThreadTest.class, execution -> {
Runnable assertionStop = executionQueue.receive(SchedulerThreadTest.class, either -> {
Execution execution = either.getLeft();
last.set(execution);

assertThat(execution.getFlowId(), is(flow.getId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void valid() {
@Test
void exception() {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
logQueue.receive(logs::add);
logQueue.receive(either -> logs.add(either.getLeft()));

Flow flow = TestsUtils.mockFlow();
Schedule schedule = Schedule.builder().id("unit").type(Schedule.class.getName()).cron("0 0 1 * *").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void eachNull() throws TimeoutException {

public static void eachNullTest(RunnerUtils runnerUtils, QueueInterface<LogEntry> logQueue) throws TimeoutException {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
logQueue.receive(logs::add);
logQueue.receive(either -> logs.add(either.getLeft()));

Execution execution = runnerUtils.runOne("io.kestra.tests", "each-null", Duration.ofSeconds(60));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ void run(String input, State.Type fromState, State.Type triggerState, int count
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<Execution> triggered = new AtomicReference<>();

executionQueue.receive(execution -> {
executionQueue.receive(either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("switch") && execution.getState().getCurrent().isTerminated()) {
countDownLatch.countDown();
triggered.set(execution);
Expand Down
Loading

0 comments on commit a85d0e1

Please sign in to comment.