Skip to content

Commit

Permalink
feat(core): enhance Kestra services liveness mechanism
Browse files Browse the repository at this point in the history
part-of: #3055
  • Loading branch information
fhussonnois committed Mar 18, 2024
1 parent 66a030a commit 015659a
Show file tree
Hide file tree
Showing 88 changed files with 3,780 additions and 1,011 deletions.
25 changes: 17 additions & 8 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ kestra:
flowtopologies:
table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology
workerinstance:
cls: io.kestra.core.runners.WorkerInstance
table: "worker_instance"
serviceinstance:
cls: io.kestra.core.server.ServiceInstance
table: "service_instance"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"
Expand Down Expand Up @@ -214,13 +214,22 @@ kestra:
preview:
initial-rows: 100
max-rows: 5000

# The expected time a server to complete all of its
# tasks before initiating a graceful shutdown.
terminationGracePeriod: 5m
# Configuration for Liveness and Heartbeat mechanism between servers.
liveness:
enabled: true
# The expected time between liveness probe.
interval: 5s
# The timeout used to detect service failures.
timeout: 45s
# The time to wait before executing a liveness probe.
initialDelay: 30s
# The expected time between service heartbeats.
heartbeatInterval: 3s
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/usages
initial-delay: 5m
fixed-delay: 1h

heartbeat:
frequency: 10s
heartbeat-missed: 3
4 changes: 3 additions & 1 deletion cli/src/test/resources/application-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ kestra:
url: https://repo.maven.apache.org/maven2/
sonatype:
url: https://s01.oss.sonatype.org/content/repositories/snapshots/

server:
liveness:
enabled: false
micronaut:
http:
services:
Expand Down
101 changes: 101 additions & 0 deletions core/src/main/java/io/kestra/core/contexts/KestraContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package io.kestra.core.contexts;

import io.kestra.core.models.ServerType;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import jakarta.annotation.PreDestroy;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

/**
* Utility class for retrieving common information about a Kestra Server at runtime.
*/
public abstract class KestraContext {

private static final AtomicReference<KestraContext> INSTANCE = new AtomicReference<>();

// Properties
private static final String KESTRA_SERVER_TYPE = "kestra.server-type";

/**
* Gets the current {@link KestraContext}.
*
* @return The context.
* @throws IllegalStateException if not context is initialized.
*/
public static KestraContext getContext() {
return Optional.ofNullable(INSTANCE.get())
.orElseThrow(() -> new IllegalStateException("Kestra context not initialized"));
}

/**
* Sets the current {@link KestraContext}.
*
* @param context The context.
*/
public static void setContext(final KestraContext context) {
KestraContext.INSTANCE.set(context);
}

/**
* Returns the current {@link ServerType}.
*
* @return The {@link ServerType}.
*/
public ServerType getServerType() {
throw new UnsupportedOperationException();
}

/**
* Stops Kestra.
*/
public void exit(int status) {
throw new UnsupportedOperationException();
}

/**
* Kestra context initializer
*/
@Context
@Requires(missingBeans = KestraContext.class)
public static class Initializer extends KestraContext {

private final ApplicationContext applicationContext;
private final Environment environment;

/**
* Creates a new {@link KestraContext} instance.
*
* @param applicationContext The {@link ApplicationContext}.
* @param environment The {@link Environment}.
*/
public Initializer(ApplicationContext applicationContext, Environment environment) {
this.applicationContext = applicationContext;
this.environment = environment;
KestraContext.setContext(this);
}

/** {@inheritDoc} **/
@Override
public ServerType getServerType() {
return Optional.ofNullable(environment)
.flatMap(env -> env.getProperty(KESTRA_SERVER_TYPE, ServerType.class))
.orElseThrow(() -> new IllegalStateException("Cannot found required environment property '" + KESTRA_SERVER_TYPE + "'."));
}

/** {@inheritDoc} **/
@Override
public void exit(int status) {
applicationContext.close();
Runtime.getRuntime().exit(status);
}

@PreDestroy
public void dispose() {
setContext(null);
}
}
}
5 changes: 4 additions & 1 deletion core/src/main/java/io/kestra/core/queues/QueueService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.*;
import io.kestra.core.server.ServiceInstance;
import jakarta.inject.Singleton;

