Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplemented the 'reduce' operator #436

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3514,17 +3514,21 @@ public Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunction)
* This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold,"
* "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance,
* has an <code>inject</code> method that does a similar operation on lists.
*
* <p>
* Note: You can't reduce an empty sequence without an initial value.
*
* @param accumulator
* An accumulator function to be invoked on each item emitted by the source
* Observable, whose result will be used in the next accumulator call
* @return an Observable that emits a single item that is the result of accumulating the
* output from the source Observable
* @throws IllegalArgumentException
* When the Observable is empty, Observers will receive an onError notification with an IllegalArgumentException.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154(v%3Dvs.103).aspx">MSDN: Observable.Aggregate</a>
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public Observable<T> reduce(Func2<T, T, T> accumulator) {
return create(OperationScan.scan(this, accumulator)).takeLast(1);
return create(OperationScan.scan(this, accumulator)).last();
}

/**
Expand Down Expand Up @@ -3777,7 +3781,7 @@ public Observable<T> aggregate(Func2<T, T, T> accumulator) {
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1);
return create(OperationScan.scan(this, initialValue, accumulator)).last();
}

/**
Expand Down
115 changes: 114 additions & 1 deletion rxjava-core/src/test/java/rx/ReduceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
*/
package rx;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import org.junit.Test;
import org.mockito.InOrder;

import rx.CovarianceTest.HorrorMovie;
import rx.CovarianceTest.Movie;
Expand Down Expand Up @@ -103,4 +108,112 @@ public Movie call(Movie t1, Movie t2) {
obs.reduce(chooseSecondMovie);
}

@Test
public void testReduceIntegersWithInitialValue() {
Observable<String> m = Observable.from(1, 2, 3).reduce("0",
new Func2<String, Integer, String>() {

@Override
public String call(String s, Integer n) {
return s + n;
}

});

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
m.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("0123");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testReduceIntegersWithoutInitialValue() {
Observable<Integer> m = Observable.from(1, 2, 3).reduce(
new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
return t1 + t2;
}

});

@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
m.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(6);
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testReduceIntegersWithoutInitialValueAndOnlyOneValue() {
Observable<Integer> m = Observable.from(1).reduce(
new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
return t1 + t2;
}

});

@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
m.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(1);
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testReduceIntegersWithInitialValueAndEmpty() {
Observable<String> m = Observable.<Integer> empty().reduce("1",
new Func2<String, Integer, String>() {

@Override
public String call(String s, Integer n) {
return s + n;
}

});

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
m.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("1");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testReduceIntegersWithoutInitialValueAndEmpty() {
Observable<Integer> m = Observable.<Integer> empty().reduce(
new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
return t1 + t2;
}

});

@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
m.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onError(isA(IllegalArgumentException.class));
inOrder.verifyNoMoreInteractions();
}
}