Skip to content

Commit

Permalink
Scheduler with Schedulable/EventLoop
Browse files Browse the repository at this point in the history
API changes as per ReactiveX#997
  • Loading branch information
benjchristensen committed Apr 11, 2014
1 parent a9e7975 commit 3b9b59e
Show file tree
Hide file tree
Showing 49 changed files with 666 additions and 663 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,43 +40,15 @@ public HandlerThreadScheduler(Handler handler) {
this.handler = handler;
}

/**
* Calls {@link HandlerThreadScheduler#schedule(Object, rx.functions.Func2, long, java.util.concurrent.TimeUnit)} with a delay of zero milliseconds.
*
* See {@link #schedule(Object, rx.functions.Func2, long, java.util.concurrent.TimeUnit)}
*/
@Override
public Subscription schedule(Action1<Inner> action) {
InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler);
inner.schedule(action);
return inner;
}

/**
* Calls {@link Handler#postDelayed(Runnable, long)} with a runnable that executes the given action.
*
* @param state
* State to pass into the action.
* @param action
* Action to schedule.
* @param delayTime
* Time the action is to be delayed before executing.
* @param unit
* Time unit of the delay time.
* @return A Subscription from which one can unsubscribe from.
*/
@Override
public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler);
inner.schedule(action, delayTime, unit);
return inner;
public Inner createInner() {
return new InnerHandlerThreadScheduler(handler);
}

