From 5626b7220badd41557f230378cfaf19c3a708b81 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 16 Oct 2013 13:14:29 +0800 Subject: [PATCH 1/4] Reimplemented the 'reduce' operator --- rxjava-core/src/main/java/rx/Observable.java | 9 +- .../java/rx/operators/OperationReduce.java | 326 ++++++++++++++++++ 2 files changed, 332 insertions(+), 3 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationReduce.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 6bcd3e9f77..52298f65b3 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -61,6 +61,7 @@ import rx.operators.OperationOnErrorReturn; import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallel; +import rx.operators.OperationReduce; import rx.operators.OperationRetry; import rx.operators.OperationSample; import rx.operators.OperationScan; @@ -3514,7 +3515,9 @@ public Observable onErrorReturn(Func1 resumeFunction) * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold," * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, * has an inject method that does a similar operation on lists. - * + *

+ * Note: You can't reduce an empty sequence without an initial value. + * * @param accumulator * An accumulator function to be invoked on each item emitted by the source * Observable, whose result will be used in the next accumulator call @@ -3524,7 +3527,7 @@ public Observable onErrorReturn(Func1 resumeFunction) * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(Func2 accumulator) { - return create(OperationScan.scan(this, accumulator)).takeLast(1); + return create(OperationReduce.reduce(this, accumulator)); } /** @@ -3777,7 +3780,7 @@ public Observable aggregate(Func2 accumulator) { * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(R initialValue, Func2 accumulator) { - return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1); + return create(OperationReduce.reduce(this, initialValue, accumulator)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationReduce.java b/rxjava-core/src/main/java/rx/operators/OperationReduce.java new file mode 100644 index 0000000000..645ae52be6 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationReduce.java @@ -0,0 +1,326 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.util.functions.Func2; + +/** + * Returns an Observable that applies a function of your choosing to the first item emitted by a + * source Observable, then feeds the result of that function along with the second item emitted + * by the source Observable into the same function, and so on until all items have been emitted + * by the source Observable, and emits the final result from the final call to your function as + * its sole item. + *

+ * + *

+ * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold," + * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, + * has an inject method that does a similar operation on lists. + * + * @param accumulator + * An accumulator function to be invoked on each item emitted by the source + * Observable, whose result will be used in the next accumulator call + * @return an Observable that emits a single item that is the result of accumulating the + * output from the source Observable + * @see MSDN: Observable.Aggregate + * @see Wikipedia: Fold (higher-order function) + */ +public final class OperationReduce { + + /** + * Returns an Observable that applies a function of your choosing to the first item emitted by a + * source Observable, then feeds the result of that function along with the second item emitted + * by an Observable into the same function, and so on until all items have been emitted by the + * source Observable, emitting the final result from the final call to your function as its sole + * item. + *

+ * + *

+ * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold," + * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, + * has an inject method that does a similar operation on lists. + * + * @param sequence + * An observable sequence of elements to reduce. + * @param initialValue + * the initial (seed) accumulator value + * @param accumulator + * an accumulator function to be invoked on each item emitted by the source + * Observable, the result of which will be used in the next accumulator call + * @return an Observable that emits a single item that is the result of accumulating the output + * from the items emitted by the source Observable + * @see MSDN: Observable.Aggregate + * @see Wikipedia: Fold (higher-order function) + */ + public static OnSubscribeFunc reduce( + Observable sequence, R initialValue, + Func2 accumulator) { + return new Accumulator(sequence, initialValue, accumulator); + } + + /** + * Returns an Observable that applies a function of your choosing to the first item emitted by a + * source Observable, then feeds the result of that function along with the second item emitted + * by the source Observable into the same function, and so on until all items have been emitted + * by the source Observable, and emits the final result from the final call to your function as + * its sole item. + *

+ * + *

+ * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold," + * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, + * has an inject method that does a similar operation on lists. + *

