Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade to micronaut 3 #160

Merged
merged 5 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
projectVersion=2.3.1-SNAPSHOT
projectVersion=3.0.0-SNAPSHOT
micronautDocsVersion=2.0.0.RC1
micronautVersion=2.5.3
micronautTestVersion=2.3.3
micronautVersion=3.0.0-M2
micronautTestVersion=2.3.6

groovyVersion=3.0.8
spockVersion=2.0-M5-groovy-3.0
spockVersion=2.0-groovy-3.0

natsVersion=2.11.1
natsVersion=2.11.4

title=Micronaut Nats
projectDesc=Integration between Micronaut and nats.io
Expand Down
2 changes: 2 additions & 0 deletions nats/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ dependencies {
api "io.micronaut:micronaut-messaging:$micronautVersion"
api "io.micronaut:micronaut-inject-java"
api "io.nats:jnats:$natsVersion"
// TODO: Remove when RxJava2 -> Reactor PR is merged into core
api "io.micronaut.reactor:micronaut-reactor"

compileOnly "io.micronaut.micrometer:micronaut-micrometer-core"
compileOnly "io.micronaut:micronaut-management"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import javax.inject.Scope;
import javax.inject.Singleton;

import io.micronaut.nats.intercept.NatsIntroductionAdvice;
import io.micronaut.aop.Introduction;
import io.micronaut.context.annotation.AliasFor;
import io.micronaut.context.annotation.Type;
import io.micronaut.nats.intercept.NatsIntroductionAdvice;
import io.micronaut.retry.annotation.Recoverable;
import jakarta.inject.Scope;
import jakarta.inject.Singleton;

/**
* An introduction advice that automatically implemnts interfaces and abstract classes and publishes nats messages.
Expand Down
2 changes: 2 additions & 0 deletions nats/src/main/java/io/micronaut/nats/annotation/Subject.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.annotation.Target;

import io.micronaut.context.annotation.AliasFor;
import io.micronaut.context.annotation.Executable;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.messaging.annotation.MessageMapping;

Expand All @@ -35,6 +36,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.PARAMETER})
@Bindable
@Executable
public @interface Subject {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
import java.util.Map;
import java.util.Optional;

import javax.inject.Singleton;

import io.micronaut.core.bind.ArgumentBinder;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.ArrayUtils;
import io.nats.client.Message;
import jakarta.inject.Singleton;

/**
* Used to determine which {@link NatsArgumentBinder} to use for any given argument.
Expand Down
5 changes: 2 additions & 3 deletions nats/src/main/java/io/micronaut/nats/bind/NatsBodyBinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

import java.util.Optional;

import javax.inject.Singleton;

import io.micronaut.nats.serdes.NatsMessageSerDesRegistry;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.type.Argument;
import io.micronaut.messaging.annotation.Body;
import io.micronaut.nats.serdes.NatsMessageSerDesRegistry;
import io.nats.client.Message;
import jakarta.inject.Singleton;

/**
* Binds an argument of with the {@link Body} annotation from the {@link Message}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
*/
package io.micronaut.nats.bind;

import javax.inject.Singleton;

import io.micronaut.core.convert.ArgumentConversionContext;
import io.nats.client.Message;
import jakarta.inject.Singleton;

/**
* The default binder for binding an argument from the {@link Message}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
import java.security.GeneralSecurityException;
import java.util.concurrent.ExecutorService;

import javax.inject.Named;
import javax.inject.Singleton;

import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.exceptions.BeanInstantiationException;
import io.micronaut.scheduling.TaskExecutors;
import io.nats.client.Connection;
import io.nats.client.Nats;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

/**
* A factory for creating a connection to nats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package io.micronaut.nats.connect;

import javax.inject.Named;
import javax.validation.constraints.NotNull;

import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Named;

/**
* The default Nats configuration class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

import java.util.Optional;

import javax.inject.Singleton;

import io.micronaut.context.annotation.Primary;
import io.nats.client.Message;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import io.micronaut.scheduling.executor.ExecutorConfiguration;
import io.micronaut.scheduling.executor.ExecutorType;
import io.micronaut.scheduling.executor.UserExecutorConfiguration;

import javax.inject.Named;
import javax.inject.Singleton;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

/**
* Configures a {@link java.util.concurrent.ScheduledExecutorService} for running {@link io.micronaut.nats.annotation.NatsListener} instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
import java.util.Collections;
import java.util.Map;

import javax.inject.Singleton;

import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.StringUtils;
import io.micronaut.health.HealthStatus;
import io.micronaut.management.endpoint.health.HealthEndpoint;
import io.micronaut.management.health.indicator.AbstractHealthIndicator;
import io.nats.client.Connection;
import jakarta.inject.Singleton;

/**
* A {@link io.micronaut.management.health.indicator.HealthIndicator} for Nats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import javax.inject.Qualifier;
import javax.inject.Singleton;

import io.micronaut.context.BeanContext;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
Expand All @@ -44,6 +41,7 @@
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Message;
import jakarta.inject.Singleton;

/**
* An {@link ExecutableMethodProcessor} that will process all beans annotated with {@link NatsListener}.
Expand All @@ -52,7 +50,7 @@
* @since 1.0.0
*/
@Singleton
public class NatsConsumerAdvice implements ExecutableMethodProcessor<NatsListener>, AutoCloseable {
public class NatsConsumerAdvice implements ExecutableMethodProcessor<Subject>, AutoCloseable {

private final BeanContext beanContext;

Expand Down Expand Up @@ -91,7 +89,7 @@ public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> met
.orElse(NatsConnection.DEFAULT_CONNECTION);

io.micronaut.context.Qualifier<Object> qualifer =
beanDefinition.getAnnotationTypeByStereotype(Qualifier.class)
beanDefinition.getAnnotationTypeByStereotype("javax.inject.Qualifier")
.map(type -> Qualifiers.byAnnotation(beanDefinition, type)).orElse(null);

Class<Object> beanType = (Class<Object>) beanDefinition.getBeanType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import javax.inject.Named;
import javax.inject.Singleton;

import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.caffeine.cache.Cache;
Expand All @@ -46,15 +43,15 @@
import io.micronaut.nats.serdes.NatsMessageSerDesRegistry;
import io.micronaut.scheduling.TaskExecutors;
import io.nats.client.Message;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
* Implementation of the {@link NatsClient} advice annotation.
Expand Down Expand Up @@ -88,7 +85,7 @@ public NatsIntroductionAdvice(BeanContext beanContext, ConversionService<?> conv
this.beanContext = beanContext;
this.conversionService = conversionService;
this.serDesRegistry = serDesRegistry;
this.scheduler = Schedulers.from(executorService);
this.scheduler = Schedulers.fromExecutorService(executorService);
}

@Override
Expand Down Expand Up @@ -147,22 +144,22 @@ public Object intercept(MethodInvocationContext<Object, Object> context) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending the message with publisher confirms.", context);
}
reactive = Flowable.fromPublisher(reactivePublisher.publish(publishState)).subscribeOn(scheduler);
reactive = Flux.from(reactivePublisher.publish(publishState)).subscribeOn(scheduler);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Publish is an RPC call. Publisher will complete when a response is received.",
context);
}
reactive = Flowable.fromPublisher(reactivePublisher.publishAndReply(publishState))
.flatMap(consumerState -> {
Object deserialized = deserialize(consumerState, publisherState.getDataType(),
publisherState.getDataType());
if (deserialized == null) {
return Flowable.empty();
} else {
return Flowable.just(deserialized);
}
}).subscribeOn(scheduler);
reactive = Flux.from(reactivePublisher.publishAndReply(publishState))
.flatMap(consumerState -> {
Object deserialized = deserialize(consumerState, publisherState.getDataType(),
publisherState.getDataType());
if (deserialized == null) {
return Flux.empty();
} else {
return Flux.just(deserialized);
}
}).subscribeOn(scheduler);
}
return conversionService.convert(reactive, context.getReturnType().getType()).orElseThrow(
() -> new NatsClientException(
Expand All @@ -173,28 +170,26 @@ public Object intercept(MethodInvocationContext<Object, Object> context) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending the message without publisher confirms.", context);
}
Throwable throwable =
Completable.fromPublisher(reactivePublisher.publish(publishState)).blockingGet();
if (throwable != null) {
throw new NatsClientException(
String.format("Failed to publish a message with subject: [%s]", subject), throwable,
Collections.singletonList(publishState));
}
return null;

return Mono.from(reactivePublisher.publish(publishState))
.onErrorResume(throwable -> Mono.error(new NatsClientException(
String.format("Failed to publish a message with subject: [%s]", subject),
throwable,
Collections.singletonList(publishState)))).block();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Publish is an RPC call. Blocking until a response is received.", context);
}
return Single.fromPublisher(reactivePublisher.publishAndReply(publishState))
.flatMapMaybe(message -> {
Object deserialized = deserialize(message, publisherState.getDataType(),
publisherState.getDataType());
if (deserialized == null) {
return Maybe.empty();
} else {
return Maybe.just(deserialized);
}
}).blockingGet();
return Mono.from(reactivePublisher.publishAndReply(publishState))
.flatMap(message -> {
Object deserialized = deserialize(message, publisherState.getDataType(),
publisherState.getDataType());
if (deserialized == null) {
return Mono.empty();
} else {
return Mono.just(deserialized);
}
}).block();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

