Skip to content

Commit

Permalink
feat(core): zombie worker task detection & automatic resubmission (#2081
Browse files Browse the repository at this point in the history
)

close #2055
  • Loading branch information
Skraye authored Sep 28, 2023
1 parent 44a8da8 commit c9e6145
Show file tree
Hide file tree
Showing 51 changed files with 1,252 additions and 80 deletions.
11 changes: 10 additions & 1 deletion cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ jackson:

endpoints:
all:
port: 8081
enabled: true
sensitive: false
health:
Expand Down Expand Up @@ -127,6 +126,12 @@ kestra:
flowtopologies:
table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology
workerinstance:
cls: io.kestra.core.runners.WorkerInstance
table: "worker_instance"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"

queues:
min-poll-interval: 25ms
Expand Down Expand Up @@ -171,3 +176,7 @@ kestra:
uri: https://api.kestra.io/v1/reports/usages
initial-delay: 5m
fixed-delay: 1h

heartbeat:
frequency: 10s
heartbeat-missed: 3
5 changes: 1 addition & 4 deletions core/src/main/java/io/kestra/core/queues/QueueService.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package io.kestra.core.queues;

import io.kestra.core.models.Setting;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.topologies.FlowTopology;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ public interface WorkerJobQueueInterface extends Closeable {
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer);

void pause();

void cleanup();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.kestra.core.repositories;

import io.kestra.core.runners.WorkerInstance;

import java.util.List;
import java.util.Optional;

