diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 2bb4c4c6bf..8d4d4b656a 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -22,6 +22,7 @@ import java.util.Arrays; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -213,6 +214,44 @@ def class ObservableTests { Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5)); } + + @Test + public void testForEach() { + Observable.create(new AsyncObservable()).forEach({ result -> a.received(result)}); + verify(a, times(1)).received(1); + verify(a, times(1)).received(2); + verify(a, times(1)).received(3); + } + + @Test + public void testForEachWithError() { + try { + Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')}); + fail("we expect an exception to be thrown"); + }catch(Exception e) { + // do nothing as we expect this + } + } + + def class AsyncObservable implements Func1, Subscription> { + + public Subscription call(final Observer observer) { + new Thread(new Runnable() { + public void run() { + try { + Thread.sleep(50) + }catch(Exception e) { + // ignore + } + observer.onNext(1); + observer.onNext(2); + observer.onNext(3); + observer.onCompleted(); + } + }).start(); + return Observable.noOpSubscription(); + } + } def class TestFactory { int counter = 1; diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 3dc60b8d0d..5cf8d82ec2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -22,9 +22,11 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; @@ -332,6 +334,84 @@ public void onNext(T args) { }); } + /** + * Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated. + *

+ * NOTE: This will block even if the Observable is asynchronous. + *

+ * This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods. + * + * @param onNext + * {@link Action1} + * @throws RuntimeException + * if error occurs + */ + public void forEach(final Action1 onNext) { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference exceptionFromOnError = new AtomicReference(); + + subscribe(new Observer() { + public void onCompleted() { + latch.countDown(); + } + + public void onError(Exception e) { + /* + * If we receive an onError event we set the reference on the outer thread + * so we can git it and throw after the latch.await(). + * + * We do this instead of throwing directly since this may be on a different thread and the latch is still waiting. + */ + exceptionFromOnError.set(e); + latch.countDown(); + } + + public void onNext(T args) { + onNext.call(args); + } + }); + // block until the subscription completes and then return + try { + latch.await(); + } catch (InterruptedException e) { + // set the interrupted flag again so callers can still get it + // for more information see https://github.com/Netflix/RxJava/pull/147#issuecomment-13624780 + Thread.currentThread().interrupt(); + // using Runtime so it is not checked + throw new RuntimeException("Interrupted while waiting for subscription to complete.", e); + } + + if (exceptionFromOnError.get() != null) { + if (exceptionFromOnError.get() instanceof RuntimeException) { + throw (RuntimeException) exceptionFromOnError.get(); + } else { + throw new RuntimeException(exceptionFromOnError.get()); + } + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void forEach(final Object o) { + if (o instanceof Action1) { + // in case a dynamic language is not correctly handling the overloaded methods and we receive an Action1 just forward to the correct method. + forEach((Action1) o); + } + + // lookup and memoize onNext + if (o == null) { + throw new RuntimeException("onNext must be implemented"); + } + final FuncN onNext = Functions.from(o); + + forEach(new Action1() { + + public void call(Object args) { + onNext.call(args); + } + + }); + } + /** * Allow the {@link RxJavaErrorHandler} to receive the exception from onError. * @@ -2543,7 +2623,6 @@ public void testSequenceEqual() { verify(result, times(1)).onNext(false); } - - } + } diff --git a/rxjava-core/src/main/java/rx/util/AtomicObserver.java b/rxjava-core/src/main/java/rx/util/AtomicObserver.java index 86eee9ccfa..fe04125bf9 100644 --- a/rxjava-core/src/main/java/rx/util/AtomicObserver.java +++ b/rxjava-core/src/main/java/rx/util/AtomicObserver.java @@ -67,8 +67,13 @@ public void onError(Exception e) { @Override public void onNext(T args) { - if (!isFinished.get()) { - actual.onNext(args); + try { + if (!isFinished.get()) { + actual.onNext(args); + } + }catch(Exception e) { + // handle errors if the onNext implementation fails, not just if the Observable fails + onError(e); } }