diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java index a2e455273..589dfae33 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java @@ -108,6 +108,8 @@ protected static enum TimedOutStatus { protected final AtomicReference isCommandTimedOut = new AtomicReference(TimedOutStatus.NOT_EXECUTED); protected final AtomicBoolean isExecutionComplete = new AtomicBoolean(false); protected final AtomicBoolean isExecutedInThread = new AtomicBoolean(false); + protected final AtomicReference endCurrentThreadExecutingCommand = new AtomicReference(); // don't like how this is being done + /** * Instance of RequestCache logic @@ -409,7 +411,17 @@ public void call() { metrics.markShortCircuited(); // short-circuit and go directly to fallback (or throw an exception if no fallback implemented) try { - getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited").unsafeSubscribe(observer); + getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited") + .map(new Func1() { + + @Override + public R call(R t1) { + // allow transforming the results via the executionHook if the fallback succeeds + return executionHook.onComplete(_this, t1); + } + + }) + .unsafeSubscribe(observer); } catch (Exception e) { observer.onError(e); } @@ -482,7 +494,6 @@ private Observable getRunObservableDecoratedForMetricsAndErrorHandling(final metrics.incrementConcurrentExecutionCount(); final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); - final AtomicReference endCurrentThreadExecutingCommand = new AtomicReference(); // don't like how this is being done Observable run = null; if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) { @@ -505,7 +516,12 @@ public void call(Subscriber s) { // store the command that is being run endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey())); isExecutedInThread.set(true); - getExecutionObservable().unsafeSubscribe(s); + getExecutionObservable().map(new Func1() { + @Override + public R call(R r) { + return executionHook.onRunSuccess(_self, r); + } + }).unsafeSubscribe(s); } catch (Throwable t) { // the run() method is a user provided implementation so can throw instead of using Observable.onError // so we catch it here and turn it into Observable.error @@ -521,7 +537,12 @@ public void call(Subscriber s) { // store the command that is being run endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey())); try { - run = getExecutionObservable(); + run = getExecutionObservable().map(new Func1() { + @Override + public R call(R r) { + return executionHook.onRunSuccess(_self, r); + } + }); } catch (Throwable t) { // the run() method is a user provided implementation so can throw instead of using Observable.onError // so we catch it here and turn it into Observable.error @@ -547,8 +568,7 @@ public R call(R t1) { executionResult = executionResult.addEvents(HystrixEventType.SUCCESS); once = true; } - - return executionHook.onRunSuccess(_self, t1); + return t1; } }).doOnCompleted(new Action0() { @@ -577,7 +597,7 @@ public Observable call(Throwable t) { } else if (t instanceof HystrixObservableTimeoutOperator.HystrixTimeoutException) { /** * Timeout handling - * + * * Callback is performed on the HystrixTimer thread. */ return getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException()); @@ -624,7 +644,7 @@ public Observable call(Throwable t) { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ - if (e instanceof HystrixBadRequestException){ + if (e instanceof HystrixBadRequestException) { return Observable.error(e); } @@ -647,13 +667,10 @@ public void call(Notification n) { }).doOnTerminate(new Action0() { @Override public void call() { - // pop the command that is being run - if (endCurrentThreadExecutingCommand.get() != null) { - endCurrentThreadExecutingCommand.get().call(); - } - if (isExecutedInThread.get()) { - threadPool.markThreadCompletion(); - executionHook.onThreadComplete(_self); + //if the command timed out, then we've reached this point in the calling thread + //but the Hystrix thread is still doing work. Let it handle these markers. + if (!isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { + handleThreadEnd(); } } }).map(new Func1() { @@ -854,6 +871,16 @@ public void call(Notification n) { } } + protected void handleThreadEnd() { + if (endCurrentThreadExecutingCommand.get() != null) { + endCurrentThreadExecutingCommand.get().call(); + } + if (isExecutedInThread.get()) { + threadPool.markThreadCompletion(); + executionHook.onThreadComplete(this); + } + } + private static class HystrixObservableTimeoutOperator implements Operator { final AbstractCommand originalCommand; diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/Hystrix.java b/hystrix-core/src/main/java/com/netflix/hystrix/Hystrix.java index 231b6a85f..3f84385e2 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/Hystrix.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/Hystrix.java @@ -61,6 +61,7 @@ private static void _reset() { HystrixCircuitBreaker.Factory.reset(); HystrixPlugins.reset(); HystrixPropertiesFactory.reset(); + currentCommand.set(new LinkedList()); } private static ThreadLocal> currentCommand = new ThreadLocal>() { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java index 5c4e7f7b3..22b2ec880 100755 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java @@ -24,6 +24,8 @@ import rx.Observable; import rx.Observable.OnSubscribe; import rx.Subscriber; +import rx.functions.Action0; +import rx.functions.Func1; import rx.schedulers.Schedulers; import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy; @@ -304,6 +306,15 @@ public void call(Subscriber s) { } } + }).doOnTerminate(new Action0() { + @Override + public void call() { + //If the command timed out, then the calling thread has already walked away so we need + //to handle these markers. Otherwise, the calling thread will perform these for us. + if (isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { + handleThreadEnd(); + } + } }); } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java index 81f36591e..4b1ff4d76 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java @@ -16,6 +16,7 @@ package com.netflix.hystrix; import rx.Observable; +import rx.functions.Func1; import rx.schedulers.Schedulers; import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy; diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java index ae1470d0e..634b414be 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java @@ -3899,7 +3899,7 @@ public void testExecutionHookTimeoutWithoutFallback() { assertEquals(1, command.builder.executionHook.threadComplete.get()); // expected hook execution sequence - assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackError - onError - onRunSuccess - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); } @@ -3948,7 +3948,7 @@ public void testExecutionHookTimeoutWithFallback() { assertEquals(1, command.builder.executionHook.threadComplete.get()); // expected hook execution sequence - assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackSuccess - onComplete - onRunSuccess - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); } /** @@ -4007,11 +4007,58 @@ public void testExecutionHookRejectedWithFallback() { } + /** + * Execution hook on short-circuited with a fallback + */ + @Test + public void testExecutionHookShortCircuitedWithFallback() { + TestCircuitBreaker circuitBreaker = new TestCircuitBreaker().setForceShortCircuit(true); + KnownFailureTestCommandWithFallback command = new KnownFailureTestCommandWithFallback(circuitBreaker); + + try { + // now execute one that will be short-circuited + command.queue().get(); + } catch (Exception e) { + throw new RuntimeException("not expecting", e); + } + + assertTrue(command.isResponseShortCircuited()); + + // the run() method should not run as we're short-circuited + assertEquals(0, command.builder.executionHook.startRun.get()); + // we should not have a response because of short-circuit + assertNull(command.builder.executionHook.runSuccessResponse); + // we should not have an exception because we didn't run + assertNull(command.builder.executionHook.runFailureException); + + // the fallback() method should be run due to short-circuit + assertEquals(1, command.builder.executionHook.startFallback.get()); + // response since we have a fallback + assertNotNull(command.builder.executionHook.fallbackSuccessResponse); + // null since fallback succeeds + assertNull(command.builder.executionHook.fallbackFailureException); + + // execution occurred + assertEquals(1, command.builder.executionHook.startExecute.get()); + // we should have a response because of fallback + assertNotNull(command.builder.executionHook.endExecuteSuccessResponse); + // we should not have an exception because of fallback + assertNull(command.builder.executionHook.endExecuteFailureException); + + // thread execution + assertEquals(0, command.builder.executionHook.threadStart.get()); + assertEquals(0, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onFallbackStart - onFallbackSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString()); + + } + /** * Execution hook on short-circuit with a fallback */ @Test - public void testExecutionHookShortCircuitedWithFallbackViaQueue() { + public void testExecutionHookShortCircuitedWithoutFallbackViaQueue() { TestCircuitBreaker circuitBreaker = new TestCircuitBreaker().setForceShortCircuit(true); KnownFailureTestCommandWithoutFallback command = new KnownFailureTestCommandWithoutFallback(circuitBreaker); try { @@ -4059,7 +4106,7 @@ public void testExecutionHookShortCircuitedWithFallbackViaQueue() { * Execution hook on short-circuit with a fallback */ @Test - public void testExecutionHookShortCircuitedWithFallbackViaExecute() { + public void testExecutionHookShortCircuitedWithoutFallbackViaExecute() { TestCircuitBreaker circuitBreaker = new TestCircuitBreaker().setForceShortCircuit(true); KnownFailureTestCommandWithoutFallback command = new KnownFailureTestCommandWithoutFallback(circuitBreaker); try { diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixTest.java index f9d834c65..20b47709d 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixTest.java @@ -4,12 +4,18 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import org.junit.Before; import org.junit.Test; import com.netflix.hystrix.HystrixCommand.Setter; import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy; public class HystrixTest { + @Before + public void reset() { + Hystrix.reset(); + } + @Test public void testNotInThread() { assertNull(Hystrix.getCurrentThreadExecutingCommand());