From f3362eb788219ecc55d4b648e2fb589e8d6171d1 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 14 Feb 2013 14:56:39 -0800 Subject: [PATCH 1/6] add error handling for onNext failure so exceptions don't get thrown up the stack but instead via onError --- rxjava-core/src/main/java/rx/util/AtomicObserver.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/util/AtomicObserver.java b/rxjava-core/src/main/java/rx/util/AtomicObserver.java index 86eee9ccfa..fe04125bf9 100644 --- a/rxjava-core/src/main/java/rx/util/AtomicObserver.java +++ b/rxjava-core/src/main/java/rx/util/AtomicObserver.java @@ -67,8 +67,13 @@ public void onError(Exception e) { @Override public void onNext(T args) { - if (!isFinished.get()) { - actual.onNext(args); + try { + if (!isFinished.get()) { + actual.onNext(args); + } + }catch(Exception e) { + // handle errors if the onNext implementation fails, not just if the Observable fails + onError(e); } } From 6c4cd18e647dfc30ef482245a36cede4d81eb20a Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 14 Feb 2013 14:57:10 -0800 Subject: [PATCH 2/6] Operator: ForEach https://github.com/Netflix/RxJava/issues/45 --- .../rx/lang/groovy/ObservableTests.groovy | 57 +++++ rxjava-core/src/main/java/rx/Observable.java | 238 +++++++++++++++++- 2 files changed, 293 insertions(+), 2 deletions(-) diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 2bb4c4c6bf..7f62af5724 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -213,6 +213,63 @@ def class ObservableTests { Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5)); } + + @Test + public void testForEach() { + Observable.create(new AsyncObservable()).forEach({ result -> a.received(result)}); + verify(a, times(1)).received(1); + verify(a, times(1)).received(2); + verify(a, times(1)).received(3); + } + + @Test + public void testForEachWithComplete() { + Observable.create(new AsyncObservable()).forEach({ result -> a.received(result)}, {}, {a.received('done')}); + verify(a, times(1)).received(1); + verify(a, times(1)).received(2); + verify(a, times(1)).received(3); + verify(a, times(1)).received("done"); + } + + @Test + public void testForEachWithError() { + Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')}, {err -> a.received(err.message)}); + verify(a, times(0)).received(1); + verify(a, times(0)).received(2); + verify(a, times(0)).received(3); + verify(a, times(1)).received("err"); + verify(a, times(0)).received("done"); + } + + @Test + public void testForEachWithCompleteAndError() { + Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')}, {err -> a.received(err.message)}, {a.received('done')},); + verify(a, times(0)).received(1); + verify(a, times(0)).received(2); + verify(a, times(0)).received(3); + verify(a, times(1)).received("err"); + verify(a, times(0)).received("done"); + } + + def class AsyncObservable implements Func1, Subscription> { + + public Subscription call(final Observer observer) { + new Thread(new Runnable() { + public void run() { + try { + Thread.sleep(50) + }catch(Exception e) { + // ignore + } + observer.onNext(1); + observer.onNext(2); + observer.onNext(3); + observer.onCompleted(); + } + }).start(); + return Observable.noOpSubscription(); + } + } def class TestFactory { int counter = 1; diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 3dc60b8d0d..86e85be0f5 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -332,6 +333,240 @@ public void onNext(T args) { }); } + /** + * Blocking version of {@link #subscribe(Observer)}. + *

+ * NOTE: This will block even if the Observable is asynchronous. + * + * @param observer + */ + public void forEach(final Observer observer) { + final CountDownLatch latch = new CountDownLatch(1); + subscribe(new Observer() { + public void onCompleted() { + try { + observer.onCompleted(); + } finally { + latch.countDown(); + } + } + + public void onError(Exception e) { + try { + observer.onError(e); + } finally { + latch.countDown(); + } + } + + public void onNext(T args) { + observer.onNext(args); + } + }); + // block until the subscription completes and then return + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting for subscription to complete.", e); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void forEach(final Map callbacks) { + // lookup and memoize onNext + Object _onNext = callbacks.get("onNext"); + if (_onNext == null) { + throw new RuntimeException("onNext must be implemented"); + } + final FuncN onNext = Functions.from(_onNext); + + forEach(new Observer() { + + public void onCompleted() { + Object onComplete = callbacks.get("onCompleted"); + if (onComplete != null) { + Functions.from(onComplete).call(); + } + } + + public void onError(Exception e) { + handleError(e); + Object onError = callbacks.get("onError"); + if (onError != null) { + Functions.from(onError).call(e); + } + } + + public void onNext(Object args) { + onNext.call(args); + } + + }); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void forEach(final Object o) { + if (o instanceof Observer) { + // in case a dynamic language is not correctly handling the overloaded methods and we receive an Observer just forward to the correct method. + forEach((Observer) o); + } + + // lookup and memoize onNext + if (o == null) { + throw new RuntimeException("onNext must be implemented"); + } + final FuncN onNext = Functions.from(o); + + forEach(new Observer() { + + public void onCompleted() { + // do nothing + } + + public void onError(Exception e) { + handleError(e); + // no callback defined + } + + public void onNext(Object args) { + onNext.call(args); + } + + }); + } + + public void forEach(final Action1 onNext) { + + forEach(new Observer() { + + public void onCompleted() { + // do nothing + } + + public void onError(Exception e) { + handleError(e); + // no callback defined + } + + public void onNext(T args) { + if (onNext == null) { + throw new RuntimeException("onNext must be implemented"); + } + onNext.call(args); + } + + }); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void forEach(final Object onNext, final Object onError) { + // lookup and memoize onNext + if (onNext == null) { + throw new RuntimeException("onNext must be implemented"); + } + final FuncN onNextFunction = Functions.from(onNext); + + forEach(new Observer() { + + public void onCompleted() { + // do nothing + } + + public void onError(Exception e) { + handleError(e); + if (onError != null) { + Functions.from(onError).call(e); + } + } + + public void onNext(Object args) { + onNextFunction.call(args); + } + + }); + } + + public void forEach(final Action1 onNext, final Action1 onError) { + + forEach(new Observer() { + + public void onCompleted() { + // do nothing + } + + public void onError(Exception e) { + handleError(e); + if (onError != null) { + onError.call(e); + } + } + + public void onNext(T args) { + if (onNext == null) { + throw new RuntimeException("onNext must be implemented"); + } + onNext.call(args); + } + + }); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void forEach(final Object onNext, final Object onError, final Object onComplete) { + // lookup and memoize onNext + if (onNext == null) { + throw new RuntimeException("onNext must be implemented"); + } + final FuncN onNextFunction = Functions.from(onNext); + + forEach(new Observer() { + + public void onCompleted() { + if (onComplete != null) { + Functions.from(onComplete).call(); + } + } + + public void onError(Exception e) { + handleError(e); + if (onError != null) { + Functions.from(onError).call(e); + } + } + + public void onNext(Object args) { + onNextFunction.call(args); + } + + }); + } + + public void forEach(final Action1 onNext, final Action1 onError, final Action0 onComplete) { + + forEach(new Observer() { + + public void onCompleted() { + onComplete.call(); + } + + public void onError(Exception e) { + handleError(e); + if (onError != null) { + onError.call(e); + } + } + + public void onNext(T args) { + if (onNext == null) { + throw new RuntimeException("onNext must be implemented"); + } + onNext.call(args); + } + + }); + } + + /** * Allow the {@link RxJavaErrorHandler} to receive the exception from onError. * @@ -2543,7 +2778,6 @@ public void testSequenceEqual() { verify(result, times(1)).onNext(false); } - - } + } From 100f571c9a2835d5a30a55374b9be74c147e031f Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 14 Feb 2013 15:18:11 -0800 Subject: [PATCH 3/6] forEach with Action1 but not Observer I re-read the MSDN docs and found the previous implementation wasn't complying with the contract. http://msdn.microsoft.com/en-us/library/hh211815(v=vs.103).aspx I believe this now does. --- .../rx/lang/groovy/ObservableTests.groovy | 29 +-- rxjava-core/src/main/java/rx/Observable.java | 223 +++--------------- 2 files changed, 39 insertions(+), 213 deletions(-) diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 7f62af5724..967c096691 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -22,6 +22,7 @@ import java.util.Arrays; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -222,33 +223,17 @@ def class ObservableTests { verify(a, times(1)).received(3); } - @Test - public void testForEachWithComplete() { - Observable.create(new AsyncObservable()).forEach({ result -> a.received(result)}, {}, {a.received('done')}); - verify(a, times(1)).received(1); - verify(a, times(1)).received(2); - verify(a, times(1)).received(3); - verify(a, times(1)).received("done"); - } - @Test public void testForEachWithError() { - Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')}, {err -> a.received(err.message)}); - verify(a, times(0)).received(1); - verify(a, times(0)).received(2); - verify(a, times(0)).received(3); - verify(a, times(1)).received("err"); - verify(a, times(0)).received("done"); - } - - @Test - public void testForEachWithCompleteAndError() { - Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')}, {err -> a.received(err.message)}, {a.received('done')},); + try { + Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')}); + fail("we expect an exception to be thrown"); + }catch(Exception e) { + + } verify(a, times(0)).received(1); verify(a, times(0)).received(2); verify(a, times(0)).received(3); - verify(a, times(1)).received("err"); - verify(a, times(0)).received("done"); } def class AsyncObservable implements Func1, Subscription> { diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 86e85be0f5..ba9cc6827b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -26,6 +26,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; @@ -334,33 +335,39 @@ public void onNext(T args) { } /** - * Blocking version of {@link #subscribe(Observer)}. + * Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated. *

* NOTE: This will block even if the Observable is asynchronous. + *

+ * This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods. * - * @param observer + * @param onNext + * {@link Action1} + * @throws RuntimeException + * if error occurs */ - public void forEach(final Observer observer) { + public void forEach(final Action1 onNext) { final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference exceptionFromOnError = new AtomicReference(); + subscribe(new Observer() { public void onCompleted() { - try { - observer.onCompleted(); - } finally { - latch.countDown(); - } + latch.countDown(); } public void onError(Exception e) { - try { - observer.onError(e); - } finally { - latch.countDown(); - } + /* + * If we receive an onError event we set the reference on the outer thread + * so we can git it and throw after the latch.await(). + * + * We do this instead of throwing directly since this may be on a different thread and the latch is still waiting. + */ + exceptionFromOnError.set(e); + latch.countDown(); } public void onNext(T args) { - observer.onNext(args); + onNext.call(args); } }); // block until the subscription completes and then return @@ -369,46 +376,21 @@ public void onNext(T args) { } catch (InterruptedException e) { throw new RuntimeException("Interrupted while waiting for subscription to complete.", e); } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void forEach(final Map callbacks) { - // lookup and memoize onNext - Object _onNext = callbacks.get("onNext"); - if (_onNext == null) { - throw new RuntimeException("onNext must be implemented"); - } - final FuncN onNext = Functions.from(_onNext); - - forEach(new Observer() { - - public void onCompleted() { - Object onComplete = callbacks.get("onCompleted"); - if (onComplete != null) { - Functions.from(onComplete).call(); - } - } - - public void onError(Exception e) { - handleError(e); - Object onError = callbacks.get("onError"); - if (onError != null) { - Functions.from(onError).call(e); - } - } - public void onNext(Object args) { - onNext.call(args); + if (exceptionFromOnError.get() != null) { + if (exceptionFromOnError.get() instanceof RuntimeException) { + throw (RuntimeException) exceptionFromOnError.get(); + } else { + throw new RuntimeException(exceptionFromOnError.get()); } - - }); + } } @SuppressWarnings({ "rawtypes", "unchecked" }) public void forEach(final Object o) { - if (o instanceof Observer) { - // in case a dynamic language is not correctly handling the overloaded methods and we receive an Observer just forward to the correct method. - forEach((Observer) o); + if (o instanceof Action1) { + // in case a dynamic language is not correctly handling the overloaded methods and we receive an Action1 just forward to the correct method. + forEach((Action1) o); } // lookup and memoize onNext @@ -417,156 +399,15 @@ public void forEach(final Object o) { } final FuncN onNext = Functions.from(o); - forEach(new Observer() { - - public void onCompleted() { - // do nothing - } - - public void onError(Exception e) { - handleError(e); - // no callback defined - } - - public void onNext(Object args) { - onNext.call(args); - } - - }); - } - - public void forEach(final Action1 onNext) { - - forEach(new Observer() { - - public void onCompleted() { - // do nothing - } - - public void onError(Exception e) { - handleError(e); - // no callback defined - } - - public void onNext(T args) { - if (onNext == null) { - throw new RuntimeException("onNext must be implemented"); - } - onNext.call(args); - } - - }); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void forEach(final Object onNext, final Object onError) { - // lookup and memoize onNext - if (onNext == null) { - throw new RuntimeException("onNext must be implemented"); - } - final FuncN onNextFunction = Functions.from(onNext); - - forEach(new Observer() { + forEach(new Action1() { - public void onCompleted() { - // do nothing - } - - public void onError(Exception e) { - handleError(e); - if (onError != null) { - Functions.from(onError).call(e); - } - } - - public void onNext(Object args) { - onNextFunction.call(args); - } - - }); - } - - public void forEach(final Action1 onNext, final Action1 onError) { - - forEach(new Observer() { - - public void onCompleted() { - // do nothing - } - - public void onError(Exception e) { - handleError(e); - if (onError != null) { - onError.call(e); - } - } - - public void onNext(T args) { - if (onNext == null) { - throw new RuntimeException("onNext must be implemented"); - } + public void call(Object args) { onNext.call(args); } }); } - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void forEach(final Object onNext, final Object onError, final Object onComplete) { - // lookup and memoize onNext - if (onNext == null) { - throw new RuntimeException("onNext must be implemented"); - } - final FuncN onNextFunction = Functions.from(onNext); - - forEach(new Observer() { - - public void onCompleted() { - if (onComplete != null) { - Functions.from(onComplete).call(); - } - } - - public void onError(Exception e) { - handleError(e); - if (onError != null) { - Functions.from(onError).call(e); - } - } - - public void onNext(Object args) { - onNextFunction.call(args); - } - - }); - } - - public void forEach(final Action1 onNext, final Action1 onError, final Action0 onComplete) { - - forEach(new Observer() { - - public void onCompleted() { - onComplete.call(); - } - - public void onError(Exception e) { - handleError(e); - if (onError != null) { - onError.call(e); - } - } - - public void onNext(T args) { - if (onNext == null) { - throw new RuntimeException("onNext must be implemented"); - } - onNext.call(args); - } - - }); - } - - /** * Allow the {@link RxJavaErrorHandler} to receive the exception from onError. * From e2e62cb134a91710b76775597c146d34a46fb29b Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 14 Feb 2013 15:20:23 -0800 Subject: [PATCH 4/6] add a comment so it doesn't look like this was left empty by accident --- .../src/test/groovy/rx/lang/groovy/ObservableTests.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 967c096691..17e0ebb934 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -229,7 +229,7 @@ def class ObservableTests { Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')}); fail("we expect an exception to be thrown"); }catch(Exception e) { - + // do nothing as we expect this } verify(a, times(0)).received(1); verify(a, times(0)).received(2); From 643627edd66312e4f3f07c99a63618834bf46071 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 15 Feb 2013 09:25:58 -0800 Subject: [PATCH 5/6] removing unused verifications https://github.com/Netflix/RxJava/pull/147/files#r3028477 --- .../src/test/groovy/rx/lang/groovy/ObservableTests.groovy | 3 --- 1 file changed, 3 deletions(-) diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 17e0ebb934..8d4d4b656a 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -231,9 +231,6 @@ def class ObservableTests { }catch(Exception e) { // do nothing as we expect this } - verify(a, times(0)).received(1); - verify(a, times(0)).received(2); - verify(a, times(0)).received(3); } def class AsyncObservable implements Func1, Subscription> { From 4880e34a45bea09828ce3419919d5fd3336f54f3 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 15 Feb 2013 12:33:56 -0800 Subject: [PATCH 6/6] mark the thread as interrupted again --- rxjava-core/src/main/java/rx/Observable.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index ba9cc6827b..5cf8d82ec2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -374,6 +374,10 @@ public void onNext(T args) { try { latch.await(); } catch (InterruptedException e) { + // set the interrupted flag again so callers can still get it + // for more information see https://github.com/Netflix/RxJava/pull/147#issuecomment-13624780 + Thread.currentThread().interrupt(); + // using Runtime so it is not checked throw new RuntimeException("Interrupted while waiting for subscription to complete.", e); }