Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Matching hook orders from 1.3.x #511

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ protected static enum TimedOutStatus {
protected final AtomicReference<TimedOutStatus> isCommandTimedOut = new AtomicReference<TimedOutStatus>(TimedOutStatus.NOT_EXECUTED);
protected final AtomicBoolean isExecutionComplete = new AtomicBoolean(false);
protected final AtomicBoolean isExecutedInThread = new AtomicBoolean(false);
protected final AtomicReference<Action0> endCurrentThreadExecutingCommand = new AtomicReference<Action0>(); // don't like how this is being done


/**
* Instance of RequestCache logic
Expand Down Expand Up @@ -409,7 +411,17 @@ public void call() {
metrics.markShortCircuited();
// short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
try {
getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited").unsafeSubscribe(observer);
getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited")
.map(new Func1<R, R>() {

@Override
public R call(R t1) {
// allow transforming the results via the executionHook if the fallback succeeds
return executionHook.onComplete(_this, t1);
}

})
.unsafeSubscribe(observer);
} catch (Exception e) {
observer.onError(e);
}
Expand Down Expand Up @@ -482,7 +494,6 @@ private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling(final
metrics.incrementConcurrentExecutionCount();

final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final AtomicReference<Action0> endCurrentThreadExecutingCommand = new AtomicReference<Action0>(); // don't like how this is being done

Observable<R> run = null;
if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
Expand All @@ -505,7 +516,12 @@ public void call(Subscriber<? super R> s) {
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
isExecutedInThread.set(true);
getExecutionObservable().unsafeSubscribe(s);
getExecutionObservable().map(new Func1<R, R>() {
@Override
public R call(R r) {
return executionHook.onRunSuccess(_self, r);
}
}).unsafeSubscribe(s);
} catch (Throwable t) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
Expand All @@ -521,7 +537,12 @@ public void call(Subscriber<? super R> s) {
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
try {
run = getExecutionObservable();
run = getExecutionObservable().map(new Func1<R, R>() {
@Override
public R call(R r) {
return executionHook.onRunSuccess(_self, r);
}
});
} catch (Throwable t) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
Expand All @@ -547,8 +568,7 @@ public R call(R t1) {
executionResult = executionResult.addEvents(HystrixEventType.SUCCESS);
once = true;
}

return executionHook.onRunSuccess(_self, t1);
return t1;
}

}).doOnCompleted(new Action0() {
Expand Down Expand Up @@ -577,7 +597,7 @@ public Observable<R> call(Throwable t) {
} else if (t instanceof HystrixObservableTimeoutOperator.HystrixTimeoutException) {
/**
* Timeout handling
*
*
* Callback is performed on the HystrixTimer thread.
*/
return getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
Expand Down Expand Up @@ -624,7 +644,7 @@ public Observable<R> call(Throwable t) {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException){
if (e instanceof HystrixBadRequestException) {
return Observable.error(e);
}

Expand All @@ -647,13 +667,10 @@ public void call(Notification<? super R> n) {
}).doOnTerminate(new Action0() {
@Override
public void call() {
// pop the command that is being run
if (endCurrentThreadExecutingCommand.get() != null) {
endCurrentThreadExecutingCommand.get().call();
}
if (isExecutedInThread.get()) {
threadPool.markThreadCompletion();
executionHook.onThreadComplete(_self);
//if the command timed out, then we've reached this point in the calling thread
//but the Hystrix thread is still doing work. Let it handle these markers.
if (!isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd();
}
}
}).map(new Func1<R, R>() {
Expand Down Expand Up @@ -854,6 +871,16 @@ public void call(Notification<? super R> n) {
}
}

protected void handleThreadEnd() {
if (endCurrentThreadExecutingCommand.get() != null) {
endCurrentThreadExecutingCommand.get().call();
}
if (isExecutedInThread.get()) {
threadPool.markThreadCompletion();
executionHook.onThreadComplete(this);
}
}

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

final AbstractCommand<R> originalCommand;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private static void _reset() {
HystrixCircuitBreaker.Factory.reset();
HystrixPlugins.reset();
HystrixPropertiesFactory.reset();
currentCommand.set(new LinkedList<HystrixCommandKey>());
}

private static ThreadLocal<LinkedList<HystrixCommandKey>> currentCommand = new ThreadLocal<LinkedList<HystrixCommandKey>>() {
Expand Down
11 changes: 11 additions & 0 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
Expand Down Expand Up @@ -304,6 +306,15 @@ public void call(Subscriber<? super R> s) {
}
}

}).doOnTerminate(new Action0() {
@Override
public void call() {
//If the command timed out, then the calling thread has already walked away so we need
//to handle these markers. Otherwise, the calling thread will perform these for us.
if (isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd();
}
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.netflix.hystrix;

import rx.Observable;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3899,7 +3899,7 @@ public void testExecutionHookTimeoutWithoutFallback() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackError - onError - onRunSuccess - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());

}

Expand Down Expand Up @@ -3948,7 +3948,7 @@ public void testExecutionHookTimeoutWithFallback() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackSuccess - onComplete - onRunSuccess - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());
}

