Skip to content

Commit

Permalink
GroupBy and SubscribeOn Tests Passing
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Feb 13, 2014
1 parent cb5b5fb commit a394a7d
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 142 deletions.
118 changes: 55 additions & 63 deletions rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,44 @@
import rx.Subscriber;
import rx.observables.GroupedObservable;
import rx.subjects.PublishSubject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

/**
* Asynchronously subscribes and unsubscribes Observers on the specified Scheduler.
* Subscribes and unsubscribes Observers on the specified Scheduler.
* <p>
* Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables
* in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred.
* <p>
* See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous
* subscribe is solving.
*
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/subscribeOn.png">
*/
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {

private final Scheduler scheduler;
/**
/**
* Indicate that events fired between the original subscription time and
* the actual subscription time should not get lost.
*/
private final boolean dontLoseEvents;
/** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
private final int bufferSize;

public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents) {
this(scheduler, dontLoseEvents, -1);
}

/**
* Construct a SubscribeOn operator.
* @param scheduler the target scheduler
* @param dontLoseEvents indicate that events should be buffered until the actual subscription happens
* @param bufferSize if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
* block the source. -1 indicates an unbounded buffer
*
* @param scheduler
* the target scheduler
* @param dontLoseEvents
* indicate that events should be buffered until the actual subscription happens
* @param bufferSize
* if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
* block the source. -1 indicates an unbounded buffer
*/
public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents, int bufferSize) {
this.scheduler = scheduler;
Expand All @@ -71,78 +80,61 @@ public void onCompleted() {
public void onError(Throwable e) {
subscriber.onError(e);
}

boolean checkNeedBuffer(Observable<?> o) {
return dontLoseEvents || ((o instanceof GroupedObservable<?, ?>)
/*
* Included are some Observable types known to be "hot" and thus needing
* buffering when subscribing across thread boundaries otherwise
* we can lose data.
*
* See https://github.com/Netflix/RxJava/issues/844 for more information.
*/
return dontLoseEvents
|| ((o instanceof GroupedObservable<?, ?>)
|| (o instanceof PublishSubject<?>)
// || (o instanceof BehaviorSubject<?, ?>)
);
}

@Override
public void onNext(final Observable<T> o) {
if (checkNeedBuffer(o)) {
final CompositeSubscription cs = new CompositeSubscription();
subscriber.add(cs);
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber, new CompositeSubscription());
// use buffering (possibly blocking) for a possibly synchronous subscribe
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber);
o.subscribe(bus);
scheduler.schedule(new Action1<Inner>() {
subscriber.add(scheduler.schedule(new Action1<Inner>() {
@Override
public void call(final Inner inner) {
cs.add(Subscriptions.create(new Action0() {
@Override
public void call() {
inner.schedule(new Action1<Inner>() {
@Override
public void call(final Inner inner) {
bus.unsubscribe();
}
});
}
}));
bus.enterPassthroughMode();
}
});
}));
return;
}
scheduler.schedule(new Action1<Inner>() {

@Override
public void call(final Inner inner) {
final CompositeSubscription cs = new CompositeSubscription();
subscriber.add(Subscriptions.create(new Action0() {

@Override
public void call() {
inner.schedule(new Action1<Inner>() {

@Override
public void call(final Inner inner) {
cs.unsubscribe();
}

});
}
} else {
// no buffering (async subscribe)
scheduler.schedule(new Action1<Inner>() {

}));
cs.add(subscriber);
o.subscribe(new Subscriber<T>(cs) {
@Override
public void call(final Inner inner) {
o.subscribe(new Subscriber<T>(subscriber) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}
});
}
});
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
});
}
});
}
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ public void testUnsubscribeOnNestedTakeAndSyncInfiniteStream() throws Interrupte
/*
* We will only take 1 group with 20 events from it and then unsubscribe.
*/
@Ignore // failing because of subscribeOn time gap issue: https://github.com/Netflix/RxJava/issues/844
@Test
public void testUnsubscribeOnNestedTakeAndAsyncInfiniteStream() throws InterruptedException {
final AtomicInteger subscribeCounter = new AtomicInteger();
Expand Down Expand Up @@ -648,7 +647,6 @@ public void call(String s) {
assertEquals(6, results.size());
}

@Ignore // failing because of subscribeOn time gap issue: https://github.com/Netflix/RxJava/issues/844
@Test
public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException {
final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete
Expand Down Expand Up @@ -702,7 +700,7 @@ public void call() {

});
} else {
return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
return group.subscribeOn(Schedulers.newThread(), 1).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {

@Override
public String call(Integer t1) {
Expand Down Expand Up @@ -803,7 +801,6 @@ public void call(String s) {
assertEquals(6, results.size());
}

@Ignore // failing because of subscribeOn time gap issue: https://github.com/Netflix/RxJava/issues/844
@Test
public void testGroupsWithNestedSubscribeOn() throws InterruptedException {
final ArrayList<String> results = new ArrayList<String>();
Expand All @@ -829,7 +826,7 @@ public Integer call(Integer t) {

@Override
public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
return group.subscribeOn(Schedulers.newThread()).map(new Func1<Integer, String>() {
return group.subscribeOn(Schedulers.newThread(), 0).map(new Func1<Integer, String>() {

@Override
public String call(Integer t1) {
Expand Down
Loading

0 comments on commit a394a7d

Please sign in to comment.