Skip to content

Commit

Permalink
fix(jdbc): handle trigger in jdbc heartbeat/resubmit (#2240)
Browse files Browse the repository at this point in the history
This fix follow the JDBC heartbeat & task resubmit feature recently released in the 0.12
  • Loading branch information
Skraye authored and tchiotludo committed Oct 11, 2023
1 parent ce6ad77 commit e045440
Show file tree
Hide file tree
Showing 30 changed files with 606 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,6 @@ public interface QueueFactoryInterface {

WorkerJobQueueInterface workerJobQueue();

WorkerTriggerResultQueueInterface workerTriggerResultQueue();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.kestra.core.queues;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.utils.Either;

import java.io.Closeable;
import java.util.function.Consumer;

/*
* Required for the QueueFactory, to have common interface with JDBC & Kafka
*/
public interface WorkerTriggerResultQueueInterface extends Closeable {
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer);

void pause();

void cleanup();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import java.util.Optional;

public interface WorkerJobRunningRepositoryInterface {
Optional<WorkerJobRunning> findByTaskRunId(String taskRunId);
Optional<WorkerJobRunning> findByKey(String uid);

void deleteByTaskRunId(String taskRunId);
void deleteByKey(String uid);

}
11 changes: 7 additions & 4 deletions core/src/main/java/io/kestra/core/runners/StandAloneRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class StandAloneRunner implements RunnerInterface, AutoCloseable {
@Setter private java.util.concurrent.ExecutorService poolExecutor;
@Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
@Setter protected boolean schedulerEnabled = true;
@Setter protected boolean workerEnabled = true;

@Inject
private ExecutorsUtils executorsUtils;
Expand Down Expand Up @@ -52,10 +53,12 @@ public void run() {

poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));

Worker worker = new Worker(applicationContext, workerThread, null);
applicationContext.registerSingleton(worker);
poolExecutor.execute(worker);
servers.add(worker);
if(workerEnabled) {
Worker worker = new Worker(applicationContext, workerThread, null);
applicationContext.registerSingleton(worker);
poolExecutor.execute(worker);
servers.add(worker);
}

if (schedulerEnabled) {
AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class);
Expand Down
15 changes: 13 additions & 2 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -552,13 +553,18 @@ public AtomicInteger getMetricRunningCount(WorkerTask workerTask) {
@SuppressWarnings("ResultOfMethodCallIgnored")
@Override
public void close() throws Exception {
closeWorker(Duration.ofMinutes(5));
}

@VisibleForTesting
public void closeWorker(Duration awaitDuration) throws Exception {
workerJobQueue.pause();
executionKilledQueue.pause();
new Thread(
() -> {
try {
this.executors.shutdown();
this.executors.awaitTermination(5, TimeUnit.MINUTES);
this.executors.awaitTermination(awaitDuration.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Fail to shutdown the worker", e);
}
Expand All @@ -570,7 +576,7 @@ public void close() throws Exception {

Await.until(
() -> {
if (this.executors.isTerminated() && this.workerThreadReferences.isEmpty()) {
if (this.executors.isTerminated() || this.workerThreadReferences.isEmpty()) {
log.info("No more worker threads busy, shutting down!");

// we ensure that last produce message are send
Expand Down Expand Up @@ -604,6 +610,11 @@ public void close() throws Exception {
metricEntryQueue.close();
}

@VisibleForTesting
public void shutdown() throws IOException {
this.executors.shutdownNow();
}

public List<WorkerTask> getWorkerThreadTasks() {
return this.workerThreadReferences.stream().map(thread -> thread.workerTask).toList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class WorkerTaskRunning extends WorkerJobRunning {

@Override
public String uid() {
return this.taskRun.getTaskId();
return this.taskRun.getId();
}

public static WorkerTaskRunning of(WorkerTask workerTask, WorkerInstance workerInstance, int partition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,22 @@
import io.kestra.core.models.triggers.types.Schedule;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
import io.kestra.core.runners.*;
import io.kestra.core.services.*;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ListUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -41,9 +37,6 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Stream;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.slf4j.Logger;

@Slf4j
@Singleton
Expand All @@ -52,7 +45,7 @@ public abstract class AbstractScheduler implements Scheduler {
private final QueueInterface<Execution> executionQueue;
private final QueueInterface<Trigger> triggerQueue;
private final QueueInterface<WorkerJob> workerTaskQueue;
private final QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;
private final WorkerTriggerResultQueueInterface workerTriggerResultQueue;
protected final FlowListenersInterface flowListeners;
private final RunContextFactory runContextFactory;
private final MetricRegistry metricRegistry;
Expand Down Expand Up @@ -85,7 +78,7 @@ public AbstractScheduler(
this.executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
this.triggerQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TRIGGER_NAMED));
this.workerTaskQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED));
this.workerTriggerResultQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED));
this.workerTriggerResultQueue = applicationContext.getBean(WorkerTriggerResultQueueInterface.class);
this.flowListeners = flowListeners;
this.runContextFactory = applicationContext.getBean(RunContextFactory.class);
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);
Expand Down Expand Up @@ -152,6 +145,7 @@ public void run() {

// listen to WorkerTriggerResult from polling triggers
this.workerTriggerResultQueue.receive(
null,
Scheduler.class,
either -> {
if (either.isRight()) {
Expand Down
49 changes: 49 additions & 0 deletions core/src/test/java/io/kestra/core/tasks/test/SleepTrigger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.kestra.core.tasks.test;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.TriggerContext;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import javax.validation.constraints.NotNull;
import java.time.Duration;
import java.util.Optional;

/**
* This trigger is used in unit tests where we need a task that wait a little to be able to check the resubmit of triggers.
*/
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class SleepTrigger extends AbstractTrigger implements PollingTriggerInterface {

@PluginProperty
@NotNull
private Long duration;

@Override
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) {
// Try catch to avoid flakky test
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return Optional.empty();
}

@Override
public Duration getInterval() {
return null;
}
}
7 changes: 7 additions & 0 deletions jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
import io.kestra.core.runners.*;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Factory;
Expand Down Expand Up @@ -121,4 +122,10 @@ public QueueInterface<Trigger> trigger() {
public WorkerJobQueueInterface workerJobQueue() {
return new H2WorkerJobQueue(applicationContext);
}

@Override
@Singleton
public WorkerTriggerResultQueueInterface workerTriggerResultQueue() {
return new H2WorkerTriggerResultQueue(applicationContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.kestra.runner.h2;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcWorkerTriggerResultQueueService;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;

import java.util.function.Consumer;

@Slf4j
public class H2WorkerTriggerResultQueue extends H2Queue<WorkerTriggerResult> implements WorkerTriggerResultQueueInterface {
private final JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService;

public H2WorkerTriggerResultQueue(ApplicationContext applicationContext) {
super(WorkerTriggerResult.class, applicationContext);
this.jdbcWorkerTriggerResultQueueService = applicationContext.getBean(JdbcWorkerTriggerResultQueueService.class);
}

@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer);
}

@Override
public void pause() {
jdbcWorkerTriggerResultQueueService.pause();
}

@Override
public void cleanup() {
jdbcWorkerTriggerResultQueueService.cleanup();
}

@Override
public void close() {
jdbcWorkerTriggerResultQueueService.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE worker_job_running
DROP COLUMN "taskrun_id";
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.kestra.runner.h2;

import io.kestra.jdbc.runner.JdbcHeartbeatTest;

class H2HeartbeatTest extends JdbcHeartbeatTest {

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
import io.kestra.core.runners.*;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Factory;
Expand Down Expand Up @@ -121,4 +122,10 @@ public QueueInterface<Trigger> trigger() {
public WorkerJobQueueInterface workerJobQueue() {
return new MysqlWorkerJobQueue(applicationContext);
}

@Override
@Singleton
public WorkerTriggerResultQueueInterface workerTriggerResultQueue() {
return new MysqlWorkerTriggerResultQueue(applicationContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.kestra.runner.mysql;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcWorkerTriggerResultQueueService;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;

import java.util.function.Consumer;

@Slf4j
public class MysqlWorkerTriggerResultQueue extends MysqlQueue<WorkerTriggerResult> implements WorkerTriggerResultQueueInterface {
private final JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService;

public MysqlWorkerTriggerResultQueue(ApplicationContext applicationContext) {
super(WorkerTriggerResult.class, applicationContext);
this.jdbcWorkerTriggerResultQueueService = applicationContext.getBean(JdbcWorkerTriggerResultQueueService.class);
}

@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer);
}

@Override
public void pause() {
jdbcWorkerTriggerResultQueueService.pause();
}

@Override
public void cleanup() {
jdbcWorkerTriggerResultQueueService.cleanup();
}

@Override
public void close() {
jdbcWorkerTriggerResultQueueService.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE worker_job_running
DROP COLUMN taskrun_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.kestra.runner.mysql;

import io.kestra.jdbc.runner.JdbcHeartbeatTest;

class MysqlHeartbeatTest extends JdbcHeartbeatTest {

}
Loading

0 comments on commit e045440

Please sign in to comment.