Skip to content

Commit

Permalink
Cache operator
Browse files Browse the repository at this point in the history
Cache operator as discussed in ReactiveX#209

Similar to `replay()` except that this auto-subscribes to the source sequence. This comes with the same cautions as `toList` when dealing with infinite or very large sequences.
  • Loading branch information
benjchristensen committed May 7, 2013
1 parent 5de471a commit 5f6fe9f
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 0 deletions.
86 changes: 86 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.operators.OperationAll;
import rx.operators.OperationCache;
import rx.operators.OperationConcat;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
Expand Down Expand Up @@ -1678,6 +1679,22 @@ public static <T> ConnectableObservable<T> replay(final Observable<T> that) {
return OperationMulticast.multicast(that, ReplaySubject.<T> create());
}

/**
* Similar to {@link #replay()} except that this auto-subscribes to the source sequence.
* <p>
* This is useful when returning an Observable that you wish to cache responses but can't control the
* subscribe/unsubscribe behavior of all the Observers.
* <p>
* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not
* use this on infinite or very large sequences that will use up memory. This is similar to
* the {@link Observable#toList()} operator in this caution.
*
* @return an observable sequence that upon first subscription caches all events for subsequent subscriptions.
*/
public static <T> Observable<T> cache(final Observable<T> that) {
return create(OperationCache.cache(that));
}

/**
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
*
Expand Down Expand Up @@ -3220,6 +3237,22 @@ public ConnectableObservable<T> replay() {
return replay(this);
}

/**
* Similar to {@link #replay()} except that this auto-subscribes to the source sequence.
* <p>
* This is useful when returning an Observable that you wish to cache responses but can't control the
* subscribe/unsubscribe behavior of all the Observers.
* <p>
* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not
* use this on infinite or very large sequences that will use up memory. This is similar to
* the {@link Observable#toList()} operator in this caution.
*
* @return an observable sequence that upon first subscription caches all events for subsequent subscriptions.
*/
public Observable<T> cache() {
return cache(this);
}

/**
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
*
Expand Down Expand Up @@ -4300,6 +4333,59 @@ public void call(String v) {
}
}

@Test
public void testCache() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
Observable<String> o = Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
public Subscription call(final Observer<String> observer) {
final BooleanSubscription subscription = new BooleanSubscription();
new Thread(new Runnable() {

@Override
public void run() {
System.out.println("published observable being executed");
observer.onNext("one");
observer.onCompleted();
counter.incrementAndGet();
}
}).start();
return subscription;
}
}).cache();

// we then expect the following 2 subscriptions to get that same value
final CountDownLatch latch = new CountDownLatch(2);

// subscribe once
o.subscribe(new Action1<String>() {

@Override
public void call(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});

// subscribe again
o.subscribe(new Action1<String>() {

@Override
public void call(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});

if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
fail("subscriptions did not receive values");
}
assertEquals(1, counter.get());
}

private static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
Expand Down
128 changes: 128 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.junit.Assert.*;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subjects.ReplaySubject;
import rx.subscriptions.BooleanSubscription;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

/**
* Similar to {@link Observable#replay()} except that this auto-subscribes to the source sequence.
* <p>
* This is useful when returning an Observable that you wish to cache responses but can't control the
* subscribe/unsubscribe behavior of all the Observers.
* <p>
* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not
* use this on infinite or very large sequences that will use up memory. This is similar to
* the {@link Observable#toList()} operator in this caution.
*
*/
public class OperationCache {

public static <T> Func1<Observer<T>, Subscription> cache(final Observable<T> source) {
return new Func1<Observer<T>, Subscription>() {

final AtomicBoolean subscribed = new AtomicBoolean(false);
private final ReplaySubject<T> cache = ReplaySubject.create();

@Override
public Subscription call(Observer<T> observer) {
if (subscribed.compareAndSet(false, true)) {
// subscribe to the source once
source.subscribe(cache);
/*
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
*
* This means this should never be used on an infinite or very large sequence, similar to toList().
*/
}

return cache.subscribe(observer);
}

};
}

public static class UnitTest {

@Test
public void testCache() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
Observable<String> o = Observable.create(cache(Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
public Subscription call(final Observer<String> observer) {
final BooleanSubscription subscription = new BooleanSubscription();
new Thread(new Runnable() {

@Override
public void run() {
System.out.println("published observable being executed");
observer.onNext("one");
observer.onCompleted();
counter.incrementAndGet();
}
}).start();
return subscription;
}
})));

// we then expect the following 2 subscriptions to get that same value
final CountDownLatch latch = new CountDownLatch(2);

// subscribe once
o.subscribe(new Action1<String>() {

@Override
public void call(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});

// subscribe again
o.subscribe(new Action1<String>() {

@Override
public void call(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});

if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
fail("subscriptions did not receive values");
}
assertEquals(1, counter.get());
}
}

}

0 comments on commit 5f6fe9f

Please sign in to comment.