diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberWhiteboxTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberWhiteboxTest.java new file mode 100644 index 00000000..a4812f8c --- /dev/null +++ b/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberWhiteboxTest.java @@ -0,0 +1,75 @@ +package org.reactivestreams.example.unicast; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.reactivestreams.tck.SubscriberBlackboxVerification; +import org.reactivestreams.tck.SubscriberWhiteboxVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Test // Must be here for TestNG to find and run this, do not remove +public class SyncSubscriberWhiteboxTest extends SubscriberWhiteboxVerification { + + private ExecutorService e; + @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (e != null) e.shutdown(); } + + public SyncSubscriberWhiteboxTest() { + super(new TestEnvironment()); + } + + @Override + public Subscriber createSubscriber(final WhiteboxSubscriberProbe probe) { + return new SyncSubscriber() { + @Override + public void onSubscribe(final Subscription s) { + super.onSubscribe(s); + + probe.registerOnSubscribe(new SubscriberPuppet() { + @Override + public void triggerRequest(long elements) { + s.request(elements); + } + + @Override + public void signalCancel() { + s.cancel(); + } + }); + } + + @Override + public void onNext(Integer element) { + super.onNext(element); + probe.registerOnNext(element); + } + + @Override + public void onError(Throwable cause) { + super.onError(cause); + probe.registerOnError(cause); + } + + @Override + public void onComplete() { + super.onComplete(); + probe.registerOnComplete(); + } + + @Override + protected boolean foreach(Integer element) { + return true; + } + }; + } + + @Override public Integer createElement(int element) { + return element; + } + +} diff --git a/tck/README.md b/tck/README.md index 6180e1f8..96872654 100644 --- a/tck/README.md +++ b/tck/README.md @@ -288,7 +288,8 @@ Based on experiences so far implementing the `SubscriberPuppet` is non-trivial a We keep the whitebox verification, as it is tremendously useful in the `ProcessorVerification`, where the Puppet is implemented within the TCK and injected to the tests. We do not expect all implementations to make use of the plain `SubscriberWhiteboxVerification`, using the `SubscriberBlackboxVerification` instead. -A simple synchronous `Subscriber` implementation would look similar to following example: +For the simplest possible (and most common) `Subscriber` implementation using the whitebox verification boils down to +exteding (or delegating to) your implementation with additionally signalling and registering the test probe, as shown in the below example: ```java package com.example.streams; @@ -305,21 +306,24 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri super(new TestEnvironment()); } + // The implementation under test is "SyncSubscriber": + // class SyncSubscriber extends Subscriber { /* ... */ } + @Override public Subscriber createSubscriber(final WhiteboxSubscriberProbe probe) { - - // return YOUR subscriber under-test, with additional WhiteboxSubscriberProbe instrumentation - return new Subscriber() { - + // in order to test the SyncSubscriber we must instrument it by extending it, + // and calling the WhiteboxSubscriberProbe in all of the Subscribers methods: + return new SyncSubscriber() { @Override public void onSubscribe(final Subscription s) { - // in addition to normal Subscriber work that you're testing, - // register a SubscriberPuppet, to give the TCK control over demand generation and cancelling - probe.registerOnSubscribe(new SubscriberPuppet() { + super.onSubscribe(s); + // register a successful subscription, and create a Puppet, + // for the WhiteboxVerification to be able to drive its tests: + probe.registerOnSubscribe(new SubscriberPuppet() { @Override - public void triggerRequest(long n) { - s.request(n); + public void triggerRequest(long elements) { + s.request(elements); } @Override @@ -330,20 +334,20 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri } @Override - public void onNext(Integer value) { - // in addition to normal Subscriber work that you're testing, register onNext with the probe - probe.registerOnNext(value); + public void onNext(Integer element) { + super.onNext(element); + probe.registerOnNext(element); } @Override public void onError(Throwable cause) { - // in addition to normal Subscriber work that you're testing, register onError with the probe + super.onError(cause); probe.registerOnError(cause); } @Override public void onComplete() { - // in addition to normal Subscriber work that you're testing, register onComplete with the probe + super.onComplete(); probe.registerOnComplete(); } }; diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index ca1e2297..c152350a 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -514,7 +514,7 @@ public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterReciev } @Override @Test - public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception { + public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable { subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal(); } diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java index 906bc267..3a0bf98e 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java @@ -203,31 +203,35 @@ public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterReciev // Verifies rule: https://github.com/reactive-streams/reactive-streams#2.5 @Override @Test - public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception { - new WhiteboxTestStage(env) {{ - // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail - final Latch secondSubscriptionCancelled = new Latch(env); - sub().onSubscribe( - new Subscription() { - @Override - public void request(long elements) { - env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`", sub(), elements)); - } + public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable { + subscriberTest(new TestStageTestRun() { + @Override + public void run(WhiteboxTestStage stage) throws Throwable { + // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail + final Latch secondSubscriptionCancelled = new Latch(env); + final Subscriber sub = stage.sub(); + final Subscription subscription = new Subscription() { + @Override + public void request(long elements) { + // ignore... + } - @Override - public void cancel() { - secondSubscriptionCancelled.close(); - } + @Override + public void cancel() { + secondSubscriptionCancelled.close(); + } - @Override - public String toString() { - return "SecondSubscription(should get cancelled)"; - } - }); + @Override + public String toString() { + return "SecondSubscription(should get cancelled)"; + } + }; + sub.onSubscribe(subscription); - secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called."); - env.verifyNoAsyncErrors(); - }}; + secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called"); + env.verifyNoAsyncErrors(); + } + }); } // Verifies rule: https://github.com/reactive-streams/reactive-streams#2.6 @@ -348,50 +352,38 @@ public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParame @Override public void run(WhiteboxTestStage stage) throws Throwable { - { - final Subscriber sub = stage.sub(); - boolean gotNPE = false; - try { - sub.onSubscribe(null); - } catch (final NullPointerException expected) { - gotNPE = true; - } - assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException"); + final Subscriber sub = stage.sub(); + boolean gotNPE = false; + try { + sub.onSubscribe(null); + } catch (final NullPointerException expected) { + gotNPE = true; } + assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException"); - env.verifyNoAsyncErrors(); + env.verifyNoAsyncErrors(env.defaultTimeoutMillis()); } }); - }// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13 + } + + // Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13 @Override @Test public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { subscriberTest(new TestStageTestRun() { @Override public void run(WhiteboxTestStage stage) throws Throwable { - final Subscription subscription = new Subscription() { - @Override - public void request(final long elements) { - } - - @Override - public void cancel() { - } - }; - - { - final Subscriber sub = stage.sub(); - boolean gotNPE = false; - sub.onSubscribe(subscription); - try { - sub.onNext(null); - } catch (final NullPointerException expected) { - gotNPE = true; - } + final Subscriber sub = stage.sub(); + boolean gotNPE = false; + try { + sub.onNext(null); + } catch (final NullPointerException expected) { + gotNPE = true; + } finally { assertTrue(gotNPE, "onNext(null) did not throw NullPointerException"); } - env.verifyNoAsyncErrors(); + env.verifyNoAsyncErrors(env.defaultTimeoutMillis()); } }); } @@ -403,29 +395,17 @@ public void required_spec213_onError_mustThrowNullPointerExceptionWhenParameters @Override public void run(WhiteboxTestStage stage) throws Throwable { - final Subscription subscription = new Subscription() { - @Override - public void request(final long elements) { - } - - @Override - public void cancel() { - } - }; - - { final Subscriber sub = stage.sub(); boolean gotNPE = false; - sub.onSubscribe(subscription); try { sub.onError(null); } catch (final NullPointerException expected) { gotNPE = true; + } finally { + assertTrue(gotNPE, "onError(null) did not throw NullPointerException"); } - assertTrue(gotNPE, "onError(null) did not throw NullPointerException"); - } - env.verifyNoAsyncErrors(); + env.verifyNoAsyncErrors(env.defaultTimeoutMillis()); } }); } @@ -495,11 +475,24 @@ abstract class TestStageTestRun { public abstract void run(WhiteboxTestStage stage) throws Throwable; } + /** + * Prepares subscriber and publisher pair (by subscribing the first to the latter), + * and then hands over the tests {@link WhiteboxTestStage} over to the test. + * + * The test stage is, like in a puppet show, used to orchestrate what each participant should do. + * Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals. + */ public void subscriberTest(TestStageTestRun body) throws Throwable { WhiteboxTestStage stage = new WhiteboxTestStage(env, true); body.run(stage); } + /** + * Provides a {@link WhiteboxTestStage} without performing any additional setup, + * like the {@link org.reactivestreams.tck.SubscriberWhiteboxVerification#subscriberTest(TestStageTestRun)} would. + * + * Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled. + */ public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable { WhiteboxTestStage stage = new WhiteboxTestStage(env, false); body.run(stage); @@ -559,7 +552,10 @@ public WhiteboxSubscriberProbe createWhiteboxSubscriberProbe(TestEnvironment } public T signalNext() throws InterruptedException { - T element = nextT(); + return signalNext(nextT()); + } + + private T signalNext(T element) throws InterruptedException { sendNext(element); return element; } @@ -577,7 +573,7 @@ public void verifyNoAsyncErrors() { /** * This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls, * in order to allow intercepting calls on the underlying {@code Subscriber}. - * This delegation allows the proxy to implement {@link org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxProbe} assertions. + * This delegation allows the proxy to implement {@link BlackboxProbe} assertions. */ public static class BlackboxSubscriberProxy extends BlackboxProbe implements Subscriber { @@ -741,8 +737,6 @@ private SubscriberPuppet puppet() { public void registerOnSubscribe(SubscriberPuppet p) { if (!puppet.isCompleted()) { puppet.complete(p); - } else { - env.flop(String.format("Subscriber %s illegally accepted a second Subscription", sub())); } } diff --git a/tck/src/main/java/org/reactivestreams/tck/support/SubscriberWhiteboxVerificationRules.java b/tck/src/main/java/org/reactivestreams/tck/support/SubscriberWhiteboxVerificationRules.java index 45835539..60d9df2a 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/SubscriberWhiteboxVerificationRules.java +++ b/tck/src/main/java/org/reactivestreams/tck/support/SubscriberWhiteboxVerificationRules.java @@ -11,7 +11,7 @@ public interface SubscriberWhiteboxVerificationRules { void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable; void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable; void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception; - void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception; + void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable; void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception; void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception; void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable; diff --git a/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java index fb7023f2..ed4b7e9e 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java @@ -162,7 +162,7 @@ public Subscriber apply(WhiteboxSubscriberProbe probe) throws } }).required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal(); } - }, "illegally accepted a second Subscription"); + }, "Expected 2nd Subscription given to subscriber to be cancelled"); } @Test