public interface WorkerInstanceRepositoryInterface {
Optional<WorkerInstance> findByWorkerUuid(String workerUuid);

List<WorkerInstance> findAll();

void delete(WorkerInstance workerInstance);

WorkerInstance save(WorkerInstance workerInstance);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.kestra.core.repositories;


import io.kestra.core.runners.WorkerJobRunning;

import java.util.Optional;

public interface WorkerJobRunningRepositoryInterface {
Optional<WorkerJobRunning> findByTaskRunId(String taskRunId);

void deleteByTaskRunId(String taskRunId);

}
31 changes: 20 additions & 11 deletions core/src/main/java/io/kestra/core/runners/StandAloneRunner.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package io.kestra.core.runners;

import io.micronaut.context.ApplicationContext;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.utils.ExecutorsUtils;

import java.io.Closeable;
import java.io.IOException;

import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;

import static io.kestra.core.utils.Rethrow.throwConsumer;

@Slf4j
public class StandAloneRunner implements RunnerInterface, Closeable {
public class StandAloneRunner implements RunnerInterface, AutoCloseable {
@Setter private java.util.concurrent.ExecutorService poolExecutor;
@Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
@Setter protected boolean schedulerEnabled = true;
Expand All @@ -39,6 +40,8 @@ public class StandAloneRunner implements RunnerInterface, Closeable {
@Inject
private ApplicationContext applicationContext;

private final List<AutoCloseable> servers = new ArrayList<>();

private boolean running = false;

@Override
Expand All @@ -52,13 +55,18 @@ public void run() {
Worker worker = new Worker(applicationContext, workerThread, null);
applicationContext.registerSingleton(worker);
poolExecutor.execute(worker);
servers.add(worker);

if (schedulerEnabled) {
poolExecutor.execute(applicationContext.getBean(AbstractScheduler.class));
AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class);
poolExecutor.execute(scheduler);
servers.add(scheduler);
}

if (applicationContext.containsBean(IndexerInterface.class)) {
poolExecutor.execute(applicationContext.getBean(IndexerInterface.class));
IndexerInterface indexer = applicationContext.getBean(IndexerInterface.class);
poolExecutor.execute(indexer);
servers.add(indexer);
}
}

Expand All @@ -67,7 +75,8 @@ public boolean isRunning() {
}

@Override
public void close() throws IOException {
public void close() throws Exception {
this.servers.forEach(throwConsumer(AutoCloseable::close));
this.poolExecutor.shutdown();
this.executionQueue.close();
this.workerTaskQueue.close();
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -565,6 +566,8 @@ public void close() throws Exception {
"worker-shutdown"
).start();

AtomicBoolean cleanShutdown = new AtomicBoolean(false);

Await.until(
() -> {
if (this.executors.isTerminated() && this.workerThreadReferences.isEmpty()) {
Expand All @@ -577,6 +580,7 @@ public void close() throws Exception {
log.error("Failed to close the workerTaskResultQueue", e);
}

cleanShutdown.set(true);;
return true;
}

Expand All @@ -590,6 +594,10 @@ public void close() throws Exception {
Duration.ofSeconds(1)
);

if (cleanShutdown.get()) {
workerJobQueue.cleanup();
}

workerJobQueue.close();
executionKilledQueue.close();
workerTaskResultQueue.close();
Expand Down
25 changes: 20 additions & 5 deletions core/src/main/java/io/kestra/core/runners/WorkerInstance.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package io.kestra.core.runners;

import lombok.Builder;
import lombok.Data;
import lombok.ToString;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.*;
import lombok.experimental.SuperBuilder;

import javax.validation.constraints.NotNull;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import javax.validation.constraints.NotNull;

@Data
@Builder
@SuperBuilder(toBuilder = true)
@ToString
@NoArgsConstructor
@Getter
public class WorkerInstance {
@NotNull
private UUID workerUuid;
Expand All @@ -25,4 +28,16 @@ public class WorkerInstance {

@Builder.Default
private List<Integer> partitions = new ArrayList<>();

@Builder.Default
@JsonInclude
private Status status = Status.UP;

@Builder.Default
private Instant heartbeatDate = Instant.now();

public enum Status {
UP, DEAD
}

}
2 changes: 2 additions & 0 deletions core/src/main/java/io/kestra/core/runners/WorkerJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ public abstract class WorkerJob {
abstract public String getType();

abstract public String uid();

abstract public String taskRunId();
}
5 changes: 5 additions & 0 deletions core/src/main/java/io/kestra/core/runners/WorkerTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public Logger logger() {
public String uid() {
return this.taskRun.getTaskId();
}

@Override
public String taskRunId() {
return this.taskRun.getId();
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/io/kestra/core/runners/WorkerTrigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public class WorkerTrigger extends WorkerJob {
public String uid() {
return triggerContext.uid();
}

@Override
public String taskRunId() {
return triggerContext.uid();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
public class TestMethodScopedWorker extends Worker {
public TestMethodScopedWorker(ApplicationContext applicationContext, int thread, String workerGroupKey) {
super(applicationContext, thread, workerGroupKey);

applicationContext.registerSingleton(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.kestra.repository.h2;

import io.kestra.core.runners.WorkerInstance;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
@H2RepositoryEnabled
public class H2WorkerInstanceRepository extends AbstractJdbcWorkerInstanceRepository {
@Inject
public H2WorkerInstanceRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(WorkerInstance.class, applicationContext));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.kestra.repository.h2;

import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
@H2RepositoryEnabled
public class H2WorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository {
@Inject
public H2WorkerJobRunningRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(WorkerJobRunning.class, applicationContext));
}
}
23 changes: 12 additions & 11 deletions jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java
Original file line number Diff line number Diff line change
@@ -1,39 +1,40 @@
package io.kestra.runner.h2;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcWorkerJobQueueService;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import lombok.extern.slf4j.Slf4j;

import java.util.function.Consumer;

@Slf4j
public class H2WorkerJobQueue implements WorkerJobQueueInterface {
QueueInterface<WorkerJob> workerTaskQueue;
private final JdbcWorkerJobQueueService jdbcworkerjobQueueService;

@SuppressWarnings("unchecked")
public H2WorkerJobQueue(ApplicationContext applicationContext) {
this.workerTaskQueue = (QueueInterface<WorkerJob>) applicationContext.getBean(
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED)
);
this.jdbcworkerjobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
}

@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return workerTaskQueue.receive(consumerGroup, queueType, consumer);
return jdbcworkerjobQueueService.receive(consumerGroup, queueType, consumer);
}

@Override
public void pause() {
jdbcworkerjobQueueService.pause();
}

@Override
public void cleanup() {
jdbcworkerjobQueueService.cleanup();
}

@Override
public void close() {

jdbcworkerjobQueueService.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* ----------------------- workerInstance ----------------------- */
CREATE TABLE IF NOT EXISTS worker_instance (
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
"value" TEXT NOT NULL,
"worker_uuid" VARCHAR(36) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.workerUuid')),
"hostname" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.hostname')),
"port" INT GENERATED ALWAYS AS (JQ_INTEGER("value",'.port')),
"management_port" INT GENERATED ALWAYS AS (JQ_INTEGER("value",'.managementPort')),
"worker_group" VARCHAR(150) GENERATED ALWAYS AS (JQ_STRING("value",'.workerGroup')),
"status" VARCHAR(10) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.status')),
"heartbeat_date" TIMESTAMP NOT NULL GENERATED ALWAYS AS (PARSEDATETIME(JQ_STRING("value", '.heartbeatDate'), 'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'))
);

/* ----------------------- worker_job_running ----------------------- */
CREATE TABLE IF NOT EXISTS worker_job_running (
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
"value" TEXT NOT NULL,
"worker_uuid" VARCHAR(36) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.workerInstance.workerUuid')),
"taskrun_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.taskRun.id'))
);

CREATE INDEX IF NOT EXISTS worker_job_running_worker_uuid ON worker_job_running ("worker_uuid");
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.kestra.repository.h2;

import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepositoryTest;

public class H2WorkerInstanceRepositoryTest extends AbstractJdbcWorkerInstanceRepositoryTest {
}
Loading

0 comments on commit c9e6145

Please sign in to comment.