Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add many fromX operators + marbles #6873

Merged
merged 1 commit into from
Jan 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 26 additions & 41 deletions docs/Operator-Matrix.md

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ public static Completable fromFuture(@NonNull Future<?> future) {
}

/**
* Returns a {@code Completable} instance that when subscribed to, subscribes to the {@link Maybe} instance and
* Returns a {@code Completable} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and
* emits an {@code onComplete} event if the maybe emits {@code onSuccess}/{@code onComplete} or forwards any
* {@code onError} events.
* <p>
Expand All @@ -506,8 +506,8 @@ public static Completable fromFuture(@NonNull Future<?> future) {
* <dd>{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.17 - beta
* @param <T> the value type of the {@link MaybeSource} element
* @param maybe the {@code Maybe} instance to subscribe to, not {@code null}
* @param <T> the value type of the {@code MaybeSource} element
* @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null}
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code maybe} is {@code null}
* @since 2.2
Expand Down
209 changes: 207 additions & 2 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.flowables.*;
Expand All @@ -28,8 +27,10 @@
import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier;
import io.reactivex.rxjava3.internal.jdk8.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.MaybeToFlowable;
import io.reactivex.rxjava3.internal.operators.mixed.*;
import io.reactivex.rxjava3.internal.operators.observable.*;
import io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher;
import io.reactivex.rxjava3.internal.operators.single.SingleToFlowable;
import io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.rxjava3.internal.subscribers.*;
import io.reactivex.rxjava3.internal.util.*;
Expand Down Expand Up @@ -1959,6 +1960,39 @@ public static <T> Flowable<T> error(@NonNull Throwable throwable) {
return error(Functions.justSupplier(throwable));
}

/**
* Returns a {@code Flowable} instance that runs the given {@link Action} for each subscriber and
* emits either its exception or simply completes.
* <p>
* <img width="640" height="286" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromAction.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This source doesn't produce any elements and effectively ignores downstream backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromAction} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@code Action} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link Subscriber#onError(Throwable)},
* except when the downstream has canceled the resulting {@code Flowable} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
* @param <T> the target type
* @param action the {@code Action} to run for each subscriber
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code action} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public static <T> Flowable<T> fromAction(@NonNull Action action) {
Objects.requireNonNull(action, "action is null");
return RxJavaPlugins.onAssembly(new FlowableFromAction<>(action));
}

/**
* Converts an array into a {@link Publisher} that emits the items in the array.
* <p>
Expand Down Expand Up @@ -2037,6 +2071,30 @@ public static <T> Flowable<T> error(@NonNull Throwable throwable) {
return RxJavaPlugins.onAssembly(new FlowableFromCallable<>(callable));
}

/**
* Wraps a {@link CompletableSource} into a {@code Flowable}.
* <p>
* <img width="640" height="278" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromCompletable.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This source doesn't produce any elements and effectively ignores downstream backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the target type
* @param completableSource the {@code CompletableSource} to convert from
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code completableSource} is {@code null}
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public static <T> Flowable<T> fromCompletable(@NonNull CompletableSource completableSource) {
Objects.requireNonNull(completableSource, "completableSource is null");
return RxJavaPlugins.onAssembly(new FlowableFromCompletable<>(completableSource));
}

/**
* Converts a {@link Future} into a {@link Publisher}.
* <p>
Expand Down Expand Up @@ -2157,6 +2215,94 @@ public static <T> Flowable<T> error(@NonNull Throwable throwable) {
return RxJavaPlugins.onAssembly(new FlowableFromIterable<>(source));
}

/**
* Returns a {@code Flowable} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and
* emits {@code onSuccess} as a single item or forwards any {@code onComplete} or
* {@code onError} signal.
* <p>
* <img width="640" height="226" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromMaybe.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the {@code MaybeSource} element
* @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null}
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code maybe} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T> Flowable<T> fromMaybe(@NonNull MaybeSource<T> maybe) {
Objects.requireNonNull(maybe, "maybe is null");
return RxJavaPlugins.onAssembly(new MaybeToFlowable<>(maybe));
}

/**
* Converts the given {@link ObservableSource} into a {@code Flowable} by applying the specified backpressure strategy.
* <p>
* Marble diagrams for the various backpressure strategies are as follows:
* <ul>
* <li>{@link BackpressureStrategy#BUFFER}
* <p>
* <img width="640" height="264" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromObservable.buffer.png" alt="">
* </li>
* <li>{@link BackpressureStrategy#DROP}
* <p>
* <img width="640" height="374" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromObservable.drop.png" alt="">
* </li>
* <li>{@link BackpressureStrategy#LATEST}
* <p>
* <img width="640" height="284" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromObservable.latest.png" alt="">
* </li>
* <li>{@link BackpressureStrategy#ERROR}
* <p>
* <img width="640" height="365" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromObservable.error.png" alt="">
* </li>
* <li>{@link BackpressureStrategy#MISSING}
* <p>
* <img width="640" height="397" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromObservable.missing.png" alt="">
* </li>
* </ul>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator applies the chosen backpressure strategy of {@link BackpressureStrategy} enum.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the element type of the source and resulting sequence
* @param source the {@code ObservableSource} to convert
* @param strategy the backpressure strategy to apply
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code source} or {@code strategy} is {@code null}
*/
@BackpressureSupport(BackpressureKind.SPECIAL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Flowable<T> fromObservable(@NonNull ObservableSource<T> source, @NonNull BackpressureStrategy strategy) {
Objects.requireNonNull(source, "source is null");
Objects.requireNonNull(strategy, "strategy is null");
Flowable<T> f = new FlowableFromObservable<>(source);
switch (strategy) {
case DROP:
return f.onBackpressureDrop();
case LATEST:
return f.onBackpressureLatest();
case MISSING:
return f;
case ERROR:
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<>(f));
default:
return f.onBackpressureBuffer();
}
}

