Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make AbstractCommand.toObservable lazy. #1274

Merged
merged 1 commit into from
Jul 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
(:use com.netflix.hystrix.core)
(:require [clojure.test :refer [deftest testing is are use-fixtures]])
(:import [com.netflix.hystrix Hystrix HystrixExecutable]
[com.netflix.hystrix.strategy.concurrency HystrixRequestContext]))
[com.netflix.hystrix.strategy.concurrency HystrixRequestContext]
[com.netflix.hystrix.exception HystrixRuntimeException]))

; reset hystrix after each execution, for consistency and sanity
(defn reset-fixture
Expand Down Expand Up @@ -145,9 +146,9 @@
(execute (instantiate (normalize (assoc base-def :run-fn str))
"hello" "-" "world"))))

(testing "throws IllegalStateException if called twice on same instance"
(testing "throws HystrixRuntimeException if called twice on same instance"
(let [instance (instantiate (normalize (assoc base-def :run-fn str)) "hi")]
(is (thrown? IllegalStateException
(is (thrown? HystrixRuntimeException
(execute instance)
(execute instance)))))

Expand All @@ -166,12 +167,6 @@
:command-key :my-command
:run-fn + }]

(testing "throws IllegalStateException if called twice on same instance"
(let [instance (instantiate (normalize base-def))]
(is (thrown? IllegalStateException
(queue instance)
(queue instance)))))

(testing "queues a HystrixCommand"
(is (= "hello-world")
(.get (queue (instantiate (normalize (assoc base-def :run-fn str))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.MultipleAssignmentSubscription;
Expand Down
108 changes: 58 additions & 50 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -361,33 +361,8 @@ public void call() {
* if invoked more than once
*/
public Observable<R> toObservable() {
/* this is a stateful object so can only be used once */
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
}

commandStartTimestamp = System.currentTimeMillis();

if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(this);
}
}

final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
final AbstractCommand<R> _cmd = this;

/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}

//doOnCompleted handler already did all of the SUCCESS work
//doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
final Action0 terminateCommandCleanup = new Action0() {
Expand Down Expand Up @@ -458,35 +433,68 @@ public void call() {
}
};

Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
/* this is a stateful object so can only be used once */
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}

commandStartTimestamp = System.currentTimeMillis();

Observable<R> afterCache;
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}

// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, this);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();

/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}

Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);


Observable<R> afterCache;

return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}

return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,11 @@ public void testExecutionMultipleTimes() {
// second should fail
command.execute();
fail("we should not allow this ... it breaks the state of request logs");
} catch (IllegalStateException e) {
} catch (HystrixRuntimeException e) {
e.printStackTrace();
// we want to get here
}

try {
// queue should also fail
command.queue();
fail("we should not allow this ... it breaks the state of request logs");
} catch (IllegalStateException e) {
e.printStackTrace();
// we want to get here
}
assertEquals(0, command.getBuilder().metrics.getCurrentConcurrentExecutionCount());
assertSaneHystrixRequestLog(1);
assertCommandExecutionEvents(command, HystrixEventType.SUCCESS);
Expand Down Expand Up @@ -2070,7 +2062,7 @@ public void testBasicExecutionWorksWithoutRequestVariable() throws Exception {
/**
* Test that if we try and execute a command with a cacheKey without initializing RequestVariable that it gives an error.
*/
@Test(expected = IllegalStateException.class)
@Test(expected = HystrixRuntimeException.class)
public void testCacheKeyExecutionRequiresRequestVariable() throws Exception {
/* force the RequestVariable to not be initialized */
HystrixRequestContext.setContextOnCurrentThread(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,7 @@ private void testObserveMultipleTimes(ExecutionIsolationStrategy isolationStrate
// second should fail
command.observe().toBlocking().single();
fail("we should not allow this ... it breaks the state of request logs");
} catch (IllegalStateException e) {
e.printStackTrace();
// we want to get here
}

try {
// queue should also fail
command.observe().toBlocking().toFuture();
fail("we should not allow this ... it breaks the state of request logs");
} catch (IllegalStateException e) {
} catch (HystrixRuntimeException e) {
e.printStackTrace();
// we want to get here
}
Expand Down