diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index f2a744e6452..33eb96826d1 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -390,37 +390,28 @@ private URI putTempFile(File file, String prefix, String name) throws IOExceptio } @SuppressWarnings("unchecked") - private String taskStateFilePathPrefix(Task task) { + private String taskStateFilePathPrefix(String name) { Map taskrun = (Map) this.getVariables().get("taskrun"); - List paths = new ArrayList<>(Arrays.asList( - "tasks", + return "/" + this.storageInterface.statePrefix( ((Map) this.getVariables().get("flow")).get("namespace"), ((Map) this.getVariables().get("flow")).get("id"), - task.getId() - )); - - if (taskrun.containsKey("value")) { - paths.add(taskrun.get("value")); - } - - return "/" + String.join( - "/", - paths + name, + taskrun.getOrDefault("value", null) ); } - public InputStream getTaskStateFile(Task task, String name) throws IOException { - URI uri = URI.create(this.taskStateFilePathPrefix(task)); + public InputStream getTaskStateFile(String state, String name) throws IOException { + URI uri = URI.create(this.taskStateFilePathPrefix(state)); URI resolve = uri.resolve(uri.getPath() + "/" + name); return this.storageInterface.get(resolve); } - public URI putTaskStateFile(File file, Task task, String name) throws IOException { + public URI putTaskStateFile(File file, String state, String name) throws IOException { return this.putTempFile( file, - this.taskStateFilePathPrefix(task), + this.taskStateFilePathPrefix(state), name ); } diff --git a/core/src/main/java/io/kestra/core/storages/StorageInterface.java b/core/src/main/java/io/kestra/core/storages/StorageInterface.java index 5ef5998b86c..f06fa98ca6e 100644 --- a/core/src/main/java/io/kestra/core/storages/StorageInterface.java +++ b/core/src/main/java/io/kestra/core/storages/StorageInterface.java @@ -1,16 +1,20 @@ package io.kestra.core.storages; -import io.micronaut.core.annotation.Introspected; +import com.google.common.base.Charsets; +import com.google.common.hash.Hashing; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.Input; import io.kestra.core.models.tasks.Task; import io.kestra.core.utils.Slugify; +import io.micronaut.core.annotation.Introspected; +import io.micronaut.core.annotation.Nullable; import java.io.*; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -53,6 +57,29 @@ default String executionPrefix(TaskRun taskRun) { ); } + @SuppressWarnings("UnstableApiUsage") + default String statePrefix(String namespace, String flowId, @Nullable String name, @Nullable String value) { + ArrayList paths = new ArrayList<>(List.of( + namespace.replace(".", "/"), + Slugify.of(flowId), + "states" + )); + + if (name != null) { + paths.add(name); + } + + if (value != null) { + paths.add(Hashing + .goodFastHash(64) + .hashString(value, Charsets.UTF_8) + .toString() + ); + } + + return String.join("/", paths); + } + default Optional extractExecutionId(URI path) { Pattern pattern = Pattern.compile("^/(.+)/executions/([^/]+)/", Pattern.CASE_INSENSITIVE); Matcher matcher = pattern.matcher(path.getPath()); diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java index 087954c014c..644528ccef1 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java @@ -322,21 +322,27 @@ protected HttpResponse validateFile(String executionId, URI path, String throw new NoSuchElementException("Unable to find flow id '" + executionId + "'"); } - // maybe redirect to correct execution String prefix = storageInterface.executionPrefix(flow.get(), execution.get()); - if (!path.getPath().substring(1).startsWith(prefix)) { - Optional redirectedExecution = storageInterface.extractExecutionId(path); + if (path.getPath().substring(1).startsWith(prefix)) { + return null; + } - if (redirectedExecution.isPresent()) { - return HttpResponse.redirect(URI.create((basePath != null? basePath : "") + - redirect.replace("{executionId}", redirectedExecution.get())) - ); - } + // maybe state + prefix = storageInterface.statePrefix(flow.get().getNamespace(), flow.get().getId(), null, null); + if (path.getPath().substring(1).startsWith(prefix)) { + return null; + } - throw new IllegalArgumentException("Invalid prefix path"); + // maybe redirect to correct execution + Optional redirectedExecution = storageInterface.extractExecutionId(path); + + if (redirectedExecution.isPresent()) { + return HttpResponse.redirect(URI.create((basePath != null? basePath : "") + + redirect.replace("{executionId}", redirectedExecution.get())) + ); } - return null; + throw new IllegalArgumentException("Invalid prefix path"); } /** * Download file binary from uri parameter