private static class InnerHandlerThreadScheduler extends Inner {

private final Handler handler;
private BooleanSubscription innerSubscription = new BooleanSubscription();
private Inner _inner = this;

public InnerHandlerThreadScheduler(Handler handler) {
this.handler = handler;
Expand All @@ -93,28 +65,28 @@ public boolean isUnsubscribed() {
}

@Override
public void schedule(final Action1<Inner> action, long delayTime, TimeUnit unit) {
public void schedule(final Action1<Recurse> action, long delayTime, TimeUnit unit) {
handler.postDelayed(new Runnable() {
@Override
public void run() {
if (_inner.isUnsubscribed()) {
if (isUnsubscribed()) {
return;
}
action.call(_inner);
action.call(Recurse.create(InnerHandlerThreadScheduler.this, action));
}
}, unit.toMillis(delayTime));
}

@Override
public void schedule(final Action1<Inner> action) {
public void schedule(final Action1<Recurse> action) {
handler.postDelayed(new Runnable() {

@Override
public void run() {
if (_inner.isUnsubscribed()) {
if (isUnsubscribed()) {
return;
}
action.call(_inner);
action.call(Recurse.create(InnerHandlerThreadScheduler.this, action));
}

}, 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx.android.subscriptions;

import rx.Scheduler.Inner;
import rx.Scheduler.Recurse;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action0;
Expand All @@ -42,9 +42,9 @@ public void call() {
if (Looper.getMainLooper() == Looper.myLooper()) {
unsubscribe.call();
} else {
AndroidSchedulers.mainThread().schedule(new Action1<Inner>() {
AndroidSchedulers.mainThread().schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
unsubscribe.call();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.robolectric.annotation.Config;

import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Scheduler.Recurse;
import rx.functions.Action1;
import android.os.Handler;

Expand All @@ -41,7 +41,7 @@ public class HandlerThreadSchedulerTest {
public void shouldScheduleImmediateActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
@SuppressWarnings("unchecked")
final Action1<Inner> action = mock(Action1.class);
final Action1<Recurse> action = mock(Action1.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
scheduler.schedule(action);
Expand All @@ -52,14 +52,14 @@ public void shouldScheduleImmediateActionOnHandlerThread() {

// verify that the given handler delegates to our action
runnable.getValue().run();
verify(action).call(any(Inner.class));
verify(action).call(any(Recurse.class));
}

@Test
public void shouldScheduleDelayedActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
@SuppressWarnings("unchecked")
final Action1<Inner> action = mock(Action1.class);
final Action1<Recurse> action = mock(Action1.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
scheduler.schedule(action, 1L, TimeUnit.SECONDS);
Expand All @@ -70,6 +70,6 @@ public void shouldScheduleDelayedActionOnHandlerThread() {

// verify that the given handler delegates to our action
runnable.getValue().run();
verify(action).call(any(Inner.class));
verify(action).call(any(Recurse.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Scheduler.Recurse;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
Expand Down Expand Up @@ -599,9 +599,9 @@ public static <R> Func0<Observable<R>> toAsync(final Func0<? extends R> func, fi
@Override
public Observable<R> call() {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call();
Expand Down Expand Up @@ -656,9 +656,9 @@ public static <T1, R> Func1<T1, Observable<R>> toAsync(final Func1<? super T1, ?
@Override
public Observable<R> call(final T1 t1) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1);
Expand Down Expand Up @@ -715,9 +715,9 @@ public static <T1, T2, R> Func2<T1, T2, Observable<R>> toAsync(final Func2<? sup
@Override
public Observable<R> call(final T1 t1, final T2 t2) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2);
Expand Down Expand Up @@ -776,9 +776,9 @@ public static <T1, T2, T3, R> Func3<T1, T2, T3, Observable<R>> toAsync(final Fun
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3);
Expand Down Expand Up @@ -839,9 +839,9 @@ public static <T1, T2, T3, T4, R> Func4<T1, T2, T3, T4, Observable<R>> toAsync(f
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3, t4);
Expand Down Expand Up @@ -904,9 +904,9 @@ public static <T1, T2, T3, T4, T5, R> Func5<T1, T2, T3, T4, T5, Observable<R>> t
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5);
Expand Down Expand Up @@ -971,9 +971,9 @@ public static <T1, T2, T3, T4, T5, T6, R> Func6<T1, T2, T3, T4, T5, T6, Observab
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6);
Expand Down Expand Up @@ -1040,9 +1040,9 @@ public static <T1, T2, T3, T4, T5, T6, T7, R> Func7<T1, T2, T3, T4, T5, T6, T7,
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6, t7);
Expand Down Expand Up @@ -1111,9 +1111,9 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Func8<T1, T2, T3, T4, T5, T6,
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6, t7, t8);
Expand Down Expand Up @@ -1184,9 +1184,9 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Func9<T1, T2, T3, T4, T5,
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8, final T9 t9) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6, t7, t8, t9);
Expand Down Expand Up @@ -1237,9 +1237,9 @@ public static <R> FuncN<Observable<R>> toAsync(final FuncN<? extends R> func, fi
@Override
public Observable<R> call(final Object... args) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(args);
Expand Down Expand Up @@ -1764,9 +1764,9 @@ public void call(Subscriber<? super U> t1) {
}
}, csub);

csub.set(scheduler.schedule(new Action1<Inner>() {
csub.set(scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner t1) {
public void call(Recurse t1) {
if (!csub.isUnsubscribed()) {
action.call(subject, csub);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package rx.util.async.operators;

import rx.Scheduler.Inner;
import rx.Scheduler.Recurse;
import rx.functions.Action0;
import rx.functions.Action1;

Expand Down Expand Up @@ -66,22 +66,22 @@ public void call() {
* @param run the Runnable to run when the Action0 is called
* @return the Action0 wrapping the Runnable
*/
public static Action1<Inner> fromRunnable(Runnable run) {
public static Action1<Recurse> fromRunnable(Runnable run) {
if (run == null) {
throw new NullPointerException("run");
}
return new ActionWrappingRunnable(run);
}
/** An Action1 which wraps and calls a Runnable. */
private static final class ActionWrappingRunnable implements Action1<Inner> {
private static final class ActionWrappingRunnable implements Action1<Recurse> {
final Runnable run;

public ActionWrappingRunnable(Runnable run) {
this.run = run;
}

@Override
public void call(Inner inner) {
public void call(Recurse inner) {
run.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,18 @@ private NewFiberScheduler() {
}

@Override
public Subscription schedule(Action1<Scheduler.Inner> action) {
EventLoopScheduler innerScheduler = new EventLoopScheduler();
innerScheduler.schedule(action);
return innerScheduler.innerSubscription;
public EventLoop createEventLoop() {
return new EventLoopScheduler();
}

@Override
public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit) {
EventLoopScheduler innerScheduler = new EventLoopScheduler();
innerScheduler.schedule(action, delayTime, unit);
return innerScheduler.innerSubscription;
}

private class EventLoopScheduler extends Scheduler.Inner implements Subscription {
private class EventLoopScheduler extends Scheduler.EventLoop implements Subscription {
private final CompositeSubscription innerSubscription = new CompositeSubscription();

private EventLoopScheduler() {
}

@Override
public void schedule(final Action1<Scheduler.Inner> action) {
public void schedule(final Action1<Schedulable> action) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return;
Expand All @@ -88,7 +79,7 @@ public void run() throws SuspendExecution {
if (innerSubscription.isUnsubscribed()) {
return;
}
action.call(EventLoopScheduler.this);
action.call(Schedulable.create(EventLoopScheduler.this, action));
} finally {
// remove the subscription now that we're completed
Subscription s = sf.get();
Expand All @@ -104,7 +95,7 @@ public void run() throws SuspendExecution {
}

@Override
public void schedule(final Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit unit) {
public void schedule(final Action1<Schedulable> action, final long delayTime, final TimeUnit unit) {
final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();

Subscription s = Subscriptions.from(new Fiber(fiberScheduler, new SuspendableRunnable() {
Expand All @@ -117,7 +108,7 @@ public void run() throws InterruptedException, SuspendExecution {
return;
}
// now that the delay is past schedule the work to be done for real on the UI thread
action.call(EventLoopScheduler.this);
action.call(Schedulable.create(EventLoopScheduler.this, action));
} finally {
// remove the subscription now that we're completed
Subscription s = sf.get();
Expand Down
Loading

0 comments on commit 3b9b59e

Please sign in to comment.