Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Operator Rewrite #1379

Merged
merged 1 commit into from
Jun 24, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 77 additions & 23 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorParallel.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observable.Operator;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subjects.Subject;

/**
* Identifies unit of work that can be executed in parallel on a given Scheduler.
Expand All @@ -38,34 +40,86 @@ public OperatorParallel(Func1<Observable<T>, Observable<R>> f, Scheduler schedul
}

@Override
public Subscriber<? super T> call(Subscriber<? super R> op) {
public Subscriber<? super T> call(final Subscriber<? super R> child) {

@SuppressWarnings("unchecked")
final UnicastPassThruSubject<T>[] subjects = new UnicastPassThruSubject[degreeOfParallelism];
@SuppressWarnings("unchecked")
final Observable<R>[] os = new Observable[degreeOfParallelism];
for (int i = 0; i < subjects.length; i++) {
subjects[i] = UnicastPassThruSubject.<T> create();
os[i] = f.call(subjects[i].observeOn(scheduler));
}

// subscribe BEFORE receiving data so everything is hooked up
Observable.merge(os).unsafeSubscribe(child);

return new Subscriber<T>(child) {

int index = 0; // trust that we receive data synchronously

@Override
public void onCompleted() {
for (UnicastPassThruSubject<T> s : subjects) {
s.onCompleted();
}
}

@Override
public void onError(Throwable e) {
// bypass the subjects and immediately terminate
child.onError(e);
}

@Override
public void onNext(T t) {
// round-robin subjects
subjects[index++].onNext(t);
if (index >= degreeOfParallelism) {
index = 0;
}
}

};

}

private static class UnicastPassThruSubject<T> extends Subject<T, T> {

private static <T> UnicastPassThruSubject<T> create() {
final AtomicReference<Subscriber<? super T>> subscriber = new AtomicReference<Subscriber<? super T>>();
return new UnicastPassThruSubject<T>(subscriber, new OnSubscribe<T>() {

@Override
public void call(Subscriber<? super T> s) {
subscriber.set(s);
}

});

Func1<Subscriber<? super GroupedObservable<Long, T>>, Subscriber<? super T>> groupBy =
new OperatorGroupBy<Long, T>(new Func1<T, Long>() {
}

long i = 0;
private final AtomicReference<Subscriber<? super T>> subscriber;

@Override
public Long call(T t) {
return i++ % degreeOfParallelism;
}
protected UnicastPassThruSubject(AtomicReference<Subscriber<? super T>> subscriber, OnSubscribe<T> onSubscribe) {
super(onSubscribe);
this.subscriber = subscriber;
}

});
@Override
public void onCompleted() {
subscriber.get().onCompleted();
}

Func1<Subscriber<? super Observable<R>>, Subscriber<? super GroupedObservable<Long, T>>> map =
new OperatorMap<GroupedObservable<Long, T>, Observable<R>>(
new Func1<GroupedObservable<Long, T>, Observable<R>>() {
@Override
public void onError(Throwable e) {
subscriber.get().onError(e);
}

@Override
public Observable<R> call(GroupedObservable<Long, T> g) {
// Must use observeOn not subscribeOn because we have a single source behind groupBy.
// The origin is already subscribed to, we are moving each group on to a new thread
// but the origin itself can only be on a single thread.
return f.call(g.observeOn(scheduler));
}
});
@Override
public void onNext(T t) {
subscriber.get().onNext(t);
}

// bind together Observers
return groupBy.call(map.call(new OperatorMerge<R>().call(op)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public class OperatorParallelTest {

@Test
@Test(timeout = 20000)
public void testParallel() {
int NUM = 1000;
final AtomicInteger count = new AtomicInteger();
Expand All @@ -52,7 +52,7 @@ public Integer[] call(Integer t) {
// TODO why is this exception not being thrown?
throw new RuntimeException(e);
}
// System.out.println("V: " + t + " Thread: " + Thread.currentThread());
// System.out.println("V: " + t + " Thread: " + Thread.currentThread());
innerCount.incrementAndGet();
return new Integer[] { t, t * 99 };
}
Expand All @@ -76,7 +76,7 @@ public void call(Integer[] v) {
assertEquals("finalCount", NUM, count.get());
}

@Test
@Test(timeout = 1000)
public void testParallelWithNestedAsyncWork() {
int NUM = 20;
final AtomicInteger count = new AtomicInteger();
Expand Down Expand Up @@ -106,4 +106,5 @@ public void call(String v) {
// just making sure we finish and get the number we expect
assertEquals(NUM, count.get());
}

}