/**
* Converts an arbitrary <em>Reactive Streams</em> {@link Publisher} into a {@code Flowable} if not already a
* {@code Flowable}.
Expand Down Expand Up @@ -2198,6 +2344,65 @@ public static <T> Flowable<T> fromPublisher(@NonNull Publisher<@NonNull ? extend
return RxJavaPlugins.onAssembly(new FlowableFromPublisher<>(publisher));
}

/**
* Returns a {@code Flowable} instance that runs the given {@link Runnable} for each subscriber and
* emits either its exception or simply completes.
* <p>
* <img width="640" height="286" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromRunnable.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This source doesn't produce any elements and effectively ignores downstream backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@code Runnable} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link Subscriber#onError(Throwable)},
* except when the downstream has canceled the resulting {@code Flowable} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
* @param <T> the target type
* @param run the {@code Runnable} to run for each subscriber
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code run} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public static <T> Flowable<T> fromRunnable(@NonNull Runnable run) {
Objects.requireNonNull(run, "run is null");
return RxJavaPlugins.onAssembly(new FlowableFromRunnable<>(run));
}

/**
* Returns a {@code Flowable} instance that when subscribed to, subscribes to the {@link SingleSource} instance and
* emits {@code onSuccess} as a single item or forwards the {@code onError} signal.
* <p>
* <img width="640" height="341" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromSingle.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the {@code SingleSource} element
* @param source the {@code SingleSource} instance to subscribe to, not {@code null}
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code source} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T> Flowable<T> fromSingle(@NonNull SingleSource<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new SingleToFlowable<>(source));
}

/**
* Returns a {@code Flowable} that, when a {@link Subscriber} subscribes to it, invokes a supplier function you specify and then
* emits the value returned from that function.
Expand Down
61 changes: 56 additions & 5 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.*;
import io.reactivex.rxjava3.internal.operators.mixed.*;
import io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtMaybe;
import io.reactivex.rxjava3.internal.util.ErrorMode;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -710,7 +711,7 @@ public static <T> Maybe<T> error(@NonNull Supplier<? extends Throwable> supplier
}

/**
* Returns a {@code Maybe} instance that runs the given {@link Action} for each subscriber and
* Returns a {@code Maybe} instance that runs the given {@link Action} for each observer and
* emits either its exception or simply completes.
* <p>
* <img width="640" height="287" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.fromAction.png" alt="">
Expand All @@ -720,13 +721,13 @@ public static <T> Maybe<T> error(@NonNull Supplier<? extends Throwable> supplier
* <dt><b>Error handling:</b></dt>
* <dd> If the {@code Action} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link MaybeObserver#onError(Throwable)},
* except when the downstream has disposed this {@code Maybe} source.
* except when the downstream has disposed the resulting {@code Maybe} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
* @param <T> the target type
* @param action the {@code Action} to run for each subscriber
* @param action the {@code Action} to run for each observer
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code action} is {@code null}
*/
Expand Down Expand Up @@ -901,7 +902,57 @@ public static <T> Maybe<T> fromSingle(@NonNull SingleSource<T> single) {
}

/**
* Returns a {@code Maybe} instance that runs the given {@link Runnable} for each subscriber and
* Wraps an {@link ObservableSource} into a {@code Maybe} and emits the very first item
* or completes if the source is empty.
* <p>
* <img width="640" height="276" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.fromObservable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the target type
* @param source the {@code ObservableSource} to convert from
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code source} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Maybe<T> fromObservable(@NonNull ObservableSource<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableElementAtMaybe<>(source, 0L));
}

/**
* Wraps a {@link Publisher} into a {@code Maybe} and emits the very first item
* or completes if the source is empty.
* <p>
* <img width="640" height="309" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.fromPublisher.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the given {@code Publisher} in an unbounded manner
* (requesting {@link Long#MAX_VALUE}) but cancels it after one item received.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromPublisher} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the target type
* @param source the {@code Publisher} to convert from
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code source} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
public static <T> Maybe<T> fromPublisher(@NonNull Publisher<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new FlowableElementAtMaybePublisher<>(source, 0L));
}

/**
* Returns a {@code Maybe} instance that runs the given {@link Runnable} for each observer and
* emits either its exception or simply completes.
* <p>
* <img width="640" height="287" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.fromRunnable.png" alt="">
Expand All @@ -910,7 +961,7 @@ public static <T> Maybe<T> fromSingle(@NonNull SingleSource<T> single) {
* <dd>{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the target type
* @param run the {@code Runnable} to run for each subscriber
* @param run the {@code Runnable} to run for each observer
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code run} is {@code null}
*/
Expand Down
Loading