Skip to content

Commit 20f50d3

Browse files
Merge pull request ReactiveX#576 from samuelgruetter/delay2
Timer and Delay
2 parents bdc1dbf + 3603c55 commit 20f50d3

File tree

4 files changed

+355
-0
lines changed

4 files changed

+355
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

+58
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import rx.operators.OperationDebounce;
4646
import rx.operators.OperationDefaultIfEmpty;
4747
import rx.operators.OperationDefer;
48+
import rx.operators.OperationDelay;
4849
import rx.operators.OperationDematerialize;
4950
import rx.operators.OperationDistinct;
5051
import rx.operators.OperationDistinctUntilChanged;
@@ -92,6 +93,7 @@
9293
import rx.operators.OperationThrottleFirst;
9394
import rx.operators.OperationTimeInterval;
9495
import rx.operators.OperationTimeout;
96+
import rx.operators.OperationTimer;
9597
import rx.operators.OperationTimestamp;
9698
import rx.operators.OperationToMap;
9799
import rx.operators.OperationToMultimap;
@@ -1989,6 +1991,62 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
19891991
return create(OperationInterval.interval(interval, unit, scheduler));
19901992
}
19911993

1994+
/**
1995+
* Emits one item after a given delay, and then completes.
1996+
*
1997+
* @param interval
1998+
* interval size in time units
1999+
* @param unit
2000+
* time units to use for the interval size
2001+
*/
2002+
public static Observable<Void> timer(long interval, TimeUnit unit) {
2003+
return create(OperationTimer.timer(interval, unit));
2004+
}
2005+
2006+
/**
2007+
* Emits one item after a given delay, and then completes.
2008+
*
2009+
* @param interval
2010+
* interval size in time units
2011+
* @param unit
2012+
* time units to use for the interval size
2013+
* @param scheduler
2014+
* the scheduler to use for scheduling the item
2015+
*/
2016+
public static Observable<Void> timer(long interval, TimeUnit unit, Scheduler scheduler) {
2017+
return create(OperationTimer.timer(interval, unit, scheduler));
2018+
}
2019+
2020+
/**
2021+
* Returns an Observable that emits the results of shifting the items emitted by the source
2022+
* Observable by a specified delay. Errors emitted by the source Observable are not delayed.
2023+
* @param delay
2024+
* the delay to shift the source by
2025+
* @param unit
2026+
* the {@link TimeUnit} in which <code>period</code> is defined
2027+
* @return the source Observable, but shifted by the specified delay
2028+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229810%28v=vs.103%29.aspx">MSDN: Observable.Delay</a>
2029+
*/
2030+
public Observable<T> delay(long delay, TimeUnit unit) {
2031+
return OperationDelay.delay(this, delay, unit, Schedulers.threadPoolForComputation());
2032+
}
2033+
2034+
/**
2035+
* Returns an Observable that emits the results of shifting the items emitted by the source
2036+
* Observable by a specified delay. Errors emitted by the source Observable are not delayed.
2037+
* @param delay
2038+
* the delay to shift the source by
2039+
* @param unit
2040+
* the {@link TimeUnit} in which <code>period</code> is defined
2041+
* @param scheduler
2042+
* the {@link Scheduler} to use for delaying
2043+
* @return the source Observable, but shifted by the specified delay
2044+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229280(v=vs.103).aspx">MSDN: Observable.Delay</a>
2045+
*/
2046+
public Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
2047+
return OperationDelay.delay(this, delay, unit, scheduler);
2048+
}
2049+
19922050
/**
19932051
* Drops items emitted by an Observable that are followed by newer items
19942052
* before a timeout value expires. The timer resets on each emission.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
import rx.Observable;
21+
import rx.Scheduler;
22+
import rx.observables.ConnectableObservable;
23+
import rx.util.functions.Func1;
24+
25+
public final class OperationDelay {
26+
27+
public static <T> Observable<T> delay(Observable<T> observable, final long delay, final TimeUnit unit, final Scheduler scheduler) {
28+
// observable.map(x => Observable.timer(t).map(_ => x).startItAlreadyNow()).concat()
29+
Observable<Observable<T>> seqs = observable.map(new Func1<T, Observable<T>>() {
30+
public Observable<T> call(final T x) {
31+
ConnectableObservable<T> co = Observable.timer(delay, unit, scheduler).map(new Func1<Void, T>() {
32+
public T call(Void ignored) {
33+
return x;
34+
}
35+
}).replay();
36+
co.connect();
37+
return co;
38+
}
39+
});
40+
return Observable.concat(seqs);
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
import rx.Observable.OnSubscribeFunc;
21+
import rx.Observer;
22+
import rx.Scheduler;
23+
import rx.Subscription;
24+
import rx.concurrency.Schedulers;
25+
import rx.subscriptions.Subscriptions;
26+
import rx.util.functions.Action0;
27+
28+
public final class OperationTimer {
29+
30+
public static OnSubscribeFunc<Void> timer(long interval, TimeUnit unit) {
31+
return timer(interval, unit, Schedulers.threadPoolForComputation());
32+
}
33+
34+
public static OnSubscribeFunc<Void> timer(final long delay, final TimeUnit unit, final Scheduler scheduler) {
35+
return new OnSubscribeFunc<Void>() {
36+
@Override
37+
public Subscription onSubscribe(Observer<? super Void> observer) {
38+
return new Timer(delay, unit, scheduler, observer).start();
39+
}
40+
};
41+
}
42+
43+
private static class Timer {
44+
private final long period;
45+
private final TimeUnit unit;
46+
private final Scheduler scheduler;
47+
private final Observer<? super Void> observer;
48+
49+
private Timer(long period, TimeUnit unit, Scheduler scheduler, Observer<? super Void> observer) {
50+
this.period = period;
51+
this.unit = unit;
52+
this.scheduler = scheduler;
53+
this.observer = observer;
54+
}
55+
56+
public Subscription start() {
57+
final Subscription s = scheduler.schedule(new Action0() {
58+
@Override
59+
public void call() {
60+
observer.onNext(null);
61+
observer.onCompleted();
62+
}
63+
}, period, unit);
64+
65+
return Subscriptions.create(new Action0() {
66+
@Override
67+
public void call() {
68+
s.unsubscribe();
69+
}
70+
});
71+
}
72+
}
73+
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package rx.operators;
2+
3+
import static org.mockito.Matchers.any;
4+
import static org.mockito.Matchers.anyLong;
5+
import static org.mockito.Mockito.inOrder;
6+
import static org.mockito.Mockito.never;
7+
import static org.mockito.Mockito.times;
8+
import static org.mockito.Mockito.verify;
9+
import static org.mockito.MockitoAnnotations.initMocks;
10+
11+
import java.util.concurrent.TimeUnit;
12+
13+
import org.junit.Before;
14+
import org.junit.Ignore;
15+
import org.junit.Test;
16+
import org.mockito.InOrder;
17+
import org.mockito.Mock;
18+
19+
import rx.Observable;
20+
import rx.Observer;
21+
import rx.concurrency.TestScheduler;
22+
import rx.util.functions.Func1;
23+
24+
public class OperationDelayTest {
25+
@Mock
26+
private Observer<Long> observer;
27+
@Mock
28+
private Observer<Long> observer2;
29+
30+
private TestScheduler scheduler;
31+
32+
@Before
33+
public void before() {
34+
initMocks(this);
35+
scheduler = new TestScheduler();
36+
}
37+
38+
@Test
39+
public void testDelay() {
40+
Observable<Long> source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3);
41+
Observable<Long> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
42+
delayed.subscribe(observer);
43+
44+
InOrder inOrder = inOrder(observer);
45+
scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS);
46+
verify(observer, never()).onNext(anyLong());
47+
verify(observer, never()).onCompleted();
48+
verify(observer, never()).onError(any(Throwable.class));
49+
50+
scheduler.advanceTimeTo(1500L, TimeUnit.MILLISECONDS);
51+
inOrder.verify(observer, times(1)).onNext(0L);
52+
inOrder.verify(observer, never()).onNext(anyLong());
53+
verify(observer, never()).onCompleted();
54+
verify(observer, never()).onError(any(Throwable.class));
55+
56+
scheduler.advanceTimeTo(2400L, TimeUnit.MILLISECONDS);
57+
inOrder.verify(observer, never()).onNext(anyLong());
58+
verify(observer, never()).onCompleted();
59+
verify(observer, never()).onError(any(Throwable.class));
60+
61+
scheduler.advanceTimeTo(2500L, TimeUnit.MILLISECONDS);
62+
inOrder.verify(observer, times(1)).onNext(1L);
63+
inOrder.verify(observer, never()).onNext(anyLong());
64+
verify(observer, never()).onCompleted();
65+
verify(observer, never()).onError(any(Throwable.class));
66+
67+
scheduler.advanceTimeTo(3400L, TimeUnit.MILLISECONDS);
68+
inOrder.verify(observer, never()).onNext(anyLong());
69+
verify(observer, never()).onCompleted();
70+
verify(observer, never()).onError(any(Throwable.class));
71+
72+
scheduler.advanceTimeTo(3500L, TimeUnit.MILLISECONDS);
73+
inOrder.verify(observer, times(1)).onNext(2L);
74+
verify(observer, times(1)).onCompleted();
75+
verify(observer, never()).onError(any(Throwable.class));
76+
}
77+
78+
@Test
79+
public void testLongDelay() {
80+
Observable<Long> source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3);
81+
Observable<Long> delayed = source.delay(5L, TimeUnit.SECONDS, scheduler);
82+
delayed.subscribe(observer);
83+
84+
InOrder inOrder = inOrder(observer);
85+
86+
scheduler.advanceTimeTo(5999L, TimeUnit.MILLISECONDS);
87+
verify(observer, never()).onNext(anyLong());
88+
verify(observer, never()).onCompleted();
89+
verify(observer, never()).onError(any(Throwable.class));
90+
91+
scheduler.advanceTimeTo(6000L, TimeUnit.MILLISECONDS);
92+
inOrder.verify(observer, times(1)).onNext(0L);
93+
scheduler.advanceTimeTo(6999L, TimeUnit.MILLISECONDS);
94+
inOrder.verify(observer, never()).onNext(anyLong());
95+
scheduler.advanceTimeTo(7000L, TimeUnit.MILLISECONDS);
96+
inOrder.verify(observer, times(1)).onNext(1L);
97+
scheduler.advanceTimeTo(7999L, TimeUnit.MILLISECONDS);
98+
inOrder.verify(observer, never()).onNext(anyLong());
99+
scheduler.advanceTimeTo(8000L, TimeUnit.MILLISECONDS);
100+
inOrder.verify(observer, times(1)).onNext(2L);
101+
inOrder.verify(observer, times(1)).onCompleted();
102+
inOrder.verify(observer, never()).onNext(anyLong());
103+
inOrder.verify(observer, never()).onCompleted();
104+
verify(observer, never()).onError(any(Throwable.class));
105+
}
106+
107+
@Test
108+
public void testDelayWithError() {
109+
Observable<Long> source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).map(new Func1<Long, Long>() {
110+
@Override
111+
public Long call(Long value) {
112+
if (value == 1L) {
113+
throw new RuntimeException("error!");
114+
}
115+
return value;
116+
}
117+
});
118+
Observable<Long> delayed = source.delay(1L, TimeUnit.SECONDS, scheduler);
119+
delayed.subscribe(observer);
120+
121+
InOrder inOrder = inOrder(observer);
122+
123+
scheduler.advanceTimeTo(1999L, TimeUnit.MILLISECONDS);
124+
verify(observer, never()).onNext(anyLong());
125+
verify(observer, never()).onCompleted();
126+
verify(observer, never()).onError(any(Throwable.class));
127+
128+
scheduler.advanceTimeTo(2000L, TimeUnit.MILLISECONDS);
129+
inOrder.verify(observer, times(1)).onError(any(Throwable.class));
130+
inOrder.verify(observer, never()).onNext(anyLong());
131+
verify(observer, never()).onCompleted();
132+
133+
scheduler.advanceTimeTo(5000L, TimeUnit.MILLISECONDS);
134+
inOrder.verify(observer, never()).onNext(anyLong());
135+
inOrder.verify(observer, never()).onError(any(Throwable.class));
136+
verify(observer, never()).onCompleted();
137+
}
138+
139+
// TODO activate this test once https://github.com/Netflix/RxJava/issues/552 is fixed
140+
@Ignore
141+
@Test
142+
public void testDelayWithMultipleSubscriptions() {
143+
Observable<Long> source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3);
144+
Observable<Long> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
145+
delayed.subscribe(observer);
146+
delayed.subscribe(observer2);
147+
148+
InOrder inOrder = inOrder(observer);
149+
InOrder inOrder2 = inOrder(observer2);
150+
151+
scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS);
152+
verify(observer, never()).onNext(anyLong());
153+
verify(observer2, never()).onNext(anyLong());
154+
155+
scheduler.advanceTimeTo(1500L, TimeUnit.MILLISECONDS);
156+
inOrder.verify(observer, times(1)).onNext(0L);
157+
inOrder2.verify(observer2, times(1)).onNext(0L);
158+
159+
scheduler.advanceTimeTo(2499L, TimeUnit.MILLISECONDS);
160+
inOrder.verify(observer, never()).onNext(anyLong());
161+
inOrder2.verify(observer2, never()).onNext(anyLong());
162+
163+
scheduler.advanceTimeTo(2500L, TimeUnit.MILLISECONDS);
164+
inOrder.verify(observer, times(1)).onNext(1L);
165+
inOrder2.verify(observer2, times(1)).onNext(1L);
166+
167+
verify(observer, never()).onCompleted();
168+
verify(observer2, never()).onCompleted();
169+
170+
scheduler.advanceTimeTo(3500L, TimeUnit.MILLISECONDS);
171+
inOrder.verify(observer, times(1)).onNext(2L);
172+
inOrder2.verify(observer2, times(1)).onNext(2L);
173+
inOrder.verify(observer, never()).onNext(anyLong());
174+
inOrder2.verify(observer2, never()).onNext(anyLong());
175+
inOrder.verify(observer, times(1)).onCompleted();
176+
inOrder2.verify(observer2, times(1)).onCompleted();
177+
178+
verify(observer, never()).onError(any(Throwable.class));
179+
verify(observer2, never()).onError(any(Throwable.class));
180+
}
181+
}

0 commit comments

Comments
 (0)