Skip to content

Commit

Permalink
feat(): Zombie task resubmit
Browse files Browse the repository at this point in the history
  • Loading branch information
Skraye committed Sep 18, 2023
1 parent 201ee9b commit 848d933
Show file tree
Hide file tree
Showing 22 changed files with 470 additions and 53 deletions.
5 changes: 4 additions & 1 deletion cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ kestra:
workerheartbeat:
cls: io.kestra.core.runners.WorkerHeartbeat
table: "worker_heartbeat"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"

queues:
min-poll-interval: 25ms
Expand Down Expand Up @@ -170,5 +173,5 @@ kestra:
fixed-delay: 1h

heartbeat:
frequency: 10
frequency: 5
heartbeat-missed: 3
5 changes: 1 addition & 4 deletions core/src/main/java/io/kestra/core/queues/QueueService.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package io.kestra.core.queues;

import io.kestra.core.models.Setting;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.topologies.FlowTopology;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.kestra.core.repositories;

import io.kestra.core.runners.WorkerJobRunning;

import java.util.List;
import java.util.Optional;

public interface WorkerJobRunningInterface {
Optional<WorkerJobRunning> findByTaskRunId(String taskRunId);

void delete(String taskRunId);

WorkerJobRunning save(WorkerJobRunning workerJobRunning);

List<WorkerJobRunning> getWorkerJobWithWorkerDead(List<String> workersAlive);
}
15 changes: 11 additions & 4 deletions core/src/main/java/io/kestra/core/runners/WorkerHeartbeat.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.kestra.core.runners;

