Skip to content

Commit

Permalink
Merge pull request #841 from benjchristensen/operator-range
Browse files Browse the repository at this point in the history
Range OnSubscribe
  • Loading branch information
benjchristensen committed Feb 9, 2014
2 parents 589d360 + 4154c0f commit 185a575
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 152 deletions.
18 changes: 12 additions & 6 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.observers.SafeSubscriber;
import rx.operators.OnSubscribeRange;
import rx.operators.OperationAll;
import rx.operators.OperationAmb;
import rx.operators.OperationAny;
Expand Down Expand Up @@ -95,7 +96,7 @@
import rx.operators.OperationWindow;
import rx.operators.OperatorCast;
import rx.operators.OperatorDoOnEach;
import rx.operators.OperatorFromIterable;
import rx.operators.OnSubscribeFromIterable;
import rx.operators.OperatorGroupBy;
import rx.operators.OperatorMap;
import rx.operators.OperatorMerge;
Expand All @@ -120,7 +121,6 @@
import rx.subscriptions.Subscriptions;
import rx.util.Exceptions;
import rx.util.OnErrorNotImplementedException;
import rx.util.Range;
import rx.util.TimeInterval;
import rx.util.Timestamped;
import rx.util.functions.Action0;
Expand Down Expand Up @@ -1217,7 +1217,7 @@ public final static <T> Observable<T> from(Future<? extends T> future, Scheduler
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-from">RxJava Wiki: from()</a>
*/
public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
return create(new OperatorFromIterable<T>(iterable));
return create(new OnSubscribeFromIterable<T>(iterable));
}

/**
Expand All @@ -1239,7 +1239,7 @@ public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212140.aspx">MSDN: Observable.ToObservable</a>
*/
public final static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler scheduler) {
return create(new OperatorFromIterable<T>(iterable)).subscribeOn(scheduler);
return create(new OnSubscribeFromIterable<T>(iterable)).subscribeOn(scheduler);
}

/**
Expand Down Expand Up @@ -2439,7 +2439,10 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229460.aspx">MSDN: Observable.Range</a>
*/
public final static Observable<Integer> range(int start, int count) {
return from(Range.createWithCount(start, count));
if ((start + count) > Integer.MAX_VALUE) {
throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
}
return Observable.create(new OnSubscribeRange(start, start + count));
}

/**
Expand All @@ -2459,7 +2462,10 @@ public final static Observable<Integer> range(int start, int count) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211896.aspx">MSDN: Observable.Range</a>
*/
public final static Observable<Integer> range(int start, int count, Scheduler scheduler) {
return from(Range.createWithCount(start, count), scheduler);
if ((start + count) > Integer.MAX_VALUE) {
throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
}
return Observable.create(new OnSubscribeRange(start, start + count)).subscribeOn(scheduler);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
* You can convert any object that supports the Iterable interface into an Observable that emits
* each item in the object, with the toObservable operation.
*/
public final class OperatorFromIterable<T> implements OnSubscribe<T> {
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {

final Iterable<? extends T> is;

public OperatorFromIterable(Iterable<? extends T> iterable) {
public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
this.is = iterable;
}

Expand Down
44 changes: 44 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OnSubscribeRange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable.OnSubscribe;
import rx.Subscriber;

/**
*/
public final class OnSubscribeRange implements OnSubscribe<Integer> {

private final int start;
private final int end;

public OnSubscribeRange(int start, int end) {
this.start = start;
this.end = end;
}

@Override
public void call(Subscriber<? super Integer> o) {
for (int i = start; i < end; i++) {
if (o.isUnsubscribed()) {
return;
}
o.onNext(i);
}
o.onCompleted();
}

}
75 changes: 0 additions & 75 deletions rxjava-core/src/main/java/rx/util/Range.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package rx.operators;

import rx.Observable;
import rx.perf.AbstractPerformanceTester;
import rx.perf.IntegerSumObserver;
import rx.util.functions.Action0;

public class OperatorRangePerformance extends AbstractPerformanceTester {

static int reps = Integer.MAX_VALUE / 8;

OperatorRangePerformance() {
super(reps);
}

public static void main(String args[]) {

final OperatorRangePerformance spt = new OperatorRangePerformance();
try {
spt.runTest(new Action0() {

@Override
public void call() {
spt.timeRange();
}
});
} catch (Exception e) {
e.printStackTrace();
}

}

/**
*
* -- 0.17
*
* Run: 10 - 271,147,198 ops/sec
* Run: 11 - 274,821,481 ops/sec
* Run: 12 - 271,632,295 ops/sec
* Run: 13 - 277,876,014 ops/sec
* Run: 14 - 274,821,763 ops/sec
*
* -- 0.16.1
*
* Run: 10 - 222,104,280 ops/sec
* Run: 11 - 224,311,761 ops/sec
* Run: 12 - 222,999,339 ops/sec
* Run: 13 - 222,344,174 ops/sec
* Run: 14 - 225,247,983 ops/sec
*
* @return
*/
public long timeRange() {
IntegerSumObserver o = new IntegerSumObserver();
Observable.range(1, reps).subscribe(o);
return o.sum;
}

}
40 changes: 40 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OnSubscribeRangeTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.mockito.Mockito.*;

import org.junit.Test;

import rx.Observable;
import rx.Observer;

public class OnSubscribeRangeTest {

@Test
public void testRangeStartAt2Count3() {
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
Observable.range(2, 3).subscribe(observer);

verify(observer, times(1)).onNext(2);
verify(observer, times(1)).onNext(3);
verify(observer, times(1)).onNext(4);
verify(observer, never()).onNext(5);
verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(observer, times(1)).onCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Arrays;

Expand All @@ -30,7 +32,7 @@ public class OperatorFromIterableTest {

@Test
public void testIterable() {
Observable<String> observable = Observable.create(new OperatorFromIterable<String>(Arrays.<String> asList("one", "two", "three")));
Observable<String> observable = Observable.create(new OnSubscribeFromIterable<String>(Arrays.<String> asList("one", "two", "three")));

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand All @@ -41,7 +43,7 @@ public void testIterable() {
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
}

@Test
public void testObservableFromIterable() {
Observable<String> observable = Observable.from(Arrays.<String> asList("one", "two", "three"));
Expand Down
Loading

0 comments on commit 185a575

Please sign in to comment.