diff --git a/hystrix-contrib/hystrix-clj/src/test/clojure/com/netflix/hystrix/core_test.clj b/hystrix-contrib/hystrix-clj/src/test/clojure/com/netflix/hystrix/core_test.clj index 2a0f9a8c8..be73314c9 100644 --- a/hystrix-contrib/hystrix-clj/src/test/clojure/com/netflix/hystrix/core_test.clj +++ b/hystrix-contrib/hystrix-clj/src/test/clojure/com/netflix/hystrix/core_test.clj @@ -18,7 +18,8 @@ (:use com.netflix.hystrix.core) (:require [clojure.test :refer [deftest testing is are use-fixtures]]) (:import [com.netflix.hystrix Hystrix HystrixExecutable] - [com.netflix.hystrix.strategy.concurrency HystrixRequestContext])) + [com.netflix.hystrix.strategy.concurrency HystrixRequestContext] + [com.netflix.hystrix.exception HystrixRuntimeException])) ; reset hystrix after each execution, for consistency and sanity (defn reset-fixture @@ -145,9 +146,9 @@ (execute (instantiate (normalize (assoc base-def :run-fn str)) "hello" "-" "world")))) - (testing "throws IllegalStateException if called twice on same instance" + (testing "throws HystrixRuntimeException if called twice on same instance" (let [instance (instantiate (normalize (assoc base-def :run-fn str)) "hi")] - (is (thrown? IllegalStateException + (is (thrown? HystrixRuntimeException (execute instance) (execute instance))))) @@ -166,12 +167,6 @@ :command-key :my-command :run-fn + }] - (testing "throws IllegalStateException if called twice on same instance" - (let [instance (instantiate (normalize base-def))] - (is (thrown? IllegalStateException - (queue instance) - (queue instance))))) - (testing "queues a HystrixCommand" (is (= "hello-world") (.get (queue (instantiate (normalize (assoc base-def :run-fn str)) diff --git a/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandler.java b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandler.java index a6e7734a4..1c50acc56 100644 --- a/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandler.java +++ b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandler.java @@ -27,7 +27,6 @@ import rx.Observable; import rx.Subscription; import rx.functions.Action1; -import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; import rx.subjects.Subject; import rx.subscriptions.MultipleAssignmentSubscription; diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java index f464817e9..176284391 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java @@ -361,33 +361,8 @@ public void call() { * if invoked more than once */ public Observable toObservable() { - /* this is a stateful object so can only be used once */ - if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { - throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); - } - - commandStartTimestamp = System.currentTimeMillis(); - - if (properties.requestLogEnabled().get()) { - // log this command execution regardless of what happened - if (currentRequestLog != null) { - currentRequestLog.addExecutedCommand(this); - } - } - - final boolean requestCacheEnabled = isRequestCachingEnabled(); - final String cacheKey = getCacheKey(); final AbstractCommand _cmd = this; - /* try from cache first */ - if (requestCacheEnabled) { - HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache) requestCache.get(cacheKey); - if (fromCache != null) { - isResponseFromCache = true; - return handleRequestCacheHitAndEmitValues(fromCache, _cmd); - } - } - //doOnCompleted handler already did all of the SUCCESS work //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work final Action0 terminateCommandCleanup = new Action0() { @@ -458,35 +433,68 @@ public void call() { } }; - Observable hystrixObservable = - Observable.defer(applyHystrixSemantics) - .map(wrapWithAllOnNextHooks); + return Observable.defer(new Func0>() { + @Override + public Observable call() { + /* this is a stateful object so can only be used once */ + if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { + IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); + //TODO make a new error type for this + throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); + } + commandStartTimestamp = System.currentTimeMillis(); - Observable afterCache; + if (properties.requestLogEnabled().get()) { + // log this command execution regardless of what happened + if (currentRequestLog != null) { + currentRequestLog.addExecutedCommand(_cmd); + } + } - // put in cache - if (requestCacheEnabled && cacheKey != null) { - // wrap it for caching - HystrixCachedObservable toCache = HystrixCachedObservable.from(hystrixObservable, this); - HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache) requestCache.putIfAbsent(cacheKey, toCache); - if (fromCache != null) { - // another thread beat us so we'll use the cached value instead - toCache.unsubscribe(); - isResponseFromCache = true; - return handleRequestCacheHitAndEmitValues(fromCache, _cmd); - } else { - // we just created an ObservableCommand so we cast and return it - afterCache = toCache.toObservable(); - } - } else { - afterCache = hystrixObservable; - } + final boolean requestCacheEnabled = isRequestCachingEnabled(); + final String cacheKey = getCacheKey(); + + /* try from cache first */ + if (requestCacheEnabled) { + HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache) requestCache.get(cacheKey); + if (fromCache != null) { + isResponseFromCache = true; + return handleRequestCacheHitAndEmitValues(fromCache, _cmd); + } + } + + Observable hystrixObservable = + Observable.defer(applyHystrixSemantics) + .map(wrapWithAllOnNextHooks); + + + Observable afterCache; - return afterCache - .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) - .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once - .doOnCompleted(fireOnCompletedHook); + // put in cache + if (requestCacheEnabled && cacheKey != null) { + // wrap it for caching + HystrixCachedObservable toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); + HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache) requestCache.putIfAbsent(cacheKey, toCache); + if (fromCache != null) { + // another thread beat us so we'll use the cached value instead + toCache.unsubscribe(); + isResponseFromCache = true; + return handleRequestCacheHitAndEmitValues(fromCache, _cmd); + } else { + // we just created an ObservableCommand so we cast and return it + afterCache = toCache.toObservable(); + } + } else { + afterCache = hystrixObservable; + } + + return afterCache + .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) + .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once + .doOnCompleted(fireOnCompletedHook); + } + }); } private Observable applyHystrixSemantics(final AbstractCommand _cmd) { diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java index b73d61c35..7e8fe6145 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java @@ -113,19 +113,11 @@ public void testExecutionMultipleTimes() { // second should fail command.execute(); fail("we should not allow this ... it breaks the state of request logs"); - } catch (IllegalStateException e) { + } catch (HystrixRuntimeException e) { e.printStackTrace(); // we want to get here } - try { - // queue should also fail - command.queue(); - fail("we should not allow this ... it breaks the state of request logs"); - } catch (IllegalStateException e) { - e.printStackTrace(); - // we want to get here - } assertEquals(0, command.getBuilder().metrics.getCurrentConcurrentExecutionCount()); assertSaneHystrixRequestLog(1); assertCommandExecutionEvents(command, HystrixEventType.SUCCESS); @@ -2070,7 +2062,7 @@ public void testBasicExecutionWorksWithoutRequestVariable() throws Exception { /** * Test that if we try and execute a command with a cacheKey without initializing RequestVariable that it gives an error. */ - @Test(expected = IllegalStateException.class) + @Test(expected = HystrixRuntimeException.class) public void testCacheKeyExecutionRequiresRequestVariable() throws Exception { /* force the RequestVariable to not be initialized */ HystrixRequestContext.setContextOnCurrentThread(null); diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java index bae5a32bb..d129f2e47 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java @@ -183,16 +183,7 @@ private void testObserveMultipleTimes(ExecutionIsolationStrategy isolationStrate // second should fail command.observe().toBlocking().single(); fail("we should not allow this ... it breaks the state of request logs"); - } catch (IllegalStateException e) { - e.printStackTrace(); - // we want to get here - } - - try { - // queue should also fail - command.observe().toBlocking().toFuture(); - fail("we should not allow this ... it breaks the state of request logs"); - } catch (IllegalStateException e) { + } catch (HystrixRuntimeException e) { e.printStackTrace(); // we want to get here }