Skip to content

Commit

Permalink
feat(core): add task restart strategy on worker failure (#3343,#3351)
Browse files Browse the repository at this point in the history
Changes:
- add new config 'kestra.server.workerTaskRestartStrategy'
- update default value for 'kestra.server.liveness.initialDelay' to match timeout
- fix schedule initial delay to 0
- cleanup javadoc

Fix: #3343
Fix: #3351
  • Loading branch information
fhussonnois committed Mar 25, 2024
1 parent 96f1eef commit 89573d6
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 122 deletions.
3 changes: 2 additions & 1 deletion cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ kestra:
max-rows: 5000
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
terminationGracePeriod: 5m
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
# Configuration for Liveness and Heartbeat mechanism between servers.
liveness:
enabled: true
Expand All @@ -224,7 +225,7 @@ kestra:
# The timeout used to detect service failures.
timeout: 45s
# The time to wait before executing a liveness probe.
initialDelay: 30s
initialDelay: 45s
# The expected time between service heartbeats.
heartbeatInterval: 3s
anonymous-usage-report:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,6 @@ ArrayListTotal<ServiceInstance> find(Pageable pageable,
*/
ServiceInstance save(ServiceInstance service);

/**
* Finds all running service instances which have not sent a state update for more than their associated timeout,
* and thus should be considered in failure.
*
* @param now the time instant to be used for querying.
* @return the list of {@link ServiceInstance}.
*/
default List<ServiceInstance> findAllTimeoutRunningInstances(final Instant now) {
return findAllInstancesInStates(List.of(Service.ServiceState.CREATED, Service.ServiceState.RUNNING))
.stream()
.filter(instance -> instance.isSessionTimeoutElapsed(now))
.toList();
}

/**
* Finds all service instances which are in the given state.
*
Expand All @@ -84,7 +70,7 @@ default List<ServiceInstance> findAllTimeoutRunningInstances(final Instant now)
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInStates(final List<Service.ServiceState> states);
List<ServiceInstance> findAllInstancesInStates(final Set<Service.ServiceState> states);

/**
* Attempt to transition the state of a given service to given new state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.time.Instant;
import java.util.List;
import java.util.Random;
import java.util.random.RandomGenerator;
import java.util.Set;

import static io.kestra.core.server.Service.ServiceState.CREATED;
import static io.kestra.core.server.Service.ServiceState.RUNNING;
Expand All @@ -23,6 +23,14 @@ public abstract class AbstractServiceLivenessCoordinator extends AbstractService

private final static int DEFAULT_SCHEDULE_JITTER_MAX_MS = 500;

protected static String DEFAULT_REASON_FOR_DISCONNECTED =
"The service was detected as non-responsive after the session timeout. " +
"Service transitioned to the 'DISCONNECTED' state.";

protected static String DEFAULT_REASON_FOR_NOT_RUNNING =
"The service was detected as non-responsive or terminated after termination grace period. " +
"Service transitioned to the 'NOT_RUNNING' state.";

private static final String TASK_NAME = "service-liveness-coordinator-task";

protected final ServiceInstanceRepositoryInterface serviceInstanceRepository;
Expand Down Expand Up @@ -56,21 +64,11 @@ protected Duration getScheduleInterval() {
return serverConfig.liveness().interval().plus(Duration.ofMillis(jitter));
}

/**
* Transitions to the DISCONNECTED state all non-local services for which liveness
* is enabled and are detected as non-responsive .
*
* @param now the instant.
*/
protected void transitionAllNonRespondingService(final Instant now) {

// Detect and handle non-responding services.
List<ServiceInstance> nonRespondingServices = serviceInstanceRepository
// gets all non-responding services.
.findAllTimeoutRunningInstances(now)
.stream()
// keep only services with liveness enabled.
protected List<ServiceInstance> filterAllNonRespondingServices(final List<ServiceInstance> instances,
final Instant now) {
return instances.stream()
.filter(instance -> instance.config().liveness().enabled())
.filter(instance -> instance.isSessionTimeoutElapsed(now))
// exclude any service running on the same server as the executor, to prevent the latter from shutting down.
.filter(instance -> !instance.server().id().equals(serverId))
// only keep services eligible for liveness probe
Expand All @@ -87,18 +85,12 @@ protected void transitionAllNonRespondingService(final Instant now) {
))
.toList();

// Attempt to transit all non-responding services to DISCONNECTED.
nonRespondingServices.forEach(instance -> this.safelyTransitionServiceTo(
instance,
Service.ServiceState.DISCONNECTED,
"The service was detected as non-responsive after the session timeout. Service was transitioned to the 'DISCONNECTED' state."
));
}

protected void mayDetectAndLogNewConnectedServices() {
if (log.isInfoEnabled()) {
// Log the newly-connected services (useful for troubleshooting).
serviceInstanceRepository.findAllInstancesInStates(List.of(CREATED, RUNNING))
serviceInstanceRepository.findAllInstancesInStates(Set.of(CREATED, RUNNING))
.stream()
.filter(instance -> instance.createdAt().isAfter(lastScheduledExecution()))
.forEach(instance -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void run(final Instant now) {
long timeout = serverConfig.liveness().timeout().toMillis();
if (elapsed > timeout) {
// useful for debugging unexpected heartbeat timeout
log.warn("Thread starvation or clock leap detected (elapsed since previous schedule {}", elapsed);
log.warn("Thread starvation or clock leap detected (elapsed since previous schedule {}ms", elapsed);
}
onSchedule(now);
} catch (Exception e) {
Expand Down Expand Up @@ -89,18 +89,17 @@ protected long getElapsedMilliSinceLastSchedule(final Instant now) {
public void start() {
if (!isLivenessEnabled()) {
log.warn(
"Server liveness is currently disabled (`kestra.server.liveness.enabled=false`) " +
"If you are running in production environment, please ensure this property is configured to `true`. "
"Server liveness is currently disabled (kestra.server.liveness.enabled=false) " +
"If you are running in production environment, please ensure this property is configured to 'true'. "
);
}
if (scheduledExecutorService == null && !isStopped.get()) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
Duration scheduleInterval = getScheduleInterval();
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
Duration initialDelay = serverConfig.liveness().initialDelay();
scheduledExecutorService.scheduleAtFixedRate(
this,
initialDelay.toSeconds(),
0,
scheduleInterval.toSeconds(),
TimeUnit.SECONDS
);
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/java/io/kestra/core/server/ServerConfig.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.kestra.core.server;

import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.bind.annotation.Bindable;
import jakarta.validation.constraints.NotNull;

import java.time.Duration;
import java.util.Optional;

/**
* Server configuration.
Expand All @@ -18,9 +20,21 @@ public record ServerConfig(
@Bindable(defaultValue = "5m")
Duration terminationGracePeriod,

@Bindable(defaultValue = "AFTER_TERMINATION_GRACE_PERIOD")
@Nullable
WorkerTaskRestartStrategy workerTaskRestartStrategy,

Liveness liveness

) {

/** {@inheritDoc} **/
public WorkerTaskRestartStrategy workerTaskRestartStrategy() {
return Optional
.ofNullable(workerTaskRestartStrategy)
.orElse(WorkerTaskRestartStrategy.AFTER_TERMINATION_GRACE_PERIOD);
}

/**
* Configuration for Liveness and Heartbeat mechanism between Kestra Services, and Executor.
*
Expand All @@ -39,7 +53,7 @@ public record Liveness(
@NotNull @Bindable(defaultValue = "5s")
Duration interval,
@NotNull @Bindable(defaultValue = "45s") Duration timeout,
@NotNull @Bindable(defaultValue = "30s") Duration initialDelay,
@NotNull @Bindable(defaultValue = "45s") Duration initialDelay,
@NotNull @Bindable(defaultValue = "3s") Duration heartbeatInterval
) {
}
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/io/kestra/core/server/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -134,8 +135,7 @@ public boolean isValidTransition(final ServiceState newState) {
}

public boolean isRunning() {
return equals(CREATED)
|| equals(RUNNING);
return allRunningStates().contains(this);
}

public boolean isDisconnectedOrTerminating() {
Expand All @@ -149,5 +149,9 @@ public boolean hasCompletedTermination() {
|| equals(NOT_RUNNING)
|| equals(EMPTY);
}

public static Set<ServiceState> allRunningStates() {
return Set.of(CREATED, RUNNING);
}
}
}
7 changes: 0 additions & 7 deletions core/src/main/java/io/kestra/core/server/WorkerConfig.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.kestra.core.server;

/**
* The supported strategies for restarting tasks on worker failure.
*/
public enum WorkerTaskRestartStrategy {

/**
* Tasks are never restarted on worker failure.
*/
NEVER,
/**
* Tasks are restarted immediately on worker failure, i.e., as soon as a worker id detected as disconnected.
* This strategy is used to reduce task recovery times at the risk of introducing duplicate executions.
*/
IMMEDIATELY,
/**
* Tasks are restarted on worker failure after the termination grace period is elapsed.
* This strategy is used to limit the risk of task duplication.
*/
AFTER_TERMINATION_GRACE_PERIOD;

/**
* Checks whether tasks are restartable for that strategy.
*
* @return {@code true} if tasks are restartable.
*/
public boolean isRestartable() {
return this.equals(IMMEDIATELY) || this.equals(AFTER_TERMINATION_GRACE_PERIOD);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

class ServiceInstanceTest {

public static final ServerConfig CONFIG = new ServerConfig(Duration.ZERO,
public static final ServerConfig CONFIG = new ServerConfig(
Duration.ZERO,
WorkerTaskRestartStrategy.AFTER_TERMINATION_GRACE_PERIOD,
new ServerConfig.Liveness(
true,
Duration.ZERO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public class ServiceLivenessManagerTest {
@BeforeEach
void beforeEach() {
KestraContext kestraContext = Mockito.mock(KestraContext.class);
ServerConfig config = new ServerConfig(Duration.ZERO,
ServerConfig config = new ServerConfig(
Duration.ZERO,
WorkerTaskRestartStrategy.AFTER_TERMINATION_GRACE_PERIOD,
new ServerConfig.Liveness(
true,
Duration.ZERO,
Expand Down Expand Up @@ -180,7 +182,9 @@ public ServiceState getState() {
}

public static ServiceInstance serviceInstanceFor(final Service service) {
ServerConfig config = new ServerConfig(Duration.ZERO,
ServerConfig config = new ServerConfig(
Duration.ZERO,
WorkerTaskRestartStrategy.AFTER_TERMINATION_GRACE_PERIOD,
new ServerConfig.Liveness(
true,
Duration.ZERO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,22 @@ public List<ServiceInstance> findAllInstancesInState(final Service.ServiceState
* {@inheritDoc}
**/
@Override
public List<ServiceInstance> findAllInstancesInStates(final List<Service.ServiceState> states) {
public List<ServiceInstance> findAllInstancesInStates(final Set<Service.ServiceState> states) {
return this.jdbcRepository.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> query = using(configuration)
.select(VALUE)
.from(table())
.where(STATE.in(states.stream().map(Enum::name).toList()));
return this.jdbcRepository.fetch(query);
});
.transactionResult(configuration -> findAllInstancesInStates(configuration, states, false));
}

public List<ServiceInstance> findAllInstancesInStates(final Configuration configuration,
final Set<Service.ServiceState> states,
final boolean isForUpdate) {
SelectConditionStep<Record1<Object>> query = using(configuration)
.select(VALUE)
.from(table())
.where(STATE.in(states.stream().map(Enum::name).toList()));

return isForUpdate ?
this.jdbcRepository.fetch(query.forUpdate()) :
this.jdbcRepository.fetch(query);
}

/**
Expand Down Expand Up @@ -234,7 +241,7 @@ public ServiceStateTransition.Response mayTransitionServiceTo(final ServiceInsta
}

/**
* Attempt to transit the status of a given service to given new status.
* Attempt to transition the state of a given service to given new state.
* This method may not update the service if the transition is not valid.
*
* @param instance the service instance.
Expand All @@ -255,7 +262,7 @@ public ServiceStateTransition.Response mayTransitServiceTo(final Configuration c
}

/**
* Attempt to transit the status of a given service to given new status.
* Attempt to transition the state of a given service to given new state.
* This method may not update the service if the transition is not valid.
*
* @param instance the new service instance.
Expand Down
Loading

0 comments on commit 89573d6

Please sign in to comment.