Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/temp file extension #2172

Merged
merged 3 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -745,20 +745,28 @@ public synchronized Path tempDir(boolean create) {
return this.temporaryDirectory;
}

/**
* @deprecated use {@link #tempFile(String)} instead
*/
@Deprecated
public Path tempFile() throws IOException {
return this.tempFile(null, null);
}

public Path tempFile(String suffix) throws IOException {
return this.tempFile(null, suffix);
public Path tempFile(String extension) throws IOException {
return this.tempFile(null, extension);
}

/**
* @deprecated use {@link #tempFile(byte[], String)} instead
*/
@Deprecated
public Path tempFile(byte[] content) throws IOException {
return this.tempFile(content, null);
}

public Path tempFile(byte[] content, String suffix) throws IOException {
Path tempFile = Files.createTempFile(this.tempDir(), null, suffix);
public Path tempFile(byte[] content, String extension) throws IOException {
Path tempFile = Files.createTempFile(this.tempDir(), null, extension);

if (content != null) {
Files.write(tempFile, content);
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/java/io/kestra/core/tasks/storages/Concat.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.net.URI;
import java.util.List;

import javax.validation.constraints.NotNull;

import static io.kestra.core.utils.Rethrow.throwConsumer;

@SuperBuilder
Expand Down Expand Up @@ -89,6 +91,7 @@ public class Concat extends Task implements RunnableTask<Concat.Output> {
description = "Must be a `kestra://` storage urls, can be a list of string or json string"
)
@PluginProperty(dynamic = true)
@NotNull
private Object files;

@Schema(
Expand All @@ -97,10 +100,17 @@ public class Concat extends Task implements RunnableTask<Concat.Output> {
@PluginProperty(dynamic = true)
private String separator;

@Schema(
title = "The extension of the created file, default is .tmp"
)
@PluginProperty(dynamic = true)
@Builder.Default
private String extension = ".tmp";

@SuppressWarnings("unchecked")
@Override
public Concat.Output run(RunContext runContext) throws Exception {
File tempFile = runContext.tempFile().toFile();
File tempFile = runContext.tempFile(extension).toFile();
try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
List<String> finalFiles;
if (this.files instanceof List) {
Expand Down
59 changes: 32 additions & 27 deletions core/src/main/java/io/kestra/core/tasks/storages/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,35 +94,40 @@ public class Split extends Task implements RunnableTask<Split.Output> {
@Override
public Split.Output run(RunContext runContext) throws Exception {
URI from = new URI(runContext.render(this.from));
String fromPath = from.getPath();
String extension = ".tmp";
if (fromPath.indexOf('.') >= 0) {
extension = fromPath.substring(fromPath.lastIndexOf('.'));
}

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)));

List<Path> splited;

if (this.bytes != null) {
ReadableBytesTypeConverter readableBytesTypeConverter = new ReadableBytesTypeConverter();
Number convert = readableBytesTypeConverter.convert(this.bytes, Number.class)
.orElseThrow(() -> new IllegalArgumentException("Invalid size with value '" + this.bytes + "'"));
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
List<Path> splited;

if (this.bytes != null) {
ReadableBytesTypeConverter readableBytesTypeConverter = new ReadableBytesTypeConverter();
Number convert = readableBytesTypeConverter.convert(this.bytes, Number.class)
.orElseThrow(() -> new IllegalArgumentException("Invalid size with value '" + this.bytes + "'"));

splited = split(runContext, extension, bufferedReader, (bytes, size) -> bytes >= convert.longValue());
} else if (this.partitions != null) {
splited = partition(runContext, extension, bufferedReader, this.partitions);
} else if (this.rows != null) {
splited = split(runContext, extension, bufferedReader, (bytes, size) -> size >= this.rows);
} else {
throw new IllegalArgumentException("Invalid configuration with no size, count, nor rows");
}

splited = split(runContext, bufferedReader, (bytes, size) -> bytes >= convert.longValue());
} else if (this.partitions != null) {
splited = partition(runContext, bufferedReader, this.partitions);
} else if (this.rows != null) {
splited = split(runContext, bufferedReader, (bytes, size) -> size >= this.rows);
} else {
throw new IllegalArgumentException("Invalid configuration with no size, count, nor rows");
return Split.Output.builder()
.uris(splited
.stream()
.map(throwFunction(path -> runContext.putTempFile(path.toFile())))
.collect(Collectors.toList())
)
.build();
}

return Split.Output.builder()
.uris(splited
.stream()
.map(throwFunction(path -> runContext.putTempFile(path.toFile())))
.collect(Collectors.toList())
)
.build();
}

public List<Path> split(RunContext runContext, BufferedReader bufferedReader, BiFunction<Integer, Integer, Boolean> predicate) throws IOException {
private List<Path> split(RunContext runContext, String extension, BufferedReader bufferedReader, BiFunction<Integer, Integer, Boolean> predicate) throws IOException {
List<Path> files = new ArrayList<>();
RandomAccessFile write = null;
int totalBytes = 0;
Expand All @@ -138,7 +143,7 @@ public List<Path> split(RunContext runContext, BufferedReader bufferedReader, Bi
totalBytes = 0;
totalRows = 0;

Path path = runContext.tempFile();
Path path = runContext.tempFile(extension);
files.add(path);
write = new RandomAccessFile(path.toFile(), "rw");
}
Expand All @@ -158,12 +163,12 @@ public List<Path> split(RunContext runContext, BufferedReader bufferedReader, Bi
return files;
}

public List<Path> partition(RunContext runContext, BufferedReader bufferedReader, int partition) throws IOException {
private List<Path> partition(RunContext runContext, String extension, BufferedReader bufferedReader, int partition) throws IOException {
List<Path> files = new ArrayList<>();
List<RandomAccessFile> writers = new ArrayList<>();

for (int i = 0; i < partition; i++) {
Path path = runContext.tempFile();
Path path = runContext.tempFile(extension);
files.add(path);

writers.add(new RandomAccessFile(path.toFile(), "rw"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import jakarta.inject.Inject;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.is;

@MicronautTest
Expand Down Expand Up @@ -47,6 +48,7 @@ void run(Boolean json) throws Exception {
Concat result = Concat.builder()
.files(json ? JacksonMapper.ofJson().writeValueAsString(files) : files)
.separator("\n")
.extension(".yml")
.build();

Concat.Output run = result.run(runContext);
Expand All @@ -57,6 +59,7 @@ void run(Boolean json) throws Exception {
CharStreams.toString(new InputStreamReader(storageInterface.get(run.getUri()))),
is(s + "\n" + s + "\n")
);
assertThat(run.getUri().getPath(), endsWith(".yml"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.stream.IntStream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.is;

@MicronautTest
Expand All @@ -45,6 +46,7 @@ void partition() throws Exception {
Split.Output run = result.run(runContext);

assertThat(run.getUris().size(), is(8));
assertThat(run.getUris().get(0).getPath(), endsWith(".yml"));
assertThat(StringUtils.countMatches(readAll(run.getUris()), "\n"), is(1000));
}

Expand Down