Skip to content

Commit

Permalink
upgrade to micronaut 3 (#160)
Browse files Browse the repository at this point in the history
* upgrade to micronaut 3

adds jakarta.inject:jakarta.inject-api:2.0.0
bump jnats version from 2.11.1 to 2.11.4

fixes #173

* code review fixes

* migrate from rxjava2 to micronaut-reactor

* migrate from @Body to @MessageBody

* renames RxJavaReactivePublisher to ReactorReactivePublisher
  • Loading branch information
grimmjo authored Jun 29, 2021
1 parent f76de72 commit 09dcdf3
Show file tree
Hide file tree
Showing 26 changed files with 110 additions and 142 deletions.
10 changes: 5 additions & 5 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
projectVersion=2.3.1-SNAPSHOT
projectVersion=3.0.0-SNAPSHOT
micronautDocsVersion=2.0.0.RC1
micronautVersion=2.5.3
micronautTestVersion=2.3.3
micronautVersion=3.0.0-M2
micronautTestVersion=2.3.6

groovyVersion=3.0.8
spockVersion=2.0-M5-groovy-3.0
spockVersion=2.0-groovy-3.0

natsVersion=2.11.1
natsVersion=2.11.4

title=Micronaut Nats
projectDesc=Integration between Micronaut and nats.io
Expand Down
2 changes: 2 additions & 0 deletions nats/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ dependencies {
api "io.micronaut:micronaut-messaging:$micronautVersion"
api "io.micronaut:micronaut-inject-java"
api "io.nats:jnats:$natsVersion"
// TODO: Remove when RxJava2 -> Reactor PR is merged into core
api "io.micronaut.reactor:micronaut-reactor"

compileOnly "io.micronaut.micrometer:micronaut-micrometer-core"
compileOnly "io.micronaut:micronaut-management"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import javax.inject.Scope;
import javax.inject.Singleton;

import io.micronaut.nats.intercept.NatsIntroductionAdvice;
import io.micronaut.aop.Introduction;
import io.micronaut.context.annotation.AliasFor;
import io.micronaut.context.annotation.Type;
import io.micronaut.nats.intercept.NatsIntroductionAdvice;
import io.micronaut.retry.annotation.Recoverable;
import jakarta.inject.Scope;
import jakarta.inject.Singleton;

/**
* An introduction advice that automatically implemnts interfaces and abstract classes and publishes nats messages.
Expand Down
2 changes: 2 additions & 0 deletions nats/src/main/java/io/micronaut/nats/annotation/Subject.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.annotation.Target;

import io.micronaut.context.annotation.AliasFor;
import io.micronaut.context.annotation.Executable;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.messaging.annotation.MessageMapping;

Expand All @@ -35,6 +36,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.PARAMETER})
@Bindable
@Executable
public @interface Subject {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
import java.util.Map;
import java.util.Optional;

import javax.inject.Singleton;

import io.micronaut.core.bind.ArgumentBinder;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.ArrayUtils;
import io.nats.client.Message;
import jakarta.inject.Singleton;

/**
* Used to determine which {@link NatsArgumentBinder} to use for any given argument.
Expand Down
15 changes: 7 additions & 8 deletions nats/src/main/java/io/micronaut/nats/bind/NatsBodyBinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,21 @@

import java.util.Optional;

import javax.inject.Singleton;

import io.micronaut.nats.serdes.NatsMessageSerDesRegistry;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.type.Argument;
import io.micronaut.messaging.annotation.Body;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.nats.serdes.NatsMessageSerDesRegistry;
import io.nats.client.Message;
import jakarta.inject.Singleton;

/**
* Binds an argument of with the {@link Body} annotation from the {@link Message}.
* Binds an argument of with the {@link MessageBody} annotation from the {@link Message}.
*
* @author jgrimm
* @since 1.0.0
*/
@Singleton
public class NatsBodyBinder implements NatsAnnotatedArgumentBinder<Body> {
public class NatsBodyBinder implements NatsAnnotatedArgumentBinder<MessageBody> {

private final NatsMessageSerDesRegistry serDesRegistry;

Expand All @@ -45,8 +44,8 @@ public NatsBodyBinder(NatsMessageSerDesRegistry serDesRegistry) {
}

@Override
public Class<Body> getAnnotationType() {
return Body.class;
public Class<MessageBody> getAnnotationType() {
return MessageBody.class;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
*/
package io.micronaut.nats.bind;

import javax.inject.Singleton;

import io.micronaut.core.convert.ArgumentConversionContext;
import io.nats.client.Message;
import jakarta.inject.Singleton;

/**
* The default binder for binding an argument from the {@link Message}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
import java.security.GeneralSecurityException;
import java.util.concurrent.ExecutorService;

import javax.inject.Named;
import javax.inject.Singleton;

import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.exceptions.BeanInstantiationException;
import io.micronaut.scheduling.TaskExecutors;
import io.nats.client.Connection;
import io.nats.client.Nats;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

/**
* A factory for creating a connection to nats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package io.micronaut.nats.connect;

import javax.inject.Named;
import javax.validation.constraints.NotNull;

import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Named;

/**
* The default Nats configuration class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

import java.util.Optional;

import javax.inject.Singleton;

import io.micronaut.context.annotation.Primary;
import io.nats.client.Message;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import io.micronaut.scheduling.executor.ExecutorConfiguration;
import io.micronaut.scheduling.executor.ExecutorType;
import io.micronaut.scheduling.executor.UserExecutorConfiguration;

import javax.inject.Named;
import javax.inject.Singleton;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

/**
* Configures a {@link java.util.concurrent.ScheduledExecutorService} for running {@link io.micronaut.nats.annotation.NatsListener} instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
import java.util.Collections;
import java.util.Map;

import javax.inject.Singleton;

import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.StringUtils;
import io.micronaut.health.HealthStatus;
import io.micronaut.management.endpoint.health.HealthEndpoint;
import io.micronaut.management.health.indicator.AbstractHealthIndicator;
import io.nats.client.Connection;
import jakarta.inject.Singleton;

/**
* A {@link io.micronaut.management.health.indicator.HealthIndicator} for Nats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import javax.inject.Qualifier;
import javax.inject.Singleton;

import io.micronaut.context.BeanContext;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
Expand All @@ -44,6 +41,7 @@
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Message;
import jakarta.inject.Singleton;

/**
* An {@link ExecutableMethodProcessor} that will process all beans annotated with {@link NatsListener}.
Expand All @@ -52,7 +50,7 @@
* @since 1.0.0
*/
@Singleton
public class NatsConsumerAdvice implements ExecutableMethodProcessor<NatsListener>, AutoCloseable {
public class NatsConsumerAdvice implements ExecutableMethodProcessor<Subject>, AutoCloseable {

private final BeanContext beanContext;

Expand Down Expand Up @@ -91,7 +89,7 @@ public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> met
.orElse(NatsConnection.DEFAULT_CONNECTION);

io.micronaut.context.Qualifier<Object> qualifer =
beanDefinition.getAnnotationTypeByStereotype(Qualifier.class)
beanDefinition.getAnnotationTypeByStereotype("javax.inject.Qualifier")
.map(type -> Qualifiers.byAnnotation(beanDefinition, type)).orElse(null);

Class<Object> beanType = (Class<Object>) beanDefinition.getBeanType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import javax.inject.Named;
import javax.inject.Singleton;

import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.caffeine.cache.Cache;
Expand All @@ -36,6 +33,7 @@
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.annotation.Body;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.nats.annotation.NatsClient;
import io.micronaut.nats.annotation.NatsConnection;
import io.micronaut.nats.annotation.Subject;
Expand All @@ -46,15 +44,15 @@
import io.micronaut.nats.serdes.NatsMessageSerDesRegistry;
import io.micronaut.scheduling.TaskExecutors;
import io.nats.client.Message;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
* Implementation of the {@link NatsClient} advice annotation.
Expand Down Expand Up @@ -88,7 +86,7 @@ public NatsIntroductionAdvice(BeanContext beanContext, ConversionService<?> conv
this.beanContext = beanContext;
this.conversionService = conversionService;
this.serDesRegistry = serDesRegistry;
this.scheduler = Schedulers.from(executorService);
this.scheduler = Schedulers.fromExecutorService(executorService);
}

@Override
Expand Down Expand Up @@ -147,22 +145,22 @@ public Object intercept(MethodInvocationContext<Object, Object> context) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending the message with publisher confirms.", context);
}
reactive = Flowable.fromPublisher(reactivePublisher.publish(publishState)).subscribeOn(scheduler);
reactive = Flux.from(reactivePublisher.publish(publishState)).subscribeOn(scheduler);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Publish is an RPC call. Publisher will complete when a response is received.",
context);
}
reactive = Flowable.fromPublisher(reactivePublisher.publishAndReply(publishState))
.flatMap(consumerState -> {
Object deserialized = deserialize(consumerState, publisherState.getDataType(),
publisherState.getDataType());
if (deserialized == null) {
return Flowable.empty();
} else {
return Flowable.just(deserialized);
}
}).subscribeOn(scheduler);
reactive = Flux.from(reactivePublisher.publishAndReply(publishState))
.flatMap(consumerState -> {
Object deserialized = deserialize(consumerState, publisherState.getDataType(),
publisherState.getDataType());
if (deserialized == null) {
return Flux.empty();
} else {
return Flux.just(deserialized);
}
}).subscribeOn(scheduler);
}
return conversionService.convert(reactive, context.getReturnType().getType()).orElseThrow(
() -> new NatsClientException(
Expand All @@ -173,28 +171,26 @@ public Object intercept(MethodInvocationContext<Object, Object> context) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending the message without publisher confirms.", context);
}
Throwable throwable =
Completable.fromPublisher(reactivePublisher.publish(publishState)).blockingGet();
if (throwable != null) {
throw new NatsClientException(
String.format("Failed to publish a message with subject: [%s]", subject), throwable,
Collections.singletonList(publishState));
}
return null;

return Mono.from(reactivePublisher.publish(publishState))
.onErrorResume(throwable -> Mono.error(new NatsClientException(
String.format("Failed to publish a message with subject: [%s]", subject),
throwable,
Collections.singletonList(publishState)))).block();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Publish is an RPC call. Blocking until a response is received.", context);
}
return Single.fromPublisher(reactivePublisher.publishAndReply(publishState))
.flatMapMaybe(message -> {
Object deserialized = deserialize(message, publisherState.getDataType(),
publisherState.getDataType());
if (deserialized == null) {
return Maybe.empty();
} else {
return Maybe.just(deserialized);
}
}).blockingGet();
return Mono.from(reactivePublisher.publishAndReply(publishState))
.flatMap(message -> {
Object deserialized = deserialize(message, publisherState.getDataType(),
publisherState.getDataType());
if (deserialized == null) {
return Mono.empty();
} else {
return Mono.just(deserialized);
}
}).block();
}
}

Expand All @@ -214,10 +210,13 @@ private Object deserialize(Message message, Argument dataType, Argument returnTy

private Optional<Argument<?>> findBodyArgument(ExecutableMethod<?, ?> method) {
return Optional.ofNullable(Arrays.stream(method.getArguments())
.filter(arg -> arg.getAnnotationMetadata().hasAnnotation(Body.class)).findFirst().orElseGet(
.filter(arg -> arg.getAnnotationMetadata().hasAnnotation(Body.class)
|| arg.getAnnotationMetadata().hasAnnotation(
MessageBody.class)).findFirst().orElseGet(
() -> Arrays.stream(method.getArguments())
.filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Subject.class)).findFirst()
.orElse(null)));
.filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Subject.class))
.findFirst()
.orElse(null)));
}

private Optional<String> findSubjectKey(MethodInvocationContext<Object, Object> method) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

import io.micronaut.core.annotation.Nullable;

import javax.annotation.concurrent.Immutable;

/**
* Stores the state of a Nats message to be published.
* This class should be treated as immutable.
*
* @author James Kleeh
* @since 1.1.0
*/
@Immutable
public class PublishState {

private final String subject;
Expand Down
Loading

0 comments on commit 09dcdf3

Please sign in to comment.