/**
Expand Down Expand Up @@ -4007,11 +4007,58 @@ public void testExecutionHookRejectedWithFallback() {

}

/**
* Execution hook on short-circuited with a fallback
*/
@Test
public void testExecutionHookShortCircuitedWithFallback() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker().setForceShortCircuit(true);
KnownFailureTestCommandWithFallback command = new KnownFailureTestCommandWithFallback(circuitBreaker);

try {
// now execute one that will be short-circuited
command.queue().get();
} catch (Exception e) {
throw new RuntimeException("not expecting", e);
}

assertTrue(command.isResponseShortCircuited());

// the run() method should not run as we're short-circuited
assertEquals(0, command.builder.executionHook.startRun.get());
// we should not have a response because of short-circuit
assertNull(command.builder.executionHook.runSuccessResponse);
// we should not have an exception because we didn't run
assertNull(command.builder.executionHook.runFailureException);

// the fallback() method should be run due to short-circuit
assertEquals(1, command.builder.executionHook.startFallback.get());
// response since we have a fallback
assertNotNull(command.builder.executionHook.fallbackSuccessResponse);
// null since fallback succeeds
assertNull(command.builder.executionHook.fallbackFailureException);

// execution occurred
assertEquals(1, command.builder.executionHook.startExecute.get());
// we should have a response because of fallback
assertNotNull(command.builder.executionHook.endExecuteSuccessResponse);
// we should not have an exception because of fallback
assertNull(command.builder.executionHook.endExecuteFailureException);

// thread execution
assertEquals(0, command.builder.executionHook.threadStart.get());
assertEquals(0, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onFallbackStart - onFallbackSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString());

}

/**
* Execution hook on short-circuit with a fallback
*/
@Test
public void testExecutionHookShortCircuitedWithFallbackViaQueue() {
public void testExecutionHookShortCircuitedWithoutFallbackViaQueue() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker().setForceShortCircuit(true);
KnownFailureTestCommandWithoutFallback command = new KnownFailureTestCommandWithoutFallback(circuitBreaker);
try {
Expand Down Expand Up @@ -4059,7 +4106,7 @@ public void testExecutionHookShortCircuitedWithFallbackViaQueue() {
* Execution hook on short-circuit with a fallback
*/
@Test
public void testExecutionHookShortCircuitedWithFallbackViaExecute() {
public void testExecutionHookShortCircuitedWithoutFallbackViaExecute() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker().setForceShortCircuit(true);
KnownFailureTestCommandWithoutFallback command = new KnownFailureTestCommandWithoutFallback(circuitBreaker);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import org.junit.Before;
import org.junit.Test;

import com.netflix.hystrix.HystrixCommand.Setter;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;

public class HystrixTest {
@Before
public void reset() {
Hystrix.reset();
}

@Test
public void testNotInThread() {
assertNull(Hystrix.getCurrentThreadExecutingCommand());
Expand Down