From 434d1f420d98e90f19e313db569eb4bb972a5ab8 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Sat, 15 Apr 2017 22:29:02 +0200 Subject: [PATCH] 2.x: cleanup for text and javadoc 04/15 (#5286) --- src/main/java/io/reactivex/Maybe.java | 4 ++-- .../flowable/FlowableFlattenIterable.java | 5 +++-- .../observable/ObservableBlockingSubscribe.java | 6 +++--- .../ObservableBufferBoundarySupplier.java | 4 ++-- .../operators/observable/ObservableDebounce.java | 2 +- .../operators/observable/ObservableDefer.java | 2 +- .../observable/ObservableMapNotification.java | 6 +++--- .../observables/ConnectableObservable.java | 15 +++++++-------- .../io/reactivex/subjects/SerializedSubject.java | 4 ++-- .../io/reactivex/single/SingleSubscribeTest.java | 8 ++++---- 10 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 4a7faf2c08..e5dfbd89ac 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -3566,10 +3566,10 @@ public final Maybe retryUntil(final BooleanSupplier stop) { * This retries 3 times, each time incrementing the number of seconds it waits. * *

-     *  Publisher.create((Subscriber s) -> {
+     *  Flowable.create((FlowableEmitter s) -> {
      *      System.out.println("subscribing");
      *      s.onError(new RuntimeException("always fails"));
-     *  }).retryWhen(attempts -> {
+     *  }, BackpressureStrategy.BUFFER).retryWhen(attempts -> {
      *      return attempts.zipWith(Publisher.range(1, 3), (n, i) -> i).flatMap(i -> {
      *          System.out.println("delay retry by " + i + " second(s)");
      *          return Publisher.timer(i, TimeUnit.SECONDS);
diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java
index 460279af7f..48a9d75333 100644
--- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java
+++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java
@@ -23,6 +23,7 @@
 import io.reactivex.annotations.Nullable;
 import io.reactivex.exceptions.*;
 import io.reactivex.functions.Function;
+import io.reactivex.internal.functions.ObjectHelper;
 import io.reactivex.internal.fuseable.*;
 import io.reactivex.internal.queue.SpscArrayQueue;
 import io.reactivex.internal.subscriptions.*;
@@ -298,7 +299,7 @@ void drain() {
                         R v;
 
                         try {
-                            v = it.next();
+                            v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
                         } catch (Throwable ex) {
                             Exceptions.throwIfFatal(ex);
                             current = null;
@@ -437,7 +438,7 @@ public R poll() throws Exception {
                     current = it;
                 }
 
-                R r = it.next();
+                R r = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
 
                 if (!it.hasNext()) {
                     current = null;
diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java
index 1151c280fc..4373a321aa 100644
--- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java
+++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java
@@ -34,7 +34,7 @@ private ObservableBlockingSubscribe() {
     /**
      * Subscribes to the source and calls the Observer methods on the current thread.
      * 

- * @param o the source publisher + * @param o the source ObservableSource * The call to dispose() is composed through. * @param observer the subscriber to forward events and calls to in the current thread * @param the value type @@ -70,7 +70,7 @@ public static void subscribe(ObservableSource o, Observer the value type */ public static void subscribe(ObservableSource o) { @@ -89,7 +89,7 @@ public static void subscribe(ObservableSource o) { /** * Subscribes to the source and calls the given actions on the current thread. - * @param o the source publisher + * @param o the source ObservableSource * @param onNext the callback action for each source value * @param onError the callback action for an error event * @param onComplete the callback action for the completion event. diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java index 479e2b5a6c..ce97d264fc 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java @@ -87,7 +87,7 @@ public void onSubscribe(Disposable s) { ObservableSource boundary; try { - boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary publisher supplied is null"); + boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary ObservableSource supplied is null"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); cancelled = true; @@ -179,7 +179,7 @@ void next() { ObservableSource boundary; try { - boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary publisher supplied is null"); + boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary ObservableSource supplied is null"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); cancelled = true; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounce.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounce.java index 85ad81b7ec..2a70f8ba4e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounce.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounce.java @@ -81,7 +81,7 @@ public void onNext(T t) { ObservableSource p; try { - p = ObjectHelper.requireNonNull(debounceSelector.apply(t), "The publisher supplied is null"); + p = ObjectHelper.requireNonNull(debounceSelector.apply(t), "The ObservableSource supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); dispose(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDefer.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDefer.java index c8013d1c57..37996e1332 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDefer.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDefer.java @@ -29,7 +29,7 @@ public ObservableDefer(Callable> supplie public void subscribeActual(Observer s) { ObservableSource pub; try { - pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied"); + pub = ObjectHelper.requireNonNull(supplier.call(), "null ObservableSource supplied"); } catch (Throwable t) { Exceptions.throwIfFatal(t); EmptyDisposable.error(t, s); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java index 6b4b95f4d1..dd45da1e52 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java @@ -88,7 +88,7 @@ public void onNext(T t) { ObservableSource p; try { - p = ObjectHelper.requireNonNull(onNextMapper.apply(t), "The onNext publisher returned is null"); + p = ObjectHelper.requireNonNull(onNextMapper.apply(t), "The onNext ObservableSource returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); actual.onError(e); @@ -103,7 +103,7 @@ public void onError(Throwable t) { ObservableSource p; try { - p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError publisher returned is null"); + p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError ObservableSource returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); actual.onError(e); @@ -119,7 +119,7 @@ public void onComplete() { ObservableSource p; try { - p = ObjectHelper.requireNonNull(onCompleteSupplier.call(), "The onComplete publisher returned is null"); + p = ObjectHelper.requireNonNull(onCompleteSupplier.call(), "The onComplete ObservableSource returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); actual.onError(e); diff --git a/src/main/java/io/reactivex/observables/ConnectableObservable.java b/src/main/java/io/reactivex/observables/ConnectableObservable.java index 2fed7634eb..fdc4334ef1 100644 --- a/src/main/java/io/reactivex/observables/ConnectableObservable.java +++ b/src/main/java/io/reactivex/observables/ConnectableObservable.java @@ -14,7 +14,6 @@ package io.reactivex.observables; import io.reactivex.annotations.NonNull; -import org.reactivestreams.Subscriber; import io.reactivex.*; import io.reactivex.disposables.Disposable; @@ -25,9 +24,9 @@ import io.reactivex.plugins.RxJavaPlugins; /** - * A {@code ConnectableObservable} resembles an ordinary {@link Flowable}, except that it does not begin + * A {@code ConnectableObservable} resembles an ordinary {@link Observable}, except that it does not begin * emitting items when it is subscribed to, but only when its {@link #connect} method is called. In this way you - * can wait for all intended {@link Subscriber}s to {@link Flowable#subscribe} to the {@code Observable} + * can wait for all intended {@link Observer}s to {@link Observable#subscribe} to the {@code Observable} * before the {@code Observable} begins emitting items. *

* @@ -41,7 +40,7 @@ public abstract class ConnectableObservable extends Observable { /** * Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying - * {@link Flowable} to its {@link Subscriber}s. + * {@link Observable} to its {@link Observer}s. * * @param connection * the action that receives the connection subscription before the subscription to source happens @@ -52,7 +51,7 @@ public abstract class ConnectableObservable extends Observable { /** * Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying - * {@link Flowable} to its {@link Subscriber}s. + * {@link Observable} to its {@link Observer}s. *

* To disconnect from a synchronous source, use the {@link #connect(Consumer)} method. * @@ -79,10 +78,10 @@ public Observable refCount() { /** * Returns an Observable that automatically connects to this ConnectableObservable - * when the first Subscriber subscribes. + * when the first Observer subscribes. * * @return an Observable that automatically connects to this ConnectableObservable - * when the first Subscriber subscribes + * when the first Observer subscribes */ @NonNull public Observable autoConnect() { @@ -90,7 +89,7 @@ public Observable autoConnect() { } /** * Returns an Observable that automatically connects to this ConnectableObservable - * when the specified number of Subscribers subscribe to it. + * when the specified number of Observers subscribe to it. * * @param numberOfSubscribers the number of subscribers to await before calling connect * on the ConnectableObservable. A non-positive value indicates diff --git a/src/main/java/io/reactivex/subjects/SerializedSubject.java b/src/main/java/io/reactivex/subjects/SerializedSubject.java index 1d47d09a6c..53f3381bce 100644 --- a/src/main/java/io/reactivex/subjects/SerializedSubject.java +++ b/src/main/java/io/reactivex/subjects/SerializedSubject.java @@ -20,8 +20,8 @@ import io.reactivex.plugins.RxJavaPlugins; /** - * Serializes calls to the Subscriber methods. - *

All other Publisher and Subject methods are thread-safe by design. + * Serializes calls to the Observer methods. + *

All other Observable and Subject methods are thread-safe by design. * * @param the item value type */ diff --git a/src/test/java/io/reactivex/single/SingleSubscribeTest.java b/src/test/java/io/reactivex/single/SingleSubscribeTest.java index 9e095db8d0..d1ace79dc9 100644 --- a/src/test/java/io/reactivex/single/SingleSubscribeTest.java +++ b/src/test/java/io/reactivex/single/SingleSubscribeTest.java @@ -232,7 +232,7 @@ public void errorIsDisposed() { @Test public void biConsumerIsDisposedOnSuccess() { final Object[] result = { null, null }; - + Disposable d = Single.just(1) .subscribe(new BiConsumer() { @Override @@ -241,7 +241,7 @@ public void accept(Integer t1, Throwable t2) throws Exception { result[1] = t2; } }); - + assertTrue("Not disposed?!", d.isDisposed()); assertEquals(1, result[0]); assertNull(result[1]); @@ -250,7 +250,7 @@ public void accept(Integer t1, Throwable t2) throws Exception { @Test public void biConsumerIsDisposedOnError() { final Object[] result = { null, null }; - + Disposable d = Single.error(new IOException()) .subscribe(new BiConsumer() { @Override @@ -259,7 +259,7 @@ public void accept(Integer t1, Throwable t2) throws Exception { result[1] = t2; } }); - + assertTrue("Not disposed?!", d.isDisposed()); assertNull(result[0]); assertTrue("" + result[1], result[1] instanceof IOException);