Skip to content

Commit

Permalink
feat(core): allow to skip and execution (#1678)
Browse files Browse the repository at this point in the history
close #767
  • Loading branch information
loicmathieu authored Aug 18, 2023
1 parent 38f6048 commit 304d921
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;

import java.util.Collections;
import java.util.List;
import java.util.Map;

@CommandLine.Command(
Expand All @@ -20,6 +23,12 @@ public class ExecutorCommand extends AbstractServerCommand {
@Inject
private ApplicationContext applicationContext;

@Inject
private SkipExecutionService skipExecutionService;

@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipExecutions = Collections.emptyList();

@SuppressWarnings("unused")
public static Map<String, Object> propertiesOverrides() {
return ImmutableMap.of(
Expand All @@ -29,6 +38,8 @@ public static Map<String, Object> propertiesOverrides() {

@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipExecutions(skipExecutions);

super.call();

ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.StandAloneRunner;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
Expand All @@ -12,6 +13,8 @@

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

@CommandLine.Command(
Expand All @@ -26,12 +29,18 @@ public class StandAloneCommand extends AbstractServerCommand {
@Inject
private ApplicationContext applicationContext;

@Inject
private SkipExecutionService skipExecutionService;

@CommandLine.Option(names = {"-f", "--flow-path"}, description = "the flow path containing flow to inject at startup (when running with a memory flow repository)")
private File flowPath;

@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker thread")
private Integer workerThread;

@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipExecutions = Collections.emptyList();

@SuppressWarnings("unused")
public static Map<String, Object> propertiesOverrides() {
return ImmutableMap.of(
Expand All @@ -41,6 +50,8 @@ public static Map<String, Object> propertiesOverrides() {

@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipExecutions(skipExecutions);

super.call();

if (flowPath != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.kestra.core.services;

import jakarta.inject.Singleton;

import java.util.Collections;
import java.util.List;

@Singleton
public class SkipExecutionService {
private volatile List<String> skipExecutions = Collections.emptyList();

public synchronized void setSkipExecutions(List<String> skipExecutions) {
this.skipExecutions = skipExecutions;
}

public boolean skipExecution(String executionId) {
return skipExecutions.contains(executionId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.kestra.core.runners;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.tasks.debugs.Return;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@Singleton
public class SkipExecutionCaseTest {
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;

@Inject
protected RunnerUtils runnerUtils;

@Inject
private ExecutionRepositoryInterface executionRepository;

@Inject
private SkipExecutionService skipExecutionService;

public void skipExecution() throws TimeoutException, InterruptedException {
Flow flow = createFlow();
Execution execution1 = runnerUtils.newExecution(flow, null, null);
String execution1Id = execution1.getId();
skipExecutionService.setSkipExecutions(List.of(execution1Id));

executionQueue.emit(execution1);
Execution execution2 = runnerUtils.runOne("io.kestra.tests", "minimal");

// the execution 2 should be in success and the 1 still created
assertThat(execution2.getState().getCurrent(), is(State.Type.SUCCESS));
Thread.sleep(25); // to be 100% sure that it works, add a slight delay to be sure we didn't miss the execution by chance
execution1 = executionRepository.findById(execution1Id).get();
assertThat(execution1.getState().getCurrent(), is(State.Type.CREATED));
}

private Flow createFlow() {
return Flow.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.revision(1)
.tasks(Collections.singletonList(Return.builder()
.id("test")
.type(Return.class.getName())
.format("{{ inputs.testInputs }}")
.build()))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.kestra.core.services;

import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@MicronautTest
class SkipExecutionServiceTest {
@Inject
private SkipExecutionService skipExecutionService;

@Test
void test() {
var executionToSkip = "aaabbbccc";
var executionNotToSkip = "bbbcccddd";

skipExecutionService.setSkipExecutions(List.of(executionToSkip));

assertThat(skipExecutionService.skipExecution(executionToSkip), is(true));
assertThat(skipExecutionService.skipExecution(executionNotToSkip), is(false));
}
}
13 changes: 13 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public class JdbcExecutor implements ExecutorInterface {
@Inject
private WorkerGroupService workerGroupService;

@Inject
private SkipExecutionService skipExecutionService;

@SneakyThrows
@Override
public void run() {
Expand Down Expand Up @@ -190,6 +193,11 @@ public void run() {
}

private void executionQueue(Execution message) {
if (skipExecutionService.skipExecution(message.getId())) {
log.warn("Skipping execution {}", message.getId());
return;
}

Executor result = executionRepository.lock(message.getId(), pair -> {
Execution execution = pair.getLeft();
ExecutorState executorState = pair.getRight();
Expand Down Expand Up @@ -331,6 +339,11 @@ private void executionQueue(Execution message) {


private void workerTaskResultQueue(WorkerTaskResult message) {
if (skipExecutionService.skipExecution(message.getTaskRun().getTaskId())) {
log.warn("Skipping execution {}", message.getTaskRun().getExecutionId());
return;
}

if (log.isDebugEnabled()) {
executorService.log(log, true, message);
}
Expand Down
9 changes: 9 additions & 0 deletions jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -72,6 +73,9 @@ public abstract class JdbcRunnerTest {
@Inject
private PauseTest.Suite pauseTest;

@Inject
private SkipExecutionCaseTest skipExecutionCaseTest;

@BeforeAll
void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
Expand Down Expand Up @@ -248,4 +252,9 @@ void executionDate() throws TimeoutException {

assertThat((String) execution.getTaskRunList().get(0).getOutputs().get("value"), matchesPattern("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z"));
}

@Test
void skipExecution() throws TimeoutException, InterruptedException {
skipExecutionCaseTest.skipExecution();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public class MemoryExecutor implements ExecutorInterface {
@Inject
protected FlowListenersInterface flowListeners;

@Inject
private SkipExecutionService skipExecutionService;

@Override
public void run() {
flowListeners.run();
Expand All @@ -101,6 +104,11 @@ public void run() {
}

private void executionQueue(Execution message) {
if (skipExecutionService.skipExecution(message.getId())) {
log.warn("Skipping execution {}", message.getId());
return;
}

if (message.getTaskRunList() == null || message.getTaskRunList().size() == 0 || message.getState().isCreated()) {
this.handleExecution(saveExecution(message));
}
Expand Down Expand Up @@ -295,6 +303,11 @@ private void toExecution(Executor executor) {
}

private void workerTaskResultQueue(WorkerTaskResult message) {
if (skipExecutionService.skipExecution(message.getTaskRun().getExecutionId())) {
log.warn("Skipping execution {}", message.getTaskRun().getExecutionId());
return;
}

synchronized (this) {
if (log.isDebugEnabled()) {
executorService.log(log, true, message);
Expand Down Expand Up @@ -368,6 +381,8 @@ private boolean deduplicateNexts(Execution execution, List<TaskRun> taskRuns) {
});
}



private static class ExecutionState {
private final Execution execution;
private Map<String, TaskRun> taskRuns = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.kestra.runner.memory;

import io.kestra.core.runners.AbstractMemoryRunnerTest;
import io.kestra.core.runners.SkipExecutionCaseTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.concurrent.TimeoutException;

@MicronautTest
class MemorySkipExecutionTest extends AbstractMemoryRunnerTest {
@Inject
private SkipExecutionCaseTest skipExecutionCaseTest;

@Test
void skipExecution() throws TimeoutException, InterruptedException {
skipExecutionCaseTest.skipExecution();
}
}

0 comments on commit 304d921

Please sign in to comment.