Skip to content

Commit

Permalink
fix(core): don't retry a task if the execution is killed (#2057)
Browse files Browse the repository at this point in the history
Note that this will make the task not killing but ends with the "previous" state (failed), the execution will then be marked as failed and not killed at the moment, but if #1747 is merged the execution will correctly be marked as KILLED.
Anyway, currently, without this , the exuction is also marked as FAILED but the attempts continue so this PR improve the current process by shortcirciting the attempt for killed execution.
  • Loading branch information
loicmathieu authored Sep 18, 2023
1 parent b9b24e6 commit 8733cd6
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu
WorkerTask finalWorkerTask = Failsafe
.with(AbstractRetry.<WorkerTask>retryPolicy(workerTask.getTask().getRetry())
.handleResultIf(result -> result.getTaskRun().lastAttempt() != null &&
Objects.requireNonNull(result.getTaskRun().lastAttempt()).getState().getCurrent() == State.Type.FAILED
result.getTaskRun().lastAttempt().getState().getCurrent() == State.Type.FAILED &&
!killedExecution.contains(result.getTaskRun().getExecutionId())
)
.onRetry(e -> {
WorkerTask lastResult = e.getLastResult();
Expand Down Expand Up @@ -443,7 +444,7 @@ private WorkerTask runAttempt(WorkerTask workerTask) {

Logger logger = runContext.logger();

if (!(workerTask.getTask() instanceof RunnableTask)) {
if (!(workerTask.getTask() instanceof RunnableTask<?> task)) {
// This should never happen but better to deal with it than crashing the Worker
TaskRunAttempt attempt = TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build();
List<TaskRunAttempt> attempts = this.addAttempt(workerTask, attempt);
Expand All @@ -453,8 +454,6 @@ private WorkerTask runAttempt(WorkerTask workerTask) {
return workerTask.withTaskRun(taskRun);
}

RunnableTask<?> task = (RunnableTask<?>) workerTask.getTask();

TaskRunAttempt.TaskRunAttemptBuilder builder = TaskRunAttempt.builder()
.state(new State().withState(State.Type.RUNNING));

Expand Down Expand Up @@ -502,7 +501,7 @@ private WorkerTask runAttempt(WorkerTask workerTask) {
log.debug("Outputs\n{}", JacksonMapper.log(workerThread.getTaskOutput()));
}

if (runContext.metrics().size() > 0 && log.isTraceEnabled()) {
if (!runContext.metrics().isEmpty() && log.isTraceEnabled()) {
log.trace("Metrics\n{}", JacksonMapper.log(runContext.metrics()));
}

Expand Down

0 comments on commit 8733cd6

Please sign in to comment.