Skip to content

Commit

Permalink
feat(): retry-flow (#3392)
Browse files Browse the repository at this point in the history
* feat(): retry-flow

* fix: rename behavior

* feat: created a metadata props for executions

* fix(ui): translate + new metadata prop
  • Loading branch information
Skraye committed Apr 3, 2024
1 parent 90f28cc commit 81f281e
Show file tree
Hide file tree
Showing 31 changed files with 603 additions and 95 deletions.
17 changes: 13 additions & 4 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public class Execution implements DeletedInterface, TenantInterface {
@Builder.Default
boolean deleted = false;

@With
ExecutionMetadata metadata;

/**
* Factory method for constructing a new {@link Execution} object for the given {@link Flow} and inputs.
*
Expand Down Expand Up @@ -134,6 +137,9 @@ public static Execution newExecution(final Flow flow,
public static class ExecutionBuilder {
void prebuild() {
this.originalId = this.id;
this.metadata = ExecutionMetadata.builder()
.originalCreatedDate(Instant.now())
.build();
}
}

Expand Down Expand Up @@ -165,7 +171,8 @@ public Execution withState(State.Type state) {
this.parentId,
this.originalId,
this.trigger,
this.deleted
this.deleted,
this.metadata
);
}

Expand Down Expand Up @@ -197,7 +204,8 @@ public Execution withTaskRun(TaskRun taskRun) throws InternalException {
this.parentId,
this.originalId,
this.trigger,
this.deleted
this.deleted,
this.metadata
);
}

Expand All @@ -217,7 +225,8 @@ public Execution childExecution(String childExecutionId, List<TaskRun> taskRunLi
childExecutionId != null ? this.getId() : null,
this.originalId,
this.trigger,
this.deleted
this.deleted,
this.metadata
);
}

Expand Down Expand Up @@ -449,7 +458,7 @@ public boolean hasFailedNoRetry(List<ResolvedTask> resolvedTasks, TaskRun parent
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'", taskRun.getId(), parentTaskRun.getId());
return false;
}
return !taskRun.shouldBeRetried(resolvedTask.getTask()) && taskRun.getState().isFailed();
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry()) && taskRun.getState().isFailed();
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.kestra.core.models.executions;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.With;

import java.time.Instant;

@Builder(toBuilder = true)
@Setter
@Getter
public class ExecutionMetadata {
@Builder.Default
@With
Integer attemptNumber = 1;

Instant originalCreatedDate;

public ExecutionMetadata nextAttempt() {
return this.toBuilder()
.attemptNumber(this.attemptNumber + 1)
.build();
}
}
43 changes: 34 additions & 9 deletions core/src/main/java/io/kestra/core/models/executions/TaskRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.validation.constraints.NotNull;
Expand Down Expand Up @@ -219,28 +219,53 @@ public String toStringState() {
}

/**
* This method is used when the retry is apply on a task
* but the retry type is NEW_EXECUTION
*
* @param task Contains the retry configuration
* @return The next retry date, null if maxAttempt is reached
* @param retry Contains the retry configuration
* @param execution Contains the attempt number and original creation date
* @return The next retry date, null if maxAttempt || maxDuration is reached
*/
public Instant nextRetryDate(Task task) {
if (this.attempts == null || this.attempts.isEmpty() || task.getRetry() == null || this.getAttempts().size() >= task.getRetry().getMaxAttempt()) {
public Instant nextRetryDate(AbstractRetry retry, Execution execution) {
if (retry.getMaxAttempt() != null && execution.getMetadata().getAttemptNumber() >= retry.getMaxAttempt()) {

return null;
}
Instant base = this.lastAttempt().getState().maxDate();
Instant nextDate = retry.nextRetryDate(execution.getMetadata().getAttemptNumber(), base);
if (retry.getMaxDuration() != null && nextDate.isAfter(execution.getMetadata().getOriginalCreatedDate().plus(retry.getMaxDuration()))) {

Instant nextDate = task.getRetry().nextRetryDate(this.attempts.size(), base);
if (task.getRetry().getMaxDuration() != null && nextDate.isAfter(this.lastAttempt().getState().minDate().plus(task.getRetry().getMaxDuration()))) {
return null;
}

return nextDate;
}

/**
* This method is used when the Retry definition comes from the flow
* @param retry The retry configuration
* @return The next retry date, null if maxAttempt || maxDuration is reached
*/
public Instant nextRetryDate(AbstractRetry retry) {
if (this.attempts == null || this.attempts.isEmpty() || (retry.getMaxAttempt() != null && this.attemptNumber() >= retry.getMaxAttempt())) {

return null;
}
Instant base = this.lastAttempt().getState().maxDate();
Instant nextDate = retry.nextRetryDate(this.attempts.size(), base);
if (retry.getMaxDuration() != null && nextDate.isAfter(this.attempts.get(0).getState().minDate().plus(retry.getMaxDuration()))) {

return null;
}

return nextDate;
}

public boolean shouldBeRetried(Task task) {
return this.nextRetryDate(task) != null;
public boolean shouldBeRetried(AbstractRetry retry) {
if (retry == null) {
return false;
}
return this.nextRetryDate(retry) != null;
}


Expand Down
12 changes: 8 additions & 4 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
Expand All @@ -27,6 +28,10 @@
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Valid;
import jakarta.validation.constraints.*;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
Expand All @@ -35,10 +40,6 @@
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Valid;
import jakarta.validation.constraints.*;

@SuperBuilder(toBuilder = true)
@Getter
Expand Down Expand Up @@ -121,6 +122,9 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
List<Output> outputs;

@Valid
protected AbstractRetry retry;

public Logger logger() {
return LoggerFactory.getLogger("flow." + this.id);
}
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/java/io/kestra/core/models/flows/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ public boolean isRestartable() {
return this.current.isFailed() || this.isPaused();
}

@JsonIgnore
public boolean isResumable() {
return this.current.isPaused() || this.current.isRetrying();
}


@Introspected
public enum Type {
Expand All @@ -172,10 +177,11 @@ public enum Type {
KILLED,
CANCELLED,
QUEUED,
RETRYING;
RETRYING,
RETRIED;

public boolean isTerminated() {
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED;
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED;
}

public boolean isCreated() {
Expand All @@ -195,8 +201,9 @@ public boolean isPaused() {
}

public boolean isRetrying() {
return this == Type.RETRYING;
return this == Type.RETRYING || this == Type.RETRIED;
}

}

@Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public abstract class AbstractRetry {
@Builder.Default
private Boolean warningOnRetry = false;

@Builder.Default
private Behavior behavior = Behavior.RETRY_FAILED_TASK;

public abstract Instant nextRetryDate(Integer attemptCount, Instant lastAttempt);

public <T> RetryPolicy<T> toPolicy() {
Expand All @@ -58,4 +61,9 @@ public static <T> RetryPolicy<T> retryPolicy(AbstractRetry retry) {
return new RetryPolicy<T>()
.withMaxAttempts(1);
}

public enum Behavior {
RETRY_FAILED_TASK,
CREATE_NEW_EXECUTION
}
}
19 changes: 17 additions & 2 deletions core/src/main/java/io/kestra/core/runners/ExecutionDelay.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.flows.State;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;

import java.time.Instant;

import jakarta.validation.constraints.NotNull;

@Value
@AllArgsConstructor
@Builder
Expand All @@ -25,8 +24,24 @@ public class ExecutionDelay {

@NotNull State.Type state;

@NotNull DelayType delayType;

@JsonIgnore
public String uid() {
return String.join("_", executionId, taskRunId);
}

/**
* For previous version, return RESUME_FLOW by default as it was the only case
* @return DelayType representing the action to do when
*/
public DelayType getDelayType() {
return delayType == null ? DelayType.RESUME_FLOW : delayType;
}

public enum DelayType {
RESUME_FLOW,
RESTART_FAILED_TASK,
RESTART_FAILED_FLOW
}
}
Loading

0 comments on commit 81f281e

Please sign in to comment.