Skip to content

Commit

Permalink
Merge pull request #1399 from benjchristensen/perf-tests-0702
Browse files Browse the repository at this point in the history
Update Perf Tests
  • Loading branch information
benjchristensen committed Jul 2, 2014
2 parents 37ccadb + 78572f3 commit 746f199
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 17 deletions.
10 changes: 10 additions & 0 deletions rxjava-core/src/perf/java/rx/PerfBaseline.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public void observableConsumption(Input input) throws InterruptedException {
public void observableViaRange(Input input) throws InterruptedException {
input.observable.subscribe(input.observer);
}

@Benchmark
public void observableConsumptionUnsafe(Input input) throws InterruptedException {
input.firehose.unsafeSubscribe(input.newSubscriber());
}

@Benchmark
public void observableViaRangeUnsafe(Input input) throws InterruptedException {
input.observable.unsafeSubscribe(input.newSubscriber());
}

@Benchmark
public void iterableViaForLoopConsumption(Input input) throws InterruptedException {
Expand Down
25 changes: 23 additions & 2 deletions rxjava-core/src/perf/java/rx/jmh/InputWithIncrementingInteger.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public abstract class InputWithIncrementingInteger {
public Iterable<Integer> iterable;
public Observable<Integer> observable;
public Observable<Integer> firehose;
private Blackhole bh;
public Blackhole bh;
public Observer<Integer> observer;

public abstract int getSize();

@Setup
public void setup(final Blackhole bh) {
this.bh = bh;
Expand Down Expand Up @@ -106,4 +106,25 @@ public LatchedObserver<Integer> newLatchedObserver() {
return new LatchedObserver<Integer>(bh);
}

public Subscriber<Integer> newSubscriber() {
return new Subscriber<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer t) {
bh.consume(t);
}

};
}

}
62 changes: 47 additions & 15 deletions rxjava-core/src/perf/java/rx/operators/OperatorMergePerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
Expand All @@ -39,21 +39,25 @@
@OutputTimeUnit(TimeUnit.SECONDS)
public class OperatorMergePerf {

@State(Scope.Thread)
public static class Input extends InputWithIncrementingInteger {

@Param({ "1", "1000" })
public int size;
// flatMap
@Benchmark
public void oneStreamOfNthatMergesIn1(final InputMillion input) throws InterruptedException {
Observable<Observable<Integer>> os = Observable.range(1, input.size).map(new Func1<Integer, Observable<Integer>>() {

@Override
public int getSize() {
return size;
}
@Override
public Observable<Integer> call(Integer i) {
return Observable.just(i);
}

});
LatchedObserver<Integer> o = input.newLatchedObserver();
Observable.merge(os).subscribe(o);
o.latch.await();
}

// flatMap
@Benchmark
public void merge1SyncStreamOfN(final Input input) throws InterruptedException {
public void merge1SyncStreamOfN(final InputMillion input) throws InterruptedException {
Observable<Observable<Integer>> os = Observable.just(1).map(new Func1<Integer, Observable<Integer>>() {

@Override
Expand All @@ -66,9 +70,9 @@ public Observable<Integer> call(Integer i) {
Observable.merge(os).subscribe(o);
o.latch.await();
}

@Benchmark
public void mergeNSyncStreamsOfN(final Input input) throws InterruptedException {
public void mergeNSyncStreamsOfN(final InputThousand input) throws InterruptedException {
Observable<Observable<Integer>> os = input.observable.map(new Func1<Integer, Observable<Integer>>() {

@Override
Expand All @@ -83,7 +87,7 @@ public Observable<Integer> call(Integer i) {
}

@Benchmark
public void mergeNAsyncStreamsOfN(final Input input) throws InterruptedException {
public void mergeNAsyncStreamsOfN(final InputThousand input) throws InterruptedException {
Observable<Observable<Integer>> os = input.observable.map(new Func1<Integer, Observable<Integer>>() {

@Override
Expand All @@ -98,7 +102,7 @@ public Observable<Integer> call(Integer i) {
}

@Benchmark
public void mergeTwoAsyncStreamsOfN(final Input input) throws InterruptedException {
public void mergeTwoAsyncStreamsOfN(final InputThousand input) throws InterruptedException {
LatchedObserver<Integer> o = input.newLatchedObserver();
Observable<Integer> ob = Observable.range(0, input.size).subscribeOn(Schedulers.computation());
Observable.merge(ob, ob).subscribe(o);
Expand All @@ -115,6 +119,7 @@ public void mergeNSyncStreamsOf1(final InputForMergeN input) throws InterruptedE
@State(Scope.Thread)
public static class InputForMergeN {
@Param({ "1", "100", "1000" })
// @Param({ "1000" })
public int size;

private Blackhole bh;
Expand All @@ -134,4 +139,31 @@ public LatchedObserver<Integer> newLatchedObserver() {
}
}

@State(Scope.Thread)
public static class InputMillion extends InputWithIncrementingInteger {

@Param({ "1", "1000", "1000000" })
// @Param({ "1000" })
public int size;

@Override
public int getSize() {
return size;
}

}

@State(Scope.Thread)
public static class InputThousand extends InputWithIncrementingInteger {

@Param({ "1", "1000" })
// @Param({ "1000" })
public int size;

@Override
public int getSize() {
return size;
}

}
}

0 comments on commit 746f199

Please sign in to comment.