diff --git a/common/reactive-grpc-common/src/main/java/com/salesforce/reactivegrpc/common/AbstractStreamObserverAndPublisher.java b/common/reactive-grpc-common/src/main/java/com/salesforce/reactivegrpc/common/AbstractStreamObserverAndPublisher.java index 00348fe9..cae18e14 100644 --- a/common/reactive-grpc-common/src/main/java/com/salesforce/reactivegrpc/common/AbstractStreamObserverAndPublisher.java +++ b/common/reactive-grpc-common/src/main/java/com/salesforce/reactivegrpc/common/AbstractStreamObserverAndPublisher.java @@ -85,7 +85,7 @@ public void request(long n) { private volatile boolean done; private Throwable error; - private volatile Subscriber downstream; + protected volatile Subscriber downstream; private volatile boolean cancelled; @@ -226,7 +226,7 @@ private void drainFused(final Subscriber subscriber) { for (;;) { if (cancelled) { - queue.clear(); + discardQueue(queue); downstream = null; return; } @@ -283,7 +283,7 @@ private void drain() { private boolean checkTerminated(boolean d, boolean empty, Subscriber subscriber, Queue q) { if (cancelled) { - q.clear(); + discardQueue(q); downstream = null; return true; } @@ -305,6 +305,7 @@ private boolean checkTerminated(boolean d, boolean empty, Subscriber @Override public void onNext(T t) { if (done || cancelled) { + discardElement(t); return; } @@ -419,7 +420,7 @@ public void cancel() { if (!outputFused) { if (WIP.getAndIncrement(this) == 0) { - queue.clear(); + discardQueue(queue); downstream = null; } } @@ -456,4 +457,10 @@ public boolean isEmpty() { public void clear() { queue.clear(); } + + protected void discardQueue(Queue q) { + q.clear(); + } + + protected void discardElement(T t) {} } \ No newline at end of file diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorClientStreamObserverAndPublisher.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorClientStreamObserverAndPublisher.java index 749fd30e..46b0d98a 100644 --- a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorClientStreamObserverAndPublisher.java +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorClientStreamObserverAndPublisher.java @@ -10,9 +10,13 @@ import com.salesforce.reactivegrpc.common.AbstractClientStreamObserverAndPublisher; import com.salesforce.reactivegrpc.common.Consumer; import io.grpc.stub.CallStreamObserver; +import reactor.core.CoreSubscriber; import reactor.core.Fuseable; +import reactor.core.publisher.Operators; import reactor.util.concurrent.Queues; +import java.util.Queue; + /** * TODO: Explain what this class does. * @param T @@ -46,4 +50,20 @@ public int requestFusion(int requestedMode) { } return Fuseable.NONE; } + + @Override + protected void discardQueue(Queue q) { + if(downstream instanceof CoreSubscriber) { + Operators.onDiscardQueueWithClear(q, ((CoreSubscriber) downstream).currentContext(), null); + } else { + q.clear(); + } + } + + @Override + protected void discardElement(T t) { + if(downstream instanceof CoreSubscriber) { + Operators.onDiscard(t, ((CoreSubscriber) downstream).currentContext()); + } + } } diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorServerStreamObserverAndPublisher.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorServerStreamObserverAndPublisher.java index 16195322..d76c4e28 100644 --- a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorServerStreamObserverAndPublisher.java +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorServerStreamObserverAndPublisher.java @@ -11,9 +11,13 @@ import com.salesforce.reactivegrpc.common.Consumer; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.ServerCallStreamObserver; +import reactor.core.CoreSubscriber; import reactor.core.Fuseable; +import reactor.core.publisher.Operators; import reactor.util.concurrent.Queues; +import java.util.Queue; + /** * TODO: Explain what this class does. * @param T @@ -38,4 +42,20 @@ public int requestFusion(int requestedMode) { } return Fuseable.NONE; } + + @Override + protected void discardQueue(Queue q) { + if(downstream instanceof CoreSubscriber) { + Operators.onDiscardQueueWithClear(q, ((CoreSubscriber) downstream).currentContext(), null); + } else { + q.clear(); + } + } + + @Override + protected void discardElement(T t) { + if(downstream instanceof CoreSubscriber) { + Operators.onDiscard(t, ((CoreSubscriber) downstream).currentContext()); + } + } } \ No newline at end of file diff --git a/reactor/reactor-grpc-stub/src/test/java/com/salesforce/reactorgrpc/stub/ReactorClientStreamObserverAndPublisherTest.java b/reactor/reactor-grpc-stub/src/test/java/com/salesforce/reactorgrpc/stub/ReactorClientStreamObserverAndPublisherTest.java index dd44527c..1b1ba384 100644 --- a/reactor/reactor-grpc-stub/src/test/java/com/salesforce/reactorgrpc/stub/ReactorClientStreamObserverAndPublisherTest.java +++ b/reactor/reactor-grpc-stub/src/test/java/com/salesforce/reactorgrpc/stub/ReactorClientStreamObserverAndPublisherTest.java @@ -6,15 +6,18 @@ package com.salesforce.reactorgrpc.stub; -import java.util.concurrent.ForkJoinPool; - -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import reactor.core.Fuseable; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; +import java.time.Duration; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ForkJoinPool; + +import static org.assertj.core.api.Assertions.assertThat; + public class ReactorClientStreamObserverAndPublisherTest { @@ -34,7 +37,7 @@ public void multiThreadedProducerTest() { .expectNextCount(countPerThread) .verifyComplete(); - Assertions.assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3); + assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3); } @Test @@ -49,6 +52,32 @@ public void producerFusedTest() { .expectNextCount(countPerThread) .verifyComplete(); - Assertions.assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3); + assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3); + } + + @Test + public void discardQueueTest() { + ReactorClientStreamObserverAndPublisher processor = + new ReactorClientStreamObserverAndPublisher<>(null); + int countPerThread = 5; + TestCallStreamObserverProducer observer = new TestCallStreamObserverProducer(ForkJoinPool.commonPool(), processor, countPerThread); + processor.beforeStart(observer); + + ConcurrentLinkedQueue discarded = new ConcurrentLinkedQueue<>(); + + Flux consumer = + Flux.from(processor) + .log("processor") + .delayElements(Duration.ofMillis(100)) + .take(1) + .log("handled") + .doOnDiscard(Integer.class, discarded::add); + + StepVerifier.create(consumer) + .expectNext(0) + .verifyComplete(); + + + assertThat(discarded).containsExactly(1, 2, 3, 4); } } \ No newline at end of file