Skip to content

Commit

Permalink
Merge pull request ReactiveX#450 from zsxwing/time-interval
Browse files Browse the repository at this point in the history
Implemented the 'TimeInterval' operator
  • Loading branch information
benjchristensen committed Oct 31, 2013
2 parents 52a33fd + bbbcb69 commit 5cab689
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 0 deletions.
26 changes: 26 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleFirst;
import rx.operators.OperationTimeInterval;
import rx.operators.OperationTimeout;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
Expand All @@ -97,6 +98,7 @@
import rx.util.OnErrorNotImplementedException;
import rx.util.Opening;
import rx.util.Range;
import rx.util.TimeInterval;
import rx.util.Timestamped;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
Expand Down Expand Up @@ -4533,6 +4535,30 @@ public Observable<T> timeout(long timeout, TimeUnit timeUnit) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, Schedulers.threadPoolForComputation()));
}

/**
* Records the time interval between consecutive elements in an observable sequence.
*
* @return An observable sequence with time interval information on elements.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212107(v=vs.103).aspx">MSDN: Observable.TimeInterval</a>
*/
public Observable<TimeInterval<T>> timeInterval() {
return create(OperationTimeInterval.timeInterval(this));
}

/**
* Records the time interval between consecutive elements in an observable
* sequence, using the specified scheduler to compute time intervals.
*
* @param scheduler
* Scheduler used to compute time intervals.
*
* @return An observable sequence with time interval information on elements.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212107(v=vs.103).aspx">MSDN: Observable.TimeInterval</a>
*/
public Observable<TimeInterval<T>> timeInterval(Scheduler scheduler) {
return create(OperationTimeInterval.timeInterval(this, scheduler));
}

/**
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
* <p>
Expand Down
140 changes: 140 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationTimeInterval.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.times;

import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.concurrency.TestScheduler;
import rx.subjects.PublishSubject;
import rx.util.TimeInterval;

/**
* Records the time interval between consecutive elements in an observable sequence.
*/
public class OperationTimeInterval {

public static <T> OnSubscribeFunc<TimeInterval<T>> timeInterval(
Observable<? extends T> source) {
return timeInterval(source, Schedulers.immediate());
}

public static <T> OnSubscribeFunc<TimeInterval<T>> timeInterval(
final Observable<? extends T> source, final Scheduler scheduler) {
return new OnSubscribeFunc<TimeInterval<T>>() {
@Override
public Subscription onSubscribe(
Observer<? super TimeInterval<T>> observer) {
return source.subscribe(new TimeIntervalObserver<T>(observer,
scheduler));
}
};
}

private static class TimeIntervalObserver<T> implements Observer<T> {

private final Observer<? super TimeInterval<T>> observer;
/**
* Only used to compute time intervals.
*/
private final Scheduler scheduler;
private long lastTimestamp;

public TimeIntervalObserver(Observer<? super TimeInterval<T>> observer,
Scheduler scheduler) {
this.observer = observer;
this.scheduler = scheduler;
// The beginning time is the time when the observer subscribes.
lastTimestamp = scheduler.now();
}

@Override
public void onNext(T args) {
long nowTimestamp = scheduler.now();
observer.onNext(new TimeInterval<T>(nowTimestamp - lastTimestamp,
args));
lastTimestamp = nowTimestamp;
}

@Override
public void onCompleted() {
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
observer.onCompleted();
}
}

public static class UnitTest {

private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;

@Mock
private Observer<TimeInterval<Integer>> observer;

private TestScheduler testScheduler;
private PublishSubject<Integer> subject;
private Observable<TimeInterval<Integer>> observable;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
testScheduler = new TestScheduler();
subject = PublishSubject.create();
observable = subject.timeInterval(testScheduler);
}

@Test
public void testTimeInterval() {
InOrder inOrder = inOrder(observer);
observable.subscribe(observer);

testScheduler.advanceTimeBy(1000, TIME_UNIT);
subject.onNext(1);
testScheduler.advanceTimeBy(2000, TIME_UNIT);
subject.onNext(2);
testScheduler.advanceTimeBy(3000, TIME_UNIT);
subject.onNext(3);
subject.onCompleted();

inOrder.verify(observer, times(1)).onNext(
new TimeInterval<Integer>(1000, 1));
inOrder.verify(observer, times(1)).onNext(
new TimeInterval<Integer>(2000, 2));
inOrder.verify(observer, times(1)).onNext(
new TimeInterval<Integer>(3000, 3));
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}
}

}
81 changes: 81 additions & 0 deletions rxjava-core/src/main/java/rx/util/TimeInterval.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.util;

public class TimeInterval<T> {
private final long intervalInMilliseconds;
private final T value;

public TimeInterval(long intervalInMilliseconds, T value) {
this.value = value;
this.intervalInMilliseconds = intervalInMilliseconds;
}

/**
* Returns the interval in milliseconds.
*
* @return interval in milliseconds
*/
public long getIntervalInMilliseconds() {
return intervalInMilliseconds;
}

/**
* Returns the value.
*
* @return the value
*/
public T getValue() {
return value;
}

// The following methods are generated by eclipse automatically.
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime
* result
+ (int) (intervalInMilliseconds ^ (intervalInMilliseconds >>> 32));
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TimeInterval<?> other = (TimeInterval<?>) obj;
if (intervalInMilliseconds != other.intervalInMilliseconds)
return false;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}

@Override
public String toString() {
return "TimeInterval [intervalInMilliseconds=" + intervalInMilliseconds
+ ", value=" + value + "]";
}
}

0 comments on commit 5cab689

Please sign in to comment.