Skip to content

Commit

Permalink
feat: allow to cache contents inside the working directory
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jul 28, 2023
1 parent 4e8c1ae commit bde5ff6
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 6 deletions.
45 changes: 45 additions & 0 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,51 @@ public boolean deleteTaskStateFile(String state, String name, Boolean isNamespac
return this.storageInterface.delete(resolve);
}

/**
* Get from the internal storage the cache file corresponding to this task.
* If the cache file didn't exist, an empty Optional is returned.
*
* @param namespace the flow namespace
* @param flowId the flow identifier
* @param taskId the task identifier
* @param value optional, the task run value
*
* @return an Optional with the cache input stream or empty.
*/
public Optional<InputStream> getTaskCacheFile(String namespace, String flowId, String taskId, String value) throws IOException {
URI uri = URI.create("/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value) + "/cache.zip");
return this.storageInterface.exists(uri) ? Optional.of(this.storageInterface.get(uri)) : Optional.empty();
}

public Optional<Long> getTaskCacheFileLastModifiedTime(String namespace, String flowId, String taskId, String value) throws IOException {
URI uri = URI.create("/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value) + "/cache.zip");
return this.storageInterface.exists(uri) ? Optional.of(this.storageInterface.lastModifiedTime(uri)) : Optional.empty();
}

/**
* Put into the internal storage the cache file corresponding to this task.
*
* @param file the cache as a ZIP archive
* @param namespace the flow namespace
* @param flowId the flow identifier
* @param taskId the task identifier
* @param value optional, the task run value
*
* @return the URI of the file inside the internal storage.
*/
public URI putTaskCacheFile(File file, String namespace, String flowId, String taskId, String value) throws IOException {
return this.putTempFile(
file,
"/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value),
"cache.zip"
);
}

public Optional<Boolean> deleteTaskCacheFile(String namespace, String flowId, String taskId, String value) throws IOException {
URI uri = URI.create("/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value) + "/cache.zip");
return this.storageInterface.exists(uri) ? Optional.of(this.storageInterface.delete(uri)) : Optional.empty();
}

public List<URI> purgeStorageExecution() throws IOException {
return this.storageInterface.deleteByPrefix(this.storageExecutionPrefix);
}
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ private void handleTask(WorkerTask workerTask) {
if (workerTask.getTask() instanceof RunnableTask) {
this.run(workerTask, true);
} else if (workerTask.getTask() instanceof WorkingDirectory workingDirectory) {
RunContext runContext = workerTask.getRunContext();
RunContext runContext = workerTask.getRunContext().forWorker(applicationContext, workerTask);

try {
workingDirectory.preExecuteTasks(runContext, workerTask.getTaskRun());

for (Task currentTask : workingDirectory.getTasks()) {
WorkerTask currentWorkerTask = workingDirectory.workerTask(
workerTask.getTaskRun(),
Expand All @@ -161,6 +163,8 @@ private void handleTask(WorkerTask workerTask) {

runContext = runContext.updateVariables(workerTaskResult, workerTask.getTaskRun());
}

workingDirectory.postExecuteTasks(runContext, workerTask.getTaskRun());
} finally {
runContext.cleanup();
}
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/java/io/kestra/core/storages/StorageInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,27 @@ public interface StorageInterface {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(URI uri) throws IOException;

/**
* Whether the uri points to a file/object that exist in the internal storage.
*
* @param uri the URI of the file/object in the internal storage.
* @return true if the uri points to a file/object that exist in the internal storage.
*/
default boolean exists(URI uri) {
try {
get(uri);
return true;
} catch (IOException ieo) {
return false;
}
}

@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
Long size(URI uri) throws IOException;

@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
Long lastModifiedTime(URI uri) throws IOException;

@Retryable(includes = {IOException.class})
URI put(URI uri, InputStream data) throws IOException;

Expand Down Expand Up @@ -106,6 +124,29 @@ default String statePrefix(String namespace, @Nullable String flowId, @Nullable
return String.join("/", paths);
}

default String cachePrefix(String namespace, String flowId, String taskId, @Nullable String value) {
String namespacePrefix = namespace.replace(".", "/");

ArrayList<String> paths = new ArrayList<>(
List.of(
namespacePrefix,
Slugify.of(flowId),
Slugify.of(taskId),
"cache"
)
);

if (value != null) {
paths.add(Hashing
.goodFastHash(64)
.hashString(value, Charsets.UTF_8)
.toString()
);
}

return String.join("/", paths);
}

default Optional<String> extractExecutionId(URI path) {
Pattern pattern = Pattern.compile("^/(.+)/executions/([^/]+)/", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(path.getPath());
Expand Down
170 changes: 168 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/flows/WorkingDirectory.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
Expand All @@ -14,14 +15,35 @@
import io.kestra.core.utils.IdUtils;
import io.kestra.core.validations.WorkingDirectoryTaskValidation;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import javax.validation.constraints.NotNull;

@SuperBuilder(toBuilder = true)
@ToString
Expand Down Expand Up @@ -75,7 +97,7 @@
tasks:
- id: demoSQL
type: io.kestra.core.tasks.storages.LocalFiles
inputs:
inputs:
query.sql: |
SELECT sum(total) as total, avg(quantity) as avg_quantity
FROM sales;
Expand All @@ -101,7 +123,7 @@ with open('query.sql', 'r') as input_file:
with open('output.json', 'w') as output_file:
json.dump(data, output_file)
Kestra.outputs({'receivedSQL': sql, 'status': response.status_code})
- id: jsonFiles
Expand Down Expand Up @@ -144,6 +166,19 @@ with open('output.json', 'w') as output_file:
@WorkingDirectoryTaskValidation
public class WorkingDirectory extends Sequential {

@Schema(
title = "Cache configuration",
description = """
When a cache is configured, an archive of the files denoted by the cache configuration is created at the end of the execution of the task and saved in Kestra's internal storage.
Then at the beginning of the next execution of the task, the archive of the files is retrieved and the working directory initialized with it.
"""
)
@PluginProperty
private Cache cache;

@Getter(AccessLevel.PRIVATE)
private transient long cacheDownloadedTime = 0L;

@Override
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
Expand Down Expand Up @@ -172,4 +207,135 @@ public WorkerTask workerTask(TaskRun parent, Task task, RunContext runContext) {
.runContext(runContext)
.build();
}

public void preExecuteTasks(RunContext runContext, TaskRun taskRun) {
if (cache == null) {
return;
}

try {
// first, check if we need to delete the file
if (cache.ttl != null) {
var maybeLastModifiedTime = runContext.getTaskCacheFileLastModifiedTime(taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
if (maybeLastModifiedTime.isPresent()) {
if (Instant.now().isAfter(Instant.ofEpochMilli(maybeLastModifiedTime.get()).plus(cache.ttl))) {
runContext.logger().debug("Cache is expired, deleting it");
runContext.deleteTaskCacheFile(taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
}
}
}

// then download it and extract its content
var maybeCacheFile = runContext.getTaskCacheFile(taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
if (maybeCacheFile.isPresent()) {
runContext.logger().debug("Cache exist, downloading it");
// download the cache if exist and unzip all entries
try (ZipInputStream archive = new ZipInputStream(maybeCacheFile.get())) {
ZipEntry entry;
while ((entry = archive.getNextEntry()) != null) {
if (!entry.isDirectory()) {
try {
Path file = runContext.tempDir().resolve(entry.getName());
Files.createDirectories(file.getParent());
Files.createFile(file);
Files.write(file, archive.readAllBytes());
} catch (IOException e) {
runContext.logger().error("Unable to create the file {}", entry.getName(), e);
}
}
}
}

// Set the cacheDownloadedTime so that we can check if files has been updated later
cacheDownloadedTime = System.currentTimeMillis();
}
} catch (IOException e) {
runContext.logger().error("Unable to execute WorkingDirectory pre actions", e);
}
}

public void postExecuteTasks(RunContext runContext, TaskRun taskRun) {
if (cache == null) {
return;
}

// This is monolithic, maybe a cache entry by pattern would be better.

List<Path> matchesList = new ArrayList<>();
FileVisitor<Path> matcherVisitor = new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attribs) throws IOException {
FileSystem fs = FileSystems.getDefault();
for (String pattern: cache.getPatterns()) {
String fullPattern = runContext.tempDir().toString() + "/" + pattern;
PathMatcher matcher = fs.getPathMatcher("glob:" + fullPattern);
if(matcher.matches(file)) {
matchesList.add(file);
}
}
return FileVisitResult.CONTINUE;
}
};


try {
Files.walkFileTree(runContext.tempDir(), matcherVisitor);
// Check that some files has been updated since the start of the task
// TODO we may need to allow excluding files as some files always changed for dependencies (for ex .package-log.json)
boolean cacheFilesAreUpdated = matchesList.stream()
.anyMatch(path -> {
try {
return Files.getLastModifiedTime(path).toMillis() > cacheDownloadedTime;
} catch (IOException e) {
runContext.logger().warn("Unable to retrieve files last modified time, will update the cache anyway", e);
return true;
}
});

if (cacheFilesAreUpdated) {
runContext.logger().debug("Cache files changed, we update the cache");
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ZipOutputStream archive = new ZipOutputStream(bos)) {

for (var file : matchesList) {
var relativeFileName = file.toFile().getPath().substring(runContext.tempDir().toString().length() + 1);
var zipEntry = new ZipEntry(relativeFileName);
archive.putNextEntry(zipEntry);
archive.write(Files.readAllBytes(file));
archive.closeEntry();
}

archive.finish();
File archiveFile = File.createTempFile("archive", ".zip");
Files.write(archiveFile.toPath(), bos.toByteArray());
URI uri = runContext.putTaskCacheFile(archiveFile, taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
runContext.logger().debug("Caching in {}", uri);
}
} else {
runContext.logger().debug("Cache files didn't change, skip updating it");
}
} catch (IOException e) {
runContext.logger().error("Unable to execute WorkingDirectory post actions", e);
}

}

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public static class Cache {
@Schema(title = "Cache TTL (Time To Live), after this duration the cache will be deleted.")
@PluginProperty
private Duration ttl;

@Schema(
title = "List of file [glob](https://en.wikipedia.org/wiki/Glob_(programming)) patterns to include in the cache.",
description = "For example 'node_modules/**' will include all files of the node_modules directory including sub-directories."
)
@PluginProperty
@NotNull
private List<String> patterns;
}
}
Loading

0 comments on commit bde5ff6

Please sign in to comment.