diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 76987f0c0c..73f3f0daac 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -5388,7 +5388,18 @@ public final Observable retry() { public final Observable retry(int retryCount) { return nest().lift(new OperatorRetry(retryCount)); } - + /** + * Returns an Observable that mirrors the source Observable, resubscribing to it if it calls {@code onError} + * and the predicate returns true for that specific exception and retry count. + * @param predicate the predicate that determines if a resubscription may happen in case of a specific exception and retry + * count + * @return the Observable modified with retry logic + * @see #retry() + */ + public final Observable retry(Func2 predicate) { + return nest().lift(new OperatorRetryWithPredicate(predicate)); + } + /** * Returns an Observable that emits the most recently emitted item (if any) emitted by the source Observable * within periodic time intervals. diff --git a/rxjava-core/src/main/java/rx/operators/OperatorRetryWithPredicate.java b/rxjava-core/src/main/java/rx/operators/OperatorRetryWithPredicate.java new file mode 100644 index 0000000000..87a1560295 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorRetryWithPredicate.java @@ -0,0 +1,117 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import rx.Observable; +import rx.Scheduler; +import rx.Subscriber; +import rx.functions.Action0; +import rx.functions.Func2; +import rx.schedulers.Schedulers; +import rx.subscriptions.SerialSubscription; + +public final class OperatorRetryWithPredicate implements Observable.Operator> { + final Func2 predicate; + public OperatorRetryWithPredicate(Func2 predicate) { + this.predicate = predicate; + } + + @Override + public Subscriber> call(final Subscriber child) { + final Scheduler.Worker inner = Schedulers.trampoline().createWorker(); + child.add(inner); + + final SerialSubscription serialSubscription = new SerialSubscription(); + // add serialSubscription so it gets unsubscribed if child is unsubscribed + child.add(serialSubscription); + + return new SourceSubscriber(child, predicate, inner, serialSubscription); + } + + static final class SourceSubscriber extends Subscriber> { + final Subscriber child; + final Func2 predicate; + final Scheduler.Worker inner; + final SerialSubscription serialSubscription; + + volatile int attempts; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater ATTEMPTS_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts"); + + public SourceSubscriber(Subscriber child, final Func2 predicate, Scheduler.Worker inner, + SerialSubscription serialSubscription) { + this.child = child; + this.predicate = predicate; + this.inner = inner; + this.serialSubscription = serialSubscription; + } + + + @Override + public void onCompleted() { + // ignore as we expect a single nested Observable + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(final Observable o) { + inner.schedule(new Action0() { + + @Override + public void call() { + final Action0 _self = this; + ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this); + + // new subscription each time so if it unsubscribes itself it does not prevent retries + // by unsubscribing the child subscription + Subscriber subscriber = new Subscriber() { + + @Override + public void onCompleted() { + child.onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (predicate.call(attempts, e) && !inner.isUnsubscribed()) { + // retry again + inner.schedule(_self); + } else { + // give up and pass the failure + child.onError(e); + } + } + + @Override + public void onNext(T v) { + child.onNext(v); + } + + }; + // register this Subscription (and unsubscribe previous if exists) + serialSubscription.set(subscriber); + o.unsafeSubscribe(subscriber); + } + }); + } + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java b/rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java index a564e71a10..841c69cd56 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java @@ -260,11 +260,11 @@ public void call(Subscriber s) { assertEquals(4, subsCount.get()); // 1 + 3 retries } - class SlowObservable implements Observable.OnSubscribe { + static final class SlowObservable implements Observable.OnSubscribe { - private AtomicInteger efforts = new AtomicInteger(0); - private AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0); - private AtomicInteger nextBeforeFailure; + final AtomicInteger efforts = new AtomicInteger(0); + final AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0); + final AtomicInteger nextBeforeFailure; private final int emitDelay; @@ -273,6 +273,7 @@ public SlowObservable(int emitDelay, int countNext) { this.nextBeforeFailure = new AtomicInteger(countNext); } + @Override public void call(final Subscriber subscriber) { final AtomicBoolean terminate = new AtomicBoolean(false); efforts.getAndIncrement(); @@ -309,7 +310,7 @@ public void call() { } /** Observer for listener on seperate thread */ - class AsyncObserver implements Observer { + static final class AsyncObserver implements Observer { protected CountDownLatch latch = new CountDownLatch(1); diff --git a/rxjava-core/src/test/java/rx/operators/OperatorRetryWithPredicateTest.java b/rxjava-core/src/test/java/rx/operators/OperatorRetryWithPredicateTest.java new file mode 100644 index 0000000000..0aff7a2404 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperatorRetryWithPredicateTest.java @@ -0,0 +1,273 @@ + /** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertEquals; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Mockito.*; +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.Subscriber; +import rx.Subscription; +import rx.exceptions.TestException; +import rx.functions.Action1; +import rx.functions.Func2; +import rx.subjects.PublishSubject; + +public class OperatorRetryWithPredicateTest { + Func2 retryTwice = new Func2() { + @Override + public Boolean call(Integer t1, Throwable t2) { + return t1 <= 2; + } + }; + Func2 retry5 = new Func2() { + @Override + public Boolean call(Integer t1, Throwable t2) { + return t1 <= 5; + } + }; + Func2 retryOnTestException = new Func2() { + @Override + public Boolean call(Integer t1, Throwable t2) { + return t2 instanceof IOException; + } + }; + @Test + public void testWithNothingToRetry() { + Observable source = Observable.range(0, 3); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.retry(retryTwice).subscribe(o); + + inOrder.verify(o).onNext(0); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(2); + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void testRetryTwice() { + Observable source = Observable.create(new OnSubscribe() { + int count; + @Override + public void call(Subscriber t1) { + count++; + t1.onNext(0); + t1.onNext(1); + if (count == 1) { + t1.onError(new TestException()); + return; + } + t1.onNext(2); + t1.onNext(3); + t1.onCompleted(); + } + }); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.retry(retryTwice).subscribe(o); + + inOrder.verify(o).onNext(0); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(0); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(2); + inOrder.verify(o).onNext(3); + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + + } + @Test + public void testRetryTwiceAndGiveUp() { + Observable source = Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber t1) { + t1.onNext(0); + t1.onNext(1); + t1.onError(new TestException()); + } + }); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.retry(retryTwice).subscribe(o); + + inOrder.verify(o).onNext(0); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(0); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(0); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onError(any(TestException.class)); + verify(o, never()).onCompleted(); + + } + @Test + public void testRetryOnSpecificException() { + Observable source = Observable.create(new OnSubscribe() { + int count; + @Override + public void call(Subscriber t1) { + count++; + t1.onNext(0); + t1.onNext(1); + if (count == 1) { + t1.onError(new IOException()); + return; + } + t1.onNext(2); + t1.onNext(3); + t1.onCompleted(); + } + }); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.retry(retryOnTestException).subscribe(o); + + inOrder.verify(o).onNext(0); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(0); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(2); + inOrder.verify(o).onNext(3); + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void testRetryOnSpecificExceptionAndNotOther() { + final IOException ioe = new IOException(); + final TestException te = new TestException(); + Observable source = Observable.create(new OnSubscribe() { + int count; + @Override + public void call(Subscriber t1) { + count++; + t1.onNext(0); + t1.onNext(1); + if (count == 1) { + t1.onError(ioe); + return; + } + t1.onNext(2); + t1.onNext(3); + t1.onError(te); + } + }); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.retry(retryOnTestException).subscribe(o); + + inOrder.verify(o).onNext(0); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(0); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(2); + inOrder.verify(o).onNext(3); + inOrder.verify(o).onError(te); + verify(o, never()).onError(ioe); + verify(o, never()).onCompleted(); + } + + @Test + public void testUnsubscribeFromRetry() { + PublishSubject subject = PublishSubject.create(); + final AtomicInteger count = new AtomicInteger(0); + Subscription sub = subject.retry(retryTwice).subscribe(new Action1() { + @Override + public void call(Integer n) { + count.incrementAndGet(); + } + }); + subject.onNext(1); + sub.unsubscribe(); + subject.onNext(2); + assertEquals(1, count.get()); + } + + @Test(timeout = 1000) + public void testUnsubscribeAfterError() { + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + + // Observable that always fails after 100ms + OperatorRetryTest.SlowObservable so = new OperatorRetryTest.SlowObservable(100, 0); + Observable o = Observable + .create(so) + .retry(retry5); + + OperatorRetryTest.AsyncObserver async = new OperatorRetryTest.AsyncObserver(observer); + + o.subscribe(async); + + async.await(); + + InOrder inOrder = inOrder(observer); + // Should fail once + inOrder.verify(observer, times(1)).onError(any(Throwable.class)); + inOrder.verify(observer, never()).onCompleted(); + + assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get()); + assertEquals("Only 1 active subscription", 1, so.maxActive.get()); + } + + @Test(timeout = 1000) + public void testTimeoutWithRetry() { + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + + // Observable that sends every 100ms (timeout fails instead) + OperatorRetryTest.SlowObservable so = new OperatorRetryTest.SlowObservable(100, 10); + Observable o = Observable + .create(so) + .timeout(80, TimeUnit.MILLISECONDS) + .retry(retry5); + + OperatorRetryTest.AsyncObserver async = new OperatorRetryTest.AsyncObserver(observer); + + o.subscribe(async); + + async.await(); + + InOrder inOrder = inOrder(observer); + // Should fail once + inOrder.verify(observer, times(1)).onError(any(Throwable.class)); + inOrder.verify(observer, never()).onCompleted(); + + assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get()); + } +}