diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 55d20cf1544..d7a7616bb66 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -66,6 +66,7 @@ import rx.operators.OperatorToIterator; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaPlugins; +import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.AtomicObserver; import rx.util.Range; @@ -529,20 +530,13 @@ public NeverObservable() { @Override public Subscription call(Observer t1) { - return new NoOpObservableSubscription(); + return Subscriptions.empty(); } }, true); } } - /** - * A {@link Subscription} that does nothing when its unsubscribe method is called. - */ - private static class NoOpObservableSubscription implements Subscription { - public void unsubscribe() { - } - } /** * an Observable that calls {@link Observer#onError(Exception)} when the Observer subscribes. @@ -565,7 +559,7 @@ public ThrowObservable(final Exception exception) { @Override public Subscription call(Observer observer) { observer.onError(exception); - return new NoOpObservableSubscription(); + return Subscriptions.empty(); } }, true); @@ -1226,54 +1220,6 @@ public static Observable never() { return new NeverObservable(); } - /** - * A {@link Subscription} that does nothing. - * - * //TODO should this be moved to a Subscriptions utility class? - * - * @return - */ - public static Subscription noOpSubscription() { - return new NoOpObservableSubscription(); - } - - /** - * A {@link Subscription} implemented via a Func - * - * //TODO should this be moved to a Subscriptions utility class? - * - * @return - */ - public static Subscription createSubscription(final Action0 unsubscribe) { - return new Subscription() { - - @Override - public void unsubscribe() { - unsubscribe.call(); - } - - }; - } - - /** - * A {@link Subscription} implemented via an anonymous function (such as closures from other languages). - * - * //TODO should this be moved to a Subscriptions utility class? - * - * @return - */ - public static Subscription createSubscription(final Object unsubscribe) { - final FuncN f = Functions.from(unsubscribe); - return new Subscription() { - - @Override - public void unsubscribe() { - f.call(); - } - - }; - } - /** * Instruct an Observable to pass control to another Observable (the return value of a function) * rather than calling onError if it encounters an error. @@ -3172,7 +3118,7 @@ public Subscription call(Observer Observer) { Observer.onNext("two"); Observer.onNext("three"); Observer.onCompleted(); - return Observable.noOpSubscription(); + return Subscriptions.empty(); } }); @@ -3257,7 +3203,7 @@ public void testToIterableWithException() { public Subscription call(Observer observer) { observer.onNext("one"); observer.onError(new TestException()); - return Observable.noOpSubscription(); + return Subscriptions.empty(); } }); diff --git a/rxjava-core/src/main/java/rx/Subscription.java b/rxjava-core/src/main/java/rx/Subscription.java index 89bef356cb4..e2347257ca6 100644 --- a/rxjava-core/src/main/java/rx/Subscription.java +++ b/rxjava-core/src/main/java/rx/Subscription.java @@ -15,6 +15,13 @@ */ package rx; +import rx.subscriptions.Subscriptions; + +/** + * Subscription returns from {@link Observable#subscribe(Observer)} to allow unsubscribing. + *

+ * See utilities in {@link Subscriptions} and implementations in the {@link rx.subscriptions} package. + */ public interface Subscription { /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 208091874b0..725d7781aeb 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -32,6 +32,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.functions.Func1; import rx.util.functions.Func2; import rx.util.functions.Func3; @@ -798,7 +799,7 @@ private static class TestObservable extends Observable { public Subscription subscribe(Observer Observer) { // just store the variable where it can be accessed so we can manually trigger it this.Observer = Observer; - return Observable.noOpSubscription(); + return Subscriptions.empty(); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index 0c16de107c7..610a5ef0441 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -36,6 +36,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.Exceptions; import rx.util.functions.Func1; @@ -333,7 +334,7 @@ public void run() { } } }).start(); - return Observable.noOpSubscription(); + return Subscriptions.empty(); } }); diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java index 8ffb03a0bce..8dc80d0ab13 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java @@ -28,6 +28,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.CompositeException; import rx.util.functions.Func1; @@ -114,7 +115,7 @@ public void testResumeNextWithSynchronousExecution() { public Subscription call(Observer observer) { observer.onNext("one"); observer.onError(new Exception("injected failure")); - return Observable.noOpSubscription(); + return Subscriptions.empty(); } }); diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableFuture.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableFuture.java index 0c19619e4ee..a7b933036cb 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableFuture.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableFuture.java @@ -7,9 +7,9 @@ import org.junit.Test; -import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.functions.Func1; public class OperationToObservableFuture { @@ -45,7 +45,7 @@ public Subscription call(Observer observer) { // the get() has already completed so there is no point in // giving the user a way to cancel. - return Observable.noOpSubscription(); + return Subscriptions.empty(); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java index 810491fb18d..79142f70a6b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java @@ -26,6 +26,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.functions.Func1; /** @@ -53,7 +54,7 @@ public Subscription call(Observer observer) { } observer.onCompleted(); - return Observable.noOpSubscription(); + return Subscriptions.empty(); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java index a2822003444..d1c1ca2ff5a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationZip.java @@ -32,6 +32,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.SynchronizedObserver; import rx.util.functions.Func1; @@ -805,7 +806,7 @@ private static class TestObservable extends Observable { public Subscription subscribe(Observer Observer) { // just store the variable where it can be accessed so we can manually trigger it this.Observer = Observer; - return Observable.noOpSubscription(); + return Subscriptions.empty(); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorToIterator.java b/rxjava-core/src/main/java/rx/operators/OperatorToIterator.java index 6c71afb6306..db067fb8581 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorToIterator.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorToIterator.java @@ -12,6 +12,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.Exceptions; import rx.util.functions.Func1; @@ -116,7 +117,7 @@ public void testToIteratorWithException() { public Subscription call(Observer observer) { observer.onNext("one"); observer.onError(new TestException()); - return Observable.noOpSubscription(); + return Subscriptions.empty(); } }); diff --git a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java new file mode 100644 index 00000000000..358df28bdb0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java @@ -0,0 +1,26 @@ +package rx.subscriptions; + +import java.util.concurrent.atomic.AtomicBoolean; + +import rx.Observable; +import rx.Subscription; + +/** + * Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop if unsubscribed. + * + * @see Rx.Net equivalent BooleanDisposable at http://msdn.microsoft.com/en-us/library/system.reactive.disposables.booleandisposable(v=vs.103).aspx + */ +public class BooleanSubscription implements Subscription { + + private final AtomicBoolean unsubscribed = new AtomicBoolean(false); + + public boolean isUnsubscribed() { + return unsubscribed.get(); + } + + @Override + public void unsubscribe() { + unsubscribed.set(false); + } + +} diff --git a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java new file mode 100644 index 00000000000..0d0d0aa0a8a --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java @@ -0,0 +1,58 @@ +package rx.subscriptions; + +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.FuncN; +import rx.util.functions.Functions; + +public class Subscriptions { + /** + * A {@link Subscription} that does nothing. + * + * @return {@link Subscription} + */ + public static Subscription empty() { + return new EmptySubscription(); + } + + /** + * A {@link Subscription} implemented via a Func + * + * @return {@link Subscription} + */ + public static Subscription createSubscription(final Action0 unsubscribe) { + return new Subscription() { + + @Override + public void unsubscribe() { + unsubscribe.call(); + } + + }; + } + + /** + * A {@link Subscription} implemented via an anonymous function (such as closures from other languages). + * + * @return {@link Subscription} + */ + public static Subscription createSubscription(final Object unsubscribe) { + final FuncN f = Functions.from(unsubscribe); + return new Subscription() { + + @Override + public void unsubscribe() { + f.call(); + } + + }; + } + + /** + * A {@link Subscription} that does nothing when its unsubscribe method is called. + */ + private static class EmptySubscription implements Subscription { + public void unsubscribe() { + } + } +}