Skip to content

Commit

Permalink
+tck reactive-streams#236 example subscriber whitebox tested, and whi…
Browse files Browse the repository at this point in the history
…tebox fixed
  • Loading branch information
ktoso committed Mar 18, 2015
1 parent b33420a commit 99c0cad
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 90 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.reactivestreams.example.unicast;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Test // Must be here for TestNG to find and run this, do not remove
public class SyncSubscriberWhiteboxTest extends SubscriberWhiteboxVerification<Integer> {

private ExecutorService e;
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
@AfterClass void after() { if (e != null) e.shutdown(); }

public SyncSubscriberWhiteboxTest() {
super(new TestEnvironment());
}

@Override
public Subscriber<Integer> createSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {
return new SyncSubscriber<Integer>() {
@Override
public void onSubscribe(final Subscription s) {
super.onSubscribe(s);

probe.registerOnSubscribe(new SubscriberPuppet() {
@Override
public void triggerRequest(long elements) {
s.request(elements);
}

@Override
public void signalCancel() {
s.cancel();
}
});
}

@Override
public void onNext(Integer element) {
super.onNext(element);
probe.registerOnNext(element);
}

@Override
public void onError(Throwable cause) {
super.onError(cause);
probe.registerOnError(cause);
}

@Override
public void onComplete() {
super.onComplete();
probe.registerOnComplete();
}

@Override
protected boolean foreach(Integer element) {
return true;
}
};
}

@Override public Integer createElement(int element) {
return element;
}

}
34 changes: 19 additions & 15 deletions tck/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ Based on experiences so far implementing the `SubscriberPuppet` is non-trivial a
We keep the whitebox verification, as it is tremendously useful in the `ProcessorVerification`, where the Puppet is implemented within the TCK and injected to the tests.
We do not expect all implementations to make use of the plain `SubscriberWhiteboxVerification`, using the `SubscriberBlackboxVerification` instead.

A simple synchronous `Subscriber` implementation would look similar to following example:
For the simplest possible (and most common) `Subscriber` implementation using the whitebox verification boils down to
exteding (or delegating to) your implementation with additionally signalling and registering the test probe, as shown in the below example:

