Skip to content

Commit

Permalink
2.x: cleanup for text and javadoc 04/15 (#5286)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Apr 15, 2017
1 parent 85e0ea5 commit 434d1f4
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -3566,10 +3566,10 @@ public final Maybe<T> retryUntil(final BooleanSupplier stop) {
* This retries 3 times, each time incrementing the number of seconds it waits.
*
* <pre><code>
* Publisher.create((Subscriber<? super String> s) -> {
* Flowable.create((FlowableEmitter<? super String> s) -> {
* System.out.println("subscribing");
* s.onError(new RuntimeException("always fails"));
* }).retryWhen(attempts -> {
* }, BackpressureStrategy.BUFFER).retryWhen(attempts -> {
* return attempts.zipWith(Publisher.range(1, 3), (n, i) -> i).flatMap(i -> {
* System.out.println("delay retry by " + i + " second(s)");
* return Publisher.timer(i, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.reactivex.annotations.Nullable;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.queue.SpscArrayQueue;
import io.reactivex.internal.subscriptions.*;
Expand Down Expand Up @@ -298,7 +299,7 @@ void drain() {
R v;

try {
v = it.next();
v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
current = null;
Expand Down Expand Up @@ -437,7 +438,7 @@ public R poll() throws Exception {
current = it;
}

R r = it.next();
R r = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");

if (!it.hasNext()) {
current = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private ObservableBlockingSubscribe() {
/**
* Subscribes to the source and calls the Observer methods on the current thread.
* <p>
* @param o the source publisher
* @param o the source ObservableSource
* The call to dispose() is composed through.
* @param observer the subscriber to forward events and calls to in the current thread
* @param <T> the value type
Expand Down Expand Up @@ -70,7 +70,7 @@ public static <T> void subscribe(ObservableSource<? extends T> o, Observer<? sup

/**
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
* @param o the source publisher
* @param o the source ObservableSource
* @param <T> the value type
*/
public static <T> void subscribe(ObservableSource<? extends T> o) {
Expand All @@ -89,7 +89,7 @@ public static <T> void subscribe(ObservableSource<? extends T> o) {

/**
* Subscribes to the source and calls the given actions on the current thread.
* @param o the source publisher
* @param o the source ObservableSource
* @param onNext the callback action for each source value
* @param onError the callback action for an error event
* @param onComplete the callback action for the completion event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void onSubscribe(Disposable s) {
ObservableSource<B> boundary;

try {
boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary publisher supplied is null");
boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary ObservableSource supplied is null");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
Expand Down Expand Up @@ -179,7 +179,7 @@ void next() {
ObservableSource<B> boundary;

try {
boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary publisher supplied is null");
boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary ObservableSource supplied is null");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void onNext(T t) {
ObservableSource<U> p;

try {
p = ObjectHelper.requireNonNull(debounceSelector.apply(t), "The publisher supplied is null");
p = ObjectHelper.requireNonNull(debounceSelector.apply(t), "The ObservableSource supplied is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ObservableDefer(Callable<? extends ObservableSource<? extends T>> supplie
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T> pub;
try {
pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied");
pub = ObjectHelper.requireNonNull(supplier.call(), "null ObservableSource supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptyDisposable.error(t, s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void onNext(T t) {
ObservableSource<? extends R> p;

try {
p = ObjectHelper.requireNonNull(onNextMapper.apply(t), "The onNext publisher returned is null");
p = ObjectHelper.requireNonNull(onNextMapper.apply(t), "The onNext ObservableSource returned is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
Expand All @@ -103,7 +103,7 @@ public void onError(Throwable t) {
ObservableSource<? extends R> p;

try {
p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError publisher returned is null");
p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError ObservableSource returned is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
Expand All @@ -119,7 +119,7 @@ public void onComplete() {
ObservableSource<? extends R> p;

try {
p = ObjectHelper.requireNonNull(onCompleteSupplier.call(), "The onComplete publisher returned is null");
p = ObjectHelper.requireNonNull(onCompleteSupplier.call(), "The onComplete ObservableSource returned is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.reactivex.observables;

import io.reactivex.annotations.NonNull;
import org.reactivestreams.Subscriber;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
Expand All @@ -25,9 +24,9 @@
import io.reactivex.plugins.RxJavaPlugins;

/**
* A {@code ConnectableObservable} resembles an ordinary {@link Flowable}, except that it does not begin
* A {@code ConnectableObservable} resembles an ordinary {@link Observable}, except that it does not begin
* emitting items when it is subscribed to, but only when its {@link #connect} method is called. In this way you
* can wait for all intended {@link Subscriber}s to {@link Flowable#subscribe} to the {@code Observable}
* can wait for all intended {@link Observer}s to {@link Observable#subscribe} to the {@code Observable}
* before the {@code Observable} begins emitting items.
* <p>
* <img width="640" height="510" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/publishConnect.png" alt="">
Expand All @@ -41,7 +40,7 @@ public abstract class ConnectableObservable<T> extends Observable<T> {

/**
* Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying
* {@link Flowable} to its {@link Subscriber}s.
* {@link Observable} to its {@link Observer}s.
*
* @param connection
* the action that receives the connection subscription before the subscription to source happens
Expand All @@ -52,7 +51,7 @@ public abstract class ConnectableObservable<T> extends Observable<T> {

/**
* Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying
* {@link Flowable} to its {@link Subscriber}s.
* {@link Observable} to its {@link Observer}s.
* <p>
* To disconnect from a synchronous source, use the {@link #connect(Consumer)} method.
*
Expand All @@ -79,18 +78,18 @@ public Observable<T> refCount() {

/**
* Returns an Observable that automatically connects to this ConnectableObservable
* when the first Subscriber subscribes.
* when the first Observer subscribes.
*
* @return an Observable that automatically connects to this ConnectableObservable
* when the first Subscriber subscribes
* when the first Observer subscribes
*/
@NonNull
public Observable<T> autoConnect() {
return autoConnect(1);
}
/**
* Returns an Observable that automatically connects to this ConnectableObservable
* when the specified number of Subscribers subscribe to it.
* when the specified number of Observers subscribe to it.
*
* @param numberOfSubscribers the number of subscribers to await before calling connect
* on the ConnectableObservable. A non-positive value indicates
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/subjects/SerializedSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import io.reactivex.plugins.RxJavaPlugins;

/**
* Serializes calls to the Subscriber methods.
* <p>All other Publisher and Subject methods are thread-safe by design.
* Serializes calls to the Observer methods.
* <p>All other Observable and Subject methods are thread-safe by design.
*
* @param <T> the item value type
*/
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/io/reactivex/single/SingleSubscribeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void errorIsDisposed() {
@Test
public void biConsumerIsDisposedOnSuccess() {
final Object[] result = { null, null };

Disposable d = Single.just(1)
.subscribe(new BiConsumer<Integer, Throwable>() {
@Override
Expand All @@ -241,7 +241,7 @@ public void accept(Integer t1, Throwable t2) throws Exception {
result[1] = t2;
}
});

assertTrue("Not disposed?!", d.isDisposed());
assertEquals(1, result[0]);
assertNull(result[1]);
Expand All @@ -250,7 +250,7 @@ public void accept(Integer t1, Throwable t2) throws Exception {
@Test
public void biConsumerIsDisposedOnError() {
final Object[] result = { null, null };

Disposable d = Single.<Integer>error(new IOException())
.subscribe(new BiConsumer<Integer, Throwable>() {
@Override
Expand All @@ -259,7 +259,7 @@ public void accept(Integer t1, Throwable t2) throws Exception {
result[1] = t2;
}
});

assertTrue("Not disposed?!", d.isDisposed());
assertNull(result[0]);
assertTrue("" + result[1], result[1] instanceof IOException);
Expand Down

0 comments on commit 434d1f4

Please sign in to comment.