@Singleton
Expand All @@ -20,7 +21,7 @@ public String key(Object object) {
} else if (object.getClass() == WorkerTaskRunning.class) {
return ((WorkerTaskRunning) object).getTaskRun().getId();
} else if (object.getClass() == WorkerInstance.class) {
return ((WorkerInstance) object).getWorkerUuid().toString();
return ((WorkerInstance) object).getWorkerUuid();
} else if (object.getClass() == WorkerTaskResult.class) {
return ((WorkerTaskResult) object).getTaskRun().getId();
} else if (object.getClass() == LogEntry.class) {
Expand Down Expand Up @@ -59,6 +60,8 @@ public String key(Object object) {
return ((WorkerTriggerResult) object).getTriggerContext().uid();
} else if (object.getClass() == ExecutionQueued.class) {
return ((ExecutionQueued) object).uid();
} else if (object.getClass() == ServiceInstance.class) {
return ((ServiceInstance) object).id();
} else {
throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ public interface WorkerJobQueueInterface extends Closeable {

void pause();

void cleanup();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package io.kestra.core.repositories;

import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.ServiceStateTransition;
import org.apache.commons.lang3.tuple.ImmutablePair;

import java.time.Instant;
import java.util.List;
import java.util.Optional;

/**
* Repository service for storing service instance.
*
* @see io.kestra.core.server.ServerInstance
*/
public interface ServiceInstanceRepositoryInterface {

/**
* Finds the service instance for the given id.
*
* @param id The service's ID. cannot be {@code null}.
* @return an {@link Optional} of {@link ServiceInstance}, or {@link Optional#empty()}
*/
Optional<ServiceInstance> findById(String id);

/**
* Finds all service instance.
*
* @return a list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAll();

/**
* Deletes the given service instance.
*
* @param service The service to be deleted.
*/
void delete(ServiceInstance service);

/**
* Saves the given service instance.
*
* @param service The service to be saved.
* @return The saved instance.
*/
ServiceInstance save(ServiceInstance service);

/**
* Finds all running service instances which have not sent a state update for more than their associated timeout,
* and thus should be considered in failure.
*
* @param now the time instant to be used for querying.
* @return the list of {@link ServiceInstance}.
*/
default List<ServiceInstance> findAllTimeoutRunningInstances(final Instant now) {
return findAllInstancesInStates(List.of(Service.ServiceState.CREATED, Service.ServiceState.RUNNING))
.stream()
.filter(instance -> instance.isSessionTimeoutElapsed(now))
.toList();
}

/**
* Finds all service instances which are in the given state.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInState(final Service.ServiceState state);

/**
* Finds all service instances which are in the given state.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInStates(final List<Service.ServiceState> states);

/**
* Attempt to transition the state of a given service to given new state.
* This method may not update the service if the transition is not valid.
*
* @param instance the service instance.
* @param newState the new state of the service.
* @return an optional of the {@link ServiceInstance} or {@link Optional#empty()} if the service is not running.
*/
default ServiceStateTransition.Response mayTransitionServiceTo(final ServiceInstance instance,
final Service.ServiceState newState) {
return mayTransitionServiceTo(instance, newState, null);
}

/**
* Attempt to transition the state of a given service to given new state.
* This method may not update the service if the transition is not valid.
*
* @param instance the service instance.
* @param newState the new state of the service.
* @param reason the human-readable reason of the state transition
* @return an optional of the {@link ServiceInstance} or {@link Optional#empty()} if the service is not running.
*/
default ServiceStateTransition.Response mayTransitionServiceTo(final ServiceInstance instance,
final Service.ServiceState newState,
final String reason) {
// This default method is not transactional and may lead to inconsistent state transition.
synchronized (this) {
Optional<ServiceInstance> optional = findById(instance.id());
final ImmutablePair<ServiceInstance, ServiceInstance> beforeAndAfter;
// UNKNOWN service
if (optional.isEmpty()) {
beforeAndAfter = null;
// VALID service transition
} else if (optional.get().state().isValidTransition(newState)) {
ServiceInstance updated = optional.get().updateState(newState, Instant.now(), reason);
beforeAndAfter = new ImmutablePair<>(optional.get(), save(updated));
// INVALID service transition
} else {
beforeAndAfter = new ImmutablePair<>(optional.get(), null);
}
return ServiceStateTransition.logTransitionAndGetResponse(instance, newState, beforeAndAfter);
}
}
}

This file was deleted.

30 changes: 0 additions & 30 deletions core/src/main/java/io/kestra/core/runners/ServerInstance.java

This file was deleted.

Loading

0 comments on commit 015659a

Please sign in to comment.