diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 85c0359b3f..0142f9837b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.Date; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -35,6 +36,7 @@ import rx.operators.OperationCombineLatest; import rx.operators.OperationConcat; import rx.operators.OperationDefer; +import rx.operators.OperationDelay; import rx.operators.OperationDematerialize; import rx.operators.OperationDistinctUntilChanged; import rx.operators.OperationDistinct; @@ -3551,6 +3553,64 @@ public Observable scan(Func2 accumulator) { return create(OperationScan.scan(this, accumulator)); } + /** + * Returns an Observable that emits the results of shifting the items emitted by the source + * Observable by a specified delay. Only errors emitted by the source Observable are not delayed. + * @param delay + * the delay to shift the source by + * @param unit + * the {@link TimeUnit} in which period is defined + * @return the source Observable, but shifted by the specified delay + * @see MSDN: Observable.Delay + */ + public Observable delay(long delay, TimeUnit unit) { + return create(OperationDelay.delay(this, delay, unit)); + } + + /** + * Returns an Observable that emits the results of shifting the items emitted by the source + * Observable by a specified delay. Only errors emitted by the source Observable are not delayed. + * @param delay + * the delay to shift the source by + * @param unit + * the {@link TimeUnit} in which period is defined + * @param scheduler + * the {@link Scheduler} to use for delaying + * @return the source Observable, but shifted by the specified delay + * @see MSDN: Observable.Delay + */ + public Observable delay(long delay, TimeUnit unit, Scheduler scheduler) { + return create(OperationDelay.delay(this, delay, unit, scheduler)); + } + + /** + * Returns an Observable that emits the results of shifting the items emitted by the source + * Observable by a delay specified by the due time at which to begin emitting. + * Only errors emitted by the source Observable are not delayed. + * @param dueTime + * the due time at which to start emitting + * @return the source Observable, but shifted by the specified delay + * @see MSDN: Observable.Delay + */ + public Observable delay(Date dueTime) { + return create(OperationDelay.delay(this, dueTime)); + } + + /** + * Returns an Observable that emits the results of shifting the items emitted by the source + * Observable by a delay specified by the due time at which to begin emitting. + * Only errors emitted by the source Observable are not delayed. + * @param dueTime + * the due time at which to start emitting + * @param scheduler + * the {@link Scheduler} to use for delaying + * @return the source Observable, but shifted by the specified delay + * @see MSDN: Observable.Delay + */ + public Observable delay(Date dueTime, Scheduler scheduler) { + return create(OperationDelay.delay(this, dueTime, scheduler)); + } + /** * Returns an Observable that emits the results of sampling the items emitted by the source * Observable at a specified time interval. diff --git a/rxjava-core/src/main/java/rx/concurrency/DelayedScheduler.java b/rxjava-core/src/main/java/rx/concurrency/DelayedScheduler.java new file mode 100644 index 0000000000..86aff00da0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/DelayedScheduler.java @@ -0,0 +1,114 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func2; + +/** + * Scheduler that delays the underlying scheduler by a fixed time delay. + */ +public class DelayedScheduler extends Scheduler { + private final Scheduler underlying; + private final long delay; + private final TimeUnit unit; + + public DelayedScheduler(Scheduler underlying, long delay, TimeUnit unit) { + this.underlying = underlying; + this.delay = delay; + this.unit = unit; + } + + @Override + public Subscription schedule(T state, Func2 action) { + return underlying.schedule(state, action, delay, unit); + } + + @Override + public Subscription schedule(T state, Func2 action, long delay, TimeUnit unit) { + long newDelay = unit.toNanos(delay) + this.unit.toNanos(this.delay); + return underlying.schedule(state, action, newDelay, TimeUnit.NANOSECONDS); + } + + public static class UnitTest { + @Mock + Action0 action; + + private TestScheduler scheduler = new TestScheduler(); + + @Before + public void before() { + initMocks(this); + } + + @Test + public void testNotDelayingAnAction() { + Scheduler delayed = new DelayedScheduler(scheduler, 0, TimeUnit.SECONDS); + delayed.schedule(action); + delayed.schedule(action, 1L, TimeUnit.SECONDS); + + InOrder inOrder = inOrder(action); + + scheduler.triggerActions(); + inOrder.verify(action, times(1)).call(); + + scheduler.advanceTimeTo(999L, TimeUnit.MILLISECONDS); + inOrder.verify(action, never()).call(); + + scheduler.advanceTimeTo(1L, TimeUnit.SECONDS); + inOrder.verify(action, times(1)).call(); + + scheduler.advanceTimeTo(5L, TimeUnit.SECONDS); + inOrder.verify(action, never()).call(); + } + + @Test + public void testdelayingAnAction() { + Scheduler delayed = new DelayedScheduler(scheduler, 500, TimeUnit.MILLISECONDS); + delayed.schedule(action); + delayed.schedule(action, 1L, TimeUnit.SECONDS); + + InOrder inOrder = inOrder(action); + + scheduler.advanceTimeTo(499L, TimeUnit.MILLISECONDS); + inOrder.verify(action, never()).call(); + + scheduler.advanceTimeTo(500L, TimeUnit.MILLISECONDS); + inOrder.verify(action, times(1)).call(); + + scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS); + inOrder.verify(action, never()).call(); + + scheduler.advanceTimeTo(1500L, TimeUnit.MILLISECONDS); + inOrder.verify(action, times(1)).call(); + + scheduler.advanceTimeTo(5L, TimeUnit.SECONDS); + inOrder.verify(action, never()).call(); + } + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationDelay.java b/rxjava-core/src/main/java/rx/operators/OperationDelay.java new file mode 100644 index 0000000000..603be7080a --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationDelay.java @@ -0,0 +1,314 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; +import static rx.Observable.interval; + +import java.util.Date; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +/** + * Returns an Observable that emits the results of shifting the items emitted by the source + * Observable by a specified delay. + */ +public final class OperationDelay { + + /** + * Delays the observable sequence by the given time interval. + */ + public static OnSubscribeFunc delay(final Observable source, long delay, TimeUnit unit) { + return delay(source, delay, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Delays the observable sequence by a time interval so that it starts at the given due time. + */ + public static OnSubscribeFunc delay(final Observable source, Date dueTime) { + return delay(source, dueTime, Schedulers.threadPoolForComputation()); + } + + /** + * Delays the observable sequence by a time interval so that it starts at the given due time. + */ + public static OnSubscribeFunc delay(final Observable source, Date dueTime, final Scheduler scheduler) { + long scheduledTime = dueTime.getTime(); + long delay = scheduledTime - scheduler.now(); + if (delay < 0L) { + delay = 0L; + } + return new Delay(source, delay, TimeUnit.MILLISECONDS, scheduler); + } + + /** + * Delays the observable sequence by the given time interval. + */ + public static OnSubscribeFunc delay(final Observable source, final long period, final TimeUnit unit, final Scheduler scheduler) { + return new Delay(source, period, unit, scheduler); + } + + private static class Delay implements OnSubscribeFunc { + private final Observable source; + private final long delay; + private final TimeUnit unit; + private final Scheduler scheduler; + + private Delay(Observable source, long delay, TimeUnit unit, Scheduler scheduler) { + this.source = source; + this.delay = delay; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + return source.subscribe(new Observer() { + private AtomicBoolean errorOccurred = new AtomicBoolean(); + + @Override + public void onCompleted() { + if (!errorOccurred.get()) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onCompleted(); + } + }, delay, unit); + } + } + + @Override + public void onError(Throwable e) { + // errors get propagated without delay + errorOccurred.set(true); + observer.onError(e); + } + + @Override + public void onNext(final T value) { + if (!errorOccurred.get()) { + scheduler.schedule(new Action0() { + @Override + public void call() { + try { + observer.onNext(value); + } catch (Throwable t) { + errorOccurred.set(true); + observer.onError(t); + } + } + }, delay, unit); + } + } + }); + } + } + + public static class UnitTest { + @Mock + private Observer observer; + @Mock + private Observer observer2; + + private TestScheduler scheduler; + + @Before + public void before() { + initMocks(this); + scheduler = new TestScheduler(); + } + + @Test + public void testDelay() { + Observable source = interval(1L, TimeUnit.SECONDS, scheduler).take(3); + Observable delayed = Observable.create(OperationDelay.delay(source, 500L, TimeUnit.MILLISECONDS, scheduler)); + delayed.subscribe(observer); + + InOrder inOrder = inOrder(observer); + scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(1500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(2400L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(2500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(3400L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(3500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(2L); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void testDelayWithDueTime() { + Observable source = interval(1L, TimeUnit.SECONDS, scheduler).first(); + Observable delayed = Observable.create(OperationDelay.delay(source, new Date(1500L), scheduler)); + delayed.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(2499L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + + scheduler.advanceTimeTo(2500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + inOrder.verify(observer, times(1)).onCompleted(); + + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void testLongDelay() { + Observable source = interval(1L, TimeUnit.SECONDS, scheduler).take(3); + Observable delayed = Observable.create(OperationDelay.delay(source, 5L, TimeUnit.SECONDS, scheduler)); + delayed.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(5999L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(6000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + scheduler.advanceTimeTo(6999L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + scheduler.advanceTimeTo(7000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + scheduler.advanceTimeTo(7999L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + scheduler.advanceTimeTo(8000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(2L); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder.verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void testDelayWithError() { + Observable source = interval(1L, TimeUnit.SECONDS, scheduler).map(new Func1() { + @Override + public Long call(Long value) { + if (value == 1L) { + throw new RuntimeException("error!"); + } + return value; + } + }); + Observable delayed = Observable.create(OperationDelay.delay(source, 1L, TimeUnit.SECONDS, scheduler)); + delayed.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(1999L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(2000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onError(any(Throwable.class)); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + + scheduler.advanceTimeTo(5000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder.verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + } + + @Test + public void testDelayWithMultipleSubscriptions() { + Observable source = interval(1L, TimeUnit.SECONDS, scheduler).take(3); + Observable delayed = Observable.create(OperationDelay.delay(source, 500L, TimeUnit.MILLISECONDS, scheduler)); + delayed.subscribe(observer); + delayed.subscribe(observer2); + + InOrder inOrder = inOrder(observer); + InOrder inOrder2 = inOrder(observer2); + + scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer2, never()).onNext(anyLong()); + + scheduler.advanceTimeTo(1500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + inOrder2.verify(observer2, times(1)).onNext(0L); + + scheduler.advanceTimeTo(2499L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder2.verify(observer2, never()).onNext(anyLong()); + + scheduler.advanceTimeTo(2500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + inOrder2.verify(observer2, times(1)).onNext(1L); + + verify(observer, never()).onCompleted(); + verify(observer2, never()).onCompleted(); + + scheduler.advanceTimeTo(3500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(2L); + inOrder2.verify(observer2, times(1)).onNext(2L); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder2.verify(observer2, never()).onNext(anyLong()); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder2.verify(observer2, times(1)).onCompleted(); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer2, never()).onError(any(Throwable.class)); + } + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java index 0f53c884ba..0d2ded62b1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationInterval.java +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -35,6 +35,7 @@ import rx.observables.ConnectableObservable; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; +import rx.util.functions.Func1; /** * Returns an observable sequence that produces a value after each period. @@ -68,6 +69,7 @@ private static class Interval implements OnSubscribeFunc { private final Scheduler scheduler; private long currentValue; + private boolean errorOccurred; private Interval(long period, TimeUnit unit, Scheduler scheduler) { this.period = period; @@ -80,8 +82,15 @@ public Subscription onSubscribe(final Observer observer) { final Subscription wrapped = scheduler.schedulePeriodically(new Action0() { @Override public void call() { - observer.onNext(currentValue); - currentValue++; + if (!errorOccurred) { + try { + observer.onNext(currentValue); + currentValue++; + } catch (Throwable t) { + errorOccurred = true; + observer.onError(t); + } + } } }, period, period, unit); @@ -89,7 +98,9 @@ public void call() { @Override public void call() { wrapped.unsubscribe(); - observer.onCompleted(); + if (!errorOccurred) { + observer.onCompleted(); + } } }); } @@ -110,7 +121,7 @@ public void before() { @Test public void testInterval() { - Observable w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)); + Observable w = Observable.create(interval(1, TimeUnit.SECONDS, scheduler)); Subscription sub = w.subscribe(observer); verify(observer, never()).onNext(0L); @@ -133,9 +144,35 @@ public void testInterval() { verify(observer, never()).onError(any(Throwable.class)); } + @Test + public void testIntervalWithError() { + Observable w = Observable.create(interval(1, TimeUnit.SECONDS, scheduler)).map(new Func1() { + @Override + public Long call(Long value) { + if (value == 2L) { + throw new RuntimeException("error!"); + } + return value; + } + }); + w.subscribe(observer); + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(2, TimeUnit.SECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + inOrder.verify(observer, times(1)).onNext(1L); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(3, TimeUnit.SECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder.verify(observer, times(1)).onError(any(RuntimeException.class)); + verify(observer, never()).onCompleted(); + } + @Test public void testWithMultipleSubscribersStartingAtSameTime() { - Observable w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)); + Observable w = Observable.create(interval(1, TimeUnit.SECONDS, scheduler)); Subscription sub1 = w.subscribe(observer); Subscription sub2 = w.subscribe(observer2); @@ -174,7 +211,7 @@ public void testWithMultipleSubscribersStartingAtSameTime() { @Test public void testWithMultipleStaggeredSubscribers() { - Observable w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)); + Observable w = Observable.create(interval(1, TimeUnit.SECONDS, scheduler)); Subscription sub1 = w.subscribe(observer); verify(observer, never()).onNext(anyLong()); @@ -214,7 +251,7 @@ public void testWithMultipleStaggeredSubscribers() { @Test public void testWithMultipleStaggeredSubscribersAndPublish() { - ConnectableObservable w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)).publish(); + ConnectableObservable w = Observable.create(interval(1, TimeUnit.SECONDS, scheduler)).publish(); Subscription sub1 = w.subscribe(observer); w.connect();