Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler with Recurse/Inner #1014

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,43 +40,15 @@ public HandlerThreadScheduler(Handler handler) {
this.handler = handler;
}

/**
* Calls {@link HandlerThreadScheduler#schedule(Object, rx.functions.Func2, long, java.util.concurrent.TimeUnit)} with a delay of zero milliseconds.
*
* See {@link #schedule(Object, rx.functions.Func2, long, java.util.concurrent.TimeUnit)}
*/
@Override
public Subscription schedule(Action1<Inner> action) {
InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler);
inner.schedule(action);
return inner;
}

/**
* Calls {@link Handler#postDelayed(Runnable, long)} with a runnable that executes the given action.
*
* @param state
* State to pass into the action.
* @param action
* Action to schedule.
* @param delayTime
* Time the action is to be delayed before executing.
* @param unit
* Time unit of the delay time.
* @return A Subscription from which one can unsubscribe from.
*/
@Override
public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler);
inner.schedule(action, delayTime, unit);
return inner;
public Inner createInner() {
return new InnerHandlerThreadScheduler(handler);
}

private static class InnerHandlerThreadScheduler extends Inner {

private final Handler handler;
private BooleanSubscription innerSubscription = new BooleanSubscription();
private Inner _inner = this;

public InnerHandlerThreadScheduler(Handler handler) {
this.handler = handler;
Expand All @@ -93,28 +65,28 @@ public boolean isUnsubscribed() {
}

@Override
public void schedule(final Action1<Inner> action, long delayTime, TimeUnit unit) {
public void schedule(final Action1<Recurse> action, long delayTime, TimeUnit unit) {
handler.postDelayed(new Runnable() {
@Override
public void run() {
if (_inner.isUnsubscribed()) {
if (isUnsubscribed()) {
return;
}
action.call(_inner);
action.call(Recurse.create(InnerHandlerThreadScheduler.this, action));
}
}, unit.toMillis(delayTime));
}

@Override
public void schedule(final Action1<Inner> action) {
public void schedule(final Action1<Recurse> action) {
handler.postDelayed(new Runnable() {

@Override
public void run() {
if (_inner.isUnsubscribed()) {
if (isUnsubscribed()) {
return;
}
action.call(_inner);
action.call(Recurse.create(InnerHandlerThreadScheduler.this, action));
}

}, 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx.android.subscriptions;

import rx.Scheduler.Inner;
import rx.Scheduler.Recurse;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action0;
Expand All @@ -42,9 +42,9 @@ public void call() {
if (Looper.getMainLooper() == Looper.myLooper()) {
unsubscribe.call();
} else {
AndroidSchedulers.mainThread().schedule(new Action1<Inner>() {
AndroidSchedulers.mainThread().schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
unsubscribe.call();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.robolectric.annotation.Config;

import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Scheduler.Recurse;
import rx.functions.Action1;
import android.os.Handler;

Expand All @@ -41,7 +41,7 @@ public class HandlerThreadSchedulerTest {
public void shouldScheduleImmediateActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
@SuppressWarnings("unchecked")
final Action1<Inner> action = mock(Action1.class);
final Action1<Recurse> action = mock(Action1.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
scheduler.schedule(action);
Expand All @@ -52,14 +52,14 @@ public void shouldScheduleImmediateActionOnHandlerThread() {

// verify that the given handler delegates to our action
runnable.getValue().run();
verify(action).call(any(Inner.class));
verify(action).call(any(Recurse.class));
}

@Test
public void shouldScheduleDelayedActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
@SuppressWarnings("unchecked")
final Action1<Inner> action = mock(Action1.class);
final Action1<Recurse> action = mock(Action1.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
scheduler.schedule(action, 1L, TimeUnit.SECONDS);
Expand All @@ -70,6 +70,6 @@ public void shouldScheduleDelayedActionOnHandlerThread() {

// verify that the given handler delegates to our action
runnable.getValue().run();
verify(action).call(any(Inner.class));
verify(action).call(any(Recurse.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Scheduler.Recurse;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
Expand Down Expand Up @@ -599,9 +599,9 @@ public static <R> Func0<Observable<R>> toAsync(final Func0<? extends R> func, fi
@Override
public Observable<R> call() {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call();
Expand Down Expand Up @@ -656,9 +656,9 @@ public static <T1, R> Func1<T1, Observable<R>> toAsync(final Func1<? super T1, ?
@Override
public Observable<R> call(final T1 t1) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1);
Expand Down Expand Up @@ -715,9 +715,9 @@ public static <T1, T2, R> Func2<T1, T2, Observable<R>> toAsync(final Func2<? sup
@Override
public Observable<R> call(final T1 t1, final T2 t2) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2);
Expand Down Expand Up @@ -776,9 +776,9 @@ public static <T1, T2, T3, R> Func3<T1, T2, T3, Observable<R>> toAsync(final Fun
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3);
Expand Down Expand Up @@ -839,9 +839,9 @@ public static <T1, T2, T3, T4, R> Func4<T1, T2, T3, T4, Observable<R>> toAsync(f
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3, t4);
Expand Down Expand Up @@ -904,9 +904,9 @@ public static <T1, T2, T3, T4, T5, R> Func5<T1, T2, T3, T4, T5, Observable<R>> t
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5);
Expand Down Expand Up @@ -971,9 +971,9 @@ public static <T1, T2, T3, T4, T5, T6, R> Func6<T1, T2, T3, T4, T5, T6, Observab
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6);
Expand Down Expand Up @@ -1040,9 +1040,9 @@ public static <T1, T2, T3, T4, T5, T6, T7, R> Func7<T1, T2, T3, T4, T5, T6, T7,
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6, t7);
Expand Down Expand Up @@ -1111,9 +1111,9 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Func8<T1, T2, T3, T4, T5, T6,
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6, t7, t8);
Expand Down Expand Up @@ -1184,9 +1184,9 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Func9<T1, T2, T3, T4, T5,
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8, final T9 t9) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6, t7, t8, t9);
Expand Down Expand Up @@ -1237,9 +1237,9 @@ public static <R> FuncN<Observable<R>> toAsync(final FuncN<? extends R> func, fi
@Override
public Observable<R> call(final Object... args) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action1<Inner>() {
scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner inner) {
public void call(Recurse inner) {
R result;
try {
result = func.call(args);
Expand Down Expand Up @@ -1764,9 +1764,9 @@ public void call(Subscriber<? super U> t1) {
}
}, csub);

csub.set(scheduler.schedule(new Action1<Inner>() {
csub.set(scheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Inner t1) {
public void call(Recurse t1) {
if (!csub.isUnsubscribed()) {
action.call(subject, csub);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package rx.util.async.operators;

import rx.Scheduler.Inner;
import rx.Scheduler.Recurse;
import rx.functions.Action0;
import rx.functions.Action1;

Expand Down Expand Up @@ -66,22 +66,22 @@ public void call() {
* @param run the Runnable to run when the Action0 is called
* @return the Action0 wrapping the Runnable
*/
public static Action1<Inner> fromRunnable(Runnable run) {
public static Action1<Recurse> fromRunnable(Runnable run) {
if (run == null) {
throw new NullPointerException("run");
}
return new ActionWrappingRunnable(run);
}
/** An Action1 which wraps and calls a Runnable. */
private static final class ActionWrappingRunnable implements Action1<Inner> {
private static final class ActionWrappingRunnable implements Action1<Recurse> {
final Runnable run;

public ActionWrappingRunnable(Runnable run) {
this.run = run;
}

@Override
public void call(Inner inner) {
public void call(Recurse inner) {
run.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,18 @@ private NewFiberScheduler() {
}

@Override
public Subscription schedule(Action1<Scheduler.Inner> action) {
EventLoopScheduler innerScheduler = new EventLoopScheduler();
innerScheduler.schedule(action);
return innerScheduler.innerSubscription;
public Inner createInner() {
return new EventLoopScheduler();
}

@Override
public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit) {
EventLoopScheduler innerScheduler = new EventLoopScheduler();
innerScheduler.schedule(action, delayTime, unit);
return innerScheduler.innerSubscription;
}

private class EventLoopScheduler extends Scheduler.Inner implements Subscription {
private final CompositeSubscription innerSubscription = new CompositeSubscription();

private EventLoopScheduler() {
}

@Override
public void schedule(final Action1<Scheduler.Inner> action) {
public void schedule(final Action1<Recurse> action) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return;
Expand All @@ -88,7 +79,7 @@ public void run() throws SuspendExecution {
if (innerSubscription.isUnsubscribed()) {
return;
}
action.call(EventLoopScheduler.this);
action.call(Recurse.create(EventLoopScheduler.this, action));
} finally {
// remove the subscription now that we're completed
Subscription s = sf.get();
Expand All @@ -104,7 +95,7 @@ public void run() throws SuspendExecution {
}

@Override
public void schedule(final Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit unit) {
public void schedule(final Action1<Recurse> action, final long delayTime, final TimeUnit unit) {
final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();

Subscription s = Subscriptions.from(new Fiber(fiberScheduler, new SuspendableRunnable() {
Expand All @@ -117,7 +108,7 @@ public void run() throws InterruptedException, SuspendExecution {
return;
}
// now that the delay is past schedule the work to be done for real on the UI thread
action.call(EventLoopScheduler.this);
action.call(Recurse.create(EventLoopScheduler.this, action));
} finally {
// remove the subscription now that we're completed
Subscription s = sf.get();
Expand Down
Loading