Skip to content

Commit

Permalink
feat(*): Migrate from RxJava2 to Reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jan 30, 2024
1 parent 88064a3 commit a57dafb
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 67 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ dependencies {
compileOnly platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
compileOnly "io.micronaut:micronaut-inject"
compileOnly "io.micronaut.validation:micronaut-validation"
compileOnly "io.micronaut.rxjava2:micronaut-rxjava2"
compileOnly "io.micronaut.reactor:micronaut-reactor"

// kestra
compileOnly group: "io.kestra", name: "core", version: kestraVersion
Expand Down Expand Up @@ -98,7 +98,7 @@ dependencies {

testImplementation platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
testImplementation "io.micronaut.test:micronaut-test-junit5"
testImplementation "io.micronaut.rxjava2:micronaut-rxjava2"
testImplementation "io.micronaut.reactor:micronaut-reactor"

// test
testImplementation "org.junit.jupiter:junit-jupiter-engine"
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.aws.AbstractConnection;
import io.kestra.plugin.aws.eventbridge.model.Entry;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.eventbridge.EventBridgeClient;
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequest;
Expand Down Expand Up @@ -186,8 +186,8 @@ private List<Entry> readEntryList(RunContext runContext, Object entries) throws
throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entries.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
return Flowable.create(FileSerde.reader(inputStream, Entry.class), BackpressureStrategy.BUFFER)
.toList().blockingGet();
return Flux.create(FileSerde.reader(inputStream, Entry.class), FluxSink.OverflowStrategy.BUFFER)
.collectList().block();
}
} else if (entries instanceof List) {
return MAPPER.convertValue(entries, new TypeReference<>() {
Expand Down
19 changes: 12 additions & 7 deletions src/main/java/io/kestra/plugin/aws/kinesis/PutRecords.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.aws.AbstractConnection;
import io.kestra.plugin.aws.kinesis.model.Record;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
Expand All @@ -30,6 +30,8 @@
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;

import jakarta.validation.constraints.NotNull;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;

import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -38,6 +40,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;

@SuperBuilder
Expand Down Expand Up @@ -172,8 +175,8 @@ private List<Record> getRecordList(Object records, RunContext runContext) throws
throw new IllegalArgumentException("Invalid records parameter, must be a Kestra internal storage URI, or a list of records.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
return Flowable.create(FileSerde.reader(inputStream, Record.class), BackpressureStrategy.BUFFER)
.toList().blockingGet();
return Flux.create(FileSerde.reader(inputStream, Record.class), FluxSink.OverflowStrategy.BUFFER)
.collectList().block();
}
} else if (records instanceof List) {
return MAPPER.convertValue(records, new TypeReference<>() {
Expand All @@ -187,15 +190,17 @@ private File writeOutputFile(RunContext runContext, PutRecordsResponse putRecord
// Create Output
File tempFile = runContext.tempFile(".ion").toFile();
try (var stream = new FileOutputStream(tempFile)) {
Flowable.fromIterable(records)
.zipWith(putRecordsResponse.records(), (record, response) -> OutputEntry.builder()
Flux.fromIterable(records)
.zipWithIterable(putRecordsResponse.records(), (record, response) -> OutputEntry.builder()
.record(record)
.sequenceNumber(response.sequenceNumber())
.shardId(response.shardId())
.errorCode(response.errorCode())
.errorMessage(response.errorMessage())
.build())
.blockingForEach(outputEntry -> FileSerde.write(stream, outputEntry));
.doOnEach(throwConsumer(outputEntry -> FileSerde.write(stream, outputEntry.get())))
.collectList()
.block();
}
return tempFile;
}
Expand Down
27 changes: 15 additions & 12 deletions src/main/java/io/kestra/plugin/aws/s3/DeleteList.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.aws.s3.models.S3Object;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;

import java.util.NoSuchElementException;
import java.util.function.Function;

import jakarta.validation.constraints.Min;

import static io.kestra.core.utils.Rethrow.throwConsumer;

@SuperBuilder
@ToString
@EqualsAndHashCode
Expand Down Expand Up @@ -83,21 +86,21 @@ public Output run(RunContext runContext) throws Exception {


try (S3Client client = this.client(runContext)) {
Flowable<S3Object> flowable = Flowable
.create(emitter -> {
Flux<S3Object> flowable = Flux
.create(throwConsumer(emitter -> {
S3Service
.list(runContext, client, this, this)
.forEach(emitter::onNext);
.forEach(emitter::next);

emitter.onComplete();
}, BackpressureStrategy.BUFFER);
emitter.complete();
}), FluxSink.OverflowStrategy.BUFFER);

Flowable<Long> result;
Flux<Long> result;

if (this.concurrent != null) {
result = flowable
.parallel(this.concurrent)
.runOn(Schedulers.io())
.runOn(Schedulers.boundedElastic())
.map(delete(logger, client, bucket))
.sequential();
} else {
Expand All @@ -107,7 +110,7 @@ public Output run(RunContext runContext) throws Exception {

Pair<Long, Long> finalResult = result
.reduce(Pair.of(0L, 0L), (pair, size) -> Pair.of(pair.getLeft() + 1, pair.getRight() + size))
.blockingGet();
.block();

runContext.metric(Counter.of("count", finalResult.getLeft()));
runContext.metric(Counter.of("size", finalResult.getRight()));
Expand Down
25 changes: 14 additions & 11 deletions src/main/java/io/kestra/plugin/aws/sns/Publish.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.aws.sns;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
Expand All @@ -9,11 +10,11 @@
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.aws.sns.model.Message;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.PublishRequest;

Expand All @@ -23,6 +24,8 @@
import java.util.List;
import jakarta.validation.constraints.NotNull;

import static io.kestra.core.utils.Rethrow.throwFunction;

@SuperBuilder
@ToString
@EqualsAndHashCode
Expand Down Expand Up @@ -62,8 +65,8 @@ public Publish.Output run(RunContext runContext) throws Exception {
var topicArn = runContext.render(getTopicArn());
try (var snsClient = this.client(runContext)) {
Integer count;
Flowable<Message> flowable;
Flowable<Integer> resultFlowable;
Flux<Message> flowable;
Flux<Integer> resultFlowable;

if (this.from instanceof String) {
URI from = new URI(runContext.render((String) this.from));
Expand All @@ -72,20 +75,20 @@ public Publish.Output run(RunContext runContext) throws Exception {
}

try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
flowable = Flowable.create(FileSerde.reader(inputStream, Message.class), BackpressureStrategy.BUFFER);
flowable = Flux.create(FileSerde.reader(inputStream, Message.class), FluxSink.OverflowStrategy.BUFFER);
resultFlowable = this.buildFlowable(flowable, snsClient, topicArn, runContext);

count = resultFlowable.reduce(Integer::sum).blockingGet();
count = resultFlowable.reduce(Integer::sum).block();
}

} else if (this.from instanceof List) {
flowable = Flowable
flowable = Flux
.fromArray(((List<Message>) this.from).toArray())
.cast(Message.class);

resultFlowable = this.buildFlowable(flowable, snsClient, topicArn, runContext);

count = resultFlowable.reduce(Integer::sum).blockingGet();
count = resultFlowable.reduce(Integer::sum).block();
} else {
var msg = JacksonMapper.toMap(this.from, Message.class);
snsClient.publish(msg.to(PublishRequest.builder().topicArn(topicArn), runContext));
Expand All @@ -102,12 +105,12 @@ public Publish.Output run(RunContext runContext) throws Exception {
}
}

private Flowable<Integer> buildFlowable(Flowable<Message> flowable, SnsClient snsClient, String topicArn, RunContext runContext) {
private Flux<Integer> buildFlowable(Flux<Message> flowable, SnsClient snsClient, String topicArn, RunContext runContext) throws IllegalVariableEvaluationException {
return flowable
.map(message -> {
.map(throwFunction(message -> {
snsClient.publish(message.to(PublishRequest.builder().topicArn(topicArn), runContext));
return 1;
});
}));
}

@Builder
Expand Down
25 changes: 14 additions & 11 deletions src/main/java/io/kestra/plugin/aws/sqs/Publish.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.aws.sqs;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
Expand All @@ -9,11 +10,11 @@
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.aws.sqs.model.Message;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

Expand All @@ -23,6 +24,8 @@
import java.util.List;
import jakarta.validation.constraints.NotNull;

import static io.kestra.core.utils.Rethrow.throwFunction;

@SuperBuilder
@ToString
@EqualsAndHashCode
Expand Down Expand Up @@ -63,8 +66,8 @@ public Output run(RunContext runContext) throws Exception {
var queueUrl = runContext.render(getQueueUrl());
try (var sqsClient = this.client(runContext)) {
Integer count;
Flowable<Message> flowable;
Flowable<Integer> resultFlowable;
Flux<Message> flowable;
Flux<Integer> resultFlowable;

if (this.from instanceof String) {
URI from = new URI(runContext.render((String) this.from));
Expand All @@ -73,20 +76,20 @@ public Output run(RunContext runContext) throws Exception {
}

try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
flowable = Flowable.create(FileSerde.reader(inputStream, Message.class), BackpressureStrategy.BUFFER);
flowable = Flux.create(FileSerde.reader(inputStream, Message.class), FluxSink.OverflowStrategy.BUFFER);
resultFlowable = this.buildFlowable(flowable, sqsClient, queueUrl, runContext);

count = resultFlowable.reduce(Integer::sum).blockingGet();
count = resultFlowable.reduce(Integer::sum).block();
}

} else if (this.from instanceof List) {
flowable = Flowable
flowable = Flux
.fromArray(((List<Message>) this.from).toArray())
.cast(Message.class);

resultFlowable = this.buildFlowable(flowable, sqsClient, queueUrl, runContext);

count = resultFlowable.reduce(Integer::sum).blockingGet();
count = resultFlowable.reduce(Integer::sum).block();
} else {
var msg = JacksonMapper.toMap(this.from, Message.class);
sqsClient.sendMessage(msg.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext));
Expand All @@ -103,12 +106,12 @@ public Output run(RunContext runContext) throws Exception {
}
}

private Flowable<Integer> buildFlowable(Flowable<Message> flowable, SqsClient sqsClient, String queueUrl, RunContext runContext) {
private Flux<Integer> buildFlowable(Flux<Message> flowable, SqsClient sqsClient, String queueUrl, RunContext runContext) throws IllegalVariableEvaluationException {
return flowable
.map(message -> {
.map(throwFunction(message -> {
sqsClient.sendMessage(message.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext));
return 1;
});
}));
}

@Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
import io.kestra.plugin.aws.AbstractLocalStackTest;
import io.kestra.plugin.aws.eventbridge.model.Entry;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import jakarta.inject.Inject;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.jackson.Jacksonized;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.io.BufferedReader;
import java.io.File;
Expand Down Expand Up @@ -46,7 +46,7 @@ private static List<PutEvents.OutputEntry> getOutputEntries(PutEvents put, RunCo
throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entry.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
outputEntries = Flowable.create(FileSerde.reader(inputStream, PutEvents.OutputEntry.class), BackpressureStrategy.BUFFER).toList().blockingGet();
outputEntries = Flux.create(FileSerde.reader(inputStream, PutEvents.OutputEntry.class), FluxSink.OverflowStrategy.BUFFER).collectList().block();
}
return outputEntries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import io.kestra.plugin.aws.AbstractLocalStackTest;
import io.kestra.plugin.aws.kinesis.model.Record;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import jakarta.inject.Inject;
import lombok.Builder;
import lombok.EqualsAndHashCode;
Expand All @@ -24,6 +22,8 @@
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
Expand Down Expand Up @@ -85,7 +85,7 @@ private static List<PutRecords.OutputEntry> getOutputEntries(PutRecords put, Run
throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entry.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
outputEntries = Flowable.create(FileSerde.reader(inputStream, PutRecords.OutputEntry.class), BackpressureStrategy.BUFFER).toList().blockingGet();
outputEntries = Flux.create(FileSerde.reader(inputStream, PutRecords.OutputEntry.class), FluxSink.OverflowStrategy.BUFFER).collectList().block();
}
return outputEntries;
}
Expand Down
Loading

0 comments on commit a57dafb

Please sign in to comment.