diff --git a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java new file mode 100644 index 0000000000..27d3c18bd4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java @@ -0,0 +1,94 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import java.util.Comparator; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.TimeUnit; + +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func0; + +public class TestScheduler extends AbstractScheduler { + private final Queue queue = new PriorityQueue(11, new CompareActionsByTime()); + + private static class TimedAction { + private final long time; + private final Func0 action; + + private TimedAction(long time, Func0 action) { + this.time = time; + this.action = action; + } + + @Override + public String toString() { + return String.format("TimedAction(time = %d, action = %s)", time, action.toString()); + } + } + + private static class CompareActionsByTime implements Comparator { + @Override + public int compare(TimedAction action1, TimedAction action2) { + return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time)); + } + } + + private long time; + + @Override + public Subscription schedule(Func0 action) { + return schedule(action, 0L, TimeUnit.NANOSECONDS); + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + queue.add(new TimedAction(now() + unit.toNanos(dueTime), action)); + return Subscriptions.empty(); + } + + @Override + public long now() { + return time; + } + + public void advanceTimeBy(long dueTime, TimeUnit unit) { + advanceTimeTo(time + unit.toNanos(dueTime), TimeUnit.NANOSECONDS); + } + + public void advanceTimeTo(long dueTime, TimeUnit unit) { + long targetTime = unit.toNanos(dueTime); + triggerActions(targetTime); + } + + public void triggerActions() { + triggerActions(time); + } + + private void triggerActions(long targetTimeInNanos) { + while (! queue.isEmpty()) { + TimedAction current = queue.peek(); + if (current.time > targetTimeInNanos) { + break; + } + time = current.time; + queue.remove(); + current.action.call(); + } + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java new file mode 100644 index 0000000000..19a3736826 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -0,0 +1,140 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +/** + * Returns an observable sequence that produces a value after each period. + * The value starts at 0 and counts up each period. + */ +public final class OperationInterval { + + /** + * Creates an event each time interval. + */ + public static Func1, Subscription> interval(long interval, TimeUnit unit) { + return new Interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); + } + + /** + * Creates an event each time interval. + */ + public static Func1, Subscription> interval(long interval, TimeUnit unit, Scheduler scheduler) { + return new Interval(interval, unit, scheduler); + } + + private static class Interval implements Func1, Subscription> { + private final long interval; + private final TimeUnit unit; + private final Scheduler scheduler; + + private long currentValue; + private final AtomicBoolean complete = new AtomicBoolean(); + + private Interval(long interval, TimeUnit unit, Scheduler scheduler) { + this.interval = interval; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscription call(final Observer observer) { + scheduler.schedule(new IntervalAction(observer), interval, unit); + return Subscriptions.create(new Action0() { + @Override + public void call() { + complete.set(true); + } + }); + } + + private class IntervalAction implements Action0 { + private final Observer observer; + + private IntervalAction(Observer observer) { + this.observer = observer; + } + + @Override + public void call() { + if (complete.get()) { + observer.onCompleted(); + } else { + observer.onNext(currentValue); + currentValue++; + scheduler.schedule(this, interval, unit); + } + } + } + } + + public static class UnitTest { + private TestScheduler scheduler; + private Observer observer; + + @Before + @SuppressWarnings("unchecked") // due to mocking + public void before() { + scheduler = new TestScheduler(); + observer = mock(Observer.class); + } + + @Test + public void testInterval() { + Observable w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)); + Subscription sub = w.subscribe(observer); + + verify(observer, never()).onNext(0L); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(2, TimeUnit.SECONDS); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(0L); + inOrder.verify(observer, times(1)).onNext(1L); + inOrder.verify(observer, never()).onNext(2L); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + sub.unsubscribe(); + scheduler.advanceTimeTo(4, TimeUnit.SECONDS); + verify(observer, never()).onNext(2L); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + } + } +}