Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue #292 - discard support #293

Merged
merged 4 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void request(long n) {
private volatile boolean done;
private Throwable error;

private volatile Subscriber<? super T> downstream;
protected volatile Subscriber<? super T> downstream;

private volatile boolean cancelled;

Expand Down Expand Up @@ -226,7 +226,7 @@ private void drainFused(final Subscriber<? super T> subscriber) {

for (;;) {
if (cancelled) {
queue.clear();
discardQueue(queue);
downstream = null;
return;
}
Expand Down Expand Up @@ -283,7 +283,7 @@ private void drain() {

private boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> subscriber, Queue<T> q) {
if (cancelled) {
q.clear();
discardQueue(q);
downstream = null;
return true;
}
Expand All @@ -305,6 +305,7 @@ private boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T>
@Override
public void onNext(T t) {
if (done || cancelled) {
discardElement(t);
return;
}

Expand Down Expand Up @@ -419,7 +420,7 @@ public void cancel() {

if (!outputFused) {
if (WIP.getAndIncrement(this) == 0) {
queue.clear();
discardQueue(queue);
downstream = null;
}
}
Expand Down Expand Up @@ -456,4 +457,10 @@ public boolean isEmpty() {
public void clear() {
queue.clear();
}

protected void discardQueue(Queue<T> q) {
q.clear();
}

protected void discardElement(T t) { }
}
13 changes: 13 additions & 0 deletions reactor/reactor-grpc-stub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.10</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
import com.salesforce.reactivegrpc.common.AbstractClientStreamObserverAndPublisher;
import com.salesforce.reactivegrpc.common.Consumer;
import io.grpc.stub.CallStreamObserver;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;

import java.util.Queue;

/**
* TODO: Explain what this class does.
* @param <T> T
Expand Down Expand Up @@ -46,4 +50,20 @@ public int requestFusion(int requestedMode) {
}
return Fuseable.NONE;
}

@Override
protected void discardQueue(Queue<T> q) {
if (downstream instanceof CoreSubscriber) {
Operators.onDiscardQueueWithClear(q, ((CoreSubscriber) downstream).currentContext(), null);
} else {
q.clear();
}
}

@Override
protected void discardElement(T t) {
if (downstream instanceof CoreSubscriber) {
Operators.onDiscard(t, ((CoreSubscriber) downstream).currentContext());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
import com.salesforce.reactivegrpc.common.Consumer;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ServerCallStreamObserver;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;

import java.util.Queue;

/**
* TODO: Explain what this class does.
* @param <T> T
Expand All @@ -38,4 +42,20 @@ public int requestFusion(int requestedMode) {
}
return Fuseable.NONE;
}

@Override
protected void discardQueue(Queue<T> q) {
if (downstream instanceof CoreSubscriber) {
Operators.onDiscardQueueWithClear(q, ((CoreSubscriber) downstream).currentContext(), null);
} else {
q.clear();
}
}

@Override
protected void discardElement(T t) {
if (downstream instanceof CoreSubscriber) {
Operators.onDiscard(t, ((CoreSubscriber) downstream).currentContext());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,31 @@

package com.salesforce.reactorgrpc.stub;

import java.util.concurrent.ForkJoinPool;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;


public class ReactorClientStreamObserverAndPublisherTest {
private static final Logger log = LoggerFactory.getLogger(ReactorClientStreamObserverAndPublisherTest.class.getName());

private static final int DEFAULT_CHUNK_SIZE = 512;
private static final int PART_OF_CHUNK = DEFAULT_CHUNK_SIZE * 2 / 3;

@Test
public void multiThreadedProducerTest() {
ReactorClientStreamObserverAndPublisher<Integer> processor =
new ReactorClientStreamObserverAndPublisher<>(null);
new ReactorClientStreamObserverAndPublisher<>(null);
int countPerThread = 100000;
TestCallStreamObserverProducer observer =
new TestCallStreamObserverProducer(ForkJoinPool.commonPool(), processor, countPerThread);
Expand All @@ -34,21 +40,76 @@ public void multiThreadedProducerTest() {
.expectNextCount(countPerThread)
.verifyComplete();

Assertions.assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1,
(countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
}

@Test
public void producerFusedTest() {
ReactorClientStreamObserverAndPublisher<Integer> processor =
new ReactorClientStreamObserverAndPublisher<>(null);
new ReactorClientStreamObserverAndPublisher<>(null);
int countPerThread = 100000;
TestCallStreamObserverProducer observer = new TestCallStreamObserverProducer(ForkJoinPool.commonPool(), processor, countPerThread);
TestCallStreamObserverProducer observer = new TestCallStreamObserverProducer(ForkJoinPool.commonPool(),
processor, countPerThread);
processor.beforeStart(observer);
StepVerifier.create(Flux.from(processor))
.expectFusion(Fuseable.ANY, Fuseable.ASYNC)
.expectNextCount(countPerThread)
.verifyComplete();

Assertions.assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1,
(countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
}

@Test
public void discardQueueTest() {
ReactorClientStreamObserverAndPublisher<Integer> processor =
new ReactorClientStreamObserverAndPublisher<>(null);
int countPerThread = 5;
TestCallStreamObserverProducer observer = new TestCallStreamObserverProducer(ForkJoinPool.commonPool(),
processor, countPerThread);
processor.beforeStart(observer);

ConcurrentLinkedQueue<Integer> discardedByObserverAndPublisher = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Integer> discardedByPublishOn = new ConcurrentLinkedQueue<>();

AtomicBoolean firstHandled = new AtomicBoolean();
Flux<Integer> consumer =
Flux.from(processor)
.doOnDiscard(Integer.class, i -> {
log.info("Processor: discarding {}", i);
discardedByObserverAndPublisher.add(i);
})
.log("processor")
.limitRate(1)
.publishOn(Schedulers.parallel())
.limitRate(1)
.doOnDiscard(Integer.class, i -> {
log.info("publishOn: discarding {}", i);
discardedByPublishOn.add(i);
})
.<Integer>handle((i, sink) -> {
if (firstHandled.compareAndSet(false, true)) {
try {
Thread.sleep(100);
} catch (Exception e) {
// noop
}
sink.next(i);
} else {
sink.complete();
}
})
.log("handled");

StepVerifier.create(consumer)
.expectNext(0)
.verifyComplete();

// 1 is dropped in handle without invoking the discard hook,
assertThat(discardedByObserverAndPublisher).containsExactly(3, 4);
// impl details: processor is able to schedule 2 before it's cancelled
// also, discard hooks are cumulative, so not using containsExactly
assertThat(discardedByPublishOn).contains(2);
}
}