+ * Note: You can't reduce an empty sequence without an initial value. + * + * @param sequence + * An observable sequence of elements to reduce. + * @param accumulator + * An accumulator function to be invoked on each item emitted by the source + * Observable, whose result will be used in the next accumulator call + * @return an Observable that emits a single item that is the result of accumulating the + * output from the source Observable + * @see MSDN: Observable.Aggregate + * @see Wikipedia: Fold (higher-order function) + */ + public static OnSubscribeFunc reduce( + Observable sequence, Func2 accumulator) { + return new AccumulatorWithoutInitialValue(sequence, accumulator); + } + + private static class Accumulator implements OnSubscribeFunc { + private final Observable sequence; + private final R initialValue; + private final Func2 accumulatorFunction; + + private Accumulator(Observable sequence, R initialValue, + Func2 accumulator) { + this.sequence = sequence; + this.initialValue = initialValue; + this.accumulatorFunction = accumulator; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + return sequence.subscribe(new Observer() { + private volatile R acc = initialValue; + + @Override + public synchronized void onNext(T args) { + acc = accumulatorFunction.call(acc, args); + } + + @Override + public void onCompleted() { + observer.onNext(acc); + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + }); + } + } + + private static class AccumulatorWithoutInitialValue implements + OnSubscribeFunc { + private final Observable sequence; + private final Func2 accumulatorFunction; + + private AccumulatorWithoutInitialValue( + Observable sequence, Func2 accumulator) { + this.sequence = sequence; + this.accumulatorFunction = accumulator; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + return sequence.subscribe(new Observer() { + private volatile T acc = null; + private volatile boolean isSourceSequenceEmpty = true; + + @Override + public synchronized void onNext(T args) { + if (isSourceSequenceEmpty) { + acc = args; + isSourceSequenceEmpty = false; + } else { + acc = accumulatorFunction.call(acc, args); + } + } + + @Override + public void onCompleted() { + if (isSourceSequenceEmpty) { + observer.onError(new UnsupportedOperationException( + "Can not reduce on an empty sequence without an initial value")); + } else { + observer.onNext(acc); + observer.onCompleted(); + } + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + }); + } + } + + public static class UnitTest { + + @Test + public void testReduceIntegersWithInitialValue() { + Observable observable = Observable.from(1, 2, 3); + + Observable m = Observable.create(reduce(observable, "0", + new Func2() { + + @Override + public String call(String s, Integer n) { + return s + n; + } + + })); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext("0123"); + verify(observer, times(1)).onNext(anyString()); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void testReduceIntegersWithoutInitialValue() { + Observable observable = Observable.from(1, 2, 3); + + Observable m = Observable.create(reduce(observable, + new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + })); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(6); + verify(observer, times(1)).onNext(anyInt()); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void testReduceIntegersWithoutInitialValueAndOnlyOneValue() { + Observable observable = Observable.from(1); + + Observable m = Observable.create(reduce(observable, + new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + })); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onNext(anyInt()); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void testReduceIntegersWithInitialValueAndEmpty() { + Observable observable = Observable.empty(); + + Observable m = Observable.create(reduce(observable, "1", + new Func2() { + + @Override + public String call(String s, Integer n) { + return s + n; + } + + })); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + verify(observer, times(1)).onNext("1"); + verify(observer, times(1)).onNext(anyString()); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void testReduceIntegersWithoutInitialValueAndEmpty() { + Observable observable = Observable.empty(); + + Observable m = Observable.create(reduce(observable, + new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + })); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + verify(observer, never()).onNext(anyInt()); + verify(observer, never()).onCompleted(); + verify(observer, times(1)).onError( + any(UnsupportedOperationException.class)); + } + } + +} From b982b288397d6a941225043aeb12db704383084d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 21 Oct 2013 14:50:17 +0800 Subject: [PATCH 2/4] Updated the unit tests and removed synchronized --- .../java/rx/operators/OperationReduce.java | 51 +++++++++---------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationReduce.java b/rxjava-core/src/main/java/rx/operators/OperationReduce.java index 645ae52be6..840a8be937 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationReduce.java +++ b/rxjava-core/src/main/java/rx/operators/OperationReduce.java @@ -16,14 +16,12 @@ package rx.operators; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import org.junit.Test; +import org.mockito.InOrder; import rx.Observable; import rx.Observable.OnSubscribeFunc; @@ -133,7 +131,7 @@ public Subscription onSubscribe(final Observer observer) { private volatile R acc = initialValue; @Override - public synchronized void onNext(T args) { + public void onNext(T args) { acc = accumulatorFunction.call(acc, args); } @@ -169,7 +167,7 @@ public Subscription onSubscribe(final Observer observer) { private volatile boolean isSourceSequenceEmpty = true; @Override - public synchronized void onNext(T args) { + public void onNext(T args) { if (isSourceSequenceEmpty) { acc = args; isSourceSequenceEmpty = false; @@ -217,11 +215,10 @@ public String call(String s, Integer n) { Observer observer = mock(Observer.class); m.subscribe(observer); - verify(observer, never()).onError(any(Throwable.class)); - verify(observer, times(1)).onNext("0123"); - verify(observer, times(1)).onNext(anyString()); - verify(observer, times(1)).onCompleted(); - verify(observer, never()).onError(any(Throwable.class)); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("0123"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); } @Test @@ -242,11 +239,10 @@ public Integer call(Integer t1, Integer t2) { Observer observer = mock(Observer.class); m.subscribe(observer); - verify(observer, never()).onError(any(Throwable.class)); - verify(observer, times(1)).onNext(6); - verify(observer, times(1)).onNext(anyInt()); - verify(observer, times(1)).onCompleted(); - verify(observer, never()).onError(any(Throwable.class)); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(6); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); } @Test @@ -267,11 +263,10 @@ public Integer call(Integer t1, Integer t2) { Observer observer = mock(Observer.class); m.subscribe(observer); - verify(observer, never()).onError(any(Throwable.class)); - verify(observer, times(1)).onNext(1); - verify(observer, times(1)).onNext(anyInt()); - verify(observer, times(1)).onCompleted(); - verify(observer, never()).onError(any(Throwable.class)); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); } @Test @@ -292,10 +287,10 @@ public String call(String s, Integer n) { Observer observer = mock(Observer.class); m.subscribe(observer); - verify(observer, times(1)).onNext("1"); - verify(observer, times(1)).onNext(anyString()); - verify(observer, times(1)).onCompleted(); - verify(observer, never()).onError(any(Throwable.class)); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("1"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); } @Test @@ -316,10 +311,10 @@ public Integer call(Integer t1, Integer t2) { Observer observer = mock(Observer.class); m.subscribe(observer); - verify(observer, never()).onNext(anyInt()); - verify(observer, never()).onCompleted(); - verify(observer, times(1)).onError( + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( any(UnsupportedOperationException.class)); + inOrder.verifyNoMoreInteractions(); } } From 5e7976d680098393c7980f04b6eb4b9c63fcfd5b Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 28 Oct 2013 12:56:12 +0800 Subject: [PATCH 3/4] Used 'materialize' and 'dematerialize' to implement the 'reduce' operator --- rxjava-core/src/main/java/rx/Observable.java | 23 +- .../java/rx/operators/OperationReduce.java | 321 ------------------ rxjava-core/src/test/java/rx/ReduceTests.java | 114 +++++++ 3 files changed, 134 insertions(+), 324 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationReduce.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 52298f65b3..98c1ca6e50 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -61,7 +61,6 @@ import rx.operators.OperationOnErrorReturn; import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallel; -import rx.operators.OperationReduce; import rx.operators.OperationRetry; import rx.operators.OperationSample; import rx.operators.OperationScan; @@ -3523,11 +3522,29 @@ public Observable onErrorReturn(Func1 resumeFunction) * Observable, whose result will be used in the next accumulator call * @return an Observable that emits a single item that is the result of accumulating the * output from the source Observable + * @throws UnsupportedOperationException + * the Observable is empty * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(Func2 accumulator) { - return create(OperationReduce.reduce(this, accumulator)); + return create(OperationScan.scan(this, accumulator)).takeLast(1).assertNonEmpty(); + } + + private Observable assertNonEmpty() { + return materialize().map(new Func1, Notification>() { + private boolean isEmpty = true; + @Override + public Notification call(Notification t1) { + if (t1.isOnNext()) { + isEmpty = false; + } + else if (t1.isOnCompleted() && isEmpty) { + return new Notification(new UnsupportedOperationException("Can not apply on an empty sequence")); + } + return t1; + } + }).dematerialize(); } /** @@ -3780,7 +3797,7 @@ public Observable aggregate(Func2 accumulator) { * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(R initialValue, Func2 accumulator) { - return create(OperationReduce.reduce(this, initialValue, accumulator)); + return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationReduce.java b/rxjava-core/src/main/java/rx/operators/OperationReduce.java deleted file mode 100644 index 840a8be937..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationReduce.java +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; - -import org.junit.Test; -import org.mockito.InOrder; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscription; -import rx.util.functions.Func2; - -/** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by the source Observable into the same function, and so on until all items have been emitted - * by the source Observable, and emits the final result from the final call to your function as - * its sole item. - *

