Skip to content

Commit

Permalink
fix(core): input streams are now properly closed to prevent exhaustin…
Browse files Browse the repository at this point in the history
…g connections on remote storages

closes kestra-io/storage-s3#33
  • Loading branch information
brian-mulier-p committed Nov 21, 2023
1 parent 070f109 commit c376b12
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,16 @@ private static boolean match(List<String> patterns, String file) {
private void copy(String tenantId, String namespace, Path basePath, List<URI> files) throws IOException {
files
.forEach(throwConsumer(f -> {
InputStream inputStream = storageInterface.get(tenantId, uri(namespace, f));
Path destination = Paths.get(basePath.toString(), f.getPath());

if (!destination.getParent().toFile().exists()) {
//noinspection ResultOfMethodCallIgnored
destination.getParent().toFile().mkdirs();
}

Files.copy(inputStream, destination);
try (InputStream inputStream = storageInterface.get(tenantId, uri(namespace, f))) {
Files.copy(inputStream, destination);
}
}));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.kestra.core.runners.pebble.functions;

import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.Slugify;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
Expand All @@ -11,6 +10,7 @@
import jakarta.inject.Singleton;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
Expand Down Expand Up @@ -47,7 +47,9 @@ public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationC
private String readFromNamespaceFile(EvaluationContext context, String path) throws IOException {
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
URI namespaceFile = URI.create(storageInterface.namespaceFilePrefix(flow.get("namespace")) + "/" + path);
return new String(storageInterface.get(flow.get("tenantId"), namespaceFile).readAllBytes(), StandardCharsets.UTF_8);
try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), namespaceFile)) {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}

private String readFromInternalStorageUri(EvaluationContext context, String path) throws IOException {
Expand All @@ -69,7 +71,9 @@ private String readFromInternalStorageUri(EvaluationContext context, String path
}
}
URI internalStorageFile = URI.create(path);
return new String(storageInterface.get(flow.get("tenantId"), internalStorageFile).readAllBytes(), StandardCharsets.UTF_8);
try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), internalStorageFile)) {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}

private boolean validateFileUri(String namespace, String flowId, String executionId, String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import io.kestra.core.storages.StorageSplitInterface;
import io.micronaut.core.convert.format.ReadableBytesTypeConverter;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.io.*;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public interface StorageInterface {
* @return true if the uri points to a file/object that exist in the internal storage.
*/
default boolean exists(String tenantId, URI uri) {
try {
get(tenantId, uri);
try (InputStream ignored = get(tenantId, uri)){
return true;
} catch (IOException ieo) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public abstract class AbstractState extends Task {


protected Map<String, Object> get(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
InputStream taskStateFile = runContext.getTaskStateFile("tasks-states", runContext.render(this.name), this.namespace, this.taskrunValue);

return JacksonMapper.ofJson(false).readValue(taskStateFile, TYPE_REFERENCE);
try (InputStream taskStateFile = runContext.getTaskStateFile("tasks-states", runContext.render(this.name), this.namespace, this.taskrunValue)) {
return JacksonMapper.ofJson(false).readValue(taskStateFile, TYPE_REFERENCE);
}
}

protected Pair<URI, Map<String, Object>> merge(RunContext runContext, Map<String, Object> map) throws IllegalVariableEvaluationException, IOException {
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/io/kestra/core/tasks/storages/Concat.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.net.URI;
import java.util.List;

Expand Down Expand Up @@ -128,7 +129,9 @@ public Concat.Output run(RunContext runContext) throws Exception {

finalFiles.forEach(throwConsumer(s -> {
URI from = new URI(runContext.render(s));
IOUtils.copyLarge(runContext.uriToInputStream(from), fileOutputStream);
try (InputStream inputStream = runContext.uriToInputStream(from)) {
IOUtils.copyLarge(inputStream, fileOutputStream);
}

if (separator != null) {
IOUtils.copy(new ByteArrayInputStream(this.separator.getBytes()), fileOutputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,6 @@ public HttpResponse<?> filePreview(
this.validateFile(executionId, path, "/api/v1/executions/{executionId}/file?path=" + path);

String extension = FilenameUtils.getExtension(path.toString());
InputStream fileStream = storageInterface.get(tenantService.resolveTenant(), path);
Optional<Charset> charset;

try {
Expand All @@ -1023,13 +1022,15 @@ public HttpResponse<?> filePreview(
throw new IllegalArgumentException("Unable to preview using encoding '" + encoding + "'");
}

FileRender fileRender = FileRenderBuilder.of(
extension,
fileStream,
charset,
maxRows == null ? this.initialPreviewRows : (maxRows > this.maxPreviewRows ? this.maxPreviewRows : maxRows)
);
try (InputStream fileStream = storageInterface.get(tenantService.resolveTenant(), path)){
FileRender fileRender = FileRenderBuilder.of(
extension,
fileStream,
charset,
maxRows == null ? this.initialPreviewRows : (maxRows > this.maxPreviewRows ? this.maxPreviewRows : maxRows)
);

return HttpResponse.ok(fileRender);
return HttpResponse.ok(fileRender);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ public void createFile(
) throws IOException, URISyntaxException {
ensureWritableFile(path);

storageInterface.put(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, path), new BufferedInputStream(fileContent.getInputStream()));
try(BufferedInputStream inputStream = new BufferedInputStream(fileContent.getInputStream())) {
storageInterface.put(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, path), inputStream);
}
}

@ExecuteOn(TaskExecutors.IO)
Expand Down

0 comments on commit c376b12

Please sign in to comment.