From 89573d6eceea347b663dbfd489fb1207e35f8112 Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Sun, 24 Mar 2024 18:33:27 +0100 Subject: [PATCH] feat(core): add task restart strategy on worker failure (#3343,#3351) Changes: - add new config 'kestra.server.workerTaskRestartStrategy' - update default value for 'kestra.server.liveness.initialDelay' to match timeout - fix schedule initial delay to 0 - cleanup javadoc Fix: #3343 Fix: #3351 --- cli/src/main/resources/application.yml | 3 +- .../ServiceInstanceRepositoryInterface.java | 16 +--- .../AbstractServiceLivenessCoordinator.java | 36 ++++----- .../server/AbstractServiceLivenessTask.java | 9 +-- .../io/kestra/core/server/ServerConfig.java | 16 +++- .../java/io/kestra/core/server/Service.java | 8 +- .../io/kestra/core/server/WorkerConfig.java | 7 -- .../server/WorkerTaskRestartStrategy.java | 32 ++++++++ .../core/server/ServiceInstanceTest.java | 4 +- .../server/ServiceLivenessManagerTest.java | 8 +- ...AbstractJdbcServiceInstanceRepository.java | 27 ++++--- .../JdbcServiceLivenessCoordinator.java | 79 ++++++++++++++----- ...ractJdbcServiceInstanceRepositoryTest.java | 38 +-------- .../JdbcServiceLivenessManagerTest.java | 6 +- .../MemoryServiceInstanceRepository.java | 2 +- 15 files changed, 169 insertions(+), 122 deletions(-) delete mode 100644 core/src/main/java/io/kestra/core/server/WorkerConfig.java create mode 100644 core/src/main/java/io/kestra/core/server/WorkerTaskRestartStrategy.java diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index a0e2a06bced..949f9d366b9 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -216,6 +216,7 @@ kestra: max-rows: 5000 # The expected time for this server to complete all its tasks before initiating a graceful shutdown. terminationGracePeriod: 5m + workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD # Configuration for Liveness and Heartbeat mechanism between servers. liveness: enabled: true @@ -224,7 +225,7 @@ kestra: # The timeout used to detect service failures. timeout: 45s # The time to wait before executing a liveness probe. - initialDelay: 30s + initialDelay: 45s # The expected time between service heartbeats. heartbeatInterval: 3s anonymous-usage-report: diff --git a/core/src/main/java/io/kestra/core/repositories/ServiceInstanceRepositoryInterface.java b/core/src/main/java/io/kestra/core/repositories/ServiceInstanceRepositoryInterface.java index a2815d05fe4..83961942844 100644 --- a/core/src/main/java/io/kestra/core/repositories/ServiceInstanceRepositoryInterface.java +++ b/core/src/main/java/io/kestra/core/repositories/ServiceInstanceRepositoryInterface.java @@ -58,20 +58,6 @@ ArrayListTotal find(Pageable pageable, */ ServiceInstance save(ServiceInstance service); - /** - * Finds all running service instances which have not sent a state update for more than their associated timeout, - * and thus should be considered in failure. - * - * @param now the time instant to be used for querying. - * @return the list of {@link ServiceInstance}. - */ - default List findAllTimeoutRunningInstances(final Instant now) { - return findAllInstancesInStates(List.of(Service.ServiceState.CREATED, Service.ServiceState.RUNNING)) - .stream() - .filter(instance -> instance.isSessionTimeoutElapsed(now)) - .toList(); - } - /** * Finds all service instances which are in the given state. * @@ -84,7 +70,7 @@ default List findAllTimeoutRunningInstances(final Instant now) * * @return the list of {@link ServiceInstance}. */ - List findAllInstancesInStates(final List states); + List findAllInstancesInStates(final Set states); /** * Attempt to transition the state of a given service to given new state. diff --git a/core/src/main/java/io/kestra/core/server/AbstractServiceLivenessCoordinator.java b/core/src/main/java/io/kestra/core/server/AbstractServiceLivenessCoordinator.java index 5e84e317172..f65a7edd8cf 100644 --- a/core/src/main/java/io/kestra/core/server/AbstractServiceLivenessCoordinator.java +++ b/core/src/main/java/io/kestra/core/server/AbstractServiceLivenessCoordinator.java @@ -9,7 +9,7 @@ import java.time.Instant; import java.util.List; import java.util.Random; -import java.util.random.RandomGenerator; +import java.util.Set; import static io.kestra.core.server.Service.ServiceState.CREATED; import static io.kestra.core.server.Service.ServiceState.RUNNING; @@ -23,6 +23,14 @@ public abstract class AbstractServiceLivenessCoordinator extends AbstractService private final static int DEFAULT_SCHEDULE_JITTER_MAX_MS = 500; + protected static String DEFAULT_REASON_FOR_DISCONNECTED = + "The service was detected as non-responsive after the session timeout. " + + "Service transitioned to the 'DISCONNECTED' state."; + + protected static String DEFAULT_REASON_FOR_NOT_RUNNING = + "The service was detected as non-responsive or terminated after termination grace period. " + + "Service transitioned to the 'NOT_RUNNING' state."; + private static final String TASK_NAME = "service-liveness-coordinator-task"; protected final ServiceInstanceRepositoryInterface serviceInstanceRepository; @@ -56,21 +64,11 @@ protected Duration getScheduleInterval() { return serverConfig.liveness().interval().plus(Duration.ofMillis(jitter)); } - /** - * Transitions to the DISCONNECTED state all non-local services for which liveness - * is enabled and are detected as non-responsive . - * - * @param now the instant. - */ - protected void transitionAllNonRespondingService(final Instant now) { - - // Detect and handle non-responding services. - List nonRespondingServices = serviceInstanceRepository - // gets all non-responding services. - .findAllTimeoutRunningInstances(now) - .stream() - // keep only services with liveness enabled. + protected List filterAllNonRespondingServices(final List instances, + final Instant now) { + return instances.stream() .filter(instance -> instance.config().liveness().enabled()) + .filter(instance -> instance.isSessionTimeoutElapsed(now)) // exclude any service running on the same server as the executor, to prevent the latter from shutting down. .filter(instance -> !instance.server().id().equals(serverId)) // only keep services eligible for liveness probe @@ -87,18 +85,12 @@ protected void transitionAllNonRespondingService(final Instant now) { )) .toList(); - // Attempt to transit all non-responding services to DISCONNECTED. - nonRespondingServices.forEach(instance -> this.safelyTransitionServiceTo( - instance, - Service.ServiceState.DISCONNECTED, - "The service was detected as non-responsive after the session timeout. Service was transitioned to the 'DISCONNECTED' state." - )); } protected void mayDetectAndLogNewConnectedServices() { if (log.isInfoEnabled()) { // Log the newly-connected services (useful for troubleshooting). - serviceInstanceRepository.findAllInstancesInStates(List.of(CREATED, RUNNING)) + serviceInstanceRepository.findAllInstancesInStates(Set.of(CREATED, RUNNING)) .stream() .filter(instance -> instance.createdAt().isAfter(lastScheduledExecution())) .forEach(instance -> { diff --git a/core/src/main/java/io/kestra/core/server/AbstractServiceLivenessTask.java b/core/src/main/java/io/kestra/core/server/AbstractServiceLivenessTask.java index 53cc3349aac..b549c6463be 100644 --- a/core/src/main/java/io/kestra/core/server/AbstractServiceLivenessTask.java +++ b/core/src/main/java/io/kestra/core/server/AbstractServiceLivenessTask.java @@ -56,7 +56,7 @@ public void run(final Instant now) { long timeout = serverConfig.liveness().timeout().toMillis(); if (elapsed > timeout) { // useful for debugging unexpected heartbeat timeout - log.warn("Thread starvation or clock leap detected (elapsed since previous schedule {}", elapsed); + log.warn("Thread starvation or clock leap detected (elapsed since previous schedule {}ms", elapsed); } onSchedule(now); } catch (Exception e) { @@ -89,18 +89,17 @@ protected long getElapsedMilliSinceLastSchedule(final Instant now) { public void start() { if (!isLivenessEnabled()) { log.warn( - "Server liveness is currently disabled (`kestra.server.liveness.enabled=false`) " + - "If you are running in production environment, please ensure this property is configured to `true`. " + "Server liveness is currently disabled (kestra.server.liveness.enabled=false) " + + "If you are running in production environment, please ensure this property is configured to 'true'. " ); } if (scheduledExecutorService == null && !isStopped.get()) { scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name)); Duration scheduleInterval = getScheduleInterval(); log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval); - Duration initialDelay = serverConfig.liveness().initialDelay(); scheduledExecutorService.scheduleAtFixedRate( this, - initialDelay.toSeconds(), + 0, scheduleInterval.toSeconds(), TimeUnit.SECONDS ); diff --git a/core/src/main/java/io/kestra/core/server/ServerConfig.java b/core/src/main/java/io/kestra/core/server/ServerConfig.java index dd88536bdd7..149c571f3b7 100644 --- a/core/src/main/java/io/kestra/core/server/ServerConfig.java +++ b/core/src/main/java/io/kestra/core/server/ServerConfig.java @@ -1,10 +1,12 @@ package io.kestra.core.server; import io.micronaut.context.annotation.ConfigurationProperties; +import io.micronaut.core.annotation.Nullable; import io.micronaut.core.bind.annotation.Bindable; import jakarta.validation.constraints.NotNull; import java.time.Duration; +import java.util.Optional; /** * Server configuration. @@ -18,9 +20,21 @@ public record ServerConfig( @Bindable(defaultValue = "5m") Duration terminationGracePeriod, + @Bindable(defaultValue = "AFTER_TERMINATION_GRACE_PERIOD") + @Nullable + WorkerTaskRestartStrategy workerTaskRestartStrategy, + Liveness liveness ) { + + /** {@inheritDoc} **/ + public WorkerTaskRestartStrategy workerTaskRestartStrategy() { + return Optional + .ofNullable(workerTaskRestartStrategy) + .orElse(WorkerTaskRestartStrategy.AFTER_TERMINATION_GRACE_PERIOD); + } + /** * Configuration for Liveness and Heartbeat mechanism between Kestra Services, and Executor. * @@ -39,7 +53,7 @@ public record Liveness( @NotNull @Bindable(defaultValue = "5s") Duration interval, @NotNull @Bindable(defaultValue = "45s") Duration timeout, - @NotNull @Bindable(defaultValue = "30s") Duration initialDelay, + @NotNull @Bindable(defaultValue = "45s") Duration initialDelay, @NotNull @Bindable(defaultValue = "3s") Duration heartbeatInterval ) { } diff --git a/core/src/main/java/io/kestra/core/server/Service.java b/core/src/main/java/io/kestra/core/server/Service.java index f6b47407b8a..1901ec5419f 100644 --- a/core/src/main/java/io/kestra/core/server/Service.java +++ b/core/src/main/java/io/kestra/core/server/Service.java @@ -2,6 +2,7 @@ import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; /** @@ -134,8 +135,7 @@ public boolean isValidTransition(final ServiceState newState) { } public boolean isRunning() { - return equals(CREATED) - || equals(RUNNING); + return allRunningStates().contains(this); } public boolean isDisconnectedOrTerminating() { @@ -149,5 +149,9 @@ public boolean hasCompletedTermination() { || equals(NOT_RUNNING) || equals(EMPTY); } + + public static Set allRunningStates() { + return Set.of(CREATED, RUNNING); + } } } diff --git a/core/src/main/java/io/kestra/core/server/WorkerConfig.java b/core/src/main/java/io/kestra/core/server/WorkerConfig.java deleted file mode 100644 index f5fbe4fccca..00000000000 --- a/core/src/main/java/io/kestra/core/server/WorkerConfig.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.kestra.core.server; - -import io.micronaut.context.annotation.ConfigurationProperties; - -@ConfigurationProperties("kestra.server") -public class WorkerConfig { -} diff --git a/core/src/main/java/io/kestra/core/server/WorkerTaskRestartStrategy.java b/core/src/main/java/io/kestra/core/server/WorkerTaskRestartStrategy.java new file mode 100644 index 00000000000..b8263ce8713 --- /dev/null +++ b/core/src/main/java/io/kestra/core/server/WorkerTaskRestartStrategy.java @@ -0,0 +1,32 @@ +package io.kestra.core.server; + +/** + * The supported strategies for restarting tasks on worker failure. + */ +public enum WorkerTaskRestartStrategy { + + /** + * Tasks are never restarted on worker failure. + */ + NEVER, + /** + * Tasks are restarted immediately on worker failure, i.e., as soon as a worker id detected as disconnected. + * This strategy is used to reduce task recovery times at the risk of introducing duplicate executions. + */ + IMMEDIATELY, + /** + * Tasks are restarted on worker failure after the termination grace period is elapsed. + * This strategy is used to limit the risk of task duplication. + */ + AFTER_TERMINATION_GRACE_PERIOD; + + /** + * Checks whether tasks are restartable for that strategy. + * + * @return {@code true} if tasks are restartable. + */ + public boolean isRestartable() { + return this.equals(IMMEDIATELY) || this.equals(AFTER_TERMINATION_GRACE_PERIOD); + } + +} diff --git a/core/src/test/java/io/kestra/core/server/ServiceInstanceTest.java b/core/src/test/java/io/kestra/core/server/ServiceInstanceTest.java index 149dec4ac4c..25d5a3887e2 100644 --- a/core/src/test/java/io/kestra/core/server/ServiceInstanceTest.java +++ b/core/src/test/java/io/kestra/core/server/ServiceInstanceTest.java @@ -11,7 +11,9 @@ class ServiceInstanceTest { - public static final ServerConfig CONFIG = new ServerConfig(Duration.ZERO, + public static final ServerConfig CONFIG = new ServerConfig( + Duration.ZERO, + WorkerTaskRestartStrategy.AFTER_TERMINATION_GRACE_PERIOD, new ServerConfig.Liveness( true, Duration.ZERO, diff --git a/core/src/test/java/io/kestra/core/server/ServiceLivenessManagerTest.java b/core/src/test/java/io/kestra/core/server/ServiceLivenessManagerTest.java index f687fe6dc24..a30b9073a08 100644 --- a/core/src/test/java/io/kestra/core/server/ServiceLivenessManagerTest.java +++ b/core/src/test/java/io/kestra/core/server/ServiceLivenessManagerTest.java @@ -47,7 +47,9 @@ public class ServiceLivenessManagerTest { @BeforeEach void beforeEach() { KestraContext kestraContext = Mockito.mock(KestraContext.class); - ServerConfig config = new ServerConfig(Duration.ZERO, + ServerConfig config = new ServerConfig( + Duration.ZERO, + WorkerTaskRestartStrategy.AFTER_TERMINATION_GRACE_PERIOD, new ServerConfig.Liveness( true, Duration.ZERO, @@ -180,7 +182,9 @@ public ServiceState getState() { } public static ServiceInstance serviceInstanceFor(final Service service) { - ServerConfig config = new ServerConfig(Duration.ZERO, + ServerConfig config = new ServerConfig( + Duration.ZERO, + WorkerTaskRestartStrategy.AFTER_TERMINATION_GRACE_PERIOD, new ServerConfig.Liveness( true, Duration.ZERO, diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcServiceInstanceRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcServiceInstanceRepository.java index 13361c95c7b..eb5f5d6219d 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcServiceInstanceRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcServiceInstanceRepository.java @@ -89,15 +89,22 @@ public List findAllInstancesInState(final Service.ServiceState * {@inheritDoc} **/ @Override - public List findAllInstancesInStates(final List states) { + public List findAllInstancesInStates(final Set states) { return this.jdbcRepository.getDslContextWrapper() - .transactionResult(configuration -> { - SelectConditionStep> query = using(configuration) - .select(VALUE) - .from(table()) - .where(STATE.in(states.stream().map(Enum::name).toList())); - return this.jdbcRepository.fetch(query); - }); + .transactionResult(configuration -> findAllInstancesInStates(configuration, states, false)); + } + + public List findAllInstancesInStates(final Configuration configuration, + final Set states, + final boolean isForUpdate) { + SelectConditionStep> query = using(configuration) + .select(VALUE) + .from(table()) + .where(STATE.in(states.stream().map(Enum::name).toList())); + + return isForUpdate ? + this.jdbcRepository.fetch(query.forUpdate()) : + this.jdbcRepository.fetch(query); } /** @@ -234,7 +241,7 @@ public ServiceStateTransition.Response mayTransitionServiceTo(final ServiceInsta } /** - * Attempt to transit the status of a given service to given new status. + * Attempt to transition the state of a given service to given new state. * This method may not update the service if the transition is not valid. * * @param instance the service instance. @@ -255,7 +262,7 @@ public ServiceStateTransition.Response mayTransitServiceTo(final Configuration c } /** - * Attempt to transit the status of a given service to given new status. + * Attempt to transition the state of a given service to given new state. * This method may not update the service if the transition is not valid. * * @param instance the new service instance. diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcServiceLivenessCoordinator.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcServiceLivenessCoordinator.java index 244685ae557..b2d300443ac 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcServiceLivenessCoordinator.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcServiceLivenessCoordinator.java @@ -6,6 +6,7 @@ import io.kestra.core.server.Service; import io.kestra.core.server.Service.ServiceState; import io.kestra.core.server.ServiceInstance; +import io.kestra.core.server.WorkerTaskRestartStrategy; import io.kestra.jdbc.repository.AbstractJdbcServiceInstanceRepository; import io.micronaut.context.annotation.Requires; import jakarta.inject.Inject; @@ -13,14 +14,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; -import static io.kestra.core.server.Service.ServiceState.*; +import static io.kestra.core.server.Service.ServiceState.DISCONNECTED; +import static io.kestra.core.server.Service.ServiceState.NOT_RUNNING; +import static io.kestra.core.server.Service.ServiceState.TERMINATED_FORCED; +import static io.kestra.core.server.Service.ServiceState.TERMINATED_GRACEFULLY; +import static io.kestra.core.server.Service.ServiceState.TERMINATING; +import static io.kestra.core.server.Service.ServiceState.allRunningStates; /** * Responsible for coordinating the state of all service instances. @@ -58,11 +64,33 @@ protected void onSchedule(final Instant now) { if (executor.get() == null) return; // only True during startup // Transition all RUNNING but non-responding services to DISCONNECTED. - transitionAllNonRespondingService(now); + serviceInstanceRepository.transaction(configuration -> { + List instances = serviceInstanceRepository.findAllInstancesInStates(configuration, allRunningStates(), true); + List nonRespondingServices = filterAllNonRespondingServices(instances, now); + + nonRespondingServices.forEach(instance -> serviceInstanceRepository.mayTransitServiceTo( + configuration, + instance, + Service.ServiceState.DISCONNECTED, + DEFAULT_REASON_FOR_DISCONNECTED + )); + + // Eventually restart workers tasks + List workerIdsHavingTasksToRestart = nonRespondingServices.stream() + .filter(instance -> instance.is(Service.ServiceType.WORKER)) + .filter(instance -> instance.config().workerTaskRestartStrategy().equals(WorkerTaskRestartStrategy.IMMEDIATELY)) + .map(ServiceInstance::id) + .toList(); + + if (!workerIdsHavingTasksToRestart.isEmpty()) { + log.info("Trigger task restart for non-responding workers after timeout: {}.", workerIdsHavingTasksToRestart); + executor.get().reEmitWorkerJobsForWorkers(configuration, workerIdsHavingTasksToRestart); + } + }); // Finds all workers which are not in a RUNNING state. serviceInstanceRepository.transaction(configuration -> { - List nonRunningWorkers = serviceInstanceRepository + final List nonRunningWorkers = serviceInstanceRepository .findAllNonRunningInstances(configuration, true) .stream() .filter(instance -> instance.is(Service.ServiceType.WORKER)) @@ -75,14 +103,7 @@ protected void onSchedule(final Instant now) { uncleanShutdownWorkers.addAll(nonRunningWorkers.stream() .filter(nonRunning -> nonRunning.state().isDisconnectedOrTerminating()) .filter(disconnectedOrTerminating -> disconnectedOrTerminating.isTerminationGracePeriodElapsed(now)) - .peek(instance -> { - log.warn("Detected non-responding service [id={}, type={}, hostname={}] after termination grace period ({}ms).", - instance.id(), - instance.type(), - instance.server().hostname(), - now.toEpochMilli() - instance.updatedAt().toEpochMilli() - ); - }) + .peek(instance -> mayLogNonRespondingAfterTerminationGracePeriod(instance, now)) .toList() ); // ...all workers that have transitioned to TERMINATED_FORCED. @@ -93,8 +114,14 @@ protected void onSchedule(final Instant now) { // Re-emit all WorkerJobs for unclean workers if (!uncleanShutdownWorkers.isEmpty()) { - List ids = uncleanShutdownWorkers.stream().map(ServiceInstance::id).toList(); - executor.get().reEmitWorkerJobsForWorkers(configuration, ids); + List ids = uncleanShutdownWorkers.stream() + .filter(instance -> instance.config().workerTaskRestartStrategy().isRestartable()) + .map(ServiceInstance::id) + .toList(); + if (!ids.isEmpty()) { + log.info("Trigger task restart for non-responding workers after termination grace period: {}.", ids); + executor.get().reEmitWorkerJobsForWorkers(configuration, ids); + } } // Transit all GRACEFUL AND UNCLEAN SHUTDOWN workers to NOT_RUNNING. @@ -104,18 +131,18 @@ protected void onSchedule(final Instant now) { instance -> serviceInstanceRepository.mayTransitServiceTo(configuration, instance, ServiceState.NOT_RUNNING, - "The worker was detected as non-responsive after termination grace period. Service was transitioned to the 'NOT_RUNNING' state." + DEFAULT_REASON_FOR_NOT_RUNNING ) ); }); // Transition all TERMINATED services to NOT_RUNNING. serviceInstanceRepository - .findAllInstancesInStates(List.of(DISCONNECTED, TERMINATING, TERMINATED_GRACEFULLY, TERMINATED_FORCED)).stream() + .findAllInstancesInStates(Set.of(DISCONNECTED, TERMINATING, TERMINATED_GRACEFULLY, TERMINATED_FORCED)).stream() .filter(instance -> !instance.is(Service.ServiceType.WORKER)) // WORKERS are handle above. - .filter(instance ->instance.isTerminationGracePeriodElapsed(now)) - .forEach(instance -> safelyTransitionServiceTo(instance, NOT_RUNNING, null)); - + .filter(instance -> instance.isTerminationGracePeriodElapsed(now)) + .peek(instance -> mayLogNonRespondingAfterTerminationGracePeriod(instance, now)) + .forEach(instance -> safelyTransitionServiceTo(instance, NOT_RUNNING, DEFAULT_REASON_FOR_NOT_RUNNING)); // Soft delete all services which are NOT_RUNNING anymore. serviceInstanceRepository.findAllInstancesInState(ServiceState.NOT_RUNNING) @@ -124,6 +151,20 @@ protected void onSchedule(final Instant now) { mayDetectAndLogNewConnectedServices(); } + private static void mayLogNonRespondingAfterTerminationGracePeriod(final ServiceInstance instance, + final Instant now) { + + if (instance.state().isDisconnectedOrTerminating()) { + log.warn("Detected non-responding service [id={}, type={}, hostname={}] after termination grace period ({}ms).", + instance.id(), + instance.type(), + instance.server().hostname(), + now.toEpochMilli() - instance.updatedAt().toEpochMilli() + ); + } + } + + synchronized void setExecutor(final JdbcExecutor executor) { this.executor.set(executor); } diff --git a/jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcServiceInstanceRepositoryTest.java b/jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcServiceInstanceRepositoryTest.java index 8e087246729..769412a1015 100644 --- a/jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcServiceInstanceRepositoryTest.java +++ b/jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcServiceInstanceRepositoryTest.java @@ -5,6 +5,7 @@ import io.kestra.core.server.Service; import io.kestra.core.server.ServiceInstance; import io.kestra.core.server.ServiceStateTransition; +import io.kestra.core.server.WorkerTaskRestartStrategy; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.Network; import io.kestra.jdbc.JdbcTestUtils; @@ -123,39 +124,6 @@ protected void shouldFindAllInstancesInNotRunningState() { assertThat(results, Matchers.containsInAnyOrder(AbstractJdbcServiceInstanceRepositoryTest.Fixtures.allInNotRunningState().toArray())); } - @Test - protected void shouldFindTimeoutRunningInstancesGivenTimeoutInstance() { - // Given - final Instant now = Instant.now(); - ServiceInstance instance = AbstractJdbcServiceInstanceRepositoryTest.Fixtures.RunningServiceInstance - .state(Service.ServiceState.RUNNING, now.minus(Duration.ofSeconds(30)).truncatedTo(ChronoUnit.MILLIS)); - - repository.save(instance); - - // When - List results = repository.findAllTimeoutRunningInstances(now); - - // Then - assertEquals(1, results.size()); - assertThat(results, Matchers.containsInAnyOrder(instance)); - } - - @Test - protected void shouldNotFindTimeoutRunningInstanceGivenHealthyInstance() { - // Given - final Instant now = Instant.now(); - ServiceInstance instance = AbstractJdbcServiceInstanceRepositoryTest.Fixtures.RunningServiceInstance - .state(Service.ServiceState.RUNNING, now.minus(Duration.ofSeconds(5)).truncatedTo(ChronoUnit.MILLIS)); - - repository.save(instance); - - // When - List results = repository.findAllTimeoutRunningInstances(now); - - // Then - assertTrue(results.isEmpty()); - } - @Test void shouldReturnEmptyForTransitionWorkerStateGivenInvalidWorker() { // Given @@ -245,7 +213,9 @@ public static List allInNotRunningState() { serviceInstanceFor(Service.ServiceState.EMPTY); public static ServiceInstance serviceInstanceFor(final Service.ServiceState state) { - ServerConfig config = new ServerConfig(Duration.ZERO, + ServerConfig config = new ServerConfig( + Duration.ZERO, + WorkerTaskRestartStrategy.AFTER_TERMINATION_GRACE_PERIOD, new ServerConfig.Liveness( true, Duration.ZERO, diff --git a/jdbc/src/test/java/io/kestra/jdbc/server/JdbcServiceLivenessManagerTest.java b/jdbc/src/test/java/io/kestra/jdbc/server/JdbcServiceLivenessManagerTest.java index 70b7bf4b104..f414dc13965 100644 --- a/jdbc/src/test/java/io/kestra/jdbc/server/JdbcServiceLivenessManagerTest.java +++ b/jdbc/src/test/java/io/kestra/jdbc/server/JdbcServiceLivenessManagerTest.java @@ -3,7 +3,6 @@ import io.kestra.core.contexts.KestraContext; import io.kestra.core.models.ServerType; import io.kestra.core.repositories.ServiceInstanceRepositoryInterface; -import io.kestra.core.runners.RunContext; import io.kestra.core.server.ServerConfig; import io.kestra.core.server.ServerInstanceFactory; import io.kestra.core.server.Service; @@ -11,6 +10,7 @@ import io.kestra.core.server.LocalServiceStateFactory; import io.kestra.core.server.ServiceRegistry; import io.kestra.core.server.ServiceStateTransition; +import io.kestra.core.server.WorkerTaskRestartStrategy; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -47,7 +47,9 @@ void beforeEach() { Mockito.when(context.getServerType()).thenReturn(ServerType.WORKER); Mockito.when(context.getVersion()).thenReturn(""); KestraContext.setContext(context); - ServerConfig config = new ServerConfig(Duration.ZERO, + ServerConfig config = new ServerConfig( + Duration.ZERO, + WorkerTaskRestartStrategy.AFTER_TERMINATION_GRACE_PERIOD, new ServerConfig.Liveness( true, Duration.ZERO, diff --git a/repository-memory/src/main/java/io/kestra/repository/memory/MemoryServiceInstanceRepository.java b/repository-memory/src/main/java/io/kestra/repository/memory/MemoryServiceInstanceRepository.java index 8781577894b..da576a510a3 100644 --- a/repository-memory/src/main/java/io/kestra/repository/memory/MemoryServiceInstanceRepository.java +++ b/repository-memory/src/main/java/io/kestra/repository/memory/MemoryServiceInstanceRepository.java @@ -68,7 +68,7 @@ public List findAllInstancesInState(Service.ServiceState state) /** {@inheritDoc} **/ @Override - public List findAllInstancesInStates(List states) { + public List findAllInstancesInStates(Set states) { List instancesInStates = new ArrayList<>(); for (ServiceInstance instance : data.values()) { if (states.contains(instance.state())) {