Skip to content

Commit

Permalink
Merge pull request #147 from benjchristensen/issue-45
Browse files Browse the repository at this point in the history
Operator: forEach
  • Loading branch information
benjchristensen committed Feb 15, 2013
2 parents 3dfae54 + 4880e34 commit 481f948
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Arrays;

import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

Expand Down Expand Up @@ -213,6 +214,44 @@ def class ObservableTests {
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
}

@Test
public void testForEach() {
Observable.create(new AsyncObservable()).forEach({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(1)).received(3);
}

@Test
public void testForEachWithError() {
try {
Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')});
fail("we expect an exception to be thrown");
}catch(Exception e) {
// do nothing as we expect this
}
}

def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {

public Subscription call(final Observer<Integer> observer) {
new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(50)
}catch(Exception e) {
// ignore
}
observer.onNext(1);
observer.onNext(2);
observer.onNext(3);
observer.onCompleted();
}
}).start();
return Observable.noOpSubscription();
}
}

def class TestFactory {
int counter = 1;
Expand Down
83 changes: 81 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -332,6 +334,84 @@ public void onNext(T args) {
});
}

/**
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
* <p>
* NOTE: This will block even if the Observable is asynchronous.
* <p>
* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods.
*
* @param onNext
* {@link Action1}
* @throws RuntimeException
* if error occurs
*/
public void forEach(final Action1<T> onNext) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> exceptionFromOnError = new AtomicReference<Exception>();

subscribe(new Observer<T>() {
public void onCompleted() {
latch.countDown();
}

public void onError(Exception e) {
/*
* If we receive an onError event we set the reference on the outer thread
* so we can git it and throw after the latch.await().
*
* We do this instead of throwing directly since this may be on a different thread and the latch is still waiting.
*/
exceptionFromOnError.set(e);
latch.countDown();
}

public void onNext(T args) {
onNext.call(args);
}
});
// block until the subscription completes and then return
try {
latch.await();
} catch (InterruptedException e) {
// set the interrupted flag again so callers can still get it
// for more information see https://github.com/Netflix/RxJava/pull/147#issuecomment-13624780
Thread.currentThread().interrupt();
// using Runtime so it is not checked
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
}

if (exceptionFromOnError.get() != null) {
if (exceptionFromOnError.get() instanceof RuntimeException) {
throw (RuntimeException) exceptionFromOnError.get();
} else {
throw new RuntimeException(exceptionFromOnError.get());
}
}
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public void forEach(final Object o) {
if (o instanceof Action1) {
// in case a dynamic language is not correctly handling the overloaded methods and we receive an Action1 just forward to the correct method.
forEach((Action1) o);
}

// lookup and memoize onNext
if (o == null) {
throw new RuntimeException("onNext must be implemented");
}
final FuncN onNext = Functions.from(o);

forEach(new Action1() {

public void call(Object args) {
onNext.call(args);
}

});
}

/**
* Allow the {@link RxJavaErrorHandler} to receive the exception from onError.
*
Expand Down Expand Up @@ -2543,7 +2623,6 @@ public void testSequenceEqual() {
verify(result, times(1)).onNext(false);
}



}

}
9 changes: 7 additions & 2 deletions rxjava-core/src/main/java/rx/util/AtomicObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ public void onError(Exception e) {

@Override
public void onNext(T args) {
if (!isFinished.get()) {
actual.onNext(args);
try {
if (!isFinished.get()) {
actual.onNext(args);
}
}catch(Exception e) {
// handle errors if the onNext implementation fails, not just if the Observable fails
onError(e);
}
}

Expand Down

0 comments on commit 481f948

Please sign in to comment.