Skip to content

Commit

Permalink
Publish operator on Observable
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed May 7, 2013
1 parent 707d9cb commit 6651539
Showing 1 changed file with 95 additions and 16 deletions.
111 changes: 95 additions & 16 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
Expand Down Expand Up @@ -1665,6 +1666,17 @@ public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<Ex
return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction));
}

/**
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
*
* @param that
* the source Observable
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*/
public static <T> ConnectableObservable<T> publish(final Observable<T> that) {
return OperationMulticast.multicast(that, PublishSubject.<T> create());
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
Expand Down Expand Up @@ -2043,7 +2055,7 @@ public static <T> Observable<T> takeLast(final Observable<T> items, final int co
* @param items
* @param predicate
* a function to test each source element for a condition
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Boolean> predicate) {
return create(OperationTakeWhile.takeWhile(items, predicate));
Expand All @@ -2055,7 +2067,7 @@ public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Bo
* @param items
* @param predicate
* a function to test each source element for a condition
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public static <T> Observable<T> takeWhile(final Observable<T> items, Object predicate) {
@SuppressWarnings("rawtypes")
Expand All @@ -2075,7 +2087,7 @@ public Boolean call(T t) {
* @param items
* @param predicate
* a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Func2<T, Integer, Boolean> predicate) {
return create(OperationTakeWhile.takeWhileWithIndex(items, predicate));
Expand All @@ -2097,12 +2109,13 @@ public Boolean call(T t, Integer integer)

/**
* Adds a timestamp to each item emitted by this observable.
*
* @return An observable sequence of timestamped items.
*/
public Observable<Timestamped<T>> timestamp() {
return create(OperationTimestamp.timestamp(this));
}

/**
* Return a Future representing a single value of the Observable.
* <p>
Expand Down Expand Up @@ -2424,7 +2437,7 @@ public static <T> Observable<T> toObservable(T... items) {
* @param sequence
* @throws ClassCastException
* if T objects do not implement Comparable
* @return an observable containing the sorted list
* @return an observable containing the sorted list
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence) {
return create(OperationToObservableSortedList.toSortedList(sequence));
Expand All @@ -2437,7 +2450,7 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence) {
*
* @param sequence
* @param sortFunction
* @return an observable containing the sorted list
* @return an observable containing the sorted list
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2<T, T, Integer> sortFunction) {
return create(OperationToObservableSortedList.toSortedList(sequence, sortFunction));
Expand All @@ -2450,7 +2463,7 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2
*
* @param sequence
* @param sortFunction
* @return an observable containing the sorted list
* @return an observable containing the sorted list
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, final Object sortFunction) {
@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -3226,6 +3239,15 @@ public Observable<T> reduce(Func2<T, T, T> accumulator) {
return reduce(this, accumulator);
}

/**
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
*
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*/
public ConnectableObservable<T> publish() {
return OperationMulticast.multicast(this, PublishSubject.<T> create());
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
Expand Down Expand Up @@ -3341,7 +3363,7 @@ public Observable<T> scan(Func2<T, T, T> accumulator) {
public Observable<T> sample(long period, TimeUnit unit) {
return create(OperationSample.sample(this, period, unit));
}

/**
* Samples the observable sequence at each interval.
*
Expand All @@ -3356,7 +3378,7 @@ public Observable<T> sample(long period, TimeUnit unit) {
public Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
return create(OperationSample.sample(this, period, unit, scheduler));
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
Expand Down Expand Up @@ -3490,7 +3512,7 @@ public Observable<T> take(final int num) {
*
* @param predicate
* a function to test each source element for a condition
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public Observable<T> takeWhile(final Func1<T, Boolean> predicate) {
return takeWhile(this, predicate);
Expand All @@ -3501,7 +3523,7 @@ public Observable<T> takeWhile(final Func1<T, Boolean> predicate) {
*
* @param predicate
* a function to test each source element for a condition
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public Observable<T> takeWhile(final Object predicate) {
return takeWhile(this, predicate);
Expand All @@ -3512,7 +3534,7 @@ public Observable<T> takeWhile(final Object predicate) {
*
* @param predicate
* a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public Observable<T> takeWhileWithIndex(final Func2<T, Integer, Boolean> predicate) {
return takeWhileWithIndex(this, predicate);
Expand All @@ -3523,7 +3545,7 @@ public Observable<T> takeWhileWithIndex(final Func2<T, Integer, Boolean> predica
*
* @param predicate
* a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public Observable<T> takeWhileWithIndex(final Object predicate) {
return takeWhileWithIndex(this, predicate);
Expand Down Expand Up @@ -3594,7 +3616,7 @@ public Observable<List<T>> toList() {
*
* @throws ClassCastException
* if T objects do not implement Comparable
* @return an observable containing the sorted list
* @return an observable containing the sorted list
*/
public Observable<List<T>> toSortedList() {
return toSortedList(this);
Expand All @@ -3606,7 +3628,7 @@ public Observable<List<T>> toSortedList() {
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/toSortedList.png">
*
* @param sortFunction
* @return an observable containing the sorted list
* @return an observable containing the sorted list
*/
public Observable<List<T>> toSortedList(Func2<T, T, Integer> sortFunction) {
return toSortedList(this, sortFunction);
Expand All @@ -3618,7 +3640,7 @@ public Observable<List<T>> toSortedList(Func2<T, T, Integer> sortFunction) {
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/toSortedList.png">
*
* @param sortFunction
* @return an observable containing the sorted list
* @return an observable containing the sorted list
*/
public Observable<List<T>> toSortedList(final Object sortFunction) {
return toSortedList(this, sortFunction);
Expand Down Expand Up @@ -4211,6 +4233,63 @@ public void call(String t1) {
}
}

@Test
public void testPublish() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
ConnectableObservable<String> o = Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
public Subscription call(final Observer<String> observer) {
final BooleanSubscription subscription = new BooleanSubscription();
new Thread(new Runnable() {

@Override
public void run() {
System.out.println("published observable being executed");
observer.onNext("one");
observer.onCompleted();
counter.incrementAndGet();
}
}).start();
return subscription;
}
}).publish();

final CountDownLatch latch = new CountDownLatch(2);

// subscribe once
o.subscribe(new Action1<String>() {

@Override
public void call(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});

// subscribe again
o.subscribe(new Action1<String>() {

@Override
public void call(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});

Subscription s = o.connect();
try {
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
fail("subscriptions did not receive values");
}
assertEquals(1, counter.get());
} finally {
s.unsubscribe();
}
}

private static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
Expand Down

0 comments on commit 6651539

Please sign in to comment.