From 9488e86a7fe7011536a42514b7f3f60238c69664 Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Mon, 28 Apr 2014 14:47:09 +0100 Subject: [PATCH 01/14] added inner RetrofitScheduler --- pom.xml | 2 +- .../src/main/java/retrofit/RestAdapter.java | 96 ++++++++++++++++++- 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index e71473b183..e64f858714 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ 16 2.2.4 1.3.0 - 0.17.1 + 0.18.1 1.8.9 diff --git a/retrofit/src/main/java/retrofit/RestAdapter.java b/retrofit/src/main/java/retrofit/RestAdapter.java index 18174734fd..1845541498 100644 --- a/retrofit/src/main/java/retrofit/RestAdapter.java +++ b/retrofit/src/main/java/retrofit/RestAdapter.java @@ -22,11 +22,22 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.lang.reflect.Type; +import java.util.Collection; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + import retrofit.Profiler.RequestInformation; import retrofit.client.Client; import retrofit.client.Header; @@ -41,7 +52,11 @@ import rx.Observable; import rx.Scheduler; import rx.Subscriber; +import rx.Subscription; +import rx.functions.Action0; import rx.schedulers.Schedulers; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; /** * Adapts a Java interface to a REST API. @@ -233,7 +248,7 @@ private static final class RxSupport { private final ErrorHandler errorHandler; RxSupport(Executor executor, ErrorHandler errorHandler) { - this.scheduler = Schedulers.executor(executor); + this.scheduler = new RetrofitScheduler(executor); this.errorHandler = errorHandler; } @@ -259,6 +274,85 @@ Observable createRequestObservable(final Callable request) { } }).subscribeOn(scheduler); } + + static class RetrofitScheduler extends Scheduler { + private final Executor executorService; + + /*package*/ RetrofitScheduler(Executor executorService) { + this.executorService = executorService; + } + + @Override + public Worker createWorker() { + return new EventLoopScheduler(executorService); + } + + static class EventLoopScheduler extends Scheduler.Worker implements Subscription { + private final CompositeSubscription innerSubscription = new CompositeSubscription(); + private final Executor executor; + + /* package */ EventLoopScheduler(Executor executor) { + this.executor = executor; + } + + @Override + public Subscription schedule(final Action0 action) { + if (innerSubscription.isUnsubscribed()) { + // don't schedule, we are unsubscribed + return Subscriptions.empty(); + } + + final AtomicReference sf = new AtomicReference(); + final Subscription s; + if (executor instanceof ExecutorService) { + s = Subscriptions.from(((ExecutorService) executor).submit(getActionRunnable(action, sf))); + } else { + //This is not ideal, we should use a ExecutorService, that way we can pass future + // back to the subscription, so if the user un-subscribe from the parent we can + // request the Future to cancel. This will always execute, meaning we could + // lock of the retrofit threads if a request is active for a long time. + // I would potentially force an API change to make sure this is always an ExecutorService + s = Subscriptions.empty(); + executor.execute(getActionRunnable(action, sf)); + } + + sf.set(s); + innerSubscription.add(s); + return s; + } + + private Runnable getActionRunnable(final Action0 action, final AtomicReference sf) { + return new Runnable() { + @Override + public void run() { + try { + if (innerSubscription.isUnsubscribed()) return; + action.call(); + } finally { + // remove the subscription now that we're completed + Subscription s = sf.get(); + if (s != null) innerSubscription.remove(s); + } + } + }; + } + + @Override + public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { + throw new UnsupportedOperationException("This Scheduler does not support timed requests"); + } + + @Override + public void unsubscribe() { + innerSubscription.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return innerSubscription.isUnsubscribed(); + } + } + } } private class RestHandler implements InvocationHandler { From 7e3bebc74f5f6b278bd9a0fedae5a41b11504219 Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Mon, 28 Apr 2014 15:30:34 +0100 Subject: [PATCH 02/14] removed unused imports --- retrofit/src/main/java/retrofit/RestAdapter.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/retrofit/src/main/java/retrofit/RestAdapter.java b/retrofit/src/main/java/retrofit/RestAdapter.java index 1845541498..26de1876e9 100644 --- a/retrofit/src/main/java/retrofit/RestAdapter.java +++ b/retrofit/src/main/java/retrofit/RestAdapter.java @@ -22,20 +22,12 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.lang.reflect.Type; -import java.util.Collection; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import retrofit.Profiler.RequestInformation; @@ -54,7 +46,6 @@ import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; -import rx.schedulers.Schedulers; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; From 966254bca86f1e7e6d6e1cd158958ea002ba4165 Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Mon, 28 Apr 2014 15:34:38 +0100 Subject: [PATCH 03/14] checkstyle fixes --- .../src/main/java/retrofit/RestAdapter.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/retrofit/src/main/java/retrofit/RestAdapter.java b/retrofit/src/main/java/retrofit/RestAdapter.java index 26de1876e9..d0eb854a28 100644 --- a/retrofit/src/main/java/retrofit/RestAdapter.java +++ b/retrofit/src/main/java/retrofit/RestAdapter.java @@ -296,13 +296,16 @@ public Subscription schedule(final Action0 action) { final AtomicReference sf = new AtomicReference(); final Subscription s; if (executor instanceof ExecutorService) { - s = Subscriptions.from(((ExecutorService) executor).submit(getActionRunnable(action, sf))); + s = Subscriptions.from(((ExecutorService) executor).submit( + getActionRunnable(action, sf))); } else { - //This is not ideal, we should use a ExecutorService, that way we can pass future - // back to the subscription, so if the user un-subscribe from the parent we can - // request the Future to cancel. This will always execute, meaning we could - // lock of the retrofit threads if a request is active for a long time. - // I would potentially force an API change to make sure this is always an ExecutorService + /* + This is not ideal, we should use a ExecutorService, that way we can pass future + back to the subscription, so if the user un-subscribe from the parent we can + request the Future to cancel. This will always execute, meaning we could + lock of the retrofit threads if a request is active for a long time. + I would potentially force an API change to make sure this is always an ExecutorService + */ s = Subscriptions.empty(); executor.execute(getActionRunnable(action, sf)); } @@ -312,7 +315,8 @@ public Subscription schedule(final Action0 action) { return s; } - private Runnable getActionRunnable(final Action0 action, final AtomicReference sf) { + private Runnable getActionRunnable(final Action0 action, + final AtomicReference sf) { return new Runnable() { @Override public void run() { From a048035a333c065c4e372334346d1563732377e4 Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Mon, 28 Apr 2014 16:03:36 +0100 Subject: [PATCH 04/14] migrated Scheduler into the Schedulers class for shared usage. --- .../main/java/retrofit/MockRestAdapter.java | 3 +- .../src/main/java/retrofit/RestAdapter.java | 85 +------------- .../src/main/java/retrofit/Schedulers.java | 110 ++++++++++++++++++ 3 files changed, 112 insertions(+), 86 deletions(-) create mode 100644 retrofit/src/main/java/retrofit/Schedulers.java diff --git a/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java b/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java index 7b1c1ff91e..d3d2f3af74 100644 --- a/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java +++ b/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java @@ -14,7 +14,6 @@ import rx.Observable; import rx.Scheduler; import rx.Subscriber; -import rx.schedulers.Schedulers; import static retrofit.RestAdapter.LogLevel; import static retrofit.RetrofitError.unexpectedError; @@ -529,7 +528,7 @@ private static class MockRxSupport { private final ErrorHandler errorHandler; MockRxSupport(RestAdapter restAdapter) { - scheduler = Schedulers.executor(restAdapter.httpExecutor); + scheduler = new Schedulers.RetrofitScheduler(restAdapter.httpExecutor); errorHandler = restAdapter.errorHandler; } diff --git a/retrofit/src/main/java/retrofit/RestAdapter.java b/retrofit/src/main/java/retrofit/RestAdapter.java index d0eb854a28..e770c9cb37 100644 --- a/retrofit/src/main/java/retrofit/RestAdapter.java +++ b/retrofit/src/main/java/retrofit/RestAdapter.java @@ -239,7 +239,7 @@ private static final class RxSupport { private final ErrorHandler errorHandler; RxSupport(Executor executor, ErrorHandler errorHandler) { - this.scheduler = new RetrofitScheduler(executor); + this.scheduler = new Schedulers.RetrofitScheduler(executor); this.errorHandler = errorHandler; } @@ -265,89 +265,6 @@ Observable createRequestObservable(final Callable request) { } }).subscribeOn(scheduler); } - - static class RetrofitScheduler extends Scheduler { - private final Executor executorService; - - /*package*/ RetrofitScheduler(Executor executorService) { - this.executorService = executorService; - } - - @Override - public Worker createWorker() { - return new EventLoopScheduler(executorService); - } - - static class EventLoopScheduler extends Scheduler.Worker implements Subscription { - private final CompositeSubscription innerSubscription = new CompositeSubscription(); - private final Executor executor; - - /* package */ EventLoopScheduler(Executor executor) { - this.executor = executor; - } - - @Override - public Subscription schedule(final Action0 action) { - if (innerSubscription.isUnsubscribed()) { - // don't schedule, we are unsubscribed - return Subscriptions.empty(); - } - - final AtomicReference sf = new AtomicReference(); - final Subscription s; - if (executor instanceof ExecutorService) { - s = Subscriptions.from(((ExecutorService) executor).submit( - getActionRunnable(action, sf))); - } else { - /* - This is not ideal, we should use a ExecutorService, that way we can pass future - back to the subscription, so if the user un-subscribe from the parent we can - request the Future to cancel. This will always execute, meaning we could - lock of the retrofit threads if a request is active for a long time. - I would potentially force an API change to make sure this is always an ExecutorService - */ - s = Subscriptions.empty(); - executor.execute(getActionRunnable(action, sf)); - } - - sf.set(s); - innerSubscription.add(s); - return s; - } - - private Runnable getActionRunnable(final Action0 action, - final AtomicReference sf) { - return new Runnable() { - @Override - public void run() { - try { - if (innerSubscription.isUnsubscribed()) return; - action.call(); - } finally { - // remove the subscription now that we're completed - Subscription s = sf.get(); - if (s != null) innerSubscription.remove(s); - } - } - }; - } - - @Override - public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { - throw new UnsupportedOperationException("This Scheduler does not support timed requests"); - } - - @Override - public void unsubscribe() { - innerSubscription.unsubscribe(); - } - - @Override - public boolean isUnsubscribed() { - return innerSubscription.isUnsubscribed(); - } - } - } } private class RestHandler implements InvocationHandler { diff --git a/retrofit/src/main/java/retrofit/Schedulers.java b/retrofit/src/main/java/retrofit/Schedulers.java new file mode 100644 index 0000000000..1e248c4d81 --- /dev/null +++ b/retrofit/src/main/java/retrofit/Schedulers.java @@ -0,0 +1,110 @@ +package retrofit; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Scheduler; +import rx.Subscription; +import rx.functions.Action0; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; + +/** + * Indirect access to Scheduler API for + */ +/*package*/ final class Schedulers { + + /** + * RetrofitScheduler, similar to the {@link rx.schedulers.EventLoopsScheduler} in the same way + * it dumps requests onto a Executor, but we can pass in the Executor. + *

+ * This does not support Scheduled execution, which may cause issues with peoples implementations. + * If they are doing, wait() or debouncing() on this scheduler. Future implementations, should + * either add {@code schedule()} support, or let the user provide the {@link rx.Scheduler} to + * RestAdapter builder. + */ + static class RetrofitScheduler extends Scheduler { + private final Executor executorService; + + /*package*/ RetrofitScheduler(Executor executorService) { + this.executorService = executorService; + } + + @Override + public Worker createWorker() { + return new EventLoopScheduler(executorService); + } + + static class EventLoopScheduler extends Scheduler.Worker implements Subscription { + private final CompositeSubscription innerSubscription = new CompositeSubscription(); + private final Executor executor; + + /* package */ EventLoopScheduler(Executor executor) { + this.executor = executor; + } + + @Override + public Subscription schedule(final Action0 action) { + if (innerSubscription.isUnsubscribed()) { + // don't schedule, we are unsubscribed + return Subscriptions.empty(); + } + + final AtomicReference sf = new AtomicReference(); + final Subscription s; + if (executor instanceof ExecutorService) { + s = Subscriptions.from(((ExecutorService) executor).submit( + getActionRunnable(action, sf))); + } else { + /* + This is not ideal, we should use a ExecutorService, that way we can pass future + back to the subscription, so if the user un-subscribe from the parent we can + request the Future to cancel. This will always execute, meaning we could + lock of the retrofit threads if a request is active for a long time. + I would potentially force an API change to make sure this is always an ExecutorService + */ + s = Subscriptions.empty(); + executor.execute(getActionRunnable(action, sf)); + } + + sf.set(s); + innerSubscription.add(s); + return s; + } + + private Runnable getActionRunnable(final Action0 action, + final AtomicReference sf) { + return new Runnable() { + @Override + public void run() { + try { + if (innerSubscription.isUnsubscribed()) return; + action.call(); + } finally { + // remove the subscription now that we're completed + Subscription s = sf.get(); + if (s != null) innerSubscription.remove(s); + } + } + }; + } + + @Override + public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { + throw new UnsupportedOperationException("This Scheduler does not support timed requests"); + } + + @Override + public void unsubscribe() { + innerSubscription.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return innerSubscription.isUnsubscribed(); + } + } + } +} From ee29804b5aa5241705b0b87f8979c6c9c167ab65 Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Mon, 28 Apr 2014 16:05:07 +0100 Subject: [PATCH 05/14] checkstyle --- retrofit/src/main/java/retrofit/RestAdapter.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/retrofit/src/main/java/retrofit/RestAdapter.java b/retrofit/src/main/java/retrofit/RestAdapter.java index e770c9cb37..4a8225aa7e 100644 --- a/retrofit/src/main/java/retrofit/RestAdapter.java +++ b/retrofit/src/main/java/retrofit/RestAdapter.java @@ -26,9 +26,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import retrofit.Profiler.RequestInformation; import retrofit.client.Client; @@ -44,10 +42,6 @@ import rx.Observable; import rx.Scheduler; import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action0; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; /** * Adapts a Java interface to a REST API. From 53fee1058cf10680458746fa4e67bf97909baa41 Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Mon, 28 Apr 2014 16:55:46 +0100 Subject: [PATCH 06/14] cleaned up comments as-per feedback. --- .../src/main/java/retrofit/Schedulers.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/retrofit/src/main/java/retrofit/Schedulers.java b/retrofit/src/main/java/retrofit/Schedulers.java index 1e248c4d81..30dabf8ade 100644 --- a/retrofit/src/main/java/retrofit/Schedulers.java +++ b/retrofit/src/main/java/retrofit/Schedulers.java @@ -41,14 +41,14 @@ static class EventLoopScheduler extends Scheduler.Worker implements Subscription private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final Executor executor; - /* package */ EventLoopScheduler(Executor executor) { + /*package*/ EventLoopScheduler(Executor executor) { this.executor = executor; } @Override public Subscription schedule(final Action0 action) { if (innerSubscription.isUnsubscribed()) { - // don't schedule, we are unsubscribed + // Don't schedule, we are un-subscribed. return Subscriptions.empty(); } @@ -59,11 +59,13 @@ public Subscription schedule(final Action0 action) { getActionRunnable(action, sf))); } else { /* - This is not ideal, we should use a ExecutorService, that way we can pass future - back to the subscription, so if the user un-subscribe from the parent we can - request the Future to cancel. This will always execute, meaning we could - lock of the retrofit threads if a request is active for a long time. - I would potentially force an API change to make sure this is always an ExecutorService + This is not ideal, we should use a ExecutorService, that way we can pass Future + back to the subscription, so if the user un-subscribed from the parent we can + request the Future to cancel. + This will always execute, meaning we could lock up the retrofit threads if: + 1. The user un-subscribes before starting the execution in the pool. + 2. The request is active for a long time, timing out etc... + I would potentially force an API change to make sure this is always an ExecutorService. */ s = Subscriptions.empty(); executor.execute(getActionRunnable(action, sf)); @@ -83,7 +85,7 @@ public void run() { if (innerSubscription.isUnsubscribed()) return; action.call(); } finally { - // remove the subscription now that we're completed + // Remove the subscription now that we've completed. Subscription s = sf.get(); if (s != null) innerSubscription.remove(s); } @@ -93,7 +95,7 @@ public void run() { @Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { - throw new UnsupportedOperationException("This Scheduler does not support timed requests"); + throw new UnsupportedOperationException("This Scheduler does not support timed Actions"); } @Override From 37ef1a342b73b307093c88a2fb41a6a9a733175b Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Mon, 28 Apr 2014 16:57:53 +0100 Subject: [PATCH 07/14] Comment to end-of-line style --- retrofit/src/main/java/retrofit/Schedulers.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/retrofit/src/main/java/retrofit/Schedulers.java b/retrofit/src/main/java/retrofit/Schedulers.java index 30dabf8ade..5f18df62b6 100644 --- a/retrofit/src/main/java/retrofit/Schedulers.java +++ b/retrofit/src/main/java/retrofit/Schedulers.java @@ -58,15 +58,14 @@ public Subscription schedule(final Action0 action) { s = Subscriptions.from(((ExecutorService) executor).submit( getActionRunnable(action, sf))); } else { - /* - This is not ideal, we should use a ExecutorService, that way we can pass Future - back to the subscription, so if the user un-subscribed from the parent we can - request the Future to cancel. - This will always execute, meaning we could lock up the retrofit threads if: - 1. The user un-subscribes before starting the execution in the pool. - 2. The request is active for a long time, timing out etc... - I would potentially force an API change to make sure this is always an ExecutorService. - */ + // This is not ideal, we should use a ExecutorService, that way we can pass Future + // back to the subscription, so if the user un-subscribed from the parent we can + // request the Future to cancel. + // This will always execute, meaning we could lock up the retrofit threads if: + // 1. The user un-subscribes before starting the execution in the pool. + // 2. The request is active for a long time, timing out etc... + // I would potentially force an API change to make sure this is always an ExecutorService. + // s = Subscriptions.empty(); executor.execute(getActionRunnable(action, sf)); } From f58428750047bffa58fe48771f771d45e4b4c0d5 Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Mon, 28 Apr 2014 17:17:50 +0100 Subject: [PATCH 08/14] fix checkstyle --- retrofit/src/main/java/retrofit/Schedulers.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/retrofit/src/main/java/retrofit/Schedulers.java b/retrofit/src/main/java/retrofit/Schedulers.java index 5f18df62b6..90f438efb9 100644 --- a/retrofit/src/main/java/retrofit/Schedulers.java +++ b/retrofit/src/main/java/retrofit/Schedulers.java @@ -64,8 +64,8 @@ public Subscription schedule(final Action0 action) { // This will always execute, meaning we could lock up the retrofit threads if: // 1. The user un-subscribes before starting the execution in the pool. // 2. The request is active for a long time, timing out etc... - // I would potentially force an API change to make sure this is always an ExecutorService. - // + // I would potentially force an API change to make sure this is always an + // ExecutorService. s = Subscriptions.empty(); executor.execute(getActionRunnable(action, sf)); } From a350444428988a398bb976386c64731786f11b20 Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Mon, 28 Apr 2014 17:25:15 +0100 Subject: [PATCH 09/14] promoted RxSupport to retrofit. --- .../main/java/retrofit/MockRestAdapter.java | 2 +- .../src/main/java/retrofit/RestAdapter.java | 37 ---------- .../{Schedulers.java => RxSupport.java} | 72 ++++++++++++++----- 3 files changed, 54 insertions(+), 57 deletions(-) rename retrofit/src/main/java/retrofit/{Schedulers.java => RxSupport.java} (73%) diff --git a/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java b/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java index d3d2f3af74..fced3be929 100644 --- a/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java +++ b/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java @@ -528,7 +528,7 @@ private static class MockRxSupport { private final ErrorHandler errorHandler; MockRxSupport(RestAdapter restAdapter) { - scheduler = new Schedulers.RetrofitScheduler(restAdapter.httpExecutor); + scheduler = new RxSupport.RetrofitScheduler(restAdapter.httpExecutor); errorHandler = restAdapter.errorHandler; } diff --git a/retrofit/src/main/java/retrofit/RestAdapter.java b/retrofit/src/main/java/retrofit/RestAdapter.java index 4a8225aa7e..ba2941f3d4 100644 --- a/retrofit/src/main/java/retrofit/RestAdapter.java +++ b/retrofit/src/main/java/retrofit/RestAdapter.java @@ -39,9 +39,6 @@ import retrofit.mime.TypedByteArray; import retrofit.mime.TypedInput; import retrofit.mime.TypedOutput; -import rx.Observable; -import rx.Scheduler; -import rx.Subscriber; /** * Adapts a Java interface to a REST API. @@ -227,40 +224,6 @@ static RestMethodInfo getMethodInfo(Map cache, Method me } } - /** Indirection to avoid VerifyError if RxJava isn't present. */ - private static final class RxSupport { - private final Scheduler scheduler; - private final ErrorHandler errorHandler; - - RxSupport(Executor executor, ErrorHandler errorHandler) { - this.scheduler = new Schedulers.RetrofitScheduler(executor); - this.errorHandler = errorHandler; - } - - Observable createRequestObservable(final Callable request) { - return Observable.create(new Observable.OnSubscribe() { - @Override public void call(Subscriber subscriber) { - if (subscriber.isUnsubscribed()) { - return; - } - try { - ResponseWrapper wrapper = request.call(); - if (subscriber.isUnsubscribed()) { - return; - } - subscriber.onNext(wrapper.responseBody); - subscriber.onCompleted(); - } catch (RetrofitError e) { - subscriber.onError(errorHandler.handleError(e)); - } catch (Exception e) { - // This is from the Callable. It shouldn't actually throw. - throw new RuntimeException(e); - } - } - }).subscribeOn(scheduler); - } - } - private class RestHandler implements InvocationHandler { private final Map methodDetailsCache; diff --git a/retrofit/src/main/java/retrofit/Schedulers.java b/retrofit/src/main/java/retrofit/RxSupport.java similarity index 73% rename from retrofit/src/main/java/retrofit/Schedulers.java rename to retrofit/src/main/java/retrofit/RxSupport.java index 90f438efb9..307cf1cc6b 100644 --- a/retrofit/src/main/java/retrofit/Schedulers.java +++ b/retrofit/src/main/java/retrofit/RxSupport.java @@ -1,20 +1,54 @@ package retrofit; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import rx.Observable; import rx.Scheduler; +import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; /** - * Indirect access to Scheduler API for + * Indirection to avoid VerifyError if RxJava isn't present. */ -/*package*/ final class Schedulers { +final class RxSupport { + private final Scheduler scheduler; + private final ErrorHandler errorHandler; + + RxSupport(Executor executor, ErrorHandler errorHandler) { + this.scheduler = new RetrofitScheduler(executor); + this.errorHandler = errorHandler; + } + + Observable createRequestObservable(final Callable request) { + return Observable.create(new Observable.OnSubscribe() { + @Override public void call(Subscriber subscriber) { + if (subscriber.isUnsubscribed()) { + return; + } + try { + ResponseWrapper wrapper = request.call(); + if (subscriber.isUnsubscribed()) { + return; + } + subscriber.onNext(wrapper.responseBody); + subscriber.onCompleted(); + } catch (RetrofitError e) { + subscriber.onError(errorHandler.handleError(e)); + } catch (Exception e) { + // This is from the Callable. It shouldn't actually throw. + throw new RuntimeException(e); + } + } + }).subscribeOn(scheduler); + } + /** * RetrofitScheduler, similar to the {@link rx.schedulers.EventLoopsScheduler} in the same way @@ -37,11 +71,11 @@ public Worker createWorker() { return new EventLoopScheduler(executorService); } - static class EventLoopScheduler extends Scheduler.Worker implements Subscription { + static class EventLoopScheduler extends Worker implements Subscription { private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final Executor executor; - /*package*/ EventLoopScheduler(Executor executor) { + EventLoopScheduler(Executor executor) { this.executor = executor; } @@ -75,6 +109,21 @@ public Subscription schedule(final Action0 action) { return s; } + @Override + public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { + throw new UnsupportedOperationException("This Scheduler does not support timed Actions"); + } + + @Override + public void unsubscribe() { + innerSubscription.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return innerSubscription.isUnsubscribed(); + } + private Runnable getActionRunnable(final Action0 action, final AtomicReference sf) { return new Runnable() { @@ -91,21 +140,6 @@ public void run() { } }; } - - @Override - public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { - throw new UnsupportedOperationException("This Scheduler does not support timed Actions"); - } - - @Override - public void unsubscribe() { - innerSubscription.unsubscribe(); - } - - @Override - public boolean isUnsubscribed() { - return innerSubscription.isUnsubscribed(); - } } } } From 17360ca284e8d20d4e2a62e8e3e94588e0c077b9 Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Mon, 28 Apr 2014 22:44:53 +0100 Subject: [PATCH 10/14] as per @jhump's suggestions simplified executing a task. passed to the executor. --- .../src/main/java/retrofit/RxSupport.java | 22 +++++-------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/retrofit/src/main/java/retrofit/RxSupport.java b/retrofit/src/main/java/retrofit/RxSupport.java index 307cf1cc6b..2a63545bbb 100644 --- a/retrofit/src/main/java/retrofit/RxSupport.java +++ b/retrofit/src/main/java/retrofit/RxSupport.java @@ -2,7 +2,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -87,22 +87,10 @@ public Subscription schedule(final Action0 action) { } final AtomicReference sf = new AtomicReference(); - final Subscription s; - if (executor instanceof ExecutorService) { - s = Subscriptions.from(((ExecutorService) executor).submit( - getActionRunnable(action, sf))); - } else { - // This is not ideal, we should use a ExecutorService, that way we can pass Future - // back to the subscription, so if the user un-subscribed from the parent we can - // request the Future to cancel. - // This will always execute, meaning we could lock up the retrofit threads if: - // 1. The user un-subscribes before starting the execution in the pool. - // 2. The request is active for a long time, timing out etc... - // I would potentially force an API change to make sure this is always an - // ExecutorService. - s = Subscriptions.empty(); - executor.execute(getActionRunnable(action, sf)); - } + final FutureTask futureTask = + new FutureTask(getActionRunnable(action, sf), null); + final Subscription s = Subscriptions.from(futureTask); + executor.execute(futureTask); sf.set(s); innerSubscription.add(s); From e009ea81d1597824eb936b752e4a0b2cee752322 Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Tue, 29 Apr 2014 17:10:21 +0100 Subject: [PATCH 11/14] Comments cleanup --- retrofit/src/main/java/retrofit/RxSupport.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/retrofit/src/main/java/retrofit/RxSupport.java b/retrofit/src/main/java/retrofit/RxSupport.java index 2a63545bbb..6f65d8cb56 100644 --- a/retrofit/src/main/java/retrofit/RxSupport.java +++ b/retrofit/src/main/java/retrofit/RxSupport.java @@ -15,7 +15,11 @@ import rx.subscriptions.Subscriptions; /** - * Indirection to avoid VerifyError if RxJava isn't present. + * Utilities for supporting RxJava Observables. + * Used primarily by {@link retrofit.RestAdapter}. + * + * Remember RxJava might not be on the classpath, check its included before calling, use + * {@link Platform#HAS_RX_JAVA} */ final class RxSupport { private final Scheduler scheduler; @@ -53,7 +57,7 @@ Observable createRequestObservable(final Callable request) { /** * RetrofitScheduler, similar to the {@link rx.schedulers.EventLoopsScheduler} in the same way * it dumps requests onto a Executor, but we can pass in the Executor. - *

+ * * This does not support Scheduled execution, which may cause issues with peoples implementations. * If they are doing, wait() or debouncing() on this scheduler. Future implementations, should * either add {@code schedule()} support, or let the user provide the {@link rx.Scheduler} to From 2765e400538ab25cee049be3c9cb8094b31e4dbd Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Wed, 30 Apr 2014 12:30:18 +0100 Subject: [PATCH 12/14] Moved away from Scheduler and Execute inside OnSubscribe. --- .../src/main/java/retrofit/RxSupport.java | 108 +++--------------- 1 file changed, 18 insertions(+), 90 deletions(-) diff --git a/retrofit/src/main/java/retrofit/RxSupport.java b/retrofit/src/main/java/retrofit/RxSupport.java index 6f65d8cb56..05f22fa89b 100644 --- a/retrofit/src/main/java/retrofit/RxSupport.java +++ b/retrofit/src/main/java/retrofit/RxSupport.java @@ -3,15 +3,10 @@ import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import rx.Observable; -import rx.Scheduler; import rx.Subscriber; import rx.Subscription; -import rx.functions.Action0; -import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; /** @@ -22,11 +17,11 @@ * {@link Platform#HAS_RX_JAVA} */ final class RxSupport { - private final Scheduler scheduler; + private final Executor executor; private final ErrorHandler errorHandler; RxSupport(Executor executor, ErrorHandler errorHandler) { - this.scheduler = new RetrofitScheduler(executor); + this.executor = executor; this.errorHandler = errorHandler; } @@ -36,11 +31,25 @@ Observable createRequestObservable(final Callable request) { if (subscriber.isUnsubscribed()) { return; } + final FutureTask task = new FutureTask(getRunnable(subscriber, request), null); + final Subscription s = Subscriptions.from(task); + // We add our subscription to the current subscriber so the future task can be + // unSubscribed from from thegc + subscriber.add(s); + executor.execute(task); + } + }); + } + + private Runnable getRunnable(final Subscriber subscriber, + final Callable request) { + return new Runnable() { + @Override public void run() { try { - ResponseWrapper wrapper = request.call(); if (subscriber.isUnsubscribed()) { return; } + ResponseWrapper wrapper = request.call(); subscriber.onNext(wrapper.responseBody); subscriber.onCompleted(); } catch (RetrofitError e) { @@ -50,88 +59,7 @@ Observable createRequestObservable(final Callable request) { throw new RuntimeException(e); } } - }).subscribeOn(scheduler); + }; } - - /** - * RetrofitScheduler, similar to the {@link rx.schedulers.EventLoopsScheduler} in the same way - * it dumps requests onto a Executor, but we can pass in the Executor. - * - * This does not support Scheduled execution, which may cause issues with peoples implementations. - * If they are doing, wait() or debouncing() on this scheduler. Future implementations, should - * either add {@code schedule()} support, or let the user provide the {@link rx.Scheduler} to - * RestAdapter builder. - */ - static class RetrofitScheduler extends Scheduler { - private final Executor executorService; - - /*package*/ RetrofitScheduler(Executor executorService) { - this.executorService = executorService; - } - - @Override - public Worker createWorker() { - return new EventLoopScheduler(executorService); - } - - static class EventLoopScheduler extends Worker implements Subscription { - private final CompositeSubscription innerSubscription = new CompositeSubscription(); - private final Executor executor; - - EventLoopScheduler(Executor executor) { - this.executor = executor; - } - - @Override - public Subscription schedule(final Action0 action) { - if (innerSubscription.isUnsubscribed()) { - // Don't schedule, we are un-subscribed. - return Subscriptions.empty(); - } - - final AtomicReference sf = new AtomicReference(); - final FutureTask futureTask = - new FutureTask(getActionRunnable(action, sf), null); - final Subscription s = Subscriptions.from(futureTask); - executor.execute(futureTask); - - sf.set(s); - innerSubscription.add(s); - return s; - } - - @Override - public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { - throw new UnsupportedOperationException("This Scheduler does not support timed Actions"); - } - - @Override - public void unsubscribe() { - innerSubscription.unsubscribe(); - } - - @Override - public boolean isUnsubscribed() { - return innerSubscription.isUnsubscribed(); - } - - private Runnable getActionRunnable(final Action0 action, - final AtomicReference sf) { - return new Runnable() { - @Override - public void run() { - try { - if (innerSubscription.isUnsubscribed()) return; - action.call(); - } finally { - // Remove the subscription now that we've completed. - Subscription s = sf.get(); - if (s != null) innerSubscription.remove(s); - } - } - }; - } - } - } } From 2c19f4233f9260744c40a42d57fb43d932c66884 Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Wed, 30 Apr 2014 18:45:17 +0100 Subject: [PATCH 13/14] Fixed MockRestAdapter to mimic new OnSubscribe, added RxSupportTests --- .../main/java/retrofit/MockRestAdapter.java | 36 ++-- .../src/test/java/retrofit/RxSupportTest.java | 186 ++++++++++++++++++ 2 files changed, 207 insertions(+), 15 deletions(-) create mode 100644 retrofit/src/test/java/retrofit/RxSupportTest.java diff --git a/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java b/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java index fced3be929..1746c13d98 100644 --- a/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java +++ b/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java @@ -8,11 +8,11 @@ import java.lang.reflect.Proxy; import java.util.Map; import java.util.Random; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import retrofit.client.Request; import retrofit.client.Response; import rx.Observable; -import rx.Scheduler; import rx.Subscriber; import static retrofit.RestAdapter.LogLevel; @@ -524,30 +524,36 @@ private static long uptimeMillis() { /** Indirection to avoid VerifyError if RxJava isn't present. */ private static class MockRxSupport { - private final Scheduler scheduler; + private final Executor httpExecutor; private final ErrorHandler errorHandler; MockRxSupport(RestAdapter restAdapter) { - scheduler = new RxSupport.RetrofitScheduler(restAdapter.httpExecutor); + httpExecutor = restAdapter.httpExecutor; errorHandler = restAdapter.errorHandler; } Observable createMockObservable(final MockHandler mockHandler, final RestMethodInfo methodInfo, final RequestInterceptor interceptor, final Object[] args) { return Observable.create(new Observable.OnSubscribe() { - @Override public void call(Subscriber subscriber) { - try { - Observable observable = - (Observable) mockHandler.invokeSync(methodInfo, interceptor, args); - //noinspection unchecked - observable.subscribe(subscriber); - } catch (RetrofitError e) { - subscriber.onError(errorHandler.handleError(e)); - } catch (Throwable e) { - subscriber.onError(e); - } + @Override public void call(final Subscriber subscriber) { + if (subscriber.isUnsubscribed()) return; + httpExecutor.execute(new Runnable() { + @Override public void run() { + try { + if (subscriber.isUnsubscribed()) return; + Observable observable = + (Observable) mockHandler.invokeSync(methodInfo, interceptor, args); + //noinspection unchecked + observable.subscribe(subscriber); + } catch (RetrofitError e) { + subscriber.onError(errorHandler.handleError(e)); + } catch (Throwable e) { + subscriber.onError(e); + } + } + }); } - }).subscribeOn(scheduler); + }); } } } diff --git a/retrofit/src/test/java/retrofit/RxSupportTest.java b/retrofit/src/test/java/retrofit/RxSupportTest.java new file mode 100644 index 0000000000..e37da9c513 --- /dev/null +++ b/retrofit/src/test/java/retrofit/RxSupportTest.java @@ -0,0 +1,186 @@ +package retrofit; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import retrofit.client.Header; +import retrofit.client.Response; +import retrofit.mime.TypedInput; +import rx.Observer; +import rx.Subscription; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class RxSupportTest { + + private Object response; + private ResponseWrapper responseWrapper; + private Callable callable = spy(new Callable() { + @Override public ResponseWrapper call() throws Exception { + return responseWrapper; + } + }); + + private QueuedSynchronousExecutor executor; + private ErrorHandler errorHandler; + private RxSupport rxSupport; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + response = new Object(); + responseWrapper = new ResponseWrapper( + new Response( + "http://example.com", 200, "Success", + Collections.
emptyList(), mock(TypedInput.class) + ), response + ); + executor = spy(new QueuedSynchronousExecutor()); + errorHandler = ErrorHandler.DEFAULT; + rxSupport = new RxSupport(executor, errorHandler); + } + + @Mock + Observer subscriber; + + @Test + public void testObservableCallsOnNextOnHttpExecutor() throws Exception { + rxSupport.createRequestObservable(callable).subscribe(subscriber); + executor.executeNextInQueue(); + verify(subscriber, times(1)).onNext(response); + } + + @Test + public void testObservableCallsOnNextOnHttpExecutorWithSubscriber() throws Exception { + TestScheduler test = Schedulers.test(); + rxSupport.createRequestObservable(callable).subscribeOn(test).subscribe(subscriber); + // Subscription is handled via the Scheduler. + test.triggerActions(); + // This will only execute up to the executor in OnSubscribe. + verify(subscriber, never()).onNext(any()); + // Upon continuing the executor we then run the retrofit request. + executor.executeNextInQueue(); + verify(subscriber, times(1)).onNext(response); + } + + @Test + public void testObservableUnSubscribesDoesNotExecuteCallable() throws Exception { + Subscription subscription = rxSupport.createRequestObservable(callable).subscribe(subscriber); + verify(subscriber, never()).onNext(any()); + + // UnSubscribe here should cancel the queued runnable. + subscription.unsubscribe(); + + executor.executeNextInQueue(); + verify(callable, never()).call(); + verify(subscriber, never()).onNext(response); + } + + @Test + public void testObservableCallsOperatorsOffHttpExecutor() throws Exception { + TestScheduler test = Schedulers.test(); + rxSupport.createRequestObservable(callable) + .delaySubscription(1000, TimeUnit.MILLISECONDS, test) + .subscribe(subscriber); + + verify(subscriber, never()).onNext(any()); + test.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + // Upon continuing the executor we then run the retrofit request. + executor.executeNextInQueue(); + verify(subscriber, times(1)).onNext(response); + } + + @Test + public void testObservableDoesNotLockExecutor() throws Exception { + TestScheduler test = Schedulers.test(); + Subscription subscription1 = rxSupport.createRequestObservable(callable) + .delay(1000, TimeUnit.MILLISECONDS, test) + .subscribe(subscriber); + + Subscription subscription2 = rxSupport.createRequestObservable(callable) + .delay(2000, TimeUnit.MILLISECONDS, test) + .subscribe(subscriber); + + // Nothing fired yet + verify(subscriber, never()).onNext(any()); + // Subscriptions should of been queued up and executed even tho we delayed on the Subscriber. + executor.executeNextInQueue(); + executor.executeNextInQueue(); + + verify(subscriber, never()).onNext(response); + + test.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + verify(subscriber, times(1)).onNext(response); + + test.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + verify(subscriber, times(2)).onNext(response); + } + + @Test + public void testObservableRespectsObserveOn() throws Exception { + TestScheduler observe = Schedulers.test(); + rxSupport.createRequestObservable(callable) + .observeOn(observe) + .subscribe(subscriber); + + verify(subscriber, never()).onNext(any()); + executor.executeNextInQueue(); + + // Should have no response yet, but callback should of been executed. + verify(subscriber, never()).onNext(any()); + verify(callable, times(1)).call(); + + // Forward the Observable Scheduler + observe.triggerActions(); + verify(subscriber, times(1)).onNext(response); + } + + /** + * Test Executor to iterate through Executions to aid in checking + * that the Observable implementation is correct. + */ + static class QueuedSynchronousExecutor implements Executor { + Deque runnableQueue = new ArrayDeque(); + + @Override public void execute(Runnable runnable) { + runnableQueue.add(runnable); + } + + /** + * Will throw exception if you are expecting something to be added to the Executor + * and it hasn't. + */ + void executeNextInQueue() { + runnableQueue.removeFirst().run(); + } + + /** + * Executes any queued executions on the executor. + */ + void executeAll() { + Iterator iterator = runnableQueue.iterator(); + while (iterator.hasNext()) { + Runnable next = iterator.next(); + next.run(); + iterator.remove(); + } + } + } +} \ No newline at end of file From 97db5c9a959cfd8c1a3f8af109c5e41b486a52ab Mon Sep 17 00:00:00 2001 From: Christopher Jenkins Date: Wed, 30 Apr 2014 18:58:00 +0100 Subject: [PATCH 14/14] comment clean up --- retrofit/src/main/java/retrofit/RxSupport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/retrofit/src/main/java/retrofit/RxSupport.java b/retrofit/src/main/java/retrofit/RxSupport.java index 05f22fa89b..b728205e72 100644 --- a/retrofit/src/main/java/retrofit/RxSupport.java +++ b/retrofit/src/main/java/retrofit/RxSupport.java @@ -34,7 +34,7 @@ Observable createRequestObservable(final Callable request) { final FutureTask task = new FutureTask(getRunnable(subscriber, request), null); final Subscription s = Subscriptions.from(task); // We add our subscription to the current subscriber so the future task can be - // unSubscribed from from thegc + // unSubscribed from. subscriber.add(s); executor.execute(task); }