Skip to content

Commit

Permalink
feat: Script runners as plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Mar 21, 2024
1 parent 17d8204 commit 481ddaa
Show file tree
Hide file tree
Showing 24 changed files with 690 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.PluginSubGroup;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.script.ScriptRunner;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.RegisteredPlugin;
Expand Down Expand Up @@ -69,6 +70,7 @@ public List<Document> generate(RegisteredPlugin registeredPlugin) throws Excepti
result.addAll(this.generate(registeredPlugin, registeredPlugin.getTasks(), Task.class, "tasks"));
result.addAll(this.generate(registeredPlugin, registeredPlugin.getTriggers(), AbstractTrigger.class, "triggers"));
result.addAll(this.generate(registeredPlugin, registeredPlugin.getConditions(), Condition.class, "conditions"));
result.addAll(this.generate(registeredPlugin, registeredPlugin.getScriptRunner(), ScriptRunner.class, "scriptRunner"));

result.addAll(guides(registeredPlugin));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.script.ScriptRunner;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
Expand Down Expand Up @@ -314,7 +315,6 @@ public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, Sch
builder.forTypesInGeneral()
.withSubtypeResolver((declaredType, context) -> {
TypeContext typeContext = context.getTypeContext();

if (declaredType.getErasedType() == Task.class) {
return getRegisteredPlugins()
.stream()
Expand All @@ -340,6 +340,12 @@ public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, Sch
.filter(ScheduleCondition.class::isAssignableFrom)
.map(clz -> typeContext.resolveSubtype(declaredType, clz))
.collect(Collectors.toList());
} else if (declaredType.getErasedType() == ScriptRunner.class) {
return getRegisteredPlugins()
.stream()
.flatMap(registeredPlugin -> registeredPlugin.getScriptRunner().stream())
.map(clz -> typeContext.resolveSubtype(declaredType, clz))
.collect(Collectors.toList());
}

return null;
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/io/kestra/core/docs/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class Plugin {
private List<String> controllers;
private List<String> storages;
private List<String> secrets;
private List<String> scriptRunners;
private List<String> guides;
private List<PluginSubGroup.PluginCategory> categories;

Expand Down Expand Up @@ -62,6 +63,7 @@ public static Plugin of(RegisteredPlugin registeredPlugin) {
plugin.controllers = className(filter(registeredPlugin.getControllers()).toArray(Class[]::new));
plugin.storages = className(filter(registeredPlugin.getStorages()).toArray(Class[]::new));
plugin.secrets = className(filter(registeredPlugin.getSecrets()).toArray(Class[]::new));
plugin.scriptRunners = className(filter(registeredPlugin.getScriptRunner()).toArray(Class[]::new));

return plugin;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.kestra.core.models.script;

import lombok.Getter;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

/**
* Base class for script engine log consumer.
* Used to retrieve the script logs and outputs.
*/
public abstract class AbstractLogConsumer implements BiConsumer<String, Boolean> {
protected final AtomicInteger stdOutCount = new AtomicInteger();

protected final AtomicInteger stdErrCount = new AtomicInteger();

@Getter
protected final Map<String, Object> outputs = new HashMap<>();

public int getStdOutCount() {
return this.stdOutCount.get();
}

public int getStdErrCount() {
return this.stdErrCount.get();
}
}
11 changes: 11 additions & 0 deletions core/src/main/java/io/kestra/core/models/script/RunnerResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.kestra.core.models.script;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
@Getter
public class RunnerResult {
private int exitCode;
private AbstractLogConsumer logConsumer;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.kestra.core.models.script;

import java.nio.file.Path;
import java.util.List;
import java.util.Map;

/**
* Interface for the commands passed to a Script runner.
*/
public interface ScriptCommands {
AbstractLogConsumer getLogConsumer();

List<String> getCommands();

Map<String, Object> getAdditionalVars();

Path getWorkingDirectory();

Path getOutputDirectory();

Map<String, String> getEnv();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.kestra.core.models.script;

import lombok.Builder;
import lombok.Getter;

import java.io.Serial;

@Getter
@Builder
public class ScriptException extends Exception {
private final int exitCode;
private final int stdOutSize;
private final int stdErrSize;

public ScriptException(int exitCode, int stdOutSize, int stdErrSize) {
super("Command failed with code " + exitCode);
this.exitCode = exitCode;
this.stdOutSize = stdOutSize;
this.stdErrSize = stdErrSize;
}
}
37 changes: 37 additions & 0 deletions core/src/main/java/io/kestra/core/models/script/ScriptRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.kestra.core.models.script;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.annotations.Beta;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Pattern;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

import java.util.List;

/**
* Base class for all script runners.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@Introspected
@Beta
public abstract class ScriptRunner {
@NotBlank
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;

/**
* This method will be called by the script plugin to run a script on a script runner.
* Script runners may be local or remote.
* For local script runner (like in process or in a local Docker engine), <code>filesToUpload</code> and <code>filesToDownload</code> may be ignored as they car directoy used the task working directory.
* For remote script runner (like Kubernetes or in a cloud provider), <code>filesToUpload</code> must be used to upload input and namespace files to the runner,
* and <code>filesToDownload</code> must be used to download output files from the runner.
*/
public abstract RunnerResult run(RunContext runContext, ScriptCommands commands, List<String> filesToUpload, List<String> filesToDownload) throws Exception;
}
100 changes: 100 additions & 0 deletions core/src/main/java/io/kestra/core/models/script/ScriptService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package io.kestra.core.models.script;

import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.ListUtils;
import org.apache.commons.io.IOUtils;

import javax.annotation.Nullable;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;

/**
* Helper class for script runners and script tasks.
*/
public final class ScriptService {
private static final Pattern INTERNAL_STORAGE_PATTERN = Pattern.compile("(kestra:\\/\\/[-a-zA-Z0-9%._\\+~#=/]*)");

private ScriptService() {
}

public static String replaceInternalStorage(RunContext runContext, @Nullable String command) throws IOException {
if (command == null) {
return "";
}

return INTERNAL_STORAGE_PATTERN
.matcher(command)
.replaceAll(throwFunction(matchResult -> saveOnLocalStorage(runContext, matchResult.group())));
}

public static List<String> uploadInputFiles(RunContext runContext, List<String> commands) throws IOException {
return commands
.stream()
.map(throwFunction(s -> replaceInternalStorage(runContext, s)))
.collect(Collectors.toList());

}

private static String saveOnLocalStorage(RunContext runContext, String uri) throws IOException {
try(InputStream inputStream = runContext.storage().getFile(URI.create(uri))) {
Path path = runContext.tempFile();

IOUtils.copyLarge(inputStream, new FileOutputStream(path.toFile()));

return path.toString();
}
}

public static Map<String, URI> uploadOutputFiles(RunContext runContext, Path outputDir) throws IOException {
// upload output files
Map<String, URI> uploaded = new HashMap<>();

try (Stream<Path> walk = Files.walk(outputDir)) {
walk
.filter(Files::isRegularFile)
.filter(path -> !path.startsWith("."))
.forEach(throwConsumer(path -> {
String filename = outputDir.relativize(path).toString();

uploaded.put(
filename,
runContext.storage().putFile(path.toFile(), filename)
);
}));
}

return uploaded;
}

public static List<String> scriptCommands(List<String> interpreter, List<String> beforeCommands, String command) {
return scriptCommands(interpreter, beforeCommands, List.of(command));
}

public static List<String> scriptCommands(List<String> interpreter, List<String> beforeCommands, List<String> commands) {
ArrayList<String> commandsArgs = new ArrayList<>(interpreter);
commandsArgs.add(
Stream
.concat(
ListUtils.emptyOnNull(beforeCommands).stream(),
commands.stream()
)
.collect(Collectors.joining(System.lineSeparator()))
);

return commandsArgs;
}
}
Loading

0 comments on commit 481ddaa

Please sign in to comment.