diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 6bcd3e9f77..1d4c4ef5b7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -3514,17 +3514,21 @@ public Observable onErrorReturn(Func1 resumeFunction) * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold," * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, * has an inject method that does a similar operation on lists. - * + *

+ * Note: You can't reduce an empty sequence without an initial value. + * * @param accumulator * An accumulator function to be invoked on each item emitted by the source * Observable, whose result will be used in the next accumulator call * @return an Observable that emits a single item that is the result of accumulating the * output from the source Observable + * @throws IllegalArgumentException + * When the Observable is empty, Observers will receive an onError notification with an IllegalArgumentException. * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(Func2 accumulator) { - return create(OperationScan.scan(this, accumulator)).takeLast(1); + return create(OperationScan.scan(this, accumulator)).last(); } /** @@ -3777,7 +3781,7 @@ public Observable aggregate(Func2 accumulator) { * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(R initialValue, Func2 accumulator) { - return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1); + return create(OperationScan.scan(this, initialValue, accumulator)).last(); } /** diff --git a/rxjava-core/src/test/java/rx/ReduceTests.java b/rxjava-core/src/test/java/rx/ReduceTests.java index 08ff57405b..75e6e5ff31 100644 --- a/rxjava-core/src/test/java/rx/ReduceTests.java +++ b/rxjava-core/src/test/java/rx/ReduceTests.java @@ -15,9 +15,14 @@ */ package rx; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import org.junit.Test; +import org.mockito.InOrder; import rx.CovarianceTest.HorrorMovie; import rx.CovarianceTest.Movie; @@ -103,4 +108,112 @@ public Movie call(Movie t1, Movie t2) { obs.reduce(chooseSecondMovie); } + @Test + public void testReduceIntegersWithInitialValue() { + Observable m = Observable.from(1, 2, 3).reduce("0", + new Func2() { + + @Override + public String call(String s, Integer n) { + return s + n; + } + + }); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("0123"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testReduceIntegersWithoutInitialValue() { + Observable m = Observable.from(1, 2, 3).reduce( + new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + }); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(6); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testReduceIntegersWithoutInitialValueAndOnlyOneValue() { + Observable m = Observable.from(1).reduce( + new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + }); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testReduceIntegersWithInitialValueAndEmpty() { + Observable m = Observable. empty().reduce("1", + new Func2() { + + @Override + public String call(String s, Integer n) { + return s + n; + } + + }); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("1"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testReduceIntegersWithoutInitialValueAndEmpty() { + Observable m = Observable. empty().reduce( + new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + }); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + m.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError(isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } }