Skip to content

Commit

Permalink
feat(core): more script-runners common steps and better handling for …
Browse files Browse the repository at this point in the history
…input files
  • Loading branch information
brian-mulier-p committed Apr 4, 2024
1 parent 3537bdc commit 45ce792
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package io.kestra.core.models.script;

public interface RemoteRunnerInterface {}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import lombok.Builder;
import lombok.Getter;

import java.io.Serial;

@Getter
@Builder
public class ScriptException extends Exception {
Expand All @@ -13,7 +11,11 @@ public class ScriptException extends Exception {
private final int stdErrSize;

public ScriptException(int exitCode, int stdOutSize, int stdErrSize) {
super("Command failed with code " + exitCode);
this("Command failed with code " + exitCode, exitCode, stdOutSize, stdErrSize);
}

public ScriptException(String message, int exitCode, int stdOutSize, int stdErrSize) {
super(message);
this.exitCode = exitCode;
this.stdOutSize = stdOutSize;
this.stdErrSize = stdErrSize;
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/java/io/kestra/core/models/script/ScriptRunner.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package io.kestra.core.models.script;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Pattern;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Base class for all script runners.
Expand All @@ -24,6 +29,14 @@ public abstract class ScriptRunner {
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;

@JsonIgnore
@Getter(AccessLevel.NONE)
protected transient Map<String, Object> additionalVars;

@JsonIgnore
@Getter(AccessLevel.NONE)
protected transient Map<String, String> env;

/**
* This method will be called by the script plugin to run a script on a script runner.
* Script runners may be local or remote.
Expand All @@ -32,4 +45,32 @@ public abstract class ScriptRunner {
* and <code>filesToDownload</code> must be used to download output files from the runner.
*/
public abstract RunnerResult run(RunContext runContext, ScriptCommands scriptCommands, List<String> filesToUpload, List<String> filesToDownload) throws Exception;

public Map<String, Object> additionalVars(ScriptCommands scriptCommands) {
if (this.additionalVars == null) {
this.additionalVars = scriptCommands.getAdditionalVars();
}

return this.additionalVars;
}

public Map<String, String> env(ScriptCommands scriptCommands) {
if (this.env == null) {
this.env = Optional.ofNullable(scriptCommands.getEnv()).map(HashMap::new).orElse(new HashMap<>());

Map<String, Object> additionalVars = this.additionalVars(scriptCommands);

if (additionalVars.containsKey(ScriptService.VAR_WORKING_DIR)) {
this.env.put(ScriptService.ENV_WORKING_DIR, additionalVars.get(ScriptService.VAR_WORKING_DIR).toString());
}
if (additionalVars.containsKey(ScriptService.VAR_OUTPUT_DIR)) {
this.env.put(ScriptService.ENV_OUTPUT_DIR, additionalVars.get(ScriptService.VAR_OUTPUT_DIR).toString());
}
if (additionalVars.containsKey(ScriptService.VAR_BUCKET_PATH)) {
this.env.put(ScriptService.ENV_BUCKET_PATH, additionalVars.get(ScriptService.VAR_BUCKET_PATH).toString());
}
}

return this.env;
}
}
53 changes: 32 additions & 21 deletions core/src/main/java/io/kestra/core/models/script/ScriptService.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.core.models.script;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.Slugify;
Expand All @@ -15,10 +16,7 @@
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -46,15 +44,12 @@ public final class ScriptService {
private ScriptService() {
}

public static String replaceInternalStorage(RunContext runContext, @Nullable String command) throws IOException {
return ScriptService.replaceInternalStorage(runContext, command, (s, s2) -> {});
}

public static String replaceInternalStorage(RunContext runContext, @Nullable String command, BiConsumer<String, String> internalStorageToLocalFileConsumer) throws IOException {
return ScriptService.replaceInternalStorage(runContext, command, internalStorageToLocalFileConsumer, false);
}

public static String replaceInternalStorage(RunContext runContext, @Nullable String command, BiConsumer<String, String> internalStorageToLocalFileConsumer, boolean replaceWithRelativePath) throws IOException {
public static String replaceInternalStorage(
RunContext runContext,
@Nullable String command,
BiConsumer<String, String> internalStorageToLocalFileConsumer,
boolean replaceWithRelativePath
) throws IOException {
if (command == null) {
return "";
}
Expand All @@ -74,22 +69,38 @@ public static String replaceInternalStorage(RunContext runContext, @Nullable Str
}));
}

public static List<String> uploadInputFiles(RunContext runContext, List<String> commands) throws IOException {
return ScriptService.uploadInputFiles(runContext, commands, (s, s2) -> {});
public static String replaceInternalStorage(
RunContext runContext,
Map<String, Object> additionalVars,
String command,
BiConsumer<String, String> internalStorageToLocalFileConsumer,
boolean replaceWithRelativePath
) throws IOException, IllegalVariableEvaluationException {
return ScriptService.replaceInternalStorage(runContext, additionalVars, List.of(command), internalStorageToLocalFileConsumer, replaceWithRelativePath).get(0);
}

public static List<String> uploadInputFiles(RunContext runContext, List<String> commands, BiConsumer<String, String> internalStorageToLocalFileConsumer) throws IOException {
return uploadInputFiles(runContext, commands, internalStorageToLocalFileConsumer, false);
}

public static List<String> uploadInputFiles(RunContext runContext, List<String> commands, BiConsumer<String, String> internalStorageToLocalFileConsumer, boolean replaceWithRelativePath) throws IOException {
public static List<String> replaceInternalStorage(
RunContext runContext,
Map<String, Object> additionalVars,
List<String> commands,
BiConsumer<String, String> internalStorageToLocalFileConsumer,
boolean replaceWithRelativePath
) throws IOException, IllegalVariableEvaluationException {
return commands
.stream()
.map(throwFunction(s -> replaceInternalStorage(runContext, s, internalStorageToLocalFileConsumer, replaceWithRelativePath)))
.map(throwFunction(c -> runContext.render(c, additionalVars)))
.map(throwFunction(c -> ScriptService.replaceInternalStorage(runContext, c, internalStorageToLocalFileConsumer, replaceWithRelativePath)))
.collect(Collectors.toList());

}

public static List<String> replaceInternalStorage(
RunContext runContext,
List<String> commands
) throws IOException, IllegalVariableEvaluationException {
return ScriptService.replaceInternalStorage(runContext, Collections.emptyMap(), commands, (ignored, file) -> {}, false);
}

private static String saveOnLocalStorage(RunContext runContext, String uri) throws IOException {
try(InputStream inputStream = runContext.storage().getFile(URI.create(uri))) {
Path path = runContext.tempFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;

Expand All @@ -39,23 +38,13 @@ public RunnerResult run(RunContext runContext, ScriptCommands scriptCommands, Li
Logger logger = runContext.logger();
AbstractLogConsumer defaultLogConsumer = scriptCommands.getLogConsumer();

Map<String, Object> additionalVars = scriptCommands.getAdditionalVars();
additionalVars.put(ScriptService.VAR_WORKING_DIR, scriptCommands.getWorkingDirectory().toString());
additionalVars.put(ScriptService.VAR_OUTPUT_DIR, scriptCommands.getOutputDirectory().toString());

ProcessBuilder processBuilder = new ProcessBuilder();

Map<String, String> environment = processBuilder.environment();
if (scriptCommands.getEnv() != null && !scriptCommands.getEnv().isEmpty()) {
environment.putAll(runContext.renderMap(scriptCommands.getEnv(), additionalVars));
}
environment.put(ScriptService.ENV_WORKING_DIR, scriptCommands.getWorkingDirectory().toString());
environment.put(ScriptService.ENV_OUTPUT_DIR, scriptCommands.getOutputDirectory().toString());
environment.putAll(this.env(scriptCommands));

processBuilder.directory(scriptCommands.getWorkingDirectory().toFile());

List<String> command = ScriptService.uploadInputFiles(runContext, runContext.render(scriptCommands.getCommands(), additionalVars));
processBuilder.command(command);
processBuilder.command(scriptCommands.getCommands());

Process process = processBuilder.start();
long pid = process.pid();
Expand Down Expand Up @@ -91,6 +80,14 @@ public RunnerResult run(RunContext runContext, ScriptCommands scriptCommands, Li
}
}

@Override
protected Map<String, Object> runnerAdditionalVars(ScriptCommands scriptCommands) {
return Map.of(
ScriptService.VAR_WORKING_DIR, scriptCommands.getWorkingDirectory().toString(),
ScriptService.VAR_OUTPUT_DIR, scriptCommands.getOutputDirectory().toString()
);
}

private void killDescendantsOf(ProcessHandle process, Logger logger) {
process.descendants().forEach(processHandle -> {
if (!processHandle.destroy()) {
Expand Down
15 changes: 9 additions & 6 deletions core/src/main/java/io/kestra/core/runners/FilesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -25,12 +23,17 @@

public abstract class FilesService {
public static Map<String, String> inputFiles(RunContext runContext, Object inputs) throws Exception {
return FilesService.inputFiles(runContext, Collections.emptyMap(), inputs);
}

public static Map<String, String> inputFiles(RunContext runContext, Map<String, Object> additionalVars, Object inputs) throws Exception {
Logger logger = runContext.logger();

Map<String, String> inputFiles = inputs == null ? Map.of() : PluginUtilsService.transformInputFiles(
Map<String, String> inputFiles = new HashMap<>(inputs == null ? Map.of() : PluginUtilsService.transformInputFiles(
runContext,
additionalVars,
inputs
);
));

inputFiles
.forEach(throwBiConsumer((fileName, input) -> {
Expand All @@ -41,7 +44,7 @@ public static Map<String, String> inputFiles(RunContext runContext, Object input
file.getParentFile().mkdirs();
}

var fileContent = runContext.render(input);
var fileContent = runContext.render(input, additionalVars);
if (fileContent.startsWith("kestra://")) {
try (var is = runContext.uriToInputStream(URI.create(fileContent));
var out = new FileOutputStream(file)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,19 @@ private static void validFilename(String s) {
}
}

@SuppressWarnings("unchecked")
public static Map<String, String> transformInputFiles(RunContext runContext, @NotNull Object inputFiles) throws IllegalVariableEvaluationException, JsonProcessingException {
return PluginUtilsService.transformInputFiles(runContext, Collections.emptyMap(), inputFiles);
}

@SuppressWarnings("unchecked")
public static Map<String, String> transformInputFiles(RunContext runContext, Map<String, Object> additionalVars, @NotNull Object inputFiles) throws IllegalVariableEvaluationException, JsonProcessingException {
if (inputFiles instanceof Map) {
return (Map<String, String>) inputFiles;
} else if (inputFiles instanceof String) {
final TypeReference<Map<String, String>> reference = new TypeReference<>() {};

return JacksonMapper.ofJson(false).readValue(
runContext.render((String) inputFiles),
runContext.render((String) inputFiles, additionalVars),
reference
);
} else {
Expand Down
Loading

0 comments on commit 45ce792

Please sign in to comment.