{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.
*
*
History: 2.1.17 - beta
- * @param the value type of the {@link MaybeSource} element
- * @param maybe the {@code Maybe} instance to subscribe to, not {@code null}
+ * @param the value type of the {@code MaybeSource} element
+ * @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null}
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code maybe} is {@code null}
* @since 2.2
diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
index d1b17e8b84..3d83d477c5 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
@@ -19,7 +19,6 @@
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.*;
-import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.flowables.*;
@@ -28,8 +27,10 @@
import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier;
import io.reactivex.rxjava3.internal.jdk8.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
+import io.reactivex.rxjava3.internal.operators.maybe.MaybeToFlowable;
import io.reactivex.rxjava3.internal.operators.mixed.*;
-import io.reactivex.rxjava3.internal.operators.observable.*;
+import io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher;
+import io.reactivex.rxjava3.internal.operators.single.SingleToFlowable;
import io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.rxjava3.internal.subscribers.*;
import io.reactivex.rxjava3.internal.util.*;
@@ -1959,6 +1960,39 @@ public static Flowable error(@NonNull Throwable throwable) {
return error(Functions.justSupplier(throwable));
}
+ /**
+ * Returns a {@code Flowable} instance that runs the given {@link Action} for each subscriber and
+ * emits either its exception or simply completes.
+ *
+ *
+ *
+ *
Backpressure:
+ *
This source doesn't produce any elements and effectively ignores downstream backpressure.
+ *
Scheduler:
+ *
{@code fromAction} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the {@code Action} throws an exception, the respective {@link Throwable} is
+ * delivered to the downstream via {@link Subscriber#onError(Throwable)},
+ * except when the downstream has canceled the resulting {@code Flowable} source.
+ * In this latter case, the {@code Throwable} is delivered to the global error handler via
+ * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}.
+ *
+ *
+ * @param the target type
+ * @param action the {@code Action} to run for each subscriber
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code action} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @BackpressureSupport(BackpressureKind.PASS_THROUGH)
+ public static Flowable fromAction(@NonNull Action action) {
+ Objects.requireNonNull(action, "action is null");
+ return RxJavaPlugins.onAssembly(new FlowableFromAction<>(action));
+ }
+
/**
* Converts an array into a {@link Publisher} that emits the items in the array.
*
@@ -2037,6 +2071,30 @@ public static Flowable error(@NonNull Throwable throwable) {
return RxJavaPlugins.onAssembly(new FlowableFromCallable<>(callable));
}
+ /**
+ * Wraps a {@link CompletableSource} into a {@code Flowable}.
+ *
+ *
+ *
+ *
Backpressure:
+ *
This source doesn't produce any elements and effectively ignores downstream backpressure.
+ *
Scheduler:
+ *
{@code fromCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the target type
+ * @param completableSource the {@code CompletableSource} to convert from
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code completableSource} is {@code null}
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @BackpressureSupport(BackpressureKind.PASS_THROUGH)
+ public static Flowable fromCompletable(@NonNull CompletableSource completableSource) {
+ Objects.requireNonNull(completableSource, "completableSource is null");
+ return RxJavaPlugins.onAssembly(new FlowableFromCompletable<>(completableSource));
+ }
+
/**
* Converts a {@link Future} into a {@link Publisher}.
*
@@ -2157,6 +2215,94 @@ public static Flowable error(@NonNull Throwable throwable) {
return RxJavaPlugins.onAssembly(new FlowableFromIterable<>(source));
}
+ /**
+ * Returns a {@code Flowable} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and
+ * emits {@code onSuccess} as a single item or forwards any {@code onComplete} or
+ * {@code onError} signal.
+ *
+ *
+ *
+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@code MaybeSource} element
+ * @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null}
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code maybe} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @BackpressureSupport(BackpressureKind.FULL)
+ public static Flowable fromMaybe(@NonNull MaybeSource maybe) {
+ Objects.requireNonNull(maybe, "maybe is null");
+ return RxJavaPlugins.onAssembly(new MaybeToFlowable<>(maybe));
+ }
+
+ /**
+ * Converts the given {@link ObservableSource} into a {@code Flowable} by applying the specified backpressure strategy.
+ *
+ * Marble diagrams for the various backpressure strategies are as follows:
+ *
+ *
{@link BackpressureStrategy#BUFFER}
+ *
+ *
+ *
+ *
{@link BackpressureStrategy#DROP}
+ *
+ *
+ *
+ *
{@link BackpressureStrategy#LATEST}
+ *
+ *
+ *
+ *
{@link BackpressureStrategy#ERROR}
+ *
+ *
+ *
+ *
{@link BackpressureStrategy#MISSING}
+ *
+ *
+ *
+ *
+ *
+ *
Backpressure:
+ *
The operator applies the chosen backpressure strategy of {@link BackpressureStrategy} enum.
+ *
Scheduler:
+ *
{@code fromObservable} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the element type of the source and resulting sequence
+ * @param source the {@code ObservableSource} to convert
+ * @param strategy the backpressure strategy to apply
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code source} or {@code strategy} is {@code null}
+ */
+ @BackpressureSupport(BackpressureKind.SPECIAL)
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Flowable fromObservable(@NonNull ObservableSource source, @NonNull BackpressureStrategy strategy) {
+ Objects.requireNonNull(source, "source is null");
+ Objects.requireNonNull(strategy, "strategy is null");
+ Flowable f = new FlowableFromObservable<>(source);
+ switch (strategy) {
+ case DROP:
+ return f.onBackpressureDrop();
+ case LATEST:
+ return f.onBackpressureLatest();
+ case MISSING:
+ return f;
+ case ERROR:
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<>(f));
+ default:
+ return f.onBackpressureBuffer();
+ }
+ }
+
/**
* Converts an arbitrary Reactive Streams {@link Publisher} into a {@code Flowable} if not already a
* {@code Flowable}.
@@ -2198,6 +2344,65 @@ public static Flowable fromPublisher(@NonNull Publisher<@NonNull ? extend
return RxJavaPlugins.onAssembly(new FlowableFromPublisher<>(publisher));
}
+ /**
+ * Returns a {@code Flowable} instance that runs the given {@link Runnable} for each subscriber and
+ * emits either its exception or simply completes.
+ *
+ *
+ *
+ *
Backpressure:
+ *
This source doesn't produce any elements and effectively ignores downstream backpressure.
+ *
Scheduler:
+ *
{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the {@code Runnable} throws an exception, the respective {@link Throwable} is
+ * delivered to the downstream via {@link Subscriber#onError(Throwable)},
+ * except when the downstream has canceled the resulting {@code Flowable} source.
+ * In this latter case, the {@code Throwable} is delivered to the global error handler via
+ * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}.
+ *
+ *
+ * @param the target type
+ * @param run the {@code Runnable} to run for each subscriber
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code run} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @BackpressureSupport(BackpressureKind.PASS_THROUGH)
+ public static Flowable fromRunnable(@NonNull Runnable run) {
+ Objects.requireNonNull(run, "run is null");
+ return RxJavaPlugins.onAssembly(new FlowableFromRunnable<>(run));
+ }
+
+ /**
+ * Returns a {@code Flowable} instance that when subscribed to, subscribes to the {@link SingleSource} instance and
+ * emits {@code onSuccess} as a single item or forwards the {@code onError} signal.
+ *
+ *
+ *
+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
{@code fromSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@code SingleSource} element
+ * @param source the {@code SingleSource} instance to subscribe to, not {@code null}
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code source} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @BackpressureSupport(BackpressureKind.FULL)
+ public static Flowable fromSingle(@NonNull SingleSource source) {
+ Objects.requireNonNull(source, "source is null");
+ return RxJavaPlugins.onAssembly(new SingleToFlowable<>(source));
+ }
+
/**
* Returns a {@code Flowable} that, when a {@link Subscriber} subscribes to it, invokes a supplier function you specify and then
* emits the value returned from that function.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index 8699d60e16..fd25aef0af 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -30,6 +30,7 @@
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.*;
import io.reactivex.rxjava3.internal.operators.mixed.*;
+import io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtMaybe;
import io.reactivex.rxjava3.internal.util.ErrorMode;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
@@ -710,7 +711,7 @@ public static Maybe error(@NonNull Supplier extends Throwable> supplier
}
/**
- * Returns a {@code Maybe} instance that runs the given {@link Action} for each subscriber and
+ * Returns a {@code Maybe} instance that runs the given {@link Action} for each observer and
* emits either its exception or simply completes.
*
If the {@code Action} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link MaybeObserver#onError(Throwable)},
- * except when the downstream has disposed this {@code Maybe} source.
+ * except when the downstream has disposed the resulting {@code Maybe} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}.
*
*
* @param the target type
- * @param action the {@code Action} to run for each subscriber
+ * @param action the {@code Action} to run for each observer
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code action} is {@code null}
*/
@@ -901,7 +902,57 @@ public static Maybe fromSingle(@NonNull SingleSource single) {
}
/**
- * Returns a {@code Maybe} instance that runs the given {@link Runnable} for each subscriber and
+ * Wraps an {@link ObservableSource} into a {@code Maybe} and emits the very first item
+ * or completes if the source is empty.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code fromObservable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the target type
+ * @param source the {@code ObservableSource} to convert from
+ * @return the new {@code Maybe} instance
+ * @throws NullPointerException if {@code source} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Maybe fromObservable(@NonNull ObservableSource source) {
+ Objects.requireNonNull(source, "source is null");
+ return RxJavaPlugins.onAssembly(new ObservableElementAtMaybe<>(source, 0L));
+ }
+
+ /**
+ * Wraps a {@link Publisher} into a {@code Maybe} and emits the very first item
+ * or completes if the source is empty.
+ *
+ *
+ *
+ *
Backpressure:
+ *
The operator consumes the given {@code Publisher} in an unbounded manner
+ * (requesting {@link Long#MAX_VALUE}) but cancels it after one item received.
+ *
Scheduler:
+ *
{@code fromPublisher} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the target type
+ * @param source the {@code Publisher} to convert from
+ * @return the new {@code Maybe} instance
+ * @throws NullPointerException if {@code source} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
+ public static Maybe fromPublisher(@NonNull Publisher source) {
+ Objects.requireNonNull(source, "source is null");
+ return RxJavaPlugins.onAssembly(new FlowableElementAtMaybePublisher<>(source, 0L));
+ }
+
+ /**
+ * Returns a {@code Maybe} instance that runs the given {@link Runnable} for each observer and
* emits either its exception or simply completes.
*
{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.
*
* @param the target type
- * @param run the {@code Runnable} to run for each subscriber
+ * @param run the {@code Runnable} to run for each observer
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code run} is {@code null}
*/
diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java
index f599ce0eeb..89de9dd63e 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Observable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java
@@ -28,8 +28,10 @@
import io.reactivex.rxjava3.internal.jdk8.*;
import io.reactivex.rxjava3.internal.observers.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
+import io.reactivex.rxjava3.internal.operators.maybe.MaybeToObservable;
import io.reactivex.rxjava3.internal.operators.mixed.*;
import io.reactivex.rxjava3.internal.operators.observable.*;
+import io.reactivex.rxjava3.internal.operators.single.SingleToObservable;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.observables.*;
import io.reactivex.rxjava3.observers.*;
@@ -1734,6 +1736,36 @@ public static Observable error(@NonNull Throwable throwable) {
return error(Functions.justSupplier(throwable));
}
+ /**
+ * Returns an {@code Observable} instance that runs the given {@link Action} for each subscriber and
+ * emits either its exception or simply completes.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code fromAction} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the {@code Action} throws an exception, the respective {@link Throwable} is
+ * delivered to the downstream via {@link Observer#onError(Throwable)},
+ * except when the downstream has canceled the resulting {@code Observable} source.
+ * In this latter case, the {@code Throwable} is delivered to the global error handler via
+ * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}.
+ *
+ *
+ * @param the target type
+ * @param action the {@code Action} to run for each subscriber
+ * @return the new {@code Observable} instance
+ * @throws NullPointerException if {@code action} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Observable fromAction(@NonNull Action action) {
+ Objects.requireNonNull(action, "action is null");
+ return RxJavaPlugins.onAssembly(new ObservableFromAction<>(action));
+ }
+
/**
* Converts an array into an {@link ObservableSource} that emits the items in the array.
*
@@ -1804,6 +1836,27 @@ public static Observable fromCallable(@NonNull Callable extends T> call
return RxJavaPlugins.onAssembly(new ObservableFromCallable<>(callable));
}
+ /**
+ * Wraps a {@link CompletableSource} into an {@code Observable}.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code fromCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the target type
+ * @param completableSource the {@code CompletableSource} to convert from
+ * @return the new {@code Observable} instance
+ * @throws NullPointerException if {@code completableSource} is {@code null}
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Observable fromCompletable(@NonNull CompletableSource completableSource) {
+ Objects.requireNonNull(completableSource, "completableSource is null");
+ return RxJavaPlugins.onAssembly(new ObservableFromCompletable<>(completableSource));
+ }
+
/**
* Converts a {@link Future} into an {@code Observable}.
*
@@ -1912,6 +1965,30 @@ public static Observable fromIterable(@NonNull Iterable<@NonNull ? extend
return RxJavaPlugins.onAssembly(new ObservableFromIterable<>(source));
}
+ /**
+ * Returns an {@code Observable} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and
+ * emits {@code onSuccess} as a single item or forwards any {@code onComplete} or
+ * {@code onError} signal.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@code MaybeSource} element
+ * @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null}
+ * @return the new {@code Observable} instance
+ * @throws NullPointerException if {@code maybe} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Observable fromMaybe(@NonNull MaybeSource maybe) {
+ Objects.requireNonNull(maybe, "maybe is null");
+ return RxJavaPlugins.onAssembly(new MaybeToObservable<>(maybe));
+ }
+
/**
* Converts an arbitrary Reactive Streams {@link Publisher} into an {@code Observable}.
*
@@ -1949,6 +2026,59 @@ public static Observable fromPublisher(@NonNull Publisher<@NonNull ? exte
return RxJavaPlugins.onAssembly(new ObservableFromPublisher<>(publisher));
}
+ /**
+ * Returns an {@code Observable} instance that runs the given {@link Runnable} for each observer and
+ * emits either its exception or simply completes.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the {@code Runnable} throws an exception, the respective {@link Throwable} is
+ * delivered to the downstream via {@link Observer#onError(Throwable)},
+ * except when the downstream has canceled the resulting {@code Observable} source.
+ * In this latter case, the {@code Throwable} is delivered to the global error handler via
+ * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}.
+ *
+ *
+ * @param the target type
+ * @param run the {@code Runnable} to run for each observer
+ * @return the new {@code Observable} instance
+ * @throws NullPointerException if {@code run} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Observable fromRunnable(@NonNull Runnable run) {
+ Objects.requireNonNull(run, "run is null");
+ return RxJavaPlugins.onAssembly(new ObservableFromRunnable<>(run));
+ }
+
+ /**
+ * Returns an {@code Observable} instance that when subscribed to, subscribes to the {@link SingleSource} instance and
+ * emits {@code onSuccess} as a single item or forwards the {@code onError} signal.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code fromSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@code SingleSource} element
+ * @param source the {@code SingleSource} instance to subscribe to, not {@code null}
+ * @return the new {@code Observable} instance
+ * @throws NullPointerException if {@code source} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Observable fromSingle(@NonNull SingleSource source) {
+ Objects.requireNonNull(source, "source is null");
+ return RxJavaPlugins.onAssembly(new SingleToObservable<>(source));
+ }
+
/**
* Returns an {@code Observable} that, when an observer subscribes to it, invokes a supplier function you specify and then
* emits the value returned from that function.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java
index 719719520f..80459445c7 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Single.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Single.java
@@ -731,6 +731,56 @@ public static Single error(@NonNull Throwable throwable) {
return toSingle(Flowable.fromFuture(future, timeout, unit));
}
+ /**
+ * Returns a {@code Single} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and
+ * emits {@code onSuccess} as a single item, turns an {@code onComplete} into {@link NoSuchElementException} error signal or
+ * forwards the {@code onError} signal.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@code MaybeSource} element
+ * @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null}
+ * @return the new {@code Single} instance
+ * @throws NullPointerException if {@code maybe} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Single fromMaybe(@NonNull MaybeSource maybe) {
+ Objects.requireNonNull(maybe, "maybe is null");
+ return RxJavaPlugins.onAssembly(new MaybeToSingle<>(maybe, null));
+ }
+
+ /**
+ * Returns a {@code Single} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and
+ * emits {@code onSuccess} as a single item, emits the {@code defaultItem} for an {@code onComplete} signal or
+ * forwards the {@code onError} signal.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@code MaybeSource} element
+ * @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null}
+ * @param defaultItem the item to signal if the current {@code MaybeSource} is empty
+ * @return the new {@code Single} instance
+ * @throws NullPointerException if {@code maybe} or {@code defaultItem} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Single fromMaybe(@NonNull MaybeSource maybe, @NonNull T defaultItem) {
+ Objects.requireNonNull(maybe, "maybe is null");
+ Objects.requireNonNull(defaultItem, "defaultItem is null");
+ return RxJavaPlugins.onAssembly(new MaybeToSingle<>(maybe, defaultItem));
+ }
+
/**
* Wraps a specific {@link Publisher} into a {@code Single} and signals its single element or error.
*
diff --git a/src/main/java/io/reactivex/rxjava3/internal/fuseable/AbstractEmptyQueueFuseable.java b/src/main/java/io/reactivex/rxjava3/internal/fuseable/AbstractEmptyQueueFuseable.java
new file mode 100644
index 0000000000..071db9fff8
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/fuseable/AbstractEmptyQueueFuseable.java
@@ -0,0 +1,76 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.fuseable;
+
+import io.reactivex.rxjava3.annotations.NonNull;
+
+/**
+ * Represents an empty, async-only {@link QueueFuseable} instance.
+ *
+ * @param the output value type
+ * @since 3.0.0
+ */
+public abstract class AbstractEmptyQueueFuseable
+implements QueueSubscription, QueueDisposable {
+
+ @Override
+ public final int requestFusion(int mode) {
+ return mode & ASYNC;
+ }
+
+ @Override
+ public final boolean offer(@NonNull T value) {
+ throw new UnsupportedOperationException("Should not be called!");
+ }
+
+ @Override
+ public final boolean offer(@NonNull T v1, @NonNull T v2) {
+ throw new UnsupportedOperationException("Should not be called!");
+ }
+
+ @Override
+ public final T poll() throws Throwable {
+ return null; // always empty
+ }
+
+ @Override
+ public final boolean isEmpty() {
+ return true; // always empty
+ }
+
+ @Override
+ public final void clear() {
+ // always empty
+ }
+
+ @Override
+ public final void request(long n) {
+ // no items to request
+ }
+
+ @Override
+ public void cancel() {
+ // default No-op
+ }
+
+ @Override
+ public void dispose() {
+ // default No-op
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseable.java b/src/main/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseable.java
new file mode 100644
index 0000000000..8f9e12d3cb
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseable.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.fuseable;
+
+/**
+ * Represents an empty, async-only {@link QueueFuseable} instance that tracks and exposes a
+ * canceled/disposed state.
+ *
+ * @param the output value type
+ * @since 3.0.0
+ */
+public final class CancellableQueueFuseable
+extends AbstractEmptyQueueFuseable {
+
+ volatile boolean disposed;
+
+ @Override
+ public void cancel() {
+ disposed = true;
+ }
+
+ @Override
+ public void dispose() {
+ disposed = true;
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return disposed;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/SubscriberCompletableObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/SubscriberCompletableObserver.java
deleted file mode 100644
index 2bbc5128d6..0000000000
--- a/src/main/java/io/reactivex/rxjava3/internal/observers/SubscriberCompletableObserver.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Copyright (c) 2016-present, RxJava Contributors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is
- * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
- * the License for the specific language governing permissions and limitations under the License.
- */
-
-package io.reactivex.rxjava3.internal.observers;
-
-import org.reactivestreams.*;
-
-import io.reactivex.rxjava3.core.CompletableObserver;
-import io.reactivex.rxjava3.disposables.Disposable;
-import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
-
-public final class SubscriberCompletableObserver implements CompletableObserver, Subscription {
- final Subscriber super T> subscriber;
-
- Disposable upstream;
-
- public SubscriberCompletableObserver(Subscriber super T> subscriber) {
- this.subscriber = subscriber;
- }
-
- @Override
- public void onComplete() {
- subscriber.onComplete();
- }
-
- @Override
- public void onError(Throwable e) {
- subscriber.onError(e);
- }
-
- @Override
- public void onSubscribe(Disposable d) {
- if (DisposableHelper.validate(this.upstream, d)) {
- this.upstream = d;
-
- subscriber.onSubscribe(this);
- }
- }
-
- @Override
- public void request(long n) {
- // ignored, no values emitted anyway
- }
-
- @Override
- public void cancel() {
- upstream.dispose();
- }
-}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java
index 4cd7fb479c..82bec67c92 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java
@@ -31,19 +31,21 @@ public CompletableFromAction(Action run) {
protected void subscribeActual(CompletableObserver observer) {
Disposable d = Disposable.empty();
observer.onSubscribe(d);
- try {
- run.run();
- } catch (Throwable e) {
- Exceptions.throwIfFatal(e);
+ if (!d.isDisposed()) {
+ try {
+ run.run();
+ } catch (Throwable e) {
+ Exceptions.throwIfFatal(e);
+ if (!d.isDisposed()) {
+ observer.onError(e);
+ } else {
+ RxJavaPlugins.onError(e);
+ }
+ return;
+ }
if (!d.isDisposed()) {
- observer.onError(e);
- } else {
- RxJavaPlugins.onError(e);
+ observer.onComplete();
}
- return;
- }
- if (!d.isDisposed()) {
- observer.onComplete();
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnable.java
index e94d650310..60332d8db2 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnable.java
@@ -30,19 +30,21 @@ public CompletableFromRunnable(Runnable runnable) {
protected void subscribeActual(CompletableObserver observer) {
Disposable d = Disposable.empty();
observer.onSubscribe(d);
- try {
- runnable.run();
- } catch (Throwable e) {
- Exceptions.throwIfFatal(e);
+ if (!d.isDisposed()) {
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ Exceptions.throwIfFatal(e);
+ if (!d.isDisposed()) {
+ observer.onError(e);
+ } else {
+ RxJavaPlugins.onError(e);
+ }
+ return;
+ }
if (!d.isDisposed()) {
- observer.onError(e);
- } else {
- RxJavaPlugins.onError(e);
+ observer.onComplete();
}
- return;
- }
- if (!d.isDisposed()) {
- observer.onComplete();
}
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFlowable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFlowable.java
index 9ad0d0a8d8..9a6c0326a1 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFlowable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFlowable.java
@@ -16,7 +16,7 @@
import org.reactivestreams.Subscriber;
import io.reactivex.rxjava3.core.*;
-import io.reactivex.rxjava3.internal.observers.SubscriberCompletableObserver;
+import io.reactivex.rxjava3.internal.operators.flowable.FlowableFromCompletable;
public final class CompletableToFlowable extends Flowable {
@@ -28,7 +28,6 @@ public CompletableToFlowable(CompletableSource source) {
@Override
protected void subscribeActual(Subscriber super T> s) {
- SubscriberCompletableObserver os = new SubscriberCompletableObserver<>(s);
- source.subscribe(os);
+ source.subscribe(new FlowableFromCompletable.FromCompletableObserver<>(s));
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToObservable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToObservable.java
index 095a975dbf..84eea97c8d 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToObservable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToObservable.java
@@ -14,9 +14,7 @@
package io.reactivex.rxjava3.internal.operators.completable;
import io.reactivex.rxjava3.core.*;
-import io.reactivex.rxjava3.disposables.Disposable;
-import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
-import io.reactivex.rxjava3.internal.observers.BasicQueueDisposable;
+import io.reactivex.rxjava3.internal.operators.observable.ObservableFromCompletable;
/**
* Wraps a Completable and exposes it as an Observable.
@@ -33,66 +31,6 @@ public CompletableToObservable(CompletableSource source) {
@Override
protected void subscribeActual(Observer super T> observer) {
- source.subscribe(new ObserverCompletableObserver(observer));
- }
-
- static final class ObserverCompletableObserver extends BasicQueueDisposable
- implements CompletableObserver {
-
- final Observer> observer;
-
- Disposable upstream;
-
- ObserverCompletableObserver(Observer> observer) {
- this.observer = observer;
- }
-
- @Override
- public void onComplete() {
- observer.onComplete();
- }
-
- @Override
- public void onError(Throwable e) {
- observer.onError(e);
- }
-
- @Override
- public void onSubscribe(Disposable d) {
- if (DisposableHelper.validate(upstream, d)) {
- this.upstream = d;
- observer.onSubscribe(this);
- }
- }
-
- @Override
- public int requestFusion(int mode) {
- return mode & ASYNC;
- }
-
- @Override
- public Void poll() {
- return null; // always empty
- }
-
- @Override
- public boolean isEmpty() {
- return true;
- }
-
- @Override
- public void clear() {
- // always empty
- }
-
- @Override
- public void dispose() {
- upstream.dispose();
- }
-
- @Override
- public boolean isDisposed() {
- return upstream.isDisposed();
- }
+ source.subscribe(new ObservableFromCompletable.FromCompletableObserver<>(observer));
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableElementAtMaybePublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableElementAtMaybePublisher.java
new file mode 100644
index 0000000000..a474b03280
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableElementAtMaybePublisher.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.flowable;
+
+import org.reactivestreams.Publisher;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.internal.operators.flowable.FlowableElementAtMaybe.ElementAtSubscriber;
+
+/**
+ * Emits the indexth element from a Publisher as a Maybe.
+ *
+ * @param the element type of the source
+ * @since 3.0.0
+ */
+public final class FlowableElementAtMaybePublisher extends Maybe {
+
+ final Publisher source;
+
+ final long index;
+
+ public FlowableElementAtMaybePublisher(Publisher source, long index) {
+ this.source = source;
+ this.index = index;
+ }
+
+ @Override
+ protected void subscribeActual(MaybeObserver super T> observer) {
+ source.subscribe(new ElementAtSubscriber<>(observer, index));
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromAction.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromAction.java
new file mode 100644
index 0000000000..2d683e7601
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromAction.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.flowable;
+
+import org.reactivestreams.Subscriber;
+
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.internal.fuseable.CancellableQueueFuseable;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+
+/**
+ * Executes an {@link Action} and signals its exception or completes normally.
+ *
+ * @param the value type
+ * @since 3.0.0
+ */
+public final class FlowableFromAction extends Flowable implements Supplier {
+
+ final Action action;
+
+ public FlowableFromAction(Action action) {
+ this.action = action;
+ }
+
+ @Override
+ protected void subscribeActual(Subscriber super T> subscriber) {
+ CancellableQueueFuseable qs = new CancellableQueueFuseable<>();
+ subscriber.onSubscribe(qs);
+
+ if (!qs.isDisposed()) {
+
+ try {
+ action.run();
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ if (!qs.isDisposed()) {
+ subscriber.onError(ex);
+ } else {
+ RxJavaPlugins.onError(ex);
+ }
+ return;
+ }
+
+ if (!qs.isDisposed()) {
+ subscriber.onComplete();
+ }
+ }
+ }
+
+ @Override
+ public T get() throws Throwable {
+ action.run();
+ return null; // considered as onComplete()
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromCompletable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromCompletable.java
new file mode 100644
index 0000000000..78b37325ca
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromCompletable.java
@@ -0,0 +1,86 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.flowable;
+
+import org.reactivestreams.Subscriber;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
+import io.reactivex.rxjava3.internal.fuseable.*;
+
+/**
+ * Wrap a Completable into a Flowable.
+ *
+ * @param the value type
+ * @since 3.0.0
+ */
+public final class FlowableFromCompletable extends Flowable implements HasUpstreamCompletableSource {
+
+ final CompletableSource source;
+
+ public FlowableFromCompletable(CompletableSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public CompletableSource source() {
+ return source;
+ }
+
+ @Override
+ protected void subscribeActual(Subscriber super T> observer) {
+ source.subscribe(new FromCompletableObserver(observer));
+ }
+
+ public static final class FromCompletableObserver
+ extends AbstractEmptyQueueFuseable
+ implements CompletableObserver {
+
+ final Subscriber super T> downstream;
+
+ Disposable upstream;
+
+ public FromCompletableObserver(Subscriber super T> downstream) {
+ this.downstream = downstream;
+ }
+
+ @Override
+ public void cancel() {
+ upstream.dispose();
+ upstream = DisposableHelper.DISPOSED;
+ }
+
+ @Override
+ public void onSubscribe(Disposable d) {
+ if (DisposableHelper.validate(this.upstream, d)) {
+ this.upstream = d;
+
+ downstream.onSubscribe(this);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ upstream = DisposableHelper.DISPOSED;
+ downstream.onComplete();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ upstream = DisposableHelper.DISPOSED;
+ downstream.onError(e);
+ }
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservable.java
index 105c6b6ee2..459228f549 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservable.java
@@ -18,9 +18,10 @@
import io.reactivex.rxjava3.disposables.Disposable;
public final class FlowableFromObservable extends Flowable {
- private final Observable upstream;
- public FlowableFromObservable(Observable upstream) {
+ private final ObservableSource upstream;
+
+ public FlowableFromObservable(ObservableSource upstream) {
this.upstream = upstream;
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromRunnable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromRunnable.java
new file mode 100644
index 0000000000..b163f1b7ee
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromRunnable.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.flowable;
+
+import org.reactivestreams.Subscriber;
+
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.Supplier;
+import io.reactivex.rxjava3.internal.fuseable.CancellableQueueFuseable;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+
+/**
+ * Executes an {@link Runnable} and signals its exception or completes normally.
+ *
+ * @param the value type
+ * @since 3.0.0
+ */
+public final class FlowableFromRunnable extends Flowable implements Supplier {
+
+ final Runnable run;
+
+ public FlowableFromRunnable(Runnable run) {
+ this.run = run;
+ }
+
+ @Override
+ protected void subscribeActual(Subscriber super T> subscriber) {
+ CancellableQueueFuseable qs = new CancellableQueueFuseable<>();
+ subscriber.onSubscribe(qs);
+
+ if (!qs.isDisposed()) {
+
+ try {
+ run.run();
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ if (!qs.isDisposed()) {
+ subscriber.onError(ex);
+ } else {
+ RxJavaPlugins.onError(ex);
+ }
+ return;
+ }
+
+ if (!qs.isDisposed()) {
+ subscriber.onComplete();
+ }
+ }
+ }
+
+ @Override
+ public T get() throws Throwable {
+ run.run();
+ return null; // considered as onComplete()
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromCompletable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromCompletable.java
index bdbc2ec03b..5cb73e821c 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromCompletable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromCompletable.java
@@ -19,7 +19,7 @@
import io.reactivex.rxjava3.internal.fuseable.HasUpstreamCompletableSource;
/**
- * Wrap a Single into a Maybe.
+ * Wrap a Completable into a Maybe.
*
* @param the value type
*/
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToSingle.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToSingle.java
index 3199521639..807b9cac51 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToSingle.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToSingle.java
@@ -22,7 +22,7 @@
/**
* Wraps a MaybeSource and exposes its onSuccess and onError signals and signals
- * NoSuchElementException for onComplete.
+ * NoSuchElementException for onComplete if {@code defaultValue} is null.
*
* @param the value type
*/
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromAction.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromAction.java
new file mode 100644
index 0000000000..cccb5b5d38
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromAction.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.observable;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.internal.fuseable.CancellableQueueFuseable;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+
+/**
+ * Executes an {@link Action} and signals its exception or completes normally.
+ *
+ * @param the value type
+ * @since 3.0.0
+ */
+public final class ObservableFromAction extends Observable implements Supplier {
+
+ final Action action;
+
+ public ObservableFromAction(Action action) {
+ this.action = action;
+ }
+
+ @Override
+ protected void subscribeActual(Observer super T> observer) {
+ CancellableQueueFuseable qs = new CancellableQueueFuseable<>();
+ observer.onSubscribe(qs);
+
+ if (!qs.isDisposed()) {
+
+ try {
+ action.run();
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ if (!qs.isDisposed()) {
+ observer.onError(ex);
+ } else {
+ RxJavaPlugins.onError(ex);
+ }
+ return;
+ }
+
+ if (!qs.isDisposed()) {
+ observer.onComplete();
+ }
+ }
+ }
+
+ @Override
+ public T get() throws Throwable {
+ action.run();
+ return null; // considered as onComplete()
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCompletable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCompletable.java
new file mode 100644
index 0000000000..4748e6a625
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCompletable.java
@@ -0,0 +1,89 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.observable;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
+import io.reactivex.rxjava3.internal.fuseable.*;
+
+/**
+ * Wrap a Completable into an Observable.
+ *
+ * @param the value type
+ * @since 3.0.0
+ */
+public final class ObservableFromCompletable extends Observable implements HasUpstreamCompletableSource {
+
+ final CompletableSource source;
+
+ public ObservableFromCompletable(CompletableSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public CompletableSource source() {
+ return source;
+ }
+
+ @Override
+ protected void subscribeActual(Observer super T> observer) {
+ source.subscribe(new FromCompletableObserver(observer));
+ }
+
+ public static final class FromCompletableObserver
+ extends AbstractEmptyQueueFuseable
+ implements CompletableObserver {
+
+ final Observer super T> downstream;
+
+ Disposable upstream;
+
+ public FromCompletableObserver(Observer super T> downstream) {
+ this.downstream = downstream;
+ }
+
+ @Override
+ public void dispose() {
+ upstream.dispose();
+ upstream = DisposableHelper.DISPOSED;
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return upstream.isDisposed();
+ }
+
+ @Override
+ public void onSubscribe(Disposable d) {
+ if (DisposableHelper.validate(this.upstream, d)) {
+ this.upstream = d;
+
+ downstream.onSubscribe(this);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ upstream = DisposableHelper.DISPOSED;
+ downstream.onComplete();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ upstream = DisposableHelper.DISPOSED;
+ downstream.onError(e);
+ }
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromRunnable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromRunnable.java
new file mode 100644
index 0000000000..0dc37a8993
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromRunnable.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.observable;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.Supplier;
+import io.reactivex.rxjava3.internal.fuseable.CancellableQueueFuseable;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+
+/**
+ * Executes an {@link Runnable} and signals its exception or completes normally.
+ *
+ * @param the value type
+ * @since 3.0.0
+ */
+public final class ObservableFromRunnable extends Observable implements Supplier {
+
+ final Runnable run;
+
+ public ObservableFromRunnable(Runnable run) {
+ this.run = run;
+ }
+
+ @Override
+ protected void subscribeActual(Observer super T> observer) {
+ CancellableQueueFuseable qs = new CancellableQueueFuseable<>();
+ observer.onSubscribe(qs);
+
+ if (!qs.isDisposed()) {
+
+ try {
+ run.run();
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ if (!qs.isDisposed()) {
+ observer.onError(ex);
+ } else {
+ RxJavaPlugins.onError(ex);
+ }
+ return;
+ }
+
+ if (!qs.isDisposed()) {
+ observer.onComplete();
+ }
+ }
+ }
+
+ @Override
+ public T get() throws Throwable {
+ run.run();
+ return null; // considered as onComplete()
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java b/src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java
new file mode 100644
index 0000000000..ed4718fd1c
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java
@@ -0,0 +1,75 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.fuseable;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class CancellableQueueFuseableTest {
+
+ @Test
+ public void offer() {
+ TestHelper.assertNoOffer(new CancellableQueueFuseable<>());
+ }
+
+ @Test
+ public void cancel() {
+ CancellableQueueFuseable