```java
package com.example.streams;
Expand All @@ -305,21 +306,24 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
super(new TestEnvironment());
}

// The implementation under test is "SyncSubscriber":
// class SyncSubscriber<T> extends Subscriber<T> { /* ... */ }

@Override
public Subscriber<Integer> createSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {

// return YOUR subscriber under-test, with additional WhiteboxSubscriberProbe instrumentation
return new Subscriber<Integer>() {

// in order to test the SyncSubscriber we must instrument it by extending it,
// and calling the WhiteboxSubscriberProbe in all of the Subscribers methods:
return new SyncSubscriber<Integer>() {
@Override
public void onSubscribe(final Subscription s) {
// in addition to normal Subscriber work that you're testing,
// register a SubscriberPuppet, to give the TCK control over demand generation and cancelling
probe.registerOnSubscribe(new SubscriberPuppet() {
super.onSubscribe(s);

// register a successful subscription, and create a Puppet,
// for the WhiteboxVerification to be able to drive its tests:
probe.registerOnSubscribe(new SubscriberPuppet() {
@Override
public void triggerRequest(long n) {
s.request(n);
public void triggerRequest(long elements) {
s.request(elements);
}

@Override
Expand All @@ -330,20 +334,20 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
}

@Override
public void onNext(Integer value) {
// in addition to normal Subscriber work that you're testing, register onNext with the probe
probe.registerOnNext(value);
public void onNext(Integer element) {
super.onNext(element);
probe.registerOnNext(element);
}

@Override
public void onError(Throwable cause) {
// in addition to normal Subscriber work that you're testing, register onError with the probe
super.onError(cause);
probe.registerOnError(cause);
}

@Override
public void onComplete() {
// in addition to normal Subscriber work that you're testing, register onComplete with the probe
super.onComplete();
probe.registerOnComplete();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterReciev
}

@Override @Test
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,31 +203,35 @@ public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterReciev

// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.5
@Override @Test
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
new WhiteboxTestStage(env) {{
// try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
final Latch secondSubscriptionCancelled = new Latch(env);
sub().onSubscribe(
new Subscription() {
@Override
public void request(long elements) {
env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`", sub(), elements));
}
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
subscriberTest(new TestStageTestRun() {
@Override
public void run(WhiteboxTestStage stage) throws Throwable {
// try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
final Latch secondSubscriptionCancelled = new Latch(env);
final Subscriber<? super T> sub = stage.sub();
final Subscription subscription = new Subscription() {
@Override
public void request(long elements) {
// ignore...
}

@Override
public void cancel() {
secondSubscriptionCancelled.close();
}
@Override
public void cancel() {
secondSubscriptionCancelled.close();
}

@Override
public String toString() {
return "SecondSubscription(should get cancelled)";
}
});
@Override
public String toString() {
return "SecondSubscription(should get cancelled)";
}
};
sub.onSubscribe(subscription);

secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
env.verifyNoAsyncErrors();
}};
secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called");
env.verifyNoAsyncErrors();
}
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.6
Expand Down Expand Up @@ -348,50 +352,38 @@ public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParame
@Override
public void run(WhiteboxTestStage stage) throws Throwable {

{
final Subscriber<? super T> sub = stage.sub();
boolean gotNPE = false;
try {
sub.onSubscribe(null);
} catch (final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
final Subscriber<? super T> sub = stage.sub();
boolean gotNPE = false;
try {
sub.onSubscribe(null);
} catch (final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");

env.verifyNoAsyncErrors();
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
}
});
}// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
@Override @Test
public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
subscriberTest(new TestStageTestRun() {
@Override
public void run(WhiteboxTestStage stage) throws Throwable {

final Subscription subscription = new Subscription() {
@Override
public void request(final long elements) {
}

@Override
public void cancel() {
}
};

{
final Subscriber<? super T> sub = stage.sub();
boolean gotNPE = false;
sub.onSubscribe(subscription);
try {
sub.onNext(null);
} catch (final NullPointerException expected) {
gotNPE = true;
}
final Subscriber<? super T> sub = stage.sub();
boolean gotNPE = false;
try {
sub.onNext(null);
} catch (final NullPointerException expected) {
gotNPE = true;
} finally {
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
}

env.verifyNoAsyncErrors();
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
}
});
}
Expand All @@ -403,29 +395,17 @@ public void required_spec213_onError_mustThrowNullPointerExceptionWhenParameters
@Override
public void run(WhiteboxTestStage stage) throws Throwable {

final Subscription subscription = new Subscription() {
@Override
public void request(final long elements) {
}

@Override
public void cancel() {
}
};

{
final Subscriber<? super T> sub = stage.sub();
boolean gotNPE = false;
sub.onSubscribe(subscription);
try {
sub.onError(null);
} catch (final NullPointerException expected) {
gotNPE = true;
} finally {
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
}
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
}

env.verifyNoAsyncErrors();
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
}
});
}
Expand Down Expand Up @@ -495,11 +475,24 @@ abstract class TestStageTestRun {
public abstract void run(WhiteboxTestStage stage) throws Throwable;
}

/**
* Prepares subscriber and publisher pair (by subscribing the first to the latter),
* and then hands over the tests {@link WhiteboxTestStage} over to the test.
*
* The test stage is, like in a puppet show, used to orchestrate what each participant should do.
* Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals.
*/
public void subscriberTest(TestStageTestRun body) throws Throwable {
WhiteboxTestStage stage = new WhiteboxTestStage(env, true);
body.run(stage);
}

/**
* Provides a {@link WhiteboxTestStage} without performing any additional setup,
* like the {@link org.reactivestreams.tck.SubscriberWhiteboxVerification#subscriberTest(TestStageTestRun)} would.
*
* Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled.
*/
public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
WhiteboxTestStage stage = new WhiteboxTestStage(env, false);
body.run(stage);
Expand Down Expand Up @@ -559,7 +552,10 @@ public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment
}

public T signalNext() throws InterruptedException {
T element = nextT();
return signalNext(nextT());
}

private T signalNext(T element) throws InterruptedException {
sendNext(element);
return element;
}
Expand All @@ -577,7 +573,7 @@ public void verifyNoAsyncErrors() {
/**
* This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls,
* in order to allow intercepting calls on the underlying {@code Subscriber}.
* This delegation allows the proxy to implement {@link org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxProbe} assertions.
* This delegation allows the proxy to implement {@link BlackboxProbe} assertions.
*/
public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> {

Expand Down Expand Up @@ -741,8 +737,6 @@ private SubscriberPuppet puppet() {
public void registerOnSubscribe(SubscriberPuppet p) {
if (!puppet.isCompleted()) {
puppet.complete(p);
} else {
env.flop(String.format("Subscriber %s illegally accepted a second Subscription", sub()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface SubscriberWhiteboxVerificationRules {
void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable;
void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable;
void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception;
void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception;
void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable;
void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception;
void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception;
void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws
}
}).required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
}
}, "illegally accepted a second Subscription");
}, "Expected 2nd Subscription given to subscriber to be cancelled");
}

@Test
Expand Down

0 comments on commit 99c0cad

Please sign in to comment.