import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.time.Instant;
Expand All @@ -19,6 +16,16 @@ public class WorkerHeartbeat extends WorkerInstance {
@Builder.Default
private Instant heartbeatDate = Instant.now();

public WorkerInstance toWorkerInstance() {
return WorkerInstance.builder()
.workerUuid(this.getWorkerUuid())
.hostname(this.getHostname())
.port(this.getPort())
.managementPort(this.getManagementPort())
.workerGroup(this.getWorkerGroup())
.build();
}

public enum Status {
UP, DEAD
}
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/java/io/kestra/core/runners/WorkerInstance.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.kestra.core.runners;

import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;

import javax.validation.constraints.NotNull;
Expand All @@ -15,6 +12,7 @@
@SuperBuilder(toBuilder = true)
@ToString
@NoArgsConstructor
@Getter
public class WorkerInstance {
@NotNull
private UUID workerUuid;
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/io/kestra/core/runners/WorkerJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ public abstract class WorkerJob {
abstract public String getType();

abstract public String uid();

abstract public String taskRunId();
}
5 changes: 5 additions & 0 deletions core/src/main/java/io/kestra/core/runners/WorkerTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public Logger logger() {
public String uid() {
return this.taskRun.getTaskId();
}

@Override
public String taskRunId() {
return this.taskRun.getId();
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/io/kestra/core/runners/WorkerTrigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public class WorkerTrigger extends WorkerJob {
public String uid() {
return triggerContext.uid();
}

@Override
public String taskRunId() {
return triggerContext.uid();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.kestra.repository.h2;

import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
@H2RepositoryEnabled
public class H2WorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository {
@Inject
public H2WorkerJobRunningRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(WorkerJobRunning.class, applicationContext));
}
}
57 changes: 53 additions & 4 deletions jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerJobQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,76 @@
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.*;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.JdbcHeartbeat;
import io.kestra.jdbc.runner.JdbcQueue;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import lombok.extern.slf4j.Slf4j;

import java.net.UnknownHostException;
import java.util.function.Consumer;

@Slf4j
public class H2WorkerJobQueue implements WorkerJobQueueInterface {
QueueInterface<WorkerJob> workerTaskQueue;
JdbcQueue<WorkerJob> workerTaskQueue;
JdbcHeartbeat jdbcHeartbeat;
AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;

@SuppressWarnings("unchecked")
public H2WorkerJobQueue(ApplicationContext applicationContext) {
this.workerTaskQueue = (QueueInterface<WorkerJob>) applicationContext.getBean(
this.workerTaskQueue = (JdbcQueue<WorkerJob>) applicationContext.getBean(
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED)
);
this.jdbcHeartbeat = applicationContext.getBean(JdbcHeartbeat.class);
this.jdbcWorkerJobRunningRepository = applicationContext.getBean(AbstractJdbcWorkerJobRunningRepository.class);
}

@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return workerTaskQueue.receive(consumerGroup, queueType, consumer);
return workerTaskQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> {

eithers.forEach(either -> {
if (either.isRight()) {
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
return;
}
WorkerJob workerJob = either.getLeft();
WorkerInstance workerInstance = null;
try {
workerInstance = jdbcHeartbeat.get().toWorkerInstance();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
WorkerJobRunning workerJobRunning;

if (workerJob instanceof WorkerTask workerTask) {
workerJobRunning = WorkerTaskRunning.of(
workerTask,
workerInstance,
0
);
} else if (workerJob instanceof WorkerTrigger workerTrigger) {
workerJobRunning = WorkerTriggerRunning.of(
workerTrigger,
workerInstance,
0
);
}
else {
throw new IllegalArgumentException("Message is of type " + workerJob.getClass() + " which should never occurs");
}

jdbcWorkerJobRunningRepository.save(workerJobRunning, dslContext);

log.trace("Sending a workerJobRunning: {}", workerJobRunning);
});

eithers.forEach(consumer);
});
}

@Override
Expand Down
10 changes: 9 additions & 1 deletion jdbc-h2/src/main/resources/migrations/h2/V1_2__initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,12 @@ CREATE TABLE IF NOT EXISTS worker_heartbeat (
"worker_group" VARCHAR(150) GENERATED ALWAYS AS (JQ_STRING("value",'.workerGroup')),
"status" VARCHAR(10) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.status')),
"heartbeat_date" TIMESTAMP NOT NULL GENERATED ALWAYS AS (PARSEDATETIME(JQ_STRING("value", '.heartbeatDate'), 'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'))
);
);

/* ----------------------- worker_job_running ----------------------- */
CREATE TABLE IF NOT EXISTS worker_job_running (
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
"value" TEXT NOT NULL,
"worker_uuid" VARCHAR(36) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.workerInstance.workerUuid')),
"taskrun_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.taskRun.id'))
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.kestra.repository.mysql;

import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
@MysqlRepositoryEnabled
public class MysqlWorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository {
@Inject
public MysqlWorkerJobRunningRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(WorkerJobRunning.class, applicationContext));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,76 @@
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.*;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.JdbcHeartbeat;
import io.kestra.jdbc.runner.JdbcQueue;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import lombok.extern.slf4j.Slf4j;

import java.net.UnknownHostException;
import java.util.function.Consumer;

@Slf4j
public class MysqlWorkerJobQueue implements WorkerJobQueueInterface {
QueueInterface<WorkerJob> workerTaskQueue;
JdbcQueue<WorkerJob> workerTaskQueue;
JdbcHeartbeat jdbcHeartbeat;
AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;

@SuppressWarnings("unchecked")
public MysqlWorkerJobQueue(ApplicationContext applicationContext) {
this.workerTaskQueue = (QueueInterface<WorkerJob>) applicationContext.getBean(
this.workerTaskQueue = (JdbcQueue<WorkerJob>) applicationContext.getBean(
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED)
);
this.jdbcHeartbeat = applicationContext.getBean(JdbcHeartbeat.class);
this.jdbcWorkerJobRunningRepository = applicationContext.getBean(AbstractJdbcWorkerJobRunningRepository.class);
}

@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return workerTaskQueue.receive(consumerGroup, queueType, consumer);
return workerTaskQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> {

eithers.forEach(either -> {
if (either.isRight()) {
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
return;
}
WorkerJob workerJob = either.getLeft();
WorkerInstance workerInstance = null;
try {
workerInstance = jdbcHeartbeat.get().toWorkerInstance();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
WorkerJobRunning workerJobRunning;

if (workerJob instanceof WorkerTask workerTask) {
workerJobRunning = WorkerTaskRunning.of(
workerTask,
workerInstance,
0
);
} else if (workerJob instanceof WorkerTrigger workerTrigger) {
workerJobRunning = WorkerTriggerRunning.of(
workerTrigger,
workerInstance,
0
);
}
else {
throw new IllegalArgumentException("Message is of type " + workerJob.getClass() + " which should never occurs");
}

jdbcWorkerJobRunningRepository.save(workerJobRunning, dslContext);

log.trace("Sending a workerJobRunning: {}", workerJobRunning);
});

eithers.forEach(consumer);
});
}

@Override
Expand Down
13 changes: 10 additions & 3 deletions jdbc-mysql/src/main/resources/migrations/mysql/V1_2__initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@
CREATE TABLE IF NOT EXISTS worker_heartbeat (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`worker_uuid` VARCHAR(36)GENERATED ALWAYS AS (value ->> '$.workerUuid') STORED NOT NULL ,
`hostname` VARCHAR(150)GENERATED ALWAYS AS (value ->> '$.hostname') STORED NOT NULL ,
`worker_uuid` VARCHAR(36) GENERATED ALWAYS AS (value ->> '$.workerUuid') STORED NOT NULL,
`hostname` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.hostname') STORED NOT NULL,
`port` INT UNSIGNED GENERATED ALWAYS AS (value ->> '$.port') STORED,
`management_port` INT UNSIGNED GENERATED ALWAYS AS (value ->> '$.managementPort') STORED,
`worker_group` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.workerGroup') STORED,
`status` VARCHAR(10)GENERATED ALWAYS AS (value ->> '$.status') STORED NOT NULL ,
`status` VARCHAR(10)GENERATED ALWAYS AS (value ->> '$.status') STORED NOT NULL,
`heartbeat_date` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.heartbeatDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

/* ----------------------- worker_job_running ----------------------- */
CREATE TABLE IF NOT EXISTS worker_job_running (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`worker_uuid` VARCHAR(36) GENERATED ALWAYS AS (value ->> '$.workerInstance.workerUuid') STORED NOT NULL,
`taskrun_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.taskRun.id') STORED NOT NULL
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.kestra.repository.postgres;

import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
@PostgresRepositoryEnabled
public class PostgresWorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository {
@Inject
public PostgresWorkerJobRunningRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(WorkerJobRunning.class, applicationContext));
}
}
Loading

0 comments on commit 848d933

Please sign in to comment.