From 66515393fafb1fdde78c87ad8c772a12ee62c3fc Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 6 May 2013 23:43:00 -0700 Subject: [PATCH] Publish operator on Observable https://github.com/Netflix/RxJava/issues/15 --- rxjava-core/src/main/java/rx/Observable.java | 111 ++++++++++++++++--- 1 file changed, 95 insertions(+), 16 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 73a11e421c..ba54a1c7fb 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -77,6 +77,7 @@ import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; +import rx.subjects.PublishSubject; import rx.subjects.Subject; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; @@ -1665,6 +1666,17 @@ public static Observable onErrorReturn(final Observable that, Func1 ConnectableObservable publish(final Observable that) { + return OperationMulticast.multicast(that, PublishSubject. create()); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -2043,7 +2055,7 @@ public static Observable takeLast(final Observable items, final int co * @param items * @param predicate * a function to test each source element for a condition - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public static Observable takeWhile(final Observable items, Func1 predicate) { return create(OperationTakeWhile.takeWhile(items, predicate)); @@ -2055,7 +2067,7 @@ public static Observable takeWhile(final Observable items, Func1 Observable takeWhile(final Observable items, Object predicate) { @SuppressWarnings("rawtypes") @@ -2075,7 +2087,7 @@ public Boolean call(T t) { * @param items * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) { return create(OperationTakeWhile.takeWhileWithIndex(items, predicate)); @@ -2097,12 +2109,13 @@ public Boolean call(T t, Integer integer) /** * Adds a timestamp to each item emitted by this observable. + * * @return An observable sequence of timestamped items. */ public Observable> timestamp() { return create(OperationTimestamp.timestamp(this)); } - + /** * Return a Future representing a single value of the Observable. *

@@ -2424,7 +2437,7 @@ public static Observable toObservable(T... items) { * @param sequence * @throws ClassCastException * if T objects do not implement Comparable - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public static Observable> toSortedList(Observable sequence) { return create(OperationToObservableSortedList.toSortedList(sequence)); @@ -2437,7 +2450,7 @@ public static Observable> toSortedList(Observable sequence) { * * @param sequence * @param sortFunction - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public static Observable> toSortedList(Observable sequence, Func2 sortFunction) { return create(OperationToObservableSortedList.toSortedList(sequence, sortFunction)); @@ -2450,7 +2463,7 @@ public static Observable> toSortedList(Observable sequence, Func2 * * @param sequence * @param sortFunction - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public static Observable> toSortedList(Observable sequence, final Object sortFunction) { @SuppressWarnings("rawtypes") @@ -3226,6 +3239,15 @@ public Observable reduce(Func2 accumulator) { return reduce(this, accumulator); } + /** + * Returns a connectable observable sequence that shares a single subscription to the underlying sequence. + * + * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. + */ + public ConnectableObservable publish() { + return OperationMulticast.multicast(this, PublishSubject. create()); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -3341,7 +3363,7 @@ public Observable scan(Func2 accumulator) { public Observable sample(long period, TimeUnit unit) { return create(OperationSample.sample(this, period, unit)); } - + /** * Samples the observable sequence at each interval. * @@ -3356,7 +3378,7 @@ public Observable sample(long period, TimeUnit unit) { public Observable sample(long period, TimeUnit unit, Scheduler scheduler) { return create(OperationSample.sample(this, period, unit, scheduler)); } - + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -3490,7 +3512,7 @@ public Observable take(final int num) { * * @param predicate * a function to test each source element for a condition - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public Observable takeWhile(final Func1 predicate) { return takeWhile(this, predicate); @@ -3501,7 +3523,7 @@ public Observable takeWhile(final Func1 predicate) { * * @param predicate * a function to test each source element for a condition - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public Observable takeWhile(final Object predicate) { return takeWhile(this, predicate); @@ -3512,7 +3534,7 @@ public Observable takeWhile(final Object predicate) { * * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public Observable takeWhileWithIndex(final Func2 predicate) { return takeWhileWithIndex(this, predicate); @@ -3523,7 +3545,7 @@ public Observable takeWhileWithIndex(final Func2 predica * * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public Observable takeWhileWithIndex(final Object predicate) { return takeWhileWithIndex(this, predicate); @@ -3594,7 +3616,7 @@ public Observable> toList() { * * @throws ClassCastException * if T objects do not implement Comparable - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public Observable> toSortedList() { return toSortedList(this); @@ -3606,7 +3628,7 @@ public Observable> toSortedList() { * * * @param sortFunction - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public Observable> toSortedList(Func2 sortFunction) { return toSortedList(this, sortFunction); @@ -3618,7 +3640,7 @@ public Observable> toSortedList(Func2 sortFunction) { * * * @param sortFunction - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public Observable> toSortedList(final Object sortFunction) { return toSortedList(this, sortFunction); @@ -4211,6 +4233,63 @@ public void call(String t1) { } } + @Test + public void testPublish() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + ConnectableObservable o = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + System.out.println("published observable being executed"); + observer.onNext("one"); + observer.onCompleted(); + counter.incrementAndGet(); + } + }).start(); + return subscription; + } + }).publish(); + + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + Subscription s = o.connect(); + try { + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } finally { + s.unsubscribe(); + } + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; }