- * - *

- * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold," - * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, - * has an inject method that does a similar operation on lists. - * - * @param accumulator - * An accumulator function to be invoked on each item emitted by the source - * Observable, whose result will be used in the next accumulator call - * @return an Observable that emits a single item that is the result of accumulating the - * output from the source Observable - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) - */ -public final class OperationReduce { - - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * item. - *

- * - *

- * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold," - * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, - * has an inject method that does a similar operation on lists. - * - * @param sequence - * An observable sequence of elements to reduce. - * @param initialValue - * the initial (seed) accumulator value - * @param accumulator - * an accumulator function to be invoked on each item emitted by the source - * Observable, the result of which will be used in the next accumulator call - * @return an Observable that emits a single item that is the result of accumulating the output - * from the items emitted by the source Observable - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) - */ - public static OnSubscribeFunc reduce( - Observable sequence, R initialValue, - Func2 accumulator) { - return new Accumulator(sequence, initialValue, accumulator); - } - - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by the source Observable into the same function, and so on until all items have been emitted - * by the source Observable, and emits the final result from the final call to your function as - * its sole item. - *

- * - *

- * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold," - * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, - * has an inject method that does a similar operation on lists. - *

- * Note: You can't reduce an empty sequence without an initial value. - * - * @param sequence - * An observable sequence of elements to reduce. - * @param accumulator - * An accumulator function to be invoked on each item emitted by the source - * Observable, whose result will be used in the next accumulator call - * @return an Observable that emits a single item that is the result of accumulating the - * output from the source Observable - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) - */ - public static OnSubscribeFunc reduce( - Observable sequence, Func2 accumulator) { - return new AccumulatorWithoutInitialValue(sequence, accumulator); - } - - private static class Accumulator implements OnSubscribeFunc { - private final Observable sequence; - private final R initialValue; - private final Func2 accumulatorFunction; - - private Accumulator(Observable sequence, R initialValue, - Func2 accumulator) { - this.sequence = sequence; - this.initialValue = initialValue; - this.accumulatorFunction = accumulator; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - return sequence.subscribe(new Observer() { - private volatile R acc = initialValue; - - @Override - public void onNext(T args) { - acc = accumulatorFunction.call(acc, args); - } - - @Override - public void onCompleted() { - observer.onNext(acc); - observer.onCompleted(); - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - }); - } - } - - private static class AccumulatorWithoutInitialValue implements - OnSubscribeFunc { - private final Observable sequence; - private final Func2 accumulatorFunction; - - private AccumulatorWithoutInitialValue( - Observable sequence, Func2 accumulator) { - this.sequence = sequence; - this.accumulatorFunction = accumulator; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - return sequence.subscribe(new Observer() { - private volatile T acc = null; - private volatile boolean isSourceSequenceEmpty = true; - - @Override - public void onNext(T args) { - if (isSourceSequenceEmpty) { - acc = args; - isSourceSequenceEmpty = false; - } else { - acc = accumulatorFunction.call(acc, args); - } - } - - @Override - public void onCompleted() { - if (isSourceSequenceEmpty) { - observer.onError(new UnsupportedOperationException( - "Can not reduce on an empty sequence without an initial value")); - } else { - observer.onNext(acc); - observer.onCompleted(); - } - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - }); - } - } - - public static class UnitTest { - - @Test - public void testReduceIntegersWithInitialValue() { - Observable observable = Observable.from(1, 2, 3); - - Observable m = Observable.create(reduce(observable, "0", - new Func2() { - - @Override - public String call(String s, Integer n) { - return s + n; - } - - })); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - m.subscribe(observer); - - InOrder inOrder = inOrder(observer); - inOrder.verify(observer, times(1)).onNext("0123"); - inOrder.verify(observer, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void testReduceIntegersWithoutInitialValue() { - Observable observable = Observable.from(1, 2, 3); - - Observable m = Observable.create(reduce(observable, - new Func2() { - - @Override - public Integer call(Integer t1, Integer t2) { - return t1 + t2; - } - - })); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - m.subscribe(observer); - - InOrder inOrder = inOrder(observer); - inOrder.verify(observer, times(1)).onNext(6); - inOrder.verify(observer, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void testReduceIntegersWithoutInitialValueAndOnlyOneValue() { - Observable observable = Observable.from(1); - - Observable m = Observable.create(reduce(observable, - new Func2() { - - @Override - public Integer call(Integer t1, Integer t2) { - return t1 + t2; - } - - })); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - m.subscribe(observer); - - InOrder inOrder = inOrder(observer); - inOrder.verify(observer, times(1)).onNext(1); - inOrder.verify(observer, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void testReduceIntegersWithInitialValueAndEmpty() { - Observable observable = Observable.empty(); - - Observable m = Observable.create(reduce(observable, "1", - new Func2() { - - @Override - public String call(String s, Integer n) { - return s + n; - } - - })); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - m.subscribe(observer); - - InOrder inOrder = inOrder(observer); - inOrder.verify(observer, times(1)).onNext("1"); - inOrder.verify(observer, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void testReduceIntegersWithoutInitialValueAndEmpty() { - Observable observable = Observable.empty(); - - Observable m = Observable.create(reduce(observable, - new Func2() { - - @Override - public Integer call(Integer t1, Integer t2) { - return t1 + t2; - } - - })); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - m.subscribe(observer); - - InOrder inOrder = inOrder(observer); - inOrder.verify(observer, times(1)).onError( - any(UnsupportedOperationException.class)); - inOrder.verifyNoMoreInteractions(); - } - } - -} diff --git a/rxjava-core/src/test/java/rx/ReduceTests.java b/rxjava-core/src/test/java/rx/ReduceTests.java index 08ff57405b..18eecf8cae 100644 --- a/rxjava-core/src/test/java/rx/ReduceTests.java +++ b/rxjava-core/src/test/java/rx/ReduceTests.java @@ -16,8 +16,13 @@ package rx; import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import org.junit.Test; +import org.mockito.InOrder; import rx.CovarianceTest.HorrorMovie; import rx.CovarianceTest.Movie; @@ -103,4 +108,113 @@ public Movie call(Movie t1, Movie t2) { obs.reduce(chooseSecondMovie); } + @Test + public void testReduceIntegersWithInitialValue() { + Observable m = Observable.from(1, 2, 3).reduce("0", + new Func2() { + + @Override + public String call(String s, Integer n) { + return s + n; + } + + }); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("0123"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testReduceIntegersWithoutInitialValue() { + Observable m = Observable.from(1, 2, 3).reduce( + new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + }); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(6); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testReduceIntegersWithoutInitialValueAndOnlyOneValue() { + Observable m = Observable.from(1).reduce( + new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + }); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testReduceIntegersWithInitialValueAndEmpty() { + Observable m = Observable. empty().reduce("1", + new Func2() { + + @Override + public String call(String s, Integer n) { + return s + n; + } + + }); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("1"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testReduceIntegersWithoutInitialValueAndEmpty() { + Observable m = Observable. empty().reduce( + new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + }); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + any(UnsupportedOperationException.class)); + inOrder.verifyNoMoreInteractions(); + } } From edf068db6424f1fc76e806a89eb560fb80616044 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 7 Nov 2013 13:59:32 +0800 Subject: [PATCH 4/4] Used 'last' to implement 'reduce' --- rxjava-core/src/main/java/rx/Observable.java | 24 ++++--------------- rxjava-core/src/test/java/rx/ReduceTests.java | 7 +++--- 2 files changed, 7 insertions(+), 24 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 98c1ca6e50..1d4c4ef5b7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -3522,29 +3522,13 @@ public Observable onErrorReturn(Func1 resumeFunction) * Observable, whose result will be used in the next accumulator call * @return an Observable that emits a single item that is the result of accumulating the * output from the source Observable - * @throws UnsupportedOperationException - * the Observable is empty + * @throws IllegalArgumentException + * When the Observable is empty, Observers will receive an onError notification with an IllegalArgumentException. * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(Func2 accumulator) { - return create(OperationScan.scan(this, accumulator)).takeLast(1).assertNonEmpty(); - } - - private Observable assertNonEmpty() { - return materialize().map(new Func1, Notification>() { - private boolean isEmpty = true; - @Override - public Notification call(Notification t1) { - if (t1.isOnNext()) { - isEmpty = false; - } - else if (t1.isOnCompleted() && isEmpty) { - return new Notification(new UnsupportedOperationException("Can not apply on an empty sequence")); - } - return t1; - } - }).dematerialize(); + return create(OperationScan.scan(this, accumulator)).last(); } /** @@ -3797,7 +3781,7 @@ public Observable aggregate(Func2 accumulator) { * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(R initialValue, Func2 accumulator) { - return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1); + return create(OperationScan.scan(this, initialValue, accumulator)).last(); } /** diff --git a/rxjava-core/src/test/java/rx/ReduceTests.java b/rxjava-core/src/test/java/rx/ReduceTests.java index 18eecf8cae..75e6e5ff31 100644 --- a/rxjava-core/src/test/java/rx/ReduceTests.java +++ b/rxjava-core/src/test/java/rx/ReduceTests.java @@ -15,8 +15,8 @@ */ package rx; -import static org.junit.Assert.*; -import static org.mockito.Matchers.any; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -213,8 +213,7 @@ public Integer call(Integer t1, Integer t2) { m.subscribe(observer); InOrder inOrder = inOrder(observer); - inOrder.verify(observer, times(1)).onError( - any(UnsupportedOperationException.class)); + inOrder.verify(observer, times(1)).onError(isA(IllegalArgumentException.class)); inOrder.verifyNoMoreInteractions(); } }