From 00a2df8a4289fdf698d497f9d1aebae77e6d37e2 Mon Sep 17 00:00:00 2001 From: lobanovdmitry Date: Tue, 1 Feb 2022 02:00:34 +0300 Subject: [PATCH 1/4] prepare error --- .../common/AbstractSubscriberAndProducer.java | 2 +- .../AbstractSubscriberAndServerProducer.java | 10 ++ .../common/TestSubscriberProducer.java | 5 + .../common/TestSubscriberProducerRx3.java | 5 + .../ReactorSubscriberAndClientProducer.java | 5 + .../ReactorSubscriberAndServerProducer.java | 5 + .../reactorgrpc/stub/ServerCalls.java | 32 +++-- .../JvmFatalServerErrorIntegrationTest.java | 124 ++++++++++++++++++ .../src/main/resources/ReactorStub.mustache | 7 +- .../stub/RxSubscriberAndServerProducer.java | 5 + .../salesforce/rxgrpc/stub/ServerCalls.java | 29 ++-- .../rxgrpc/src/main/resources/RxStub.mustache | 8 +- .../stub/RxSubscriberAndServerProducer.java | 5 + .../salesforce/rx3grpc/stub/ServerCalls.java | 29 ++-- .../src/main/resources/Rx3Stub.mustache | 8 +- 15 files changed, 238 insertions(+), 41 deletions(-) create mode 100644 reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/JvmFatalServerErrorIntegrationTest.java diff --git a/common/reactive-grpc-common/src/main/java/com/salesforce/reactivegrpc/common/AbstractSubscriberAndProducer.java b/common/reactive-grpc-common/src/main/java/com/salesforce/reactivegrpc/common/AbstractSubscriberAndProducer.java index 275e3d56..9bc723f9 100644 --- a/common/reactive-grpc-common/src/main/java/com/salesforce/reactivegrpc/common/AbstractSubscriberAndProducer.java +++ b/common/reactive-grpc-common/src/main/java/com/salesforce/reactivegrpc/common/AbstractSubscriberAndProducer.java @@ -443,7 +443,7 @@ private boolean checkTerminated(boolean d, boolean empty, CallStreamObserver return false; } - private static Throwable prepareError(Throwable throwable) { + protected Throwable prepareError(Throwable throwable) { if (throwable instanceof StatusException || throwable instanceof StatusRuntimeException) { return throwable; } else { diff --git a/common/reactive-grpc-common/src/main/java/com/salesforce/reactivegrpc/common/AbstractSubscriberAndServerProducer.java b/common/reactive-grpc-common/src/main/java/com/salesforce/reactivegrpc/common/AbstractSubscriberAndServerProducer.java index ff869c86..01b22592 100644 --- a/common/reactive-grpc-common/src/main/java/com/salesforce/reactivegrpc/common/AbstractSubscriberAndServerProducer.java +++ b/common/reactive-grpc-common/src/main/java/com/salesforce/reactivegrpc/common/AbstractSubscriberAndServerProducer.java @@ -18,6 +18,12 @@ public abstract class AbstractSubscriberAndServerProducer extends AbstractSubscriberAndProducer { + private final Function prepareError; + + protected AbstractSubscriberAndServerProducer(Function prepareError) { + this.prepareError = prepareError; + } + @Override public void subscribe(CallStreamObserver downstream) { super.subscribe(downstream); @@ -28,4 +34,8 @@ public void run() { } }); } + + protected Throwable prepareError(Throwable throwable) { + return prepareError.apply(throwable); + } } diff --git a/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducer.java b/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducer.java index 9173cb57..f64bbaca 100644 --- a/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducer.java +++ b/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducer.java @@ -33,4 +33,9 @@ protected Subscription fuse(Subscription s) { return s; } + + @Override + protected Throwable prepareError(Throwable throwable) { + return throwable; + } } diff --git a/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducerRx3.java b/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducerRx3.java index 5323955e..ac8808d7 100644 --- a/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducerRx3.java +++ b/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducerRx3.java @@ -34,4 +34,9 @@ protected Subscription fuse(Subscription s) { return s; } + + @Override + protected Throwable prepareError(Throwable throwable) { + return throwable; + } } diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorSubscriberAndClientProducer.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorSubscriberAndClientProducer.java index b07d8fc9..d3d9bdd5 100644 --- a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorSubscriberAndClientProducer.java +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorSubscriberAndClientProducer.java @@ -36,4 +36,9 @@ protected Subscription fuse(Subscription s) { return s; } + + @Override + protected Throwable prepareError(Throwable throwable) { + return throwable; + } } diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorSubscriberAndServerProducer.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorSubscriberAndServerProducer.java index 14e136a0..cd3a6ff6 100644 --- a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorSubscriberAndServerProducer.java +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorSubscriberAndServerProducer.java @@ -8,6 +8,7 @@ package com.salesforce.reactorgrpc.stub; import com.salesforce.reactivegrpc.common.AbstractSubscriberAndServerProducer; +import com.salesforce.reactivegrpc.common.Function; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Fuseable; @@ -21,6 +22,10 @@ public class ReactorSubscriberAndServerProducer extends AbstractSubscriberAndServerProducer implements CoreSubscriber { + public ReactorSubscriberAndServerProducer(Function prepareError) { + super(prepareError); + } + @Override protected Subscription fuse(Subscription s) { if (s instanceof Fuseable.QueueSubscription) { diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ServerCalls.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ServerCalls.java index 032d33b5..85ed9c93 100644 --- a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ServerCalls.java +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ServerCalls.java @@ -14,10 +14,11 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; -import java.util.function.Function; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.function.Function; + /** * Utility functions for processing different server call idioms. We have one-to-one correspondence * between utilities in this class and the potential signatures in a generated server stub class so @@ -33,7 +34,8 @@ private ServerCalls() { */ public static void oneToOne( TRequest request, StreamObserver responseObserver, - Function, Mono> delegate) { + Function, Mono> delegate, + Function prepareError) { try { Mono rxRequest = Mono.just(request); @@ -46,10 +48,10 @@ public static void oneToOne( } responseObserver.onNext(value); }, - throwable -> responseObserver.onError(prepareError(throwable)), + throwable -> responseObserver.onError(prepareError.apply(throwable)), responseObserver::onCompleted); } catch (Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } } @@ -59,15 +61,16 @@ public static void oneToOne( */ public static void oneToMany( TRequest request, StreamObserver responseObserver, - Function, Flux> delegate) { + Function, Flux> delegate, + Function prepareError) { try { Mono rxRequest = Mono.just(request); Flux rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest)); - ReactorSubscriberAndServerProducer server = rxResponse.subscribeWith(new ReactorSubscriberAndServerProducer<>()); + ReactorSubscriberAndServerProducer server = rxResponse.subscribeWith(new ReactorSubscriberAndServerProducer<>(prepareError::apply)); server.subscribe((ServerCallStreamObserver) responseObserver); } catch (Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } } @@ -78,6 +81,7 @@ public static void oneToMany( public static StreamObserver manyToOne( StreamObserver responseObserver, Function, Mono> delegate, + Function prepareError, CallOptions options) { final int prefetch = ReactorCallOptions.getPrefetch(options); @@ -99,13 +103,13 @@ public static StreamObserver manyToOne( // Don't try to respond if the server has already canceled the request if (!streamObserverPublisher.isCancelled()) { streamObserverPublisher.abortPendingCancel(); - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } }, responseObserver::onCompleted ); } catch (Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } return streamObserverPublisher; @@ -118,6 +122,7 @@ public static StreamObserver manyToOne( public static StreamObserver manyToMany( StreamObserver responseObserver, Function, Flux> delegate, + Function prepareError, CallOptions options) { final int prefetch = ReactorCallOptions.getPrefetch(options); @@ -128,18 +133,21 @@ public static StreamObserver manyToMany( try { Flux rxResponse = Preconditions.checkNotNull(delegate.apply(Flux.from(streamObserverPublisher))); - ReactorSubscriberAndServerProducer subscriber = new ReactorSubscriberAndServerProducer<>(); + ReactorSubscriberAndServerProducer subscriber = new ReactorSubscriberAndServerProducer<>(prepareError::apply); subscriber.subscribe((ServerCallStreamObserver) responseObserver); // Don't try to respond if the server has already canceled the request rxResponse.subscribe(subscriber); } catch (Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } return streamObserverPublisher; } - private static Throwable prepareError(Throwable throwable) { + /** + * Implements default error mapping. + */ + public static Throwable prepareError(Throwable throwable) { if (throwable instanceof StatusException || throwable instanceof StatusRuntimeException) { return throwable; } else { diff --git a/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/JvmFatalServerErrorIntegrationTest.java b/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/JvmFatalServerErrorIntegrationTest.java new file mode 100644 index 00000000..5a3775c2 --- /dev/null +++ b/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/JvmFatalServerErrorIntegrationTest.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2019, Salesforce.com, Inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.reactorgrpc; + +import io.grpc.*; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.concurrent.Executors; + +@SuppressWarnings("unchecked") +public class JvmFatalServerErrorIntegrationTest { + private static Server server; + private static ManagedChannel channel; + + @BeforeClass + public static void setupServer() throws Exception { + ReactorGreeterGrpc.GreeterImplBase svc = new ReactorGreeterGrpc.GreeterImplBase() { + @Override + public Mono sayHello(Mono reactorRequest) { + return reactorRequest.map(this::map); + } + + @Override + public Flux sayHelloRespStream(Mono reactorRequest) { + return reactorRequest.map(this::map).flux(); + } + + @Override + public Mono sayHelloReqStream(Flux reactorRequest) { + return reactorRequest.map(this::map).single(); + } + + @Override + public Flux sayHelloBothStream(Flux reactorRequest) { + return reactorRequest.map(this::map); + } + + private HelloResponse map(HelloRequest request) { + throw new NoSuchMethodError("Fatal!"); + } + + @Override + protected Throwable onErrorMap(Throwable throwable) { + if (throwable instanceof LinkageError) { + return Status.INTERNAL.withDescription("Linkage error:" + throwable.getMessage()).asRuntimeException(); + } + return super.onErrorMap(throwable); + } + }; + + server = ServerBuilder.forPort(9000).addService(svc).build().start(); + channel = ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build(); + } + + @Before + public void init() { + StepVerifier.setDefaultTimeout(Duration.ofSeconds(3)); + } + + @AfterClass + public static void stopServer() { + server.shutdown(); + channel.shutdown(); + + server = null; + channel = null; + } + + @Test + public void oneToOne() { + ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel); + Mono resp = Mono.just(HelloRequest.getDefaultInstance()).transform(stub::sayHello); + + StepVerifier.create(resp) + .verifyErrorMatches(t -> t instanceof StatusRuntimeException && ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.INTERNAL); + } + + @Test + public void oneToMany() { + ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel); + Flux resp = Mono.just(HelloRequest.getDefaultInstance()).as(stub::sayHelloRespStream); + Flux test = resp + .doOnNext(System.out::println) + .doOnError(throwable -> System.out.println(throwable.getMessage())) + .doOnComplete(() -> System.out.println("Completed")) + .doOnCancel(() -> System.out.println("Client canceled")); + + StepVerifier.create(resp) + .verifyErrorMatches(t -> t instanceof StatusRuntimeException && ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.INTERNAL); + } + + @Test + public void manyToOne() { + ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel) + .withExecutor(Executors.newSingleThreadExecutor()); + Flux req = Flux.just(HelloRequest.getDefaultInstance()); + Mono resp = req.as(stub::sayHelloReqStream); + + StepVerifier.create(resp) + .verifyErrorMatches(t -> t instanceof StatusRuntimeException && ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.INTERNAL); + } + + @Test + public void manyToMany() { + ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel); + Flux req = Flux.just(HelloRequest.getDefaultInstance()); + Flux resp = req.transform(stub::sayHelloBothStream); + + StepVerifier.create(resp) + .verifyErrorMatches(t -> t instanceof StatusRuntimeException && ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.INTERNAL); + } +} diff --git a/reactor/reactor-grpc/src/main/resources/ReactorStub.mustache b/reactor/reactor-grpc/src/main/resources/ReactorStub.mustache index 5a54b028..3d97dce3 100644 --- a/reactor/reactor-grpc/src/main/resources/ReactorStub.mustache +++ b/reactor/reactor-grpc/src/main/resources/ReactorStub.mustache @@ -104,6 +104,9 @@ public final class {{className}} { return null; } + protected Throwable onErrorMap(Throwable throwable) { + return com.salesforce.reactorgrpc.stub.ServerCalls.prepareError(throwable); + } } {{#methods}} @@ -132,7 +135,7 @@ public final class {{className}} { case METHODID_{{methodNameUpperUnderscore}}: com.salesforce.reactorgrpc.stub.ServerCalls.{{reactiveCallsMethodName}}(({{inputType}}) request, (io.grpc.stub.StreamObserver<{{outputType}}>) responseObserver, - serviceImpl::{{methodNameCamelCase}}); + serviceImpl::{{methodNameCamelCase}}, serviceImpl::onErrorMap); break; {{/isManyInput}} {{/methods}} @@ -150,7 +153,7 @@ public final class {{className}} { case METHODID_{{methodNameUpperUnderscore}}: return (io.grpc.stub.StreamObserver) com.salesforce.reactorgrpc.stub.ServerCalls.{{reactiveCallsMethodName}}( (io.grpc.stub.StreamObserver<{{outputType}}>) responseObserver, - serviceImpl::{{methodNameCamelCase}}, serviceImpl.getCallOptions(methodId)); + serviceImpl::{{methodNameCamelCase}}, serviceImpl::onErrorMap, serviceImpl.getCallOptions(methodId)); {{/isManyInput}} {{/methods}} default: diff --git a/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxSubscriberAndServerProducer.java b/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxSubscriberAndServerProducer.java index 7f157aad..89532d60 100644 --- a/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxSubscriberAndServerProducer.java +++ b/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxSubscriberAndServerProducer.java @@ -8,6 +8,7 @@ package com.salesforce.rxgrpc.stub; import com.salesforce.reactivegrpc.common.AbstractSubscriberAndServerProducer; +import com.salesforce.reactivegrpc.common.Function; import io.reactivex.FlowableSubscriber; import io.reactivex.internal.fuseable.QueueSubscription; import org.reactivestreams.Subscription; @@ -21,6 +22,10 @@ public class RxSubscriberAndServerProducer extends AbstractSubscriberAndServerProducer implements FlowableSubscriber { + public RxSubscriberAndServerProducer(Function prepareError) { + super(prepareError); + } + @Override protected Subscription fuse(Subscription s) { if (s instanceof QueueSubscription) { diff --git a/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/ServerCalls.java b/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/ServerCalls.java index f899b5e7..8b1284f4 100644 --- a/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/ServerCalls.java +++ b/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/ServerCalls.java @@ -35,7 +35,8 @@ private ServerCalls() { public static void oneToOne( final TRequest request, final StreamObserver responseObserver, - final Function, Single> delegate) { + final Function, Single> delegate, + final Function prepareError) { try { final Single rxRequest = Single.just(request); @@ -55,11 +56,11 @@ public void accept(TResponse value) { new Consumer() { @Override public void accept(Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } }); } catch (Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } } @@ -70,16 +71,17 @@ public void accept(Throwable throwable) { public static void oneToMany( final TRequest request, final StreamObserver responseObserver, - final Function, Flowable> delegate) { + final Function, Flowable> delegate, + final Function prepareError) { try { final Single rxRequest = Single.just(request); final Flowable rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest)); final RxSubscriberAndServerProducer serverProducer = - rxResponse.subscribeWith(new RxSubscriberAndServerProducer()); + rxResponse.subscribeWith(new RxSubscriberAndServerProducer(prepareError::apply)); serverProducer.subscribe((ServerCallStreamObserver) responseObserver); } catch (Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } } @@ -90,6 +92,7 @@ public static void oneToMany( public static StreamObserver manyToOne( final StreamObserver responseObserver, final Function, Single> delegate, + final Function prepareError, final CallOptions options) { final int prefetch = RxCallOptions.getPrefetch(options); @@ -117,13 +120,13 @@ public void accept(Throwable throwable) { // Don't try to respond if the server has already canceled the request if (!streamObserverPublisher.isCancelled()) { streamObserverPublisher.abortPendingCancel(); - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } } } ); } catch (Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } return streamObserverPublisher; @@ -136,6 +139,7 @@ public void accept(Throwable throwable) { public static StreamObserver manyToMany( final StreamObserver responseObserver, final Function, Flowable> delegate, + final Function prepareError, final CallOptions options) { final int prefetch = RxCallOptions.getPrefetch(options); @@ -146,18 +150,21 @@ public static StreamObserver manyToMany( try { final Flowable rxResponse = Preconditions.checkNotNull(delegate.apply(Flowable.fromPublisher(streamObserverPublisher))); - final RxSubscriberAndServerProducer subscriber = new RxSubscriberAndServerProducer(); + final RxSubscriberAndServerProducer subscriber = new RxSubscriberAndServerProducer(prepareError::apply); subscriber.subscribe((ServerCallStreamObserver) responseObserver); // Don't try to respond if the server has already canceled the request rxResponse.subscribe(subscriber); } catch (Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } return streamObserverPublisher; } - private static Throwable prepareError(Throwable throwable) { + /** + * Implements default error mapping. + */ + public static Throwable prepareError(Throwable throwable) { if (throwable instanceof StatusException || throwable instanceof StatusRuntimeException) { return throwable; } else { diff --git a/rx-java/rxgrpc/src/main/resources/RxStub.mustache b/rx-java/rxgrpc/src/main/resources/RxStub.mustache index 32cdc7bf..bb882231 100644 --- a/rx-java/rxgrpc/src/main/resources/RxStub.mustache +++ b/rx-java/rxgrpc/src/main/resources/RxStub.mustache @@ -126,6 +126,10 @@ public final class {{className}} { return null; } + protected Throwable onErrorMap(Throwable throwable) { + return com.salesforce.rxgrpc.stub.ServerCalls.prepareError(throwable); + } + } {{#methods}} @@ -159,7 +163,7 @@ public final class {{className}} { public {{#isManyOutput}}io.reactivex.Flowable{{/isManyOutput}}{{^isManyOutput}}io.reactivex.Single{{/isManyOutput}}<{{outputType}}> apply({{#isManyInput}}io.reactivex.Flowable{{/isManyInput}}{{^isManyInput}}io.reactivex.Single{{/isManyInput}}<{{inputType}}> single) { return serviceImpl.{{methodNameCamelCase}}(single); } - }); + }, serviceImpl::onErrorMap); break; {{/isManyInput}} {{/methods}} @@ -177,7 +181,7 @@ public final class {{className}} { case METHODID_{{methodNameUpperUnderscore}}: return (io.grpc.stub.StreamObserver) com.salesforce.rxgrpc.stub.ServerCalls.{{reactiveCallsMethodName}}( (io.grpc.stub.StreamObserver<{{outputType}}>) responseObserver, - serviceImpl::{{methodNameCamelCase}}, serviceImpl.getCallOptions(methodId)); + serviceImpl::{{methodNameCamelCase}}, serviceImpl::onErrorMap, serviceImpl.getCallOptions(methodId)); {{/isManyInput}} {{/methods}} default: diff --git a/rx3-java/rx3grpc-stub/src/main/java/com/salesforce/rx3grpc/stub/RxSubscriberAndServerProducer.java b/rx3-java/rx3grpc-stub/src/main/java/com/salesforce/rx3grpc/stub/RxSubscriberAndServerProducer.java index 88ef0303..f25498bf 100644 --- a/rx3-java/rx3grpc-stub/src/main/java/com/salesforce/rx3grpc/stub/RxSubscriberAndServerProducer.java +++ b/rx3-java/rx3grpc-stub/src/main/java/com/salesforce/rx3grpc/stub/RxSubscriberAndServerProducer.java @@ -7,6 +7,7 @@ package com.salesforce.rx3grpc.stub; +import com.salesforce.reactivegrpc.common.Function; import org.reactivestreams.Subscription; import com.salesforce.reactivegrpc.common.AbstractSubscriberAndServerProducer; @@ -23,6 +24,10 @@ public class RxSubscriberAndServerProducer extends AbstractSubscriberAndServerProducer implements FlowableSubscriber { + public RxSubscriberAndServerProducer(Function prepareError) { + super(prepareError); + } + @Override protected Subscription fuse(Subscription s) { if (s instanceof QueueSubscription) { diff --git a/rx3-java/rx3grpc-stub/src/main/java/com/salesforce/rx3grpc/stub/ServerCalls.java b/rx3-java/rx3grpc-stub/src/main/java/com/salesforce/rx3grpc/stub/ServerCalls.java index 0930fd8a..1ba9cc93 100644 --- a/rx3-java/rx3grpc-stub/src/main/java/com/salesforce/rx3grpc/stub/ServerCalls.java +++ b/rx3-java/rx3grpc-stub/src/main/java/com/salesforce/rx3grpc/stub/ServerCalls.java @@ -36,7 +36,8 @@ private ServerCalls() { public static void oneToOne( final TRequest request, final StreamObserver responseObserver, - final Function, Single> delegate) { + final Function, Single> delegate, + final Function prepareError) { try { final Single rxRequest = Single.just(request); @@ -56,11 +57,11 @@ public void accept(TResponse value) { new Consumer() { @Override public void accept(Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } }); } catch (Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } } @@ -71,16 +72,17 @@ public void accept(Throwable throwable) { public static void oneToMany( final TRequest request, final StreamObserver responseObserver, - final Function, Flowable> delegate) { + final Function, Flowable> delegate, + final Function prepareError) { try { final Single rxRequest = Single.just(request); final Flowable rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest)); final RxSubscriberAndServerProducer serverProducer = - rxResponse.subscribeWith(new RxSubscriberAndServerProducer()); + rxResponse.subscribeWith(new RxSubscriberAndServerProducer(prepareError::apply)); serverProducer.subscribe((ServerCallStreamObserver) responseObserver); } catch (Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } } @@ -91,6 +93,7 @@ public static void oneToMany( public static StreamObserver manyToOne( final StreamObserver responseObserver, final Function, Single> delegate, + final Function prepareError, final CallOptions options) { final int prefetch = RxCallOptions.getPrefetch(options); @@ -118,13 +121,13 @@ public void accept(Throwable throwable) { // Don't try to respond if the server has already canceled the request if (!streamObserverPublisher.isCancelled()) { streamObserverPublisher.abortPendingCancel(); - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } } } ); } catch (Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } return streamObserverPublisher; @@ -137,6 +140,7 @@ public void accept(Throwable throwable) { public static StreamObserver manyToMany( final StreamObserver responseObserver, final Function, Flowable> delegate, + final Function prepareError, final CallOptions options) { final int prefetch = RxCallOptions.getPrefetch(options); @@ -147,18 +151,21 @@ public static StreamObserver manyToMany( try { final Flowable rxResponse = Preconditions.checkNotNull(delegate.apply(Flowable.fromPublisher(streamObserverPublisher))); - final RxSubscriberAndServerProducer subscriber = new RxSubscriberAndServerProducer(); + final RxSubscriberAndServerProducer subscriber = new RxSubscriberAndServerProducer(prepareError::apply); subscriber.subscribe((ServerCallStreamObserver) responseObserver); // Don't try to respond if the server has already canceled the request rxResponse.subscribe(subscriber); } catch (Throwable throwable) { - responseObserver.onError(prepareError(throwable)); + responseObserver.onError(prepareError.apply(throwable)); } return streamObserverPublisher; } - private static Throwable prepareError(Throwable throwable) { + /** + * Implements default error mapping. + */ + public static Throwable prepareError(Throwable throwable) { if (throwable instanceof StatusException || throwable instanceof StatusRuntimeException) { return throwable; } else { diff --git a/rx3-java/rx3grpc/src/main/resources/Rx3Stub.mustache b/rx3-java/rx3grpc/src/main/resources/Rx3Stub.mustache index 3bfcb75d..535e2abb 100644 --- a/rx3-java/rx3grpc/src/main/resources/Rx3Stub.mustache +++ b/rx3-java/rx3grpc/src/main/resources/Rx3Stub.mustache @@ -126,6 +126,10 @@ public final class {{className}} { return null; } + protected Throwable onErrorMap(Throwable throwable) { + return com.salesforce.rx3grpc.stub.ServerCalls.prepareError(throwable); + } + } {{#methods}} @@ -159,7 +163,7 @@ public final class {{className}} { public {{#isManyOutput}}io.reactivex.rxjava3.core.Flowable{{/isManyOutput}}{{^isManyOutput}}io.reactivex.rxjava3.core.Single{{/isManyOutput}}<{{outputType}}> apply({{#isManyInput}}io.reactivex.rxjava3.core.Flowable{{/isManyInput}}{{^isManyInput}}io.reactivex.rxjava3.core.Single{{/isManyInput}}<{{inputType}}> single) { return serviceImpl.{{methodNameCamelCase}}(single); } - }); + }, serviceImpl::onErrorMap); break; {{/isManyInput}} {{/methods}} @@ -177,7 +181,7 @@ public final class {{className}} { case METHODID_{{methodNameUpperUnderscore}}: return (io.grpc.stub.StreamObserver) com.salesforce.rx3grpc.stub.ServerCalls.{{reactiveCallsMethodName}}( (io.grpc.stub.StreamObserver<{{outputType}}>) responseObserver, - serviceImpl::{{methodNameCamelCase}}, serviceImpl.getCallOptions(methodId)); + serviceImpl::{{methodNameCamelCase}}, serviceImpl::onErrorMap, serviceImpl.getCallOptions(methodId)); {{/isManyInput}} {{/methods}} default: From 68b24f45a110a2278209f793b79389b983c87946 Mon Sep 17 00:00:00 2001 From: lobanovdmitry Date: Tue, 1 Feb 2022 23:53:51 +0300 Subject: [PATCH 2/4] prepare error --- .../reactivegrpc/common/TestSubscriberProducer.java | 5 ----- .../reactivegrpc/common/TestSubscriberProducerRx3.java | 5 ----- .../reactorgrpc/stub/ReactorSubscriberAndClientProducer.java | 5 ----- .../reactorgrpc/JvmFatalServerErrorIntegrationTest.java | 1 - .../main/java/com/salesforce/rx3grpc/stub/ServerCalls.java | 4 ++-- 5 files changed, 2 insertions(+), 18 deletions(-) diff --git a/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducer.java b/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducer.java index f64bbaca..9173cb57 100644 --- a/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducer.java +++ b/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducer.java @@ -33,9 +33,4 @@ protected Subscription fuse(Subscription s) { return s; } - - @Override - protected Throwable prepareError(Throwable throwable) { - return throwable; - } } diff --git a/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducerRx3.java b/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducerRx3.java index ac8808d7..5323955e 100644 --- a/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducerRx3.java +++ b/common/reactive-grpc-common/src/test/java/com/salesforce/reactivegrpc/common/TestSubscriberProducerRx3.java @@ -34,9 +34,4 @@ protected Subscription fuse(Subscription s) { return s; } - - @Override - protected Throwable prepareError(Throwable throwable) { - return throwable; - } } diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorSubscriberAndClientProducer.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorSubscriberAndClientProducer.java index d3d9bdd5..b07d8fc9 100644 --- a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorSubscriberAndClientProducer.java +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorSubscriberAndClientProducer.java @@ -36,9 +36,4 @@ protected Subscription fuse(Subscription s) { return s; } - - @Override - protected Throwable prepareError(Throwable throwable) { - return throwable; - } } diff --git a/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/JvmFatalServerErrorIntegrationTest.java b/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/JvmFatalServerErrorIntegrationTest.java index 5a3775c2..d41846d8 100644 --- a/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/JvmFatalServerErrorIntegrationTest.java +++ b/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/JvmFatalServerErrorIntegrationTest.java @@ -19,7 +19,6 @@ import java.time.Duration; import java.util.concurrent.Executors; -@SuppressWarnings("unchecked") public class JvmFatalServerErrorIntegrationTest { private static Server server; private static ManagedChannel channel; diff --git a/rx3-java/rx3grpc-stub/src/main/java/com/salesforce/rx3grpc/stub/ServerCalls.java b/rx3-java/rx3grpc-stub/src/main/java/com/salesforce/rx3grpc/stub/ServerCalls.java index 1ba9cc93..1ae7b1b2 100644 --- a/rx3-java/rx3grpc-stub/src/main/java/com/salesforce/rx3grpc/stub/ServerCalls.java +++ b/rx3-java/rx3grpc-stub/src/main/java/com/salesforce/rx3grpc/stub/ServerCalls.java @@ -79,7 +79,7 @@ public static void oneToMany( final Flowable rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest)); final RxSubscriberAndServerProducer serverProducer = - rxResponse.subscribeWith(new RxSubscriberAndServerProducer(prepareError::apply)); + rxResponse.subscribeWith(new RxSubscriberAndServerProducer(prepareError)); serverProducer.subscribe((ServerCallStreamObserver) responseObserver); } catch (Throwable throwable) { responseObserver.onError(prepareError.apply(throwable)); @@ -151,7 +151,7 @@ public static StreamObserver manyToMany( try { final Flowable rxResponse = Preconditions.checkNotNull(delegate.apply(Flowable.fromPublisher(streamObserverPublisher))); - final RxSubscriberAndServerProducer subscriber = new RxSubscriberAndServerProducer(prepareError::apply); + final RxSubscriberAndServerProducer subscriber = new RxSubscriberAndServerProducer(prepareError); subscriber.subscribe((ServerCallStreamObserver) responseObserver); // Don't try to respond if the server has already canceled the request rxResponse.subscribe(subscriber); From 35751cdf683dedf5f1bd372f2a1a5cd4d6919e31 Mon Sep 17 00:00:00 2001 From: lobanovdmitry Date: Wed, 2 Feb 2022 00:22:31 +0300 Subject: [PATCH 3/4] prepare error --- .../main/java/com/salesforce/reactorgrpc/stub/ServerCalls.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ServerCalls.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ServerCalls.java index 85ed9c93..ef9ad536 100644 --- a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ServerCalls.java +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ServerCalls.java @@ -14,11 +14,10 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; +import java.util.function.Function; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.function.Function; - /** * Utility functions for processing different server call idioms. We have one-to-one correspondence * between utilities in this class and the potential signatures in a generated server stub class so From 71c0dcb52d60e15ceaed3109e97f32e7b4df2d45 Mon Sep 17 00:00:00 2001 From: lobanovdmitry Date: Wed, 2 Feb 2022 00:34:46 +0300 Subject: [PATCH 4/4] Add test --- .../JvmFatalServerErrorIntegrationTest.java | 4 +- .../JvmFatalServerErrorIntegrationTest.java | 130 ++++++++++++++++++ .../JvmFatalServerErrorIntegrationTest.java | 130 ++++++++++++++++++ 3 files changed, 262 insertions(+), 2 deletions(-) create mode 100644 rx-java/rxgrpc-test/src/test/java/com/salesforce/rxgrpc/JvmFatalServerErrorIntegrationTest.java create mode 100644 rx3-java/rx3grpc-test/src/test/java/com/salesforce/rx3grpc/JvmFatalServerErrorIntegrationTest.java diff --git a/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/JvmFatalServerErrorIntegrationTest.java b/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/JvmFatalServerErrorIntegrationTest.java index d41846d8..d32b5d5f 100644 --- a/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/JvmFatalServerErrorIntegrationTest.java +++ b/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/JvmFatalServerErrorIntegrationTest.java @@ -52,8 +52,8 @@ private HelloResponse map(HelloRequest request) { @Override protected Throwable onErrorMap(Throwable throwable) { - if (throwable instanceof LinkageError) { - return Status.INTERNAL.withDescription("Linkage error:" + throwable.getMessage()).asRuntimeException(); + if (throwable instanceof NoSuchMethodError) { + return Status.INTERNAL.withDescription("NoSuchMethod:" + throwable.getMessage()).asRuntimeException(); } return super.onErrorMap(throwable); } diff --git a/rx-java/rxgrpc-test/src/test/java/com/salesforce/rxgrpc/JvmFatalServerErrorIntegrationTest.java b/rx-java/rxgrpc-test/src/test/java/com/salesforce/rxgrpc/JvmFatalServerErrorIntegrationTest.java new file mode 100644 index 00000000..2f16b581 --- /dev/null +++ b/rx-java/rxgrpc-test/src/test/java/com/salesforce/rxgrpc/JvmFatalServerErrorIntegrationTest.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2019, Salesforce.com, Inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.rxgrpc; + +import io.grpc.*; +import io.reactivex.Flowable; +import io.reactivex.Single; +import io.reactivex.observers.TestObserver; +import io.reactivex.subscribers.TestSubscriber; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +@SuppressWarnings("unchecked") +public class JvmFatalServerErrorIntegrationTest { + @Rule + public UnhandledRxJavaErrorRule errorRule = new UnhandledRxJavaErrorRule().autoVerifyNoError(); + + private static Server server; + private static ManagedChannel channel; + + @BeforeClass + public static void setupServer() throws Exception { + RxGreeterGrpc.GreeterImplBase svc = new RxGreeterGrpc.GreeterImplBase() { + @Override + public Single sayHello(Single rxRequest) { + return rxRequest.map(this::kaboom); + } + + @Override + public Flowable sayHelloRespStream(Single rxRequest) { + return rxRequest.map(this::kaboom).toFlowable(); + } + + @Override + public Single sayHelloReqStream(Flowable rxRequest) { + return rxRequest.map(this::kaboom).firstOrError(); + } + + @Override + public Flowable sayHelloBothStream(Flowable rxRequest) { + return rxRequest.map(this::kaboom); + } + + private HelloResponse kaboom(HelloRequest request) { + throw new NoSuchMethodError("Fatal!"); + } + + @Override + protected Throwable onErrorMap(Throwable throwable) { + if (throwable instanceof NoSuchMethodError) { + return Status.INTERNAL.withDescription("NoSuchMethod:" + throwable.getMessage()).asRuntimeException(); + } + return super.onErrorMap(throwable); + } + }; + + server = ServerBuilder.forPort(9000).addService(svc).build().start(); + channel = ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build(); + } + + @AfterClass + public static void stopServer() { + server.shutdown(); + channel.shutdown(); + + server = null; + channel = null; + } + + @Test + public void oneToOne() { + RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel); + Single resp = Single.just(HelloRequest.getDefaultInstance()).compose(stub::sayHello); + TestObserver test = resp.test(); + + test.awaitTerminalEvent(3, TimeUnit.SECONDS); + test.assertError(t -> t instanceof StatusRuntimeException); + test.assertError(t -> ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.INTERNAL); + } + + @Test + public void oneToMany() { + RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel); + Flowable resp = Single.just(HelloRequest.getDefaultInstance()).as(stub::sayHelloRespStream); + TestSubscriber test = resp + .doOnNext(System.out::println) + .doOnError(throwable -> System.out.println(throwable.getMessage())) + .doOnComplete(() -> System.out.println("Completed")) + .doOnCancel(() -> System.out.println("Client canceled")) + .test(); + + test.awaitTerminalEvent(3, TimeUnit.SECONDS); + test.assertError(t -> t instanceof StatusRuntimeException); + test.assertError(t -> ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.INTERNAL); + } + + @Test + public void manyToOne() { + RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel); + Flowable req = Flowable.just(HelloRequest.getDefaultInstance()); + Single resp = req.as(stub::sayHelloReqStream); + TestObserver test = resp.test(); + + test.awaitTerminalEvent(3, TimeUnit.SECONDS); + test.assertError(t -> t instanceof StatusRuntimeException); + // Flowable requests get canceled when unexpected errors happen + test.assertError(t -> ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.INTERNAL); + } + + @Test + public void manyToMany() { + RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel); + Flowable req = Flowable.just(HelloRequest.getDefaultInstance()); + Flowable resp = req.compose(stub::sayHelloBothStream); + TestSubscriber test = resp.test(); + + test.awaitTerminalEvent(3, TimeUnit.SECONDS); + test.assertError(t -> t instanceof StatusRuntimeException); + test.assertError(t -> ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.INTERNAL); + } +} diff --git a/rx3-java/rx3grpc-test/src/test/java/com/salesforce/rx3grpc/JvmFatalServerErrorIntegrationTest.java b/rx3-java/rx3grpc-test/src/test/java/com/salesforce/rx3grpc/JvmFatalServerErrorIntegrationTest.java new file mode 100644 index 00000000..0ac9d53b --- /dev/null +++ b/rx3-java/rx3grpc-test/src/test/java/com/salesforce/rx3grpc/JvmFatalServerErrorIntegrationTest.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2019, Salesforce.com, Inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.rx3grpc; + +import io.grpc.*; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +@SuppressWarnings("unchecked") +public class JvmFatalServerErrorIntegrationTest { + @Rule + public UnhandledRxJavaErrorRule errorRule = new UnhandledRxJavaErrorRule().autoVerifyNoError(); + + private static Server server; + private static ManagedChannel channel; + + @BeforeClass + public static void setupServer() throws Exception { + Rx3GreeterGrpc.GreeterImplBase svc = new Rx3GreeterGrpc.GreeterImplBase() { + @Override + public Single sayHello(Single rxRequest) { + return rxRequest.map(this::kaboom); + } + + @Override + public Flowable sayHelloRespStream(Single rxRequest) { + return rxRequest.map(this::kaboom).toFlowable(); + } + + @Override + public Single sayHelloReqStream(Flowable rxRequest) { + return rxRequest.map(this::kaboom).firstOrError(); + } + + @Override + public Flowable sayHelloBothStream(Flowable rxRequest) { + return rxRequest.map(this::kaboom); + } + + private HelloResponse kaboom(HelloRequest request) { + throw new NoSuchMethodError("Fatal!"); + } + + @Override + protected Throwable onErrorMap(Throwable throwable) { + if (throwable instanceof NoSuchMethodError) { + return Status.INTERNAL.withDescription("NoSuchMethod:" + throwable.getMessage()).asRuntimeException(); + } + return super.onErrorMap(throwable); + } + }; + + server = ServerBuilder.forPort(9000).addService(svc).build().start(); + channel = ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build(); + } + + @AfterClass + public static void stopServer() { + server.shutdown(); + channel.shutdown(); + + server = null; + channel = null; + } + + @Test + public void oneToOne() throws InterruptedException { + Rx3GreeterGrpc.RxGreeterStub stub = Rx3GreeterGrpc.newRxStub(channel); + Single resp = Single.just(HelloRequest.getDefaultInstance()).compose(stub::sayHello); + TestObserver test = resp.test(); + + test.await(3, TimeUnit.SECONDS); + test.assertError(t -> t instanceof StatusRuntimeException); + test.assertError(t -> ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.INTERNAL); + } + + @Test + public void oneToMany() throws InterruptedException { + Rx3GreeterGrpc.RxGreeterStub stub = Rx3GreeterGrpc.newRxStub(channel); + Flowable resp = Single.just(HelloRequest.getDefaultInstance()).to(stub::sayHelloRespStream); + TestSubscriber test = resp + .doOnNext(System.out::println) + .doOnError(throwable -> System.out.println(throwable.getMessage())) + .doOnComplete(() -> System.out.println("Completed")) + .doOnCancel(() -> System.out.println("Client canceled")) + .test(); + + test.await(3, TimeUnit.SECONDS); + test.assertError(t -> t instanceof StatusRuntimeException); + test.assertError(t -> ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.INTERNAL); + } + + @Test + public void manyToOne() throws InterruptedException { + Rx3GreeterGrpc.RxGreeterStub stub = Rx3GreeterGrpc.newRxStub(channel); + Flowable req = Flowable.just(HelloRequest.getDefaultInstance()); + Single resp = req.to(stub::sayHelloReqStream); + TestObserver test = resp.test(); + + test.await(3, TimeUnit.SECONDS); + test.assertError(t -> t instanceof StatusRuntimeException); + // Flowable requests get canceled when unexpected errors happen + test.assertError(t -> ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.INTERNAL); + } + + @Test + public void manyToMany() throws InterruptedException { + Rx3GreeterGrpc.RxGreeterStub stub = Rx3GreeterGrpc.newRxStub(channel); + Flowable req = Flowable.just(HelloRequest.getDefaultInstance()); + Flowable resp = req.compose(stub::sayHelloBothStream); + TestSubscriber test = resp.test(); + + test.await(3, TimeUnit.SECONDS); + test.assertError(t -> t instanceof StatusRuntimeException); + test.assertError(t -> ((StatusRuntimeException) t).getStatus().getCode() == Status.Code.INTERNAL); + } +}