diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java index 9bfa548880..994175c080 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java @@ -40,43 +40,15 @@ public HandlerThreadScheduler(Handler handler) { this.handler = handler; } - /** - * Calls {@link HandlerThreadScheduler#schedule(Object, rx.functions.Func2, long, java.util.concurrent.TimeUnit)} with a delay of zero milliseconds. - * - * See {@link #schedule(Object, rx.functions.Func2, long, java.util.concurrent.TimeUnit)} - */ - @Override - public Subscription schedule(Action1 action) { - InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler); - inner.schedule(action); - return inner; - } - - /** - * Calls {@link Handler#postDelayed(Runnable, long)} with a runnable that executes the given action. - * - * @param state - * State to pass into the action. - * @param action - * Action to schedule. - * @param delayTime - * Time the action is to be delayed before executing. - * @param unit - * Time unit of the delay time. - * @return A Subscription from which one can unsubscribe from. - */ @Override - public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) { - InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler); - inner.schedule(action, delayTime, unit); - return inner; + public Inner createInner() { + return new InnerHandlerThreadScheduler(handler); } - + private static class InnerHandlerThreadScheduler extends Inner { private final Handler handler; private BooleanSubscription innerSubscription = new BooleanSubscription(); - private Inner _inner = this; public InnerHandlerThreadScheduler(Handler handler) { this.handler = handler; @@ -93,28 +65,28 @@ public boolean isUnsubscribed() { } @Override - public void schedule(final Action1 action, long delayTime, TimeUnit unit) { + public void schedule(final Action1 action, long delayTime, TimeUnit unit) { handler.postDelayed(new Runnable() { @Override public void run() { - if (_inner.isUnsubscribed()) { + if (isUnsubscribed()) { return; } - action.call(_inner); + action.call(Recurse.create(InnerHandlerThreadScheduler.this, action)); } }, unit.toMillis(delayTime)); } @Override - public void schedule(final Action1 action) { + public void schedule(final Action1 action) { handler.postDelayed(new Runnable() { @Override public void run() { - if (_inner.isUnsubscribed()) { + if (isUnsubscribed()) { return; } - action.call(_inner); + action.call(Recurse.create(InnerHandlerThreadScheduler.this, action)); } }, 0L); diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/subscriptions/AndroidSubscriptions.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/subscriptions/AndroidSubscriptions.java index 8e2f54ab3e..01763b9c2c 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/subscriptions/AndroidSubscriptions.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/subscriptions/AndroidSubscriptions.java @@ -15,7 +15,7 @@ */ package rx.android.subscriptions; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; import rx.android.schedulers.AndroidSchedulers; import rx.functions.Action0; @@ -42,9 +42,9 @@ public void call() { if (Looper.getMainLooper() == Looper.myLooper()) { unsubscribe.call(); } else { - AndroidSchedulers.mainThread().schedule(new Action1() { + AndroidSchedulers.mainThread().schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { unsubscribe.call(); } }); diff --git a/rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java b/rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java index d68bd258f0..f16e0df978 100644 --- a/rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java +++ b/rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java @@ -29,7 +29,7 @@ import org.robolectric.annotation.Config; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.functions.Action1; import android.os.Handler; @@ -41,7 +41,7 @@ public class HandlerThreadSchedulerTest { public void shouldScheduleImmediateActionOnHandlerThread() { final Handler handler = mock(Handler.class); @SuppressWarnings("unchecked") - final Action1 action = mock(Action1.class); + final Action1 action = mock(Action1.class); Scheduler scheduler = new HandlerThreadScheduler(handler); scheduler.schedule(action); @@ -52,14 +52,14 @@ public void shouldScheduleImmediateActionOnHandlerThread() { // verify that the given handler delegates to our action runnable.getValue().run(); - verify(action).call(any(Inner.class)); + verify(action).call(any(Recurse.class)); } @Test public void shouldScheduleDelayedActionOnHandlerThread() { final Handler handler = mock(Handler.class); @SuppressWarnings("unchecked") - final Action1 action = mock(Action1.class); + final Action1 action = mock(Action1.class); Scheduler scheduler = new HandlerThreadScheduler(handler); scheduler.schedule(action, 1L, TimeUnit.SECONDS); @@ -70,6 +70,6 @@ public void shouldScheduleDelayedActionOnHandlerThread() { // verify that the given handler delegates to our action runnable.getValue().run(); - verify(action).call(any(Inner.class)); + verify(action).call(any(Recurse.class)); } } diff --git a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java index d4ca377a04..346ed65b05 100644 --- a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java +++ b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java @@ -22,7 +22,7 @@ import rx.Observable; import rx.Observer; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; @@ -599,9 +599,9 @@ public static Func0> toAsync(final Func0 func, fi @Override public Observable call() { final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { R result; try { result = func.call(); @@ -656,9 +656,9 @@ public static Func1> toAsync(final Func1 call(final T1 t1) { final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { R result; try { result = func.call(t1); @@ -715,9 +715,9 @@ public static Func2> toAsync(final Func2 call(final T1 t1, final T2 t2) { final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { R result; try { result = func.call(t1, t2); @@ -776,9 +776,9 @@ public static Func3> toAsync(final Fun @Override public Observable call(final T1 t1, final T2 t2, final T3 t3) { final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { R result; try { result = func.call(t1, t2, t3); @@ -839,9 +839,9 @@ public static Func4> toAsync(f @Override public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4) { final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { R result; try { result = func.call(t1, t2, t3, t4); @@ -904,9 +904,9 @@ public static Func5> t @Override public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5) { final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { R result; try { result = func.call(t1, t2, t3, t4, t5); @@ -971,9 +971,9 @@ public static Func6 call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6) { final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { R result; try { result = func.call(t1, t2, t3, t4, t5, t6); @@ -1040,9 +1040,9 @@ public static Func7 call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7) { final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { R result; try { result = func.call(t1, t2, t3, t4, t5, t6, t7); @@ -1111,9 +1111,9 @@ public static Func8 call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8) { final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { R result; try { result = func.call(t1, t2, t3, t4, t5, t6, t7, t8); @@ -1184,9 +1184,9 @@ public static Func9 call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8, final T9 t9) { final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { R result; try { result = func.call(t1, t2, t3, t4, t5, t6, t7, t8, t9); @@ -1237,9 +1237,9 @@ public static FuncN> toAsync(final FuncN func, fi @Override public Observable call(final Object... args) { final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { R result; try { result = func.call(args); @@ -1764,9 +1764,9 @@ public void call(Subscriber t1) { } }, csub); - csub.set(scheduler.schedule(new Action1() { + csub.set(scheduler.schedule(new Action1() { @Override - public void call(Inner t1) { + public void call(Recurse t1) { if (!csub.isUnsubscribed()) { action.call(subject, csub); } diff --git a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/Functionals.java b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/Functionals.java index 037c7b8573..9ef89c4722 100644 --- a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/Functionals.java +++ b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/Functionals.java @@ -16,7 +16,7 @@ package rx.util.async.operators; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.functions.Action0; import rx.functions.Action1; @@ -66,14 +66,14 @@ public void call() { * @param run the Runnable to run when the Action0 is called * @return the Action0 wrapping the Runnable */ - public static Action1 fromRunnable(Runnable run) { + public static Action1 fromRunnable(Runnable run) { if (run == null) { throw new NullPointerException("run"); } return new ActionWrappingRunnable(run); } /** An Action1 which wraps and calls a Runnable. */ - private static final class ActionWrappingRunnable implements Action1 { + private static final class ActionWrappingRunnable implements Action1 { final Runnable run; public ActionWrappingRunnable(Runnable run) { @@ -81,7 +81,7 @@ public ActionWrappingRunnable(Runnable run) { } @Override - public void call(Inner inner) { + public void call(Recurse inner) { run.run(); } diff --git a/rxjava-contrib/rxjava-quasar/src/main/java/rx/quasar/NewFiberScheduler.java b/rxjava-contrib/rxjava-quasar/src/main/java/rx/quasar/NewFiberScheduler.java index f5edc254f6..1b52736ee7 100644 --- a/rxjava-contrib/rxjava-quasar/src/main/java/rx/quasar/NewFiberScheduler.java +++ b/rxjava-contrib/rxjava-quasar/src/main/java/rx/quasar/NewFiberScheduler.java @@ -53,19 +53,10 @@ private NewFiberScheduler() { } @Override - public Subscription schedule(Action1 action) { - EventLoopScheduler innerScheduler = new EventLoopScheduler(); - innerScheduler.schedule(action); - return innerScheduler.innerSubscription; + public Inner createInner() { + return new EventLoopScheduler(); } - @Override - public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) { - EventLoopScheduler innerScheduler = new EventLoopScheduler(); - innerScheduler.schedule(action, delayTime, unit); - return innerScheduler.innerSubscription; - } - private class EventLoopScheduler extends Scheduler.Inner implements Subscription { private final CompositeSubscription innerSubscription = new CompositeSubscription(); @@ -73,7 +64,7 @@ private EventLoopScheduler() { } @Override - public void schedule(final Action1 action) { + public void schedule(final Action1 action) { if (innerSubscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return; @@ -88,7 +79,7 @@ public void run() throws SuspendExecution { if (innerSubscription.isUnsubscribed()) { return; } - action.call(EventLoopScheduler.this); + action.call(Recurse.create(EventLoopScheduler.this, action)); } finally { // remove the subscription now that we're completed Subscription s = sf.get(); @@ -104,7 +95,7 @@ public void run() throws SuspendExecution { } @Override - public void schedule(final Action1 action, final long delayTime, final TimeUnit unit) { + public void schedule(final Action1 action, final long delayTime, final TimeUnit unit) { final AtomicReference sf = new AtomicReference(); Subscription s = Subscriptions.from(new Fiber(fiberScheduler, new SuspendableRunnable() { @@ -117,7 +108,7 @@ public void run() throws InterruptedException, SuspendExecution { return; } // now that the delay is past schedule the work to be done for real on the UI thread - action.call(EventLoopScheduler.this); + action.call(Recurse.create(EventLoopScheduler.this, action)); } finally { // remove the subscription now that we're completed Subscription s = sf.get(); diff --git a/rxjava-contrib/rxjava-swing/src/main/java/rx/schedulers/SwingScheduler.java b/rxjava-contrib/rxjava-swing/src/main/java/rx/schedulers/SwingScheduler.java index 1854db762b..923a8cfb35 100644 --- a/rxjava-contrib/rxjava-swing/src/main/java/rx/schedulers/SwingScheduler.java +++ b/rxjava-contrib/rxjava-swing/src/main/java/rx/schedulers/SwingScheduler.java @@ -45,24 +45,12 @@ public static SwingScheduler getInstance() { } @Override - public Subscription schedule(Action1 action) { - InnerSwingScheduler inner = new InnerSwingScheduler(); - inner.schedule(action); - return inner; + public Inner createInner() { + return new InnerSwingScheduler(); } - - @Override - public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) { - long delay = unit.toMillis(delayTime); - assertThatTheDelayIsValidForTheSwingTimer(delay); - InnerSwingScheduler inner = new InnerSwingScheduler(); - inner.schedule(action, delayTime, unit); - return inner; - } - + private static class InnerSwingScheduler extends Inner { - private final Inner _inner = this; private final CompositeSubscription innerSubscription = new CompositeSubscription(); @Override @@ -76,7 +64,7 @@ public boolean isUnsubscribed() { } @Override - public void schedule(final Action1 action, long delayTime, TimeUnit unit) { + public void schedule(final Action1 action, long delayTime, TimeUnit unit) { final AtomicReference sub = new AtomicReference(); long delay = unit.toMillis(delayTime); assertThatTheDelayIsValidForTheSwingTimer(delay); @@ -95,7 +83,7 @@ public void actionPerformed(ActionEvent e) { if (innerSubscription.isUnsubscribed()) { return; } - action.call(_inner); + action.call(Recurse.create(InnerSwingScheduler.this, action)); Subscription s = sf.get(); if (s != null) { innerSubscription.remove(s); @@ -125,7 +113,7 @@ public void call() { } @Override - public void schedule(final Action1 action) { + public void schedule(final Action1 action) { final AtomicReference sub = new AtomicReference(); final AtomicReference sf = new AtomicReference(); @@ -135,7 +123,7 @@ public void run() { if (innerSubscription.isUnsubscribed()) { return; } - action.call(_inner); + action.call(Recurse.create(InnerSwingScheduler.this, action)); Subscription s = sf.get(); if (s != null) { innerSubscription.remove(s); diff --git a/rxjava-contrib/rxjava-swing/src/main/java/rx/subscriptions/SwingSubscriptions.java b/rxjava-contrib/rxjava-swing/src/main/java/rx/subscriptions/SwingSubscriptions.java index ce3c18e2d8..34682ee317 100644 --- a/rxjava-contrib/rxjava-swing/src/main/java/rx/subscriptions/SwingSubscriptions.java +++ b/rxjava-contrib/rxjava-swing/src/main/java/rx/subscriptions/SwingSubscriptions.java @@ -17,11 +17,11 @@ import javax.swing.SwingUtilities; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; -import rx.schedulers.SwingScheduler; import rx.functions.Action0; import rx.functions.Action1; +import rx.schedulers.SwingScheduler; public final class SwingSubscriptions { @@ -42,9 +42,9 @@ public void call() { if (SwingUtilities.isEventDispatchThread()) { unsubscribe.call(); } else { - SwingScheduler.getInstance().schedule(new Action1() { + SwingScheduler.getInstance().schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { unsubscribe.call(); } }); diff --git a/rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/SwingTestHelper.java b/rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/SwingTestHelper.java index 1b1053ec75..1457fb0e1d 100644 --- a/rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/SwingTestHelper.java +++ b/rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/SwingTestHelper.java @@ -18,10 +18,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import rx.Scheduler.Inner; -import rx.schedulers.SwingScheduler; +import rx.Scheduler.Recurse; import rx.functions.Action0; import rx.functions.Action1; +import rx.schedulers.SwingScheduler; /* package-private */ final class SwingTestHelper { // only for test @@ -36,10 +36,10 @@ public static SwingTestHelper create() { } public SwingTestHelper runInEventDispatchThread(final Action0 action) { - SwingScheduler.getInstance().schedule(new Action1() { + SwingScheduler.getInstance().schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { try { action.call(); } catch (Throwable e) { diff --git a/rxjava-contrib/rxjava-swing/src/test/java/rx/schedulers/SwingSchedulerTest.java b/rxjava-contrib/rxjava-swing/src/test/java/rx/schedulers/SwingSchedulerTest.java index afe6951e5b..71a00eb4f4 100644 --- a/rxjava-contrib/rxjava-swing/src/test/java/rx/schedulers/SwingSchedulerTest.java +++ b/rxjava-contrib/rxjava-swing/src/test/java/rx/schedulers/SwingSchedulerTest.java @@ -15,9 +15,13 @@ */ package rx.schedulers; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.awt.EventQueue; import java.util.concurrent.CountDownLatch; @@ -30,7 +34,7 @@ import org.junit.rules.ExpectedException; import org.mockito.InOrder; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; import rx.functions.Action1; @@ -46,7 +50,7 @@ public final class SwingSchedulerTest { @Test public void testInvalidDelayValues() { final SwingScheduler scheduler = new SwingScheduler(); - final Action1 action = mock(Action1.class); + final Action1 action = mock(Action1.class); exception.expect(IllegalArgumentException.class); scheduler.schedulePeriodically(action, -1L, 100L, TimeUnit.SECONDS); @@ -67,10 +71,10 @@ public void testPeriodicScheduling() throws Exception { final CountDownLatch latch = new CountDownLatch(4); - final Action1 innerAction = mock(Action1.class); - final Action1 action = new Action1() { + final Action1 innerAction = mock(Action1.class); + final Action1 action = new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { try { innerAction.call(inner); assertTrue(SwingUtilities.isEventDispatchThread()); @@ -88,42 +92,42 @@ public void call(Inner inner) { sub.unsubscribe(); waitForEmptyEventQueue(); - verify(innerAction, times(4)).call(any(Inner.class)); + verify(innerAction, times(4)).call(any(Recurse.class)); } @Test public void testNestedActions() throws Exception { final SwingScheduler scheduler = new SwingScheduler(); - final Action1 firstStepStart = mock(Action1.class); - final Action1 firstStepEnd = mock(Action1.class); + final Action1 firstStepStart = mock(Action1.class); + final Action1 firstStepEnd = mock(Action1.class); - final Action1 secondStepStart = mock(Action1.class); - final Action1 secondStepEnd = mock(Action1.class); + final Action1 secondStepStart = mock(Action1.class); + final Action1 secondStepEnd = mock(Action1.class); - final Action1 thirdStepStart = mock(Action1.class); - final Action1 thirdStepEnd = mock(Action1.class); + final Action1 thirdStepStart = mock(Action1.class); + final Action1 thirdStepEnd = mock(Action1.class); - final Action1 firstAction = new Action1() { + final Action1 firstAction = new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { assertTrue(SwingUtilities.isEventDispatchThread()); firstStepStart.call(inner); firstStepEnd.call(inner); } }; - final Action1 secondAction = new Action1() { + final Action1 secondAction = new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { assertTrue(SwingUtilities.isEventDispatchThread()); secondStepStart.call(inner); scheduler.schedule(firstAction); secondStepEnd.call(inner); } }; - final Action1 thirdAction = new Action1() { + final Action1 thirdAction = new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { assertTrue(SwingUtilities.isEventDispatchThread()); thirdStepStart.call(inner); scheduler.schedule(secondAction); @@ -136,12 +140,12 @@ public void call(Inner inner) { scheduler.schedule(thirdAction); waitForEmptyEventQueue(); - inOrder.verify(thirdStepStart, times(1)).call(any(Inner.class)); - inOrder.verify(thirdStepEnd, times(1)).call(any(Inner.class)); - inOrder.verify(secondStepStart, times(1)).call(any(Inner.class)); - inOrder.verify(secondStepEnd, times(1)).call(any(Inner.class)); - inOrder.verify(firstStepStart, times(1)).call(any(Inner.class)); - inOrder.verify(firstStepEnd, times(1)).call(any(Inner.class)); + inOrder.verify(thirdStepStart, times(1)).call(any(Recurse.class)); + inOrder.verify(thirdStepEnd, times(1)).call(any(Recurse.class)); + inOrder.verify(secondStepStart, times(1)).call(any(Recurse.class)); + inOrder.verify(secondStepEnd, times(1)).call(any(Recurse.class)); + inOrder.verify(firstStepStart, times(1)).call(any(Recurse.class)); + inOrder.verify(firstStepEnd, times(1)).call(any(Recurse.class)); } private static void waitForEmptyEventQueue() throws Exception { diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index 8203ec567b..4f9e905f65 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -32,18 +32,20 @@ *

*

    *
  1. Java doesn't support extension methods and there are many overload methods needing default - * implementations.
  2. + * implementations. *
  3. Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for - * a long time.
  4. + * a long time. *
  5. If only an interface were used Scheduler implementations would then need to extend from an - * AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the - * functionality.
  6. + * AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the + * functionality. *
  7. Without virtual extension methods even additive changes are breaking and thus severely impede library - * maintenance.
  8. + * maintenance. *
*/ public abstract class Scheduler { + public abstract Inner createInner(); + /** * Schedules an Action on a new Scheduler instance (typically another thread) for execution. * @@ -52,7 +54,11 @@ public abstract class Scheduler { * @return a subscription to be able to unsubscribe from action */ - public abstract Subscription schedule(Action1 action); + public final Subscription schedule(Action1 action) { + Inner inner = createInner(); + inner.schedule(action); + return inner; + } /** * Schedules an Action on a new Scheduler instance (typically another thread) for execution at some point @@ -66,7 +72,11 @@ public abstract class Scheduler { * the time unit the delay time is given in * @return a subscription to be able to unsubscribe from action */ - public abstract Subscription schedule(final Action1 action, final long delayTime, final TimeUnit unit); + public final Subscription schedule(final Action1 action, final long delay, final TimeUnit unit) { + Inner inner = createInner(); + inner.schedule(action, delay, unit); + return inner; + } /** * Schedules a cancelable action to be executed periodically. This default implementation schedules @@ -83,35 +93,43 @@ public abstract class Scheduler { * the time unit the interval above is given in * @return a subscription to be able to unsubscribe from action */ - public Subscription schedulePeriodically(final Action1 action, long initialDelay, long period, TimeUnit unit) { + public Subscription schedulePeriodically(final Action1 action, long initialDelay, long period, TimeUnit unit) { final long periodInNanos = unit.toNanos(period); - final Action1 recursiveAction = new Action1() { + final Action1 recursiveAction = new Action1() { @Override - public void call(Inner inner) { - if (!inner.isUnsubscribed()) { + public void call(Recurse re) { + if (!re.isUnsubscribed()) { long startedAt = now(); - action.call(inner); + action.call(re); long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt); - inner.schedule(this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS); + re.schedule(this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS); } } }; return schedule(recursiveAction, initialDelay, unit); } - public final Subscription scheduleRecursive(final Action1 action) { - return schedule(new Action1() { - - @Override - public void call(Inner inner) { - action.call(new Recurse(inner, action)); - } + /** + * Parallelism available to a Scheduler. + *

+ * This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases + * such as scheduling work on a computer cluster. + * + * @return the scheduler's available degree of parallelism + */ + public int degreeOfParallelism() { + return Runtime.getRuntime().availableProcessors(); + } - }); + /** + * @return the scheduler's notion of current absolute time in milliseconds. + */ + public long now() { + return System.currentTimeMillis(); } - public static final class Recurse { + public static final class Recurse implements Subscription { private final Action1 action; private final Inner inner; @@ -120,15 +138,26 @@ private Recurse(Inner inner, Action1 action) { this.action = action; } + /** + * @param inner + * The Inner this should schedule on. + * @param action + * The action to invoke recursively with {@lnk #schedule()} and {@link #schedule(long, TimeUnit)}. + * @return new instance of Recurse + */ + public static Recurse create(Inner inner, Action1 action) { + return new Recurse(inner, action); + } + /** * Schedule the current function for execution immediately. */ public final void schedule() { final Recurse self = this; - inner.schedule(new Action1() { + inner.schedule(new Action1() { @Override - public void call(Inner _inner) { + public void call(Recurse _re) { action.call(self); } @@ -140,15 +169,49 @@ public void call(Inner _inner) { */ public final void schedule(long delay, TimeUnit unit) { final Recurse self = this; - inner.schedule(new Action1() { + inner.schedule(new Action1() { @Override - public void call(Inner _inner) { + public void call(Recurse _re) { action.call(self); } }, delay, unit); } + + public final void schedule(final Action1 action) { + final Recurse self = this; + inner.schedule(new Action1() { + + @Override + public void call(Recurse _re) { + action.call(self); + } + + }); + } + + public final void schedule(final Action1 action, final long delay, final TimeUnit unit) { + final Recurse self = this; + inner.schedule(new Action1() { + + @Override + public void call(Recurse _re) { + action.call(self); + } + + }, delay, unit); + } + + @Override + public final void unsubscribe() { + inner.unsubscribe(); + } + + @Override + public final boolean isUnsubscribed() { + return inner.isUnsubscribed(); + } } public abstract static class Inner implements Subscription { @@ -161,13 +224,13 @@ public abstract static class Inner implements Subscription { * @param unit * time unit of the delay time */ - public abstract void schedule(Action1 action, long delayTime, TimeUnit unit); + public abstract void schedule(Action1 action, long delayTime, TimeUnit unit); /** * Schedules a cancelable action to be executed in delayTime. * */ - public abstract void schedule(Action1 action); + public abstract void schedule(Action1 action); /** * @return the scheduler's notion of current absolute time in milliseconds. @@ -177,23 +240,4 @@ public long now() { } } - /** - * Parallelism available to a Scheduler. - *

- * This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases - * such as scheduling work on a computer cluster. - * - * @return the scheduler's available degree of parallelism - */ - public int degreeOfParallelism() { - return Runtime.getRuntime().availableProcessors(); - } - - /** - * @return the scheduler's notion of current absolute time in milliseconds. - */ - public long now() { - return System.currentTimeMillis(); - } - } diff --git a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java index 46e1ea0846..e7c9cdda70 100644 --- a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java +++ b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java @@ -28,6 +28,7 @@ import rx.Observer; import rx.Scheduler; import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.Subscription; import rx.functions.Action1; @@ -171,9 +172,9 @@ public TimeAndSizeBasedChunks(Observer observer, Func0 createChunk() { final Chunk chunk = super.createChunk(); - subscriptions.put(chunk, scheduler.schedule(new Action1() { + subscriptions.put(chunk, scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { emitChunk(chunk); } }, maxTime, unit)); @@ -253,9 +254,9 @@ public TimeBasedChunks(Observer observer, Func0 @Override public Chunk createChunk() { final Chunk chunk = super.createChunk(); - subscriptions.put(chunk, scheduler.schedule(new Action1() { + subscriptions.put(chunk, scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { emitChunk(chunk); } }, time, unit)); @@ -605,18 +606,18 @@ protected static class TimeBasedChunkCreator implements ChunkCreator { private final SafeObservableSubscription subscription = new SafeObservableSubscription(); public TimeBasedChunkCreator(final NonOverlappingChunks chunks, long time, TimeUnit unit, Scheduler scheduler) { - this.subscription.wrap(scheduler.schedulePeriodically(new Action1() { + this.subscription.wrap(scheduler.schedulePeriodically(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { chunks.emitAndReplaceChunk(); } }, 0, time, unit)); } public TimeBasedChunkCreator(final OverlappingChunks chunks, long time, TimeUnit unit, Scheduler scheduler) { - this.subscription.wrap(scheduler.schedulePeriodically(new Action1() { + this.subscription.wrap(scheduler.schedulePeriodically(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { chunks.createChunk(); } }, 0, time, unit)); diff --git a/rxjava-core/src/main/java/rx/operators/OperationDebounce.java b/rxjava-core/src/main/java/rx/operators/OperationDebounce.java index 48c502d0b5..43045701f4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDebounce.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDebounce.java @@ -22,7 +22,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.Subscription; import rx.functions.Action1; @@ -142,10 +142,11 @@ public void onError(Throwable e) { @Override public void onNext(final T v) { - Subscription previousSubscription = lastScheduledNotification.getAndSet(scheduler.schedule(new Action1() { + // TODO fix ... this is creating a new Scheduler.Inner each time, it needs to get a single Inner and reuse it + Subscription previousSubscription = lastScheduledNotification.getAndSet(scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onNext(v); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationDelay.java b/rxjava-core/src/main/java/rx/operators/OperationDelay.java index b996197a54..5fec21cd09 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDelay.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDelay.java @@ -21,7 +21,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.Subscription; import rx.functions.Action1; @@ -77,9 +77,9 @@ public DelaySubscribeFunc(Observable source, long time, TimeUnit un public Subscription onSubscribe(final Observer t1) { final SerialSubscription ssub = new SerialSubscription(); - ssub.set(scheduler.schedule(new Action1() { + ssub.set(scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { if (!ssub.isUnsubscribed()) { ssub.set(source.unsafeSubscribe(Subscribers.from(t1))); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java index a0cd28537c..89d4d3ce11 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationInterval.java +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -20,7 +20,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; @@ -68,9 +68,9 @@ private Interval(long period, TimeUnit unit, Scheduler scheduler) { @Override public Subscription onSubscribe(final Observer observer) { - final Subscription wrapped = scheduler.schedulePeriodically(new Action1() { + final Subscription wrapped = scheduler.schedulePeriodically(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onNext(currentValue); currentValue++; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkip.java b/rxjava-core/src/main/java/rx/operators/OperationSkip.java index 0696c86792..f623d8e2a0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSkip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSkip.java @@ -22,7 +22,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.Subscription; import rx.functions.Action1; @@ -81,7 +81,7 @@ public Subscription onSubscribe(Observer t1) { * @param * the observed value type */ - private static final class SourceObserver extends Subscriber implements Action1 { + private static final class SourceObserver extends Subscriber implements Action1 { final AtomicBoolean gate; final Observer observer; final Subscription cancel; @@ -119,7 +119,7 @@ public void onCompleted() { } @Override - public void call(Inner inner) { + public void call(Recurse inner) { gate.set(true); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeTimed.java b/rxjava-core/src/main/java/rx/operators/OperationTakeTimed.java index cd52a04762..0e2f97661f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeTimed.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeTimed.java @@ -22,7 +22,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.Subscription; import rx.functions.Action1; @@ -212,7 +212,7 @@ public Subscription onSubscribe(Observer t1) { * @param * the observed value type */ - private static final class SourceObserver extends Subscriber implements Action1 { + private static final class SourceObserver extends Subscriber implements Action1 { final Observer observer; final Subscription cancel; final AtomicInteger state = new AtomicInteger(); @@ -283,7 +283,7 @@ public void onCompleted() { } @Override - public void call(Inner inner) { + public void call(Recurse inner) { onCompleted(); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTimer.java b/rxjava-core/src/main/java/rx/operators/OperationTimer.java index 16f745129f..8f499c4fdd 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTimer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTimer.java @@ -20,7 +20,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; import rx.functions.Action1; @@ -50,9 +50,9 @@ public TimerOnce(long dueTime, TimeUnit unit, Scheduler scheduler) { @Override public Subscription onSubscribe(final Observer t1) { - return scheduler.schedule(new Action1() { + return scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { t1.onNext(0L); t1.onCompleted(); } @@ -79,11 +79,11 @@ public TimerPeriodically(long initialDelay, long period, TimeUnit unit, Schedule @Override public Subscription onSubscribe(final Observer t1) { - return scheduler.schedulePeriodically(new Action1() { + return scheduler.schedulePeriodically(new Action1() { long count; @Override - public void call(Inner inner) { + public void call(Recurse inner) { t1.onNext(count++); } }, initialDelay, period, unit); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java index 9a3b6937f5..e13d41bd81 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java @@ -20,7 +20,7 @@ import rx.Observable.Operator; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.functions.Action1; import rx.schedulers.ImmediateScheduler; @@ -91,29 +91,20 @@ public void onError(final Throwable e) { protected void schedule() { if (counter.getAndIncrement() == 0) { if (recursiveScheduler == null) { - add(scheduler.schedule(new Action1() { - - @Override - public void call(Inner inner) { - recursiveScheduler = inner; - pollQueue(); - } - - })); - } else { - recursiveScheduler.schedule(new Action1() { + recursiveScheduler = scheduler.createInner(); + add(recursiveScheduler); + } + recursiveScheduler.schedule(new Action1() { - @Override - public void call(Inner inner) { - pollQueue(); - } + @Override + public void call(Recurse inner) { + pollQueue(); + } - }); - } + }); } } - @SuppressWarnings("unchecked") private void pollQueue() { do { Object v = queue.poll(); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOnBounded.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOnBounded.java index 7901a86f4f..05515a04ce 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorObserveOnBounded.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOnBounded.java @@ -20,7 +20,7 @@ import rx.Observable.Operator; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.functions.Action0; import rx.functions.Action1; @@ -167,25 +167,18 @@ public void call() { } })); - add(scheduler.schedule(new Action1() { - - @Override - public void call(Inner inner) { - recursiveScheduler = inner; - pollQueue(); - } + recursiveScheduler = scheduler.createInner(); + add(recursiveScheduler); + } - })); - } else { - recursiveScheduler.schedule(new Action1() { + recursiveScheduler.schedule(new Action1() { - @Override - public void call(Inner inner) { - pollQueue(); - } + @Override + public void call(Recurse inner) { + pollQueue(); + } - }); - } + }); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorRepeat.java b/rxjava-core/src/main/java/rx/operators/OperatorRepeat.java index 3c0b08b5f3..44b2b89ca4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorRepeat.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorRepeat.java @@ -20,6 +20,7 @@ import rx.Observable.Operator; import rx.Scheduler; import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.functions.Action1; import rx.observers.Subscribers; @@ -53,6 +54,7 @@ public Subscriber> call(final Subscriber child) child.onCompleted(); return Subscribers.empty(); } + final Inner innerScheduler = scheduler.createInner(); return new Subscriber>(child) { int executionCount = 0; @@ -70,19 +72,17 @@ public void onError(Throwable e) { @Override public void onNext(final Observable t) { - scheduler.schedule(new Action1() { - - final Action1 self = this; + innerScheduler.schedule(new Action1() { @Override - public void call(final Inner inner) { + public void call(final Recurse re) { executionCount++; t.unsafeSubscribe(new Subscriber(child) { @Override public void onCompleted() { if (count == -1 || executionCount < count) { - inner.schedule(self); + re.schedule(); } else { child.onCompleted(); } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorRetry.java b/rxjava-core/src/main/java/rx/operators/OperatorRetry.java index 98c9a82f65..94f6ab2b24 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorRetry.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorRetry.java @@ -36,6 +36,7 @@ import rx.Observable; import rx.Observable.Operator; import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.functions.Action1; import rx.schedulers.Schedulers; @@ -56,9 +57,10 @@ public OperatorRetry() { @Override public Subscriber> call(final Subscriber s) { + final Inner innerScheduler = Schedulers.trampoline().createInner(); return new Subscriber>(s) { final AtomicInteger attempts = new AtomicInteger(0); - + @Override public void onCompleted() { // ignore as we expect a single nested Observable @@ -71,11 +73,10 @@ public void onError(Throwable e) { @Override public void onNext(final Observable o) { - Schedulers.trampoline().schedule(new Action1() { + innerScheduler.schedule(new Action1() { @Override - public void call(final Inner inner) { - final Action1 _self = this; + public void call(final Recurse re) { attempts.incrementAndGet(); o.unsafeSubscribe(new Subscriber(s) { @@ -86,9 +87,9 @@ public void onCompleted() { @Override public void onError(Throwable e) { - if ((retryCount == INFINITE_RETRY || attempts.get() <= retryCount) && !inner.isUnsubscribed()) { + if ((retryCount == INFINITE_RETRY || attempts.get() <= retryCount) && !re.isUnsubscribed()) { // retry again - inner.schedule(_self); + re.schedule(); } else { // give up and pass the failure s.onError(e); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java index 29e01fd369..1ba1f3d060 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java @@ -18,7 +18,7 @@ import rx.Observable; import rx.Observable.Operator; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.functions.Action1; @@ -51,10 +51,10 @@ public void onError(Throwable e) { @Override public void onNext(final Observable o) { - subscriber.add(scheduler.schedule(new Action1() { + subscriber.add(scheduler.schedule(new Action1() { @Override - public void call(final Inner inner) { + public void call(final Recurse inner) { o.unsafeSubscribe(subscriber); } })); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorTimeout.java b/rxjava-core/src/main/java/rx/operators/OperatorTimeout.java index eb5debfcd5..e3a6b3b10c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorTimeout.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorTimeout.java @@ -19,7 +19,7 @@ import rx.Observable; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; import rx.functions.Action1; @@ -40,9 +40,9 @@ public OperatorTimeout(final long timeout, final TimeUnit timeUnit, public Subscription call( final TimeoutSubscriber timeoutSubscriber, final Long seqId) { - return scheduler.schedule(new Action1() { + return scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { timeoutSubscriber.onTimeout(seqId); } }, timeout, timeUnit); @@ -53,9 +53,9 @@ public void call(Inner inner) { public Subscription call( final TimeoutSubscriber timeoutSubscriber, final Long seqId, T value) { - return scheduler.schedule(new Action1() { + return scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { timeoutSubscriber.onTimeout(seqId); } }, timeout, timeUnit); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java index c48e0bc69f..98563336e0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java @@ -17,7 +17,7 @@ import rx.Observable.Operator; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.functions.Action0; import rx.functions.Action1; @@ -45,10 +45,10 @@ public Subscriber call(final Subscriber subscriber) { @Override public void call() { final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); - mas.set(scheduler.schedule(new Action1() { + mas.set(scheduler.schedule(new Action1() { @Override - public void call(final Inner inner) { + public void call(final Recurse inner) { parentSubscription.unsubscribe(); mas.unsubscribe(); } diff --git a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java index ff994f0759..50e6ee14da 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -29,8 +29,7 @@ import rx.subscriptions.Subscriptions; /** - * A {@link Scheduler} implementation that uses an {@link Executor} or {@link ScheduledExecutorService} - * implementation. + * A {@link Scheduler} implementation that uses an {@link Executor} or {@link ScheduledExecutorService} implementation. *

* Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a * system-wide Timer will be used to handle delayed events. @@ -55,21 +54,12 @@ public ExecutorScheduler(ScheduledExecutorService executor) { } @Override - public Subscription schedule(Action1 action) { - InnerExecutorScheduler inner = new InnerExecutorScheduler(); - inner.schedule(action); - return inner.innerSubscription; + public Inner createInner() { + return new InnerExecutorScheduler(); } @Override - public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) { - InnerExecutorScheduler inner = new InnerExecutorScheduler(); - inner.schedule(action, delayTime, unit); - return inner.innerSubscription; - } - - @Override - public Subscription schedulePeriodically(final Action1 action, long initialDelay, long period, TimeUnit unit) { + public Subscription schedulePeriodically(final Action1 action, long initialDelay, long period, TimeUnit unit) { if (executor instanceof ScheduledExecutorService) { final InnerExecutorScheduler inner = new InnerExecutorScheduler(); ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() { @@ -79,7 +69,7 @@ public void run() { // don't execute if unsubscribed return; } - action.call(inner); + action.call(Recurse.create(inner, action)); } }, initialDelay, period, unit); @@ -95,7 +85,7 @@ private class InnerExecutorScheduler extends Scheduler.Inner { private final MultipleAssignmentSubscription innerSubscription = new MultipleAssignmentSubscription(); @Override - public void schedule(final Action1 action, long delayTime, TimeUnit unit) { + public void schedule(final Action1 action, long delayTime, TimeUnit unit) { if (innerSubscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return; @@ -112,7 +102,7 @@ public void run() { return; } // when the delay has passed we now do the work on the actual scheduler - action.call(_inner); + action.call(Recurse.create(_inner, action)); } }, delayTime, unit); // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens @@ -144,7 +134,7 @@ public void run() { } @Override - public void schedule(final Action1 action) { + public void schedule(final Action1 action) { if (innerSubscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return; @@ -159,7 +149,7 @@ public void run() { // don't execute if unsubscribed return; } - action.call(_inner); + action.call(Recurse.create(_inner, action)); } }; diff --git a/rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java index e5f5ca6e8a..ac650bea28 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java @@ -45,17 +45,8 @@ public static ImmediateScheduler getInstance() { } @Override - public Subscription schedule(Action1 action) { - InnerImmediateScheduler inner = new InnerImmediateScheduler(); - inner.schedule(action); - return inner.innerSubscription; - } - - @Override - public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) { - InnerImmediateScheduler inner = new InnerImmediateScheduler(); - inner.schedule(action, delayTime, unit); - return inner.innerSubscription; + public Inner createInner() { + return new InnerImmediateScheduler(); } private class InnerImmediateScheduler extends Scheduler.Inner implements Subscription { @@ -63,7 +54,7 @@ private class InnerImmediateScheduler extends Scheduler.Inner implements Subscri final BooleanSubscription innerSubscription = new BooleanSubscription(); @Override - public void schedule(Action1 action, long delayTime, TimeUnit unit) { + public void schedule(Action1 action, long delayTime, TimeUnit unit) { // since we are executing immediately on this thread we must cause this thread to sleep long execTime = now() + unit.toMillis(delayTime); @@ -71,8 +62,8 @@ public void schedule(Action1 action, long delayTime, TimeUnit u } @Override - public void schedule(Action1 action) { - action.call(this); + public void schedule(Action1 action) { + action.call(Recurse.create(this, action)); } @Override diff --git a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java index 975918f292..6887b1ee1f 100644 --- a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -64,30 +64,20 @@ private NewThreadScheduler() { } @Override - public Subscription schedule(Action1 action) { - EventLoopScheduler innerScheduler = new EventLoopScheduler(); - innerScheduler.schedule(action); - return innerScheduler.innerSubscription; - } - - @Override - public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) { - EventLoopScheduler innerScheduler = new EventLoopScheduler(); - innerScheduler.schedule(action, delayTime, unit); - return innerScheduler.innerSubscription; + public Inner createInner() { + return new EventLoopScheduler(); } private class EventLoopScheduler extends Scheduler.Inner implements Subscription { private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final ExecutorService executor; - private final Inner _inner = this; private EventLoopScheduler() { executor = Executors.newSingleThreadExecutor(THREAD_FACTORY); } @Override - public void schedule(final Action1 action) { + public void schedule(final Action1 action) { if (innerSubscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return; @@ -102,7 +92,7 @@ public void run() { if (innerSubscription.isUnsubscribed()) { return; } - action.call(_inner); + action.call(Recurse.create(EventLoopScheduler.this, action)); } finally { // remove the subscription now that we're completed Subscription s = sf.get(); @@ -118,7 +108,7 @@ public void run() { } @Override - public void schedule(final Action1 action, long delayTime, TimeUnit unit) { + public void schedule(final Action1 action, long delayTime, TimeUnit unit) { final AtomicReference sf = new AtomicReference(); // we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep // we will instead schedule the event then launch the thread after the delay has passed diff --git a/rxjava-core/src/main/java/rx/schedulers/SleepingAction.java b/rxjava-core/src/main/java/rx/schedulers/SleepingAction.java index cd4a39e046..4a2524cd8c 100644 --- a/rxjava-core/src/main/java/rx/schedulers/SleepingAction.java +++ b/rxjava-core/src/main/java/rx/schedulers/SleepingAction.java @@ -16,22 +16,22 @@ package rx.schedulers; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.functions.Action1; -/* package */class SleepingAction implements Action1 { - private final Action1 underlying; +/* package */class SleepingAction implements Action1 { + private final Action1 underlying; private final Scheduler scheduler; private final long execTime; - public SleepingAction(Action1 underlying, Scheduler scheduler, long execTime) { + public SleepingAction(Action1 underlying, Scheduler scheduler, long execTime) { this.underlying = underlying; this.scheduler = scheduler; this.execTime = execTime; } @Override - public void call(Inner s) { + public void call(Recurse s) { if (s.isUnsubscribed()) { return; } diff --git a/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java b/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java index 1231c71fa4..815b5c2988 100644 --- a/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java @@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit; import rx.Scheduler; -import rx.Subscription; import rx.functions.Action1; import rx.subscriptions.BooleanSubscription; @@ -32,11 +31,11 @@ public class TestScheduler extends Scheduler { private static class TimedAction { private final long time; - private final Action1 action; + private final Action1 action; private final Inner scheduler; private final long count = counter++; // for differentiating tasks at same time - private TimedAction(Inner scheduler, long time, Action1 action) { + private TimedAction(Inner scheduler, long time, Action1 action) { this.time = time; this.action = action; this.scheduler = scheduler; @@ -91,30 +90,15 @@ private void triggerActions(long targetTimeInNanos) { // Only execute if not unsubscribed if (!current.scheduler.isUnsubscribed()) { - current.action.call(current.scheduler); + current.action.call(Recurse.create(current.scheduler, current.action)); } } time = targetTimeInNanos; } - public Inner createInnerScheduler() { - return new InnerTestScheduler(); - } - @Override - public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) { - Inner inner = createInnerScheduler(); - final TimedAction timedAction = new TimedAction(inner, time + unit.toNanos(delayTime), action); - queue.add(timedAction); - return inner; - } - - @Override - public Subscription schedule(Action1 action) { - Inner inner = createInnerScheduler(); - final TimedAction timedAction = new TimedAction(inner, 0, action); - queue.add(timedAction); - return inner; + public Inner createInner() { + return new InnerTestScheduler(); } private final class InnerTestScheduler extends Inner { @@ -132,13 +116,13 @@ public boolean isUnsubscribed() { } @Override - public void schedule(Action1 action, long delayTime, TimeUnit unit) { + public void schedule(Action1 action, long delayTime, TimeUnit unit) { final TimedAction timedAction = new TimedAction(this, time + unit.toNanos(delayTime), action); queue.add(timedAction); } @Override - public void schedule(Action1 action) { + public void schedule(Action1 action) { final TimedAction timedAction = new TimedAction(this, 0, action); queue.add(timedAction); } diff --git a/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java b/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java index 314a43dc7d..7da8bac29e 100644 --- a/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java @@ -38,23 +38,14 @@ public class TrampolineScheduler extends Scheduler { public static TrampolineScheduler getInstance() { return INSTANCE; } - - /* package */ static TrampolineScheduler instance() { - return INSTANCE; - } - @Override - public Subscription schedule(Action1 action) { - InnerCurrentThreadScheduler inner = new InnerCurrentThreadScheduler(); - inner.schedule(action); - return inner.innerSubscription; + /* package */static TrampolineScheduler instance() { + return INSTANCE; } @Override - public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) { - InnerCurrentThreadScheduler inner = new InnerCurrentThreadScheduler(); - inner.schedule(action, delayTime, unit); - return inner.innerSubscription; + public Inner createInner() { + return new InnerCurrentThreadScheduler(); } /* package accessible for unit tests */TrampolineScheduler() { @@ -69,18 +60,18 @@ private class InnerCurrentThreadScheduler extends Scheduler.Inner implements Sub private final BooleanSubscription innerSubscription = new BooleanSubscription(); @Override - public void schedule(Action1 action) { + public void schedule(Action1 action) { enqueue(action, now()); } @Override - public void schedule(Action1 action, long delayTime, TimeUnit unit) { + public void schedule(Action1 action, long delayTime, TimeUnit unit) { long execTime = now() + unit.toMillis(delayTime); enqueue(new SleepingAction(action, TrampolineScheduler.this, execTime), execTime); } - private void enqueue(Action1 action, long execTime) { + private void enqueue(Action1 action, long execTime) { if (innerSubscription.isUnsubscribed()) { return; } @@ -99,7 +90,7 @@ private void enqueue(Action1 action, long execTime) { if (innerSubscription.isUnsubscribed()) { return; } - queue.poll().action.call(this); + queue.poll().action.call(Recurse.create(InnerCurrentThreadScheduler.this, action)); } QUEUE.set(null); @@ -120,11 +111,11 @@ public boolean isUnsubscribed() { } private static class TimedAction implements Comparable { - final Action1 action; + final Action1 action; final Long execTime; final Integer count; // In case if time between enqueueing took less than 1ms - private TimedAction(Action1 action, Long execTime, Integer count) { + private TimedAction(Action1 action, Long execTime, Integer count) { this.action = action; this.execTime = execTime; this.count = count; diff --git a/rxjava-core/src/main/java/rx/subjects/TestSubject.java b/rxjava-core/src/main/java/rx/subjects/TestSubject.java index 7904886c75..c99f12f10c 100644 --- a/rxjava-core/src/main/java/rx/subjects/TestSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/TestSubject.java @@ -23,6 +23,7 @@ import rx.Observer; import rx.Scheduler; import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.functions.Action1; import rx.schedulers.TestScheduler; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; @@ -96,7 +97,7 @@ protected TestSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager super(onSubscribe); this.subscriptionManager = subscriptionManager; this.lastNotification = lastNotification; - this.innerScheduler = scheduler.createInnerScheduler(); + this.innerScheduler = scheduler.createInner(); } @Override @@ -118,10 +119,10 @@ public void call(Collection> observers) { } public void onCompleted(long timeInMilliseconds) { - innerScheduler.schedule(new Action1() { + innerScheduler.schedule(new Action1() { @Override - public void call(Inner t1) { + public void call(Recurse t1) { _onCompleted(); } @@ -148,10 +149,10 @@ public void call(Collection> observers) { } public void onError(final Throwable e, long timeInMilliseconds) { - innerScheduler.schedule(new Action1() { + innerScheduler.schedule(new Action1() { @Override - public void call(Inner t1) { + public void call(Recurse t1) { _onError(e); } @@ -170,10 +171,10 @@ private void _onNext(T v) { } public void onNext(final T v, long timeInMilliseconds) { - innerScheduler.schedule(new Action1() { + innerScheduler.schedule(new Action1() { @Override - public void call(Inner t1) { + public void call(Recurse t1) { _onNext(v); } diff --git a/rxjava-core/src/perf/java/rx/archive/schedulers/TestRecursionMemoryUsage.java b/rxjava-core/src/perf/java/rx/archive/schedulers/TestRecursionMemoryUsage.java index 2e120596ee..a9d8f75cbd 100644 --- a/rxjava-core/src/perf/java/rx/archive/schedulers/TestRecursionMemoryUsage.java +++ b/rxjava-core/src/perf/java/rx/archive/schedulers/TestRecursionMemoryUsage.java @@ -18,7 +18,7 @@ import rx.Observable; import rx.Observable.OnSubscribe; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.functions.Action1; import rx.schedulers.Schedulers; @@ -29,78 +29,78 @@ */ public class TestRecursionMemoryUsage { - public static void main(String args[]) { - usingFunc2(Schedulers.newThread()); - usingAction0(Schedulers.newThread()); + public static void main(String args[]) { + usingFunc2(Schedulers.newThread()); + usingAction0(Schedulers.newThread()); - usingFunc2(Schedulers.currentThread()); - usingAction0(Schedulers.currentThread()); + usingFunc2(Schedulers.currentThread()); + usingAction0(Schedulers.currentThread()); - usingFunc2(Schedulers.computation()); - usingAction0(Schedulers.computation()); + usingFunc2(Schedulers.computation()); + usingAction0(Schedulers.computation()); - System.exit(0); - } + System.exit(0); + } - protected static void usingFunc2(final Scheduler scheduler) { - System.out.println("************ usingFunc2: " + scheduler); - Observable.create(new OnSubscribe() { + protected static void usingFunc2(final Scheduler scheduler) { + System.out.println("************ usingFunc2: " + scheduler); + Observable.create(new OnSubscribe() { - @Override - public void call(final Subscriber o) { - o.add(scheduler.schedule(new Action1() { - long i = 0; + @Override + public void call(final Subscriber o) { + o.add(scheduler.schedule(new Action1() { + long i = 0; - @Override - public void call(Inner inner) { - i++; - if (i % 500000 == 0) { - System.out.println(i + " Total Memory: " - + Runtime.getRuntime().totalMemory() - + " Free: " - + Runtime.getRuntime().freeMemory()); - o.onNext(i); - } - if (i == 100000000L) { - o.onCompleted(); - return; - } + @Override + public void call(Recurse inner) { + i++; + if (i % 500000 == 0) { + System.out.println(i + " Total Memory: " + + Runtime.getRuntime().totalMemory() + + " Free: " + + Runtime.getRuntime().freeMemory()); + o.onNext(i); + } + if (i == 100000000L) { + o.onCompleted(); + return; + } - inner.schedule(this); - } - })); - } - }).toBlockingObservable().last(); - } + inner.schedule(); + } + })); + } + }).toBlockingObservable().last(); + } - protected static void usingAction0(final Scheduler scheduler) { - System.out.println("************ usingAction0: " + scheduler); - Observable.create(new OnSubscribe() { + protected static void usingAction0(final Scheduler scheduler) { + System.out.println("************ usingAction0: " + scheduler); + Observable.create(new OnSubscribe() { - @Override - public void call(final Subscriber o) { - o.add(scheduler.schedule(new Action1() { + @Override + public void call(final Subscriber o) { + o.add(scheduler.schedule(new Action1() { - private long i = 0; + private long i = 0; - @Override - public void call(Inner inner) { - i++; - if (i % 500000 == 0) { - System.out.println(i + " Total Memory: " - + Runtime.getRuntime().totalMemory() - + " Free: " - + Runtime.getRuntime().freeMemory()); - o.onNext(i); - } - if (i == 100000000L) { - o.onCompleted(); - return; - } - inner.schedule(this); - } - })); - } - }).toBlockingObservable().last(); - } + @Override + public void call(Recurse inner) { + i++; + if (i % 500000 == 0) { + System.out.println(i + " Total Memory: " + + Runtime.getRuntime().totalMemory() + + " Free: " + + Runtime.getRuntime().freeMemory()); + o.onNext(i); + } + if (i == 100000000L) { + o.onCompleted(); + return; + } + inner.schedule(); + } + })); + } + }).toBlockingObservable().last(); + } } diff --git a/rxjava-core/src/test/java/rx/EventStream.java b/rxjava-core/src/test/java/rx/EventStream.java index ad92262695..28c04ca3ce 100644 --- a/rxjava-core/src/test/java/rx/EventStream.java +++ b/rxjava-core/src/test/java/rx/EventStream.java @@ -21,6 +21,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.functions.Action1; import rx.schedulers.Schedulers; @@ -35,14 +36,14 @@ public static Observable getEventStream(final String type, final int numI @Override public Subscription onSubscribe(final Observer observer) { // run on a background thread inside the OnSubscribeFunc so unsubscribe works - return Schedulers.newThread().schedule(new Action1() { + return Schedulers.newThread().schedule(new Action1() { @Override - public void call(Inner inner) { - inner.schedule(new Action1() { + public void call(Recurse inner) { + inner.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { while (!(inner.isUnsubscribed() || Thread.currentThread().isInterrupted())) { observer.onNext(randomEvent(type, numInstances)); try { diff --git a/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java b/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java index 04944498b2..05dd0c123f 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java @@ -15,10 +15,13 @@ */ package rx.operators; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; -import static rx.operators.OperationBuffer.*; +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static rx.operators.OperationBuffer.buffer; import java.util.ArrayList; import java.util.Arrays; @@ -34,6 +37,7 @@ import rx.Observable; import rx.Observer; import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; import rx.functions.Action1; import rx.functions.Func0; @@ -347,18 +351,18 @@ private List list(String... args) { } private void push(final Observer observer, final T value, int delay) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onNext(value); } }, delay, TimeUnit.MILLISECONDS); } private void complete(final Observer observer, int delay) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onCompleted(); } }, delay, TimeUnit.MILLISECONDS); diff --git a/rxjava-core/src/test/java/rx/operators/OperationDebounceTest.java b/rxjava-core/src/test/java/rx/operators/OperationDebounceTest.java index 1f23264ae8..161f8f4389 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDebounceTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationDebounceTest.java @@ -15,8 +15,13 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.concurrent.TimeUnit; @@ -27,6 +32,7 @@ import rx.Observable; import rx.Observer; import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; import rx.functions.Action1; import rx.functions.Func1; @@ -132,27 +138,27 @@ public Subscription onSubscribe(Observer observer) { } private void publishCompleted(final Observer observer, long delay) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onCompleted(); } }, delay, TimeUnit.MILLISECONDS); } private void publishError(final Observer observer, long delay, final Exception error) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onError(error); } }, delay, TimeUnit.MILLISECONDS); } private void publishNext(final Observer observer, final long delay, final T value) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onNext(value); } }, delay, TimeUnit.MILLISECONDS); diff --git a/rxjava-core/src/test/java/rx/operators/OperationSampleTest.java b/rxjava-core/src/test/java/rx/operators/OperationSampleTest.java index f925de54cb..7a23f0eccc 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSampleTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSampleTest.java @@ -15,8 +15,12 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.concurrent.TimeUnit; @@ -26,7 +30,7 @@ import rx.Observable; import rx.Observer; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; import rx.functions.Action1; import rx.schedulers.TestScheduler; @@ -52,21 +56,21 @@ public void testSample() { Observable source = Observable.create(new Observable.OnSubscribeFunc() { @Override public Subscription onSubscribe(final Observer observer1) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer1.onNext(1L); } }, 1, TimeUnit.SECONDS); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer1.onNext(2L); } }, 2, TimeUnit.SECONDS); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer1.onCompleted(); } }, 3, TimeUnit.SECONDS); diff --git a/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java b/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java index b09d06abc2..c53996e7de 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java @@ -15,8 +15,13 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.concurrent.TimeUnit; @@ -26,7 +31,7 @@ import rx.Observable; import rx.Observer; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; import rx.functions.Action1; import rx.schedulers.TestScheduler; @@ -45,7 +50,7 @@ public void before() { } @Test - public void testSwitchWhenOuterCompleteBeforeInner() { + public void testSwitchWhenOuterCompleteBeforeRecurse() { Observable> source = Observable.create(new Observable.OnSubscribeFunc>() { @Override public Subscription onSubscribe(Observer> observer) { @@ -75,7 +80,7 @@ public Subscription onSubscribe(Observer observer) { } @Test - public void testSwitchWhenInnerCompleteBeforeOuter() { + public void testSwitchWhenRecurseCompleteBeforeOuter() { Observable> source = Observable.create(new Observable.OnSubscribeFunc>() { @Override public Subscription onSubscribe(Observer> observer) { @@ -352,27 +357,27 @@ public Subscription onSubscribe(Observer observer) { } private void publishCompleted(final Observer observer, long delay) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onCompleted(); } }, delay, TimeUnit.MILLISECONDS); } private void publishError(final Observer observer, long delay, final Throwable error) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onError(error); } }, delay, TimeUnit.MILLISECONDS); } private void publishNext(final Observer observer, long delay, final T value) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onNext(value); } }, delay, TimeUnit.MILLISECONDS); diff --git a/rxjava-core/src/test/java/rx/operators/OperationThrottleFirstTest.java b/rxjava-core/src/test/java/rx/operators/OperationThrottleFirstTest.java index 6172b7dfb3..59c8024ec9 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationThrottleFirstTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationThrottleFirstTest.java @@ -15,8 +15,10 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import java.util.concurrent.TimeUnit; @@ -26,7 +28,7 @@ import rx.Observable; import rx.Observer; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; import rx.functions.Action1; import rx.schedulers.TestScheduler; @@ -99,27 +101,27 @@ public Subscription onSubscribe(Observer observer) { } private void publishCompleted(final Observer observer, long delay) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onCompleted(); } }, delay, TimeUnit.MILLISECONDS); } private void publishError(final Observer observer, long delay, final Exception error) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onError(error); } }, delay, TimeUnit.MILLISECONDS); } private void publishNext(final Observer observer, long delay, final T value) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onNext(value); } }, delay, TimeUnit.MILLISECONDS); diff --git a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java index 16ac7579d3..953a1b868e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java @@ -15,10 +15,13 @@ */ package rx.operators; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; -import static rx.operators.OperationWindow.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static rx.operators.OperationWindow.window; import java.util.ArrayList; import java.util.List; @@ -29,7 +32,7 @@ import rx.Observable; import rx.Observer; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; import rx.functions.Action1; import rx.functions.Func0; @@ -285,18 +288,18 @@ private List list(String... args) { } private void push(final Observer observer, final T value, int delay) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onNext(value); } }, delay, TimeUnit.MILLISECONDS); } private void complete(final Observer observer, int delay) { - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onCompleted(); } }, delay, TimeUnit.MILLISECONDS); diff --git a/rxjava-core/src/test/java/rx/operators/OperatorAmbTest.java b/rxjava-core/src/test/java/rx/operators/OperatorAmbTest.java index 323eda3ca5..e35b9e1131 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorAmbTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorAmbTest.java @@ -30,7 +30,7 @@ import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observer; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.functions.Action1; import rx.schedulers.TestScheduler; @@ -55,17 +55,17 @@ public void call(final Subscriber subscriber) { subscriber.add(parentSubscription); long delay = interval; for (final String value : values) { - parentSubscription.add(scheduler.schedule(new Action1() { + parentSubscription.add(scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { subscriber.onNext(value); } }, delay, TimeUnit.MILLISECONDS)); delay += interval; } - parentSubscription.add(scheduler.schedule(new Action1() { + parentSubscription.add(scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { if (e == null) { subscriber.onCompleted(); } else { diff --git a/rxjava-core/src/test/java/rx/operators/OperatorMergeTest.java b/rxjava-core/src/test/java/rx/operators/OperatorMergeTest.java index 934513d0db..d85b3be26e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorMergeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorMergeTest.java @@ -42,7 +42,7 @@ import rx.Observable.OnSubscribe; import rx.Observer; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; @@ -485,10 +485,10 @@ public void testConcurrency() { @Override public void call(final Subscriber s) { - Schedulers.newThread().schedule(new Action1() { + Schedulers.newThread().schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { for (int i = 0; i < 10000; i++) { s.onNext(1); } @@ -519,10 +519,10 @@ public void testConcurrencyWithSleeping() { @Override public void call(final Subscriber s) { - Schedulers.newThread().schedule(new Action1() { + Schedulers.newThread().schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { for (int i = 0; i < 100; i++) { s.onNext(1); try { @@ -558,10 +558,10 @@ public void testConcurrencyWithBrokenOnCompleteContract() { @Override public void call(final Subscriber s) { - Schedulers.newThread().schedule(new Action1() { + Schedulers.newThread().schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { for (int i = 0; i < 10000; i++) { s.onNext(1); } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java index ba9d171ccc..08a2a273ca 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java @@ -15,7 +15,7 @@ */ package rx.operators; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.util.Arrays; import java.util.concurrent.CountDownLatch; @@ -94,16 +94,42 @@ public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) { } @Override - public Subscription schedule(final Action1 action) { - return actual.schedule(action, delay, unit); + public Inner createInner() { + return new SlowInnerScheduler(actual.createInner()); } - @Override - public Subscription schedule(final Action1 action, final long delayTime, final TimeUnit delayUnit) { - TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit; - long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit); - return actual.schedule(action, t, common); + private class SlowInnerScheduler extends Scheduler.Inner { + + private final Inner actualInner; + + public SlowInnerScheduler(Inner actualInner) { + this.actualInner = actualInner; + } + + @Override + public void unsubscribe() { + actualInner.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return actualInner.isUnsubscribed(); + } + + @Override + public void schedule(final Action1 action) { + actualInner.schedule(action, delay, unit); + } + + @Override + public void schedule(final Action1 action, final long delayTime, final TimeUnit delayUnit) { + TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit; + long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit); + actualInner.schedule(action, t, common); + } + } + } @Test(timeout = 5000) diff --git a/rxjava-core/src/test/java/rx/operators/OperatorUnsubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorUnsubscribeOnTest.java index b86a015b2c..60b119edab 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorUnsubscribeOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorUnsubscribeOnTest.java @@ -1,10 +1,11 @@ package rx.operators; -import static org.junit.Assert.*; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; @@ -138,22 +139,20 @@ public Thread getThread() throws InterruptedException { public static class UIEventLoopScheduler extends Scheduler { - private final Scheduler.Inner eventLoop; - private final Subscription s; private volatile Thread t; + private final Inner inner; public UIEventLoopScheduler() { + inner = Schedulers.newThread().createInner(); /* * DON'T DO THIS IN PRODUCTION CODE */ - final AtomicReference innerScheduler = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - s = Schedulers.newThread().schedule(new Action1() { + inner.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { t = Thread.currentThread(); - innerScheduler.set(inner); latch.countDown(); } @@ -161,30 +160,23 @@ public void call(Inner inner) { try { latch.await(); } catch (InterruptedException e) { - throw new RuntimeException("failed to initialize and get inner scheduler"); + throw new RuntimeException("failed to initialize thread"); } - eventLoop = innerScheduler.get(); - } - - @Override - public Subscription schedule(Action1 action) { - eventLoop.schedule(action); - return Subscriptions.empty(); - } - - @Override - public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) { - eventLoop.schedule(action); - return Subscriptions.empty(); } public void shutdown() { - s.unsubscribe(); + inner.unsubscribe(); } public Thread getThread() { return t; } + @Override + public Inner createInner() { + // same one every time + return inner; + } + } } diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java index 39cac1740b..a470d3583f 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -15,7 +15,10 @@ */ package rx.schedulers; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -28,7 +31,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; @@ -98,16 +101,16 @@ public void testUnsubscribeRecursiveScheduleFromOutside() throws InterruptedExce final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch unsubscribeLatch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(); - Subscription s = getScheduler().schedule(new Action1() { + Subscription s = getScheduler().schedule(new Action1() { @Override - public void call(final Inner inner) { - inner.schedule(new Action1() { + public void call(final Recurse inner) { + inner.schedule(new Action1() { int i = 0; @Override - public void call(Inner s) { + public void call(Recurse s) { System.out.println("Run: " + i++); if (i == 10) { latch.countDown(); @@ -141,16 +144,16 @@ public void call(Inner s) { public void testUnsubscribeRecursiveScheduleFromInside() throws InterruptedException { final CountDownLatch unsubscribeLatch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(); - getScheduler().schedule(new Action1() { + getScheduler().schedule(new Action1() { @Override - public void call(Inner inner) { - inner.schedule(new Action1() { + public void call(Recurse inner) { + inner.schedule(new Action1() { int i = 0; @Override - public void call(Inner inner) { + public void call(Recurse inner) { System.out.println("Run: " + i++); if (i == 10) { inner.unsubscribe(); @@ -174,16 +177,16 @@ public void testUnsubscribeRecursiveScheduleWithDelay() throws InterruptedExcept final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch unsubscribeLatch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(); - Subscription s = getScheduler().schedule(new Action1() { + Subscription s = getScheduler().schedule(new Action1() { @Override - public void call(final Inner innerScheduler) { - innerScheduler.schedule(new Action1() { + public void call(final Recurse innerScheduler) { + innerScheduler.schedule(new Action1() { long i = 1L; @Override - public void call(Inner s) { + public void call(Recurse s) { if (i++ == 10) { latch.countDown(); try { @@ -214,12 +217,12 @@ public void call(Inner s) { @Test public void recursionFromOuterActionAndUnsubscribeInside() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - getScheduler().schedule(new Action1() { + getScheduler().schedule(new Action1() { int i = 0; @Override - public void call(Inner inner) { + public void call(Recurse inner) { i++; if (i % 100000 == 0) { System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); @@ -238,12 +241,12 @@ public void call(Inner inner) { @Test public void testRecursion() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - getScheduler().schedule(new Action1() { + getScheduler().schedule(new Action1() { private long i = 0; @Override - public void call(Inner inner) { + public void call(Recurse inner) { i++; if (i % 100000 == 0) { System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); @@ -269,9 +272,9 @@ public void testRecursionAndOuterUnsubscribe() throws InterruptedException { @Override public Subscription onSubscribe(final Observer observer) { - final Subscription s = getScheduler().schedule(new Action1() { + final Subscription s = getScheduler().schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { observer.onNext(42); latch.countDown(); diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java index 0947bcd884..8f8a608e3f 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java @@ -15,9 +15,15 @@ */ package rx.schedulers; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.Arrays; import java.util.List; @@ -35,7 +41,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; @@ -67,26 +73,26 @@ public void testNestedActions() throws InterruptedException { final Action0 thirdStepStart = mock(Action0.class); final Action0 thirdStepEnd = mock(Action0.class); - final Action1 firstAction = new Action1() { + final Action1 firstAction = new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { firstStepStart.call(); firstStepEnd.call(); latch.countDown(); } }; - final Action1 secondAction = new Action1() { + final Action1 secondAction = new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { secondStepStart.call(); inner.schedule(firstAction); secondStepEnd.call(); } }; - final Action1 thirdAction = new Action1() { + final Action1 thirdAction = new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { thirdStepStart.call(); inner.schedule(secondAction); thirdStepEnd.call(); @@ -149,8 +155,8 @@ public final void testSequenceOfActions() throws InterruptedException { final Scheduler scheduler = getScheduler(); final CountDownLatch latch = new CountDownLatch(2); - final Action1 first = mock(Action1.class); - final Action1 second = mock(Action1.class); + final Action1 first = mock(Action1.class); + final Action1 second = mock(Action1.class); // make it wait until both the first and second are called doAnswer(new Answer() { @@ -163,7 +169,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { latch.countDown(); } } - }).when(first).call(any(Inner.class)); + }).when(first).call(any(Recurse.class)); doAnswer(new Answer() { @Override @@ -174,15 +180,15 @@ public Object answer(InvocationOnMock invocation) throws Throwable { latch.countDown(); } } - }).when(second).call(any(Inner.class)); + }).when(second).call(any(Recurse.class)); scheduler.schedule(first); scheduler.schedule(second); latch.await(); - verify(first, times(1)).call(any(Inner.class)); - verify(second, times(1)).call(any(Inner.class)); + verify(first, times(1)).call(any(Recurse.class)); + verify(second, times(1)).call(any(Recurse.class)); } @@ -191,18 +197,18 @@ public void testSequenceOfDelayedActions() throws InterruptedException { Scheduler scheduler = getScheduler(); final CountDownLatch latch = new CountDownLatch(1); - final Action1 first = mock(Action1.class); - final Action1 second = mock(Action1.class); + final Action1 first = mock(Action1.class); + final Action1 second = mock(Action1.class); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { inner.schedule(first, 30, TimeUnit.MILLISECONDS); inner.schedule(second, 10, TimeUnit.MILLISECONDS); - inner.schedule(new Action1() { + inner.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { latch.countDown(); } }, 40, TimeUnit.MILLISECONDS); @@ -212,8 +218,8 @@ public void call(Inner inner) { latch.await(); InOrder inOrder = inOrder(first, second); - inOrder.verify(second, times(1)).call(any(Inner.class)); - inOrder.verify(first, times(1)).call(any(Inner.class)); + inOrder.verify(second, times(1)).call(any(Recurse.class)); + inOrder.verify(first, times(1)).call(any(Recurse.class)); } @@ -222,22 +228,22 @@ public void testMixOfDelayedAndNonDelayedActions() throws InterruptedException { Scheduler scheduler = getScheduler(); final CountDownLatch latch = new CountDownLatch(1); - final Action1 first = mock(Action1.class); - final Action1 second = mock(Action1.class); - final Action1 third = mock(Action1.class); - final Action1 fourth = mock(Action1.class); + final Action1 first = mock(Action1.class); + final Action1 second = mock(Action1.class); + final Action1 third = mock(Action1.class); + final Action1 fourth = mock(Action1.class); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { inner.schedule(first); inner.schedule(second, 300, TimeUnit.MILLISECONDS); inner.schedule(third, 100, TimeUnit.MILLISECONDS); inner.schedule(fourth); - inner.schedule(new Action1() { + inner.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { latch.countDown(); } }, 400, TimeUnit.MILLISECONDS); @@ -247,10 +253,10 @@ public void call(Inner inner) { latch.await(); InOrder inOrder = inOrder(first, second, third, fourth); - inOrder.verify(first, times(1)).call(any(Inner.class)); - inOrder.verify(fourth, times(1)).call(any(Inner.class)); - inOrder.verify(third, times(1)).call(any(Inner.class)); - inOrder.verify(second, times(1)).call(any(Inner.class)); + inOrder.verify(first, times(1)).call(any(Recurse.class)); + inOrder.verify(fourth, times(1)).call(any(Recurse.class)); + inOrder.verify(third, times(1)).call(any(Recurse.class)); + inOrder.verify(second, times(1)).call(any(Recurse.class)); } @Test @@ -258,10 +264,10 @@ public final void testRecursiveExecution() throws InterruptedException { final Scheduler scheduler = getScheduler(); final AtomicInteger i = new AtomicInteger(); final CountDownLatch latch = new CountDownLatch(1); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { if (i.incrementAndGet() < 100) { inner.schedule(this); } else { @@ -280,12 +286,12 @@ public final void testRecursiveExecutionWithDelayTime() throws InterruptedExcept final AtomicInteger i = new AtomicInteger(); final CountDownLatch latch = new CountDownLatch(1); - scheduler.schedule(new Action1() { + scheduler.schedule(new Action1() { int state = 0; @Override - public void call(Inner inner) { + public void call(Recurse inner) { i.set(state); if (state++ < 100) { inner.schedule(this, 1, TimeUnit.MILLISECONDS); @@ -305,11 +311,11 @@ public final void testRecursiveSchedulerInObservable() { Observable obs = Observable.create(new OnSubscribeFunc() { @Override public Subscription onSubscribe(final Observer observer) { - return getScheduler().schedule(new Action1() { + return getScheduler().schedule(new Action1() { int i = 0; @Override - public void call(Inner inner) { + public void call(Recurse inner) { if (i > 42) { observer.onCompleted(); return; diff --git a/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java index bb217ad2ca..f2c2582ae9 100644 --- a/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java @@ -15,7 +15,9 @@ */ package rx.schedulers; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.util.HashMap; import java.util.concurrent.CountDownLatch; @@ -24,7 +26,7 @@ import rx.Observable; import rx.Scheduler; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.functions.Action1; import rx.functions.Func1; @@ -43,13 +45,13 @@ public void testThreadSafetyWhenSchedulerIsHoppingBetweenThreads() { final CountDownLatch latch = new CountDownLatch(1); final HashMap map = new HashMap(); - Schedulers.computation().schedule(new Action1() { + Schedulers.computation().schedule(new Action1() { private HashMap statefulMap = map; int nonThreadSafeCounter = 0; @Override - public void call(Inner inner) { + public void call(Recurse inner) { Integer i = statefulMap.get("a"); if (i == null) { i = 1; diff --git a/rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java index 09adaa7e28..de2485c2f6 100644 --- a/rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java @@ -15,9 +15,12 @@ */ package rx.schedulers; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -26,7 +29,7 @@ import org.mockito.InOrder; import org.mockito.Mockito; -import rx.Scheduler.Inner; +import rx.Scheduler.Recurse; import rx.Subscription; import rx.functions.Action1; import rx.functions.Func1; @@ -40,9 +43,9 @@ public final void testPeriodicScheduling() { final Func1 calledOp = mock(Func1.class); final TestScheduler scheduler = new TestScheduler(); - Subscription subscription = scheduler.schedulePeriodically(new Action1() { + Subscription subscription = scheduler.schedulePeriodically(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { System.out.println(scheduler.now()); calledOp.call(scheduler.now()); } @@ -79,10 +82,10 @@ public final void testImmediateUnsubscribes() { final AtomicInteger counter = new AtomicInteger(0); - Subscription subscription = s.schedule(new Action1() { + Subscription subscription = s.schedule(new Action1() { @Override - public void call(Inner inner) { + public void call(Recurse inner) { counter.incrementAndGet(); System.out.println("counter: " + counter.get()); inner.schedule(this); diff --git a/rxjava-core/src/test/java/rx/test/OperatorTester.java b/rxjava-core/src/test/java/rx/test/OperatorTester.java index b6239130e9..ae735180d4 100644 --- a/rxjava-core/src/test/java/rx/test/OperatorTester.java +++ b/rxjava-core/src/test/java/rx/test/OperatorTester.java @@ -18,7 +18,6 @@ import java.util.concurrent.TimeUnit; import rx.Scheduler; -import rx.Subscription; import rx.functions.Action1; /** @@ -56,13 +55,38 @@ public ForwardingScheduler(Scheduler underlying) { } @Override - public Subscription schedule(Action1 action) { - return underlying.schedule(action); + public Inner createInner() { + return new InnerScheduler(underlying.createInner()); } - @Override - public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) { - return underlying.schedule(action, delayTime, unit); + public static class InnerScheduler extends Scheduler.Inner { + + private final Inner actualInner; + + public InnerScheduler(Inner inner) { + this.actualInner = inner; + } + + @Override + public void schedule(Action1 action) { + actualInner.schedule(action); + } + + @Override + public void schedule(Action1 action, long delayTime, TimeUnit unit) { + actualInner.schedule(action, delayTime, unit); + } + + @Override + public void unsubscribe() { + actualInner.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return actualInner.isUnsubscribed(); + } + } }