-
Notifications
You must be signed in to change notification settings - Fork 534
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fixes #233 by implementing support for triggered demand in in the Sub…
…scriberBlackboxVerification
- Loading branch information
1 parent
3d82baf
commit c97be99
Showing
4 changed files
with
259 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
47 changes: 47 additions & 0 deletions
47
tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package org.reactivestreams.tck; | ||
|
||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
import org.reactivestreams.tck.SubscriberBlackboxVerification; | ||
import org.reactivestreams.tck.TestEnvironment; | ||
import org.testng.annotations.AfterClass; | ||
import org.testng.annotations.BeforeClass; | ||
import org.testng.annotations.Test; | ||
|
||
import org.reactivestreams.tck.support.SyncTriggeredDemandSubscriber; | ||
|
||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
|
||
@Test // Must be here for TestNG to find and run this, do not remove | ||
public class SyncTriggeredDemandSubscriberTest extends SubscriberBlackboxVerification<Integer> { | ||
|
||
private ExecutorService e; | ||
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); } | ||
@AfterClass void after() { if (e != null) e.shutdown(); } | ||
|
||
public SyncTriggeredDemandSubscriberTest() { | ||
super(new TestEnvironment()); | ||
} | ||
|
||
@Override public void triggerRequest(final Subscriber<? super Integer> subscriber) { | ||
((SyncTriggeredDemandSubscriber<? super Integer>)subscriber).triggerDemand(1); | ||
} | ||
|
||
@Override public Subscriber<Integer> createSubscriber() { | ||
return new SyncTriggeredDemandSubscriber<Integer>() { | ||
private long acc; | ||
@Override protected long foreach(final Integer element) { | ||
acc += element; | ||
return 1; | ||
} | ||
|
||
@Override public void onComplete() { | ||
} | ||
}; | ||
} | ||
|
||
@Override public Integer createElement(int element) { | ||
return element; | ||
} | ||
} |
77 changes: 77 additions & 0 deletions
77
tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberWhiteboxTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package org.reactivestreams.tck; | ||
|
||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
import org.reactivestreams.tck.SubscriberBlackboxVerification; | ||
import org.reactivestreams.tck.SubscriberWhiteboxVerification; | ||
import org.reactivestreams.tck.TestEnvironment; | ||
import org.testng.annotations.AfterClass; | ||
import org.testng.annotations.BeforeClass; | ||
import org.testng.annotations.Test; | ||
|
||
import org.reactivestreams.tck.support.SyncTriggeredDemandSubscriber; | ||
|
||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
|
||
@Test // Must be here for TestNG to find and run this, do not remove | ||
public class SyncTriggeredDemandSubscriberWhiteboxTest extends SubscriberWhiteboxVerification<Integer> { | ||
|
||
private ExecutorService e; | ||
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); } | ||
@AfterClass void after() { if (e != null) e.shutdown(); } | ||
|
||
public SyncTriggeredDemandSubscriberWhiteboxTest() { | ||
super(new TestEnvironment()); | ||
} | ||
|
||
@Override | ||
public Subscriber<Integer> createSubscriber(final WhiteboxSubscriberProbe<Integer> probe) { | ||
return new SyncTriggeredDemandSubscriber<Integer>() { | ||
@Override | ||
public void onSubscribe(final Subscription s) { | ||
super.onSubscribe(s); | ||
|
||
probe.registerOnSubscribe(new SubscriberPuppet() { | ||
@Override | ||
public void triggerRequest(long elements) { | ||
s.request(elements); | ||
} | ||
|
||
@Override | ||
public void signalCancel() { | ||
s.cancel(); | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public void onNext(Integer element) { | ||
super.onNext(element); | ||
probe.registerOnNext(element); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable cause) { | ||
super.onError(cause); | ||
probe.registerOnError(cause); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
super.onComplete(); | ||
probe.registerOnComplete(); | ||
} | ||
|
||
@Override | ||
protected long foreach(Integer element) { | ||
return 1; | ||
} | ||
}; | ||
} | ||
|
||
@Override public Integer createElement(int element) { | ||
return element; | ||
} | ||
|
||
} |
123 changes: 123 additions & 0 deletions
123
tck/src/test/java/org/reactivestreams/tck/support/SyncTriggeredDemandSubscriber.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
package org.reactivestreams.tck.support; | ||
|
||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
|
||
/** | ||
* SyncTriggeredDemandSubscriber is an implementation of Reactive Streams `Subscriber`, | ||
* it runs synchronously (on the Publisher's thread) and requests demand triggered from | ||
* "the outside" using its `triggerDemand` method and from "the inside" using the return | ||
* value of its user-defined `foreach` method which is invoked to process each element. | ||
* | ||
* NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden. | ||
*/ | ||
public abstract class SyncTriggeredDemandSubscriber<T> implements Subscriber<T> { | ||
private Subscription subscription; // Obeying rule 3.1, we make this private! | ||
private boolean done = false; | ||
|
||
@Override public void onSubscribe(final Subscription s) { | ||
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null` | ||
if (s == null) throw null; | ||
|
||
if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully | ||
try { | ||
s.cancel(); // Cancel the additional subscription | ||
} catch(final Throwable t) { | ||
//Subscription.cancel is not allowed to throw an exception, according to rule 3.15 | ||
(new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err); | ||
} | ||
} else { | ||
// We have to assign it locally before we use it, if we want to be a synchronous `Subscriber` | ||
// Because according to rule 3.10, the Subscription is allowed to call `onNext` synchronously from within `request` | ||
subscription = s; | ||
} | ||
} | ||
|
||
/** | ||
* Requests the provided number of elements from the `Subscription` of this `Subscriber`. | ||
* NOTE: This makes no attempt at thread safety so only invoke it once from the outside to initiate the demand. | ||
* @return `true` if successful and `false` if not (either due to no `Subscription` or due to exceptions thrown) | ||
*/ | ||
public boolean triggerDemand(final long n) { | ||
final Subscription s = subscription; | ||
if (s == null) return false; | ||
else { | ||
try { | ||
s.request(n); | ||
} catch(final Throwable t) { | ||
// Subscription.request is not allowed to throw according to rule 3.16 | ||
(new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err); | ||
return false; | ||
} | ||
return true; | ||
} | ||
} | ||
|
||
@Override public void onNext(final T element) { | ||
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec | ||
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err); | ||
} else { | ||
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null` | ||
if (element == null) throw null; | ||
|
||
if (!done) { // If we aren't already done | ||
try { | ||
final long need = foreach(element); | ||
if (need > 0) triggerDemand(need); | ||
else if (need == 0) {} | ||
else { | ||
done(); | ||
} | ||
} catch (final Throwable t) { | ||
done(); | ||
try { | ||
onError(t); | ||
} catch (final Throwable t2) { | ||
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13 | ||
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Showcases a convenience method to idempotently marking the Subscriber as "done", so we don't want to process more elements | ||
// herefor we also need to cancel our `Subscription`. | ||
private void done() { | ||
//On this line we could add a guard against `!done`, but since rule 3.7 says that `Subscription.cancel()` is idempotent, we don't need to. | ||
done = true; // If we `foreach` throws an exception, let's consider ourselves done (not accepting more elements) | ||
try { | ||
subscription.cancel(); // Cancel the subscription | ||
} catch(final Throwable t) { | ||
//Subscription.cancel is not allowed to throw an exception, according to rule 3.15 | ||
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err); | ||
} | ||
} | ||
|
||
// This method is left as an exercise to the reader/extension point | ||
// Don't forget to call `triggerDemand` at the end if you are interested in more data, | ||
// a return value of < 0 indicates that the subscription should be cancelled, | ||
// a value of 0 indicates that there is no current need, | ||
// a value of > 0 indicates the current need. | ||
protected abstract long foreach(final T element); | ||
|
||
@Override public void onError(final Throwable t) { | ||
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec | ||
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err); | ||
} else { | ||
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null` | ||
if (t == null) throw null; | ||
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 | ||
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 | ||
} | ||
} | ||
|
||
@Override public void onComplete() { | ||
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec | ||
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err); | ||
} else { | ||
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 | ||
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 | ||
} | ||
} | ||
} |