Skip to content

Commit

Permalink
fix(core): state file must be named to be easily use on different tas…
Browse files Browse the repository at this point in the history
…k from the same flow
  • Loading branch information
tchiotludo committed Jan 24, 2022
1 parent 5675a3f commit 0b0e8de
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 28 deletions.
25 changes: 8 additions & 17 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -390,37 +390,28 @@ private URI putTempFile(File file, String prefix, String name) throws IOExceptio
}

@SuppressWarnings("unchecked")
private String taskStateFilePathPrefix(Task task) {
private String taskStateFilePathPrefix(String name) {
Map<String, String> taskrun = (Map<String, String>) this.getVariables().get("taskrun");

List<String> paths = new ArrayList<>(Arrays.asList(
"tasks",
return "/" + this.storageInterface.statePrefix(
((Map<String, String>) this.getVariables().get("flow")).get("namespace"),
((Map<String, String>) this.getVariables().get("flow")).get("id"),
task.getId()
));

if (taskrun.containsKey("value")) {
paths.add(taskrun.get("value"));
}

return "/" + String.join(
"/",
paths
name,
taskrun.getOrDefault("value", null)
);
}

public InputStream getTaskStateFile(Task task, String name) throws IOException {
URI uri = URI.create(this.taskStateFilePathPrefix(task));
public InputStream getTaskStateFile(String state, String name) throws IOException {
URI uri = URI.create(this.taskStateFilePathPrefix(state));
URI resolve = uri.resolve(uri.getPath() + "/" + name);

return this.storageInterface.get(resolve);
}

public URI putTaskStateFile(File file, Task task, String name) throws IOException {
public URI putTaskStateFile(File file, String state, String name) throws IOException {
return this.putTempFile(
file,
this.taskStateFilePathPrefix(task),
this.taskStateFilePathPrefix(state),
name
);
}
Expand Down
29 changes: 28 additions & 1 deletion core/src/main/java/io/kestra/core/storages/StorageInterface.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package io.kestra.core.storages;

import io.micronaut.core.annotation.Introspected;
import com.google.common.base.Charsets;
import com.google.common.hash.Hashing;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.utils.Slugify;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;

import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -53,6 +57,29 @@ default String executionPrefix(TaskRun taskRun) {
);
}

@SuppressWarnings("UnstableApiUsage")
default String statePrefix(String namespace, String flowId, @Nullable String name, @Nullable String value) {
ArrayList<String> paths = new ArrayList<>(List.of(
namespace.replace(".", "/"),
Slugify.of(flowId),
"states"
));

if (name != null) {
paths.add(name);
}

if (value != null) {
paths.add(Hashing
.goodFastHash(64)
.hashString(value, Charsets.UTF_8)
.toString()
);
}

return String.join("/", paths);
}

default Optional<String> extractExecutionId(URI path) {
Pattern pattern = Pattern.compile("^/(.+)/executions/([^/]+)/", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(path.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,21 +322,27 @@ protected <T> HttpResponse<T> validateFile(String executionId, URI path, String
throw new NoSuchElementException("Unable to find flow id '" + executionId + "'");
}

// maybe redirect to correct execution
String prefix = storageInterface.executionPrefix(flow.get(), execution.get());
if (!path.getPath().substring(1).startsWith(prefix)) {
Optional<String> redirectedExecution = storageInterface.extractExecutionId(path);
if (path.getPath().substring(1).startsWith(prefix)) {
return null;
}

if (redirectedExecution.isPresent()) {
return HttpResponse.redirect(URI.create((basePath != null? basePath : "") +
redirect.replace("{executionId}", redirectedExecution.get()))
);
}
// maybe state
prefix = storageInterface.statePrefix(flow.get().getNamespace(), flow.get().getId(), null, null);
if (path.getPath().substring(1).startsWith(prefix)) {
return null;
}

throw new IllegalArgumentException("Invalid prefix path");
// maybe redirect to correct execution
Optional<String> redirectedExecution = storageInterface.extractExecutionId(path);

if (redirectedExecution.isPresent()) {
return HttpResponse.redirect(URI.create((basePath != null? basePath : "") +
redirect.replace("{executionId}", redirectedExecution.get()))
);
}

return null;
throw new IllegalArgumentException("Invalid prefix path");
}
/**
* Download file binary from uri parameter
Expand Down

0 comments on commit 0b0e8de

Please sign in to comment.