import io.micronaut.core.annotation.Nullable;

import javax.annotation.concurrent.Immutable;

/**
* Stores the state of a Nats message to be published.
* This class should be treated as immutable.
*
* @author James Kleeh
* @since 1.1.0
*/
@Immutable
public class PublishState {

private final String subject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import io.micronaut.core.annotation.Internal;
import io.nats.client.Connection;
import io.nats.client.Message;
import io.reactivex.Completable;
import io.reactivex.Single;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

/**
* @author jgrimm
Expand All @@ -44,24 +43,23 @@ public RxJavaReactivePublisher(@Parameter Connection connection) {

@Override
public Publisher<Void> publish(PublishState publishState) {
return getConnection().flatMapCompletable(con -> publishInternal(publishState, con)).toFlowable();
return getConnection().flatMap(con -> publishInternal(publishState, con));
}

private Completable publishInternal(PublishState publishState, Connection con) {
return Completable.create(subscriber -> {
private Mono<Void> publishInternal(PublishState publishState, Connection con) {
return Mono.create(subscriber -> {
con.publish(publishState.getSubject(), publishState.getBody());
subscriber.onComplete();
subscriber.success();
});
}

@Override
public Publisher<Message> publishAndReply(PublishState publishState) {
return getConnection()
.flatMap(con -> Single.fromFuture(con.request(publishState.getSubject(), publishState.getBody())))
.toFlowable();
.flatMap(con -> Mono.fromFuture(con.request(publishState.getSubject(), publishState.getBody())));
}

private Single<Connection> getConnection() {
return Single.just(connection);
private Mono<Connection> getConnection() {
return Mono.just(connection);
}
}
Loading