Skip to content

Commit

Permalink
BugFix: Plugin WrapCallable of RequestContext in Timeout
Browse files Browse the repository at this point in the history
Fixes issue reported in #212
This was also fixed in 1.3.10
  • Loading branch information
benjchristensen committed Mar 5, 2014
1 parent 999e7d3 commit fc99fce
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ public Subscriber<? super R> call(final Subscriber<? super R> child) {
* Define the action to perform on timeout outside of the TimerListener to it can capture the HystrixRequestContext
* of the calling thread which doesn't exist on the Timer thread.
*/
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(new Runnable() {
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {

@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private class CollapsedTask implements TimerListener {
CollapsedTask() {
// this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread)
// so we create the callable now where we can capture the thread context
callableWithContextOfParent = concurrencyStrategy.wrapCallable(new HystrixContextCallable<Void>(new Callable<Void>() {
callableWithContextOfParent = new HystrixContextCallable<Void>(concurrencyStrategy, new Callable<Void>() {
// the wrapCallable call allows a strategy to capture thread-context if desired

@Override
Expand All @@ -144,7 +144,7 @@ public Void call() throws Exception {
return null;
}

}));
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public class HystrixContextCallable<K> implements Callable<K> {
private final Callable<K> actual;
private final HystrixRequestContext parentThreadState;

public HystrixContextCallable(Callable<K> actual) {
this.actual = actual;
public HystrixContextCallable(HystrixConcurrencyStrategy concurrencyStrategy, Callable<K> actual) {
this.actual = concurrencyStrategy.wrapCallable(actual);
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,28 @@
*/
package com.netflix.hystrix.strategy.concurrency;

import java.util.concurrent.Callable;

/**
* Wrapper around {@link Runnable} that manages the {@link HystrixRequestContext} initialization and cleanup for the execution of the {@link Runnable}
*
* @ExcludeFromJavadoc
*/
public class HystrixContextRunnable implements Runnable {

private final Runnable actual;
private final Callable<Void> actual;
private final HystrixRequestContext parentThreadState;

public HystrixContextRunnable(Runnable actual) {
this.actual = actual;
public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {

@Override
public Void call() throws Exception {
actual.run();
return null;
}

});
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
}

Expand All @@ -37,7 +47,11 @@ public void run() {
// set the state of this thread to that of its parent
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// execute actual Callable with the state of the parent
actual.run();
try {
actual.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
} finally {
// restore this thread back to its original state
HystrixRequestContext.setContextOnCurrentThread(existingState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.hystrix.collapser.RealCollapserTimer;
import com.netflix.hystrix.collapser.RequestCollapser;
import com.netflix.hystrix.collapser.RequestCollapserFactory;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableHolder;
Expand Down Expand Up @@ -329,7 +330,7 @@ public void testRequestVariableLifecycle2() throws Exception {

// kick off work (simulating a single request with multiple threads)
for (int t = 0; t < 5; t++) {
Thread th = new Thread(new HystrixContextRunnable(new Runnable() {
Thread th = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1910,7 +1910,7 @@ public void testExecutionSemaphoreWithQueue() {
final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));

Runnable r = new HystrixContextRunnable(new Runnable() {
Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

@Override
public void run() {
Expand Down Expand Up @@ -1982,7 +1982,7 @@ public void testExecutionSemaphoreWithExecution() {
final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));

Runnable r = new HystrixContextRunnable(new Runnable() {
Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

@Override
public void run() {
Expand Down Expand Up @@ -2045,7 +2045,7 @@ public void testRejectedExecutionSemaphoreWithFallback() {

final AtomicBoolean exceptionReceived = new AtomicBoolean();

Runnable r = new HystrixContextRunnable(new Runnable() {
Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

@Override
public void run() {
Expand Down Expand Up @@ -2118,7 +2118,7 @@ public void testSemaphorePermitsInUse() {
// used to signal that all command can finish
final CountDownLatch sharedLatch = new CountDownLatch(1);

final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(new Runnable() {
final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {
public void run() {
try {
new LatchedSemaphoreCommand(circuitBreaker, sharedSemaphore, startLatch, sharedLatch).execute();
Expand Down Expand Up @@ -2146,7 +2146,7 @@ public void run() {
// tracks failures to obtain semaphores
final AtomicInteger failureCount = new AtomicInteger();

final Thread isolatedThread = new Thread(new HystrixContextRunnable(new Runnable() {
final Thread isolatedThread = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {
public void run() {
try {
new LatchedSemaphoreCommand(circuitBreaker, isolatedSemaphore, startLatch, isolatedLatch).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1737,7 +1737,7 @@ public void testExecutionSemaphoreWithQueue() {
final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));

Runnable r = new HystrixContextRunnable(new Runnable() {
Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

@Override
public void run() {
Expand Down Expand Up @@ -1809,7 +1809,7 @@ public void testExecutionSemaphoreWithExecution() {
final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));

Runnable r = new HystrixContextRunnable(new Runnable() {
Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

@Override
public void run() {
Expand Down Expand Up @@ -1872,7 +1872,7 @@ public void testRejectedExecutionSemaphoreWithFallback() {

final AtomicBoolean exceptionReceived = new AtomicBoolean();

Runnable r = new HystrixContextRunnable(new Runnable() {
Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

@Override
public void run() {
Expand Down Expand Up @@ -1945,7 +1945,7 @@ public void testSemaphorePermitsInUse() {
// used to signal that all command can finish
final CountDownLatch sharedLatch = new CountDownLatch(1);

final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(new Runnable() {
final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {
public void run() {
try {
new LatchedSemaphoreCommand(circuitBreaker, sharedSemaphore, startLatch, sharedLatch).execute();
Expand Down Expand Up @@ -1973,7 +1973,7 @@ public void run() {
// tracks failures to obtain semaphores
final AtomicInteger failureCount = new AtomicInteger();

final Thread isolatedThread = new Thread(new HystrixContextRunnable(new Runnable() {
final Thread isolatedThread = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {
public void run() {
try {
new LatchedSemaphoreCommand(circuitBreaker, isolatedSemaphore, startLatch, isolatedLatch).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@

import static org.junit.Assert.*;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.After;
import org.junit.Test;

import rx.functions.Action1;

import com.netflix.hystrix.Hystrix;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifierDefault;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
Expand Down Expand Up @@ -143,5 +152,74 @@ public void testPropertiesStrategyViaProperty() {
public static class HystrixPropertiesStrategyTestImpl extends HystrixPropertiesStrategy {
// just use defaults
}

@Test
public void testRequestContextViaPluginInTimeout() {
HystrixPlugins.getInstance().registerConcurrencyStrategy(new HystrixConcurrencyStrategy() {
@Override
public <T> Callable<T> wrapCallable(final Callable<T> callable) {
return new RequestIdCallable<T>(callable);
}
});

HystrixRequestContext context = HystrixRequestContext.initializeContext();

testRequestIdThreadLocal.set("foobar");
final AtomicReference<String> valueInTimeout = new AtomicReference<String>();

new DummyCommand().toObservable()
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("initialized = " + HystrixRequestContext.isCurrentThreadInitialized());
System.out.println("requestId (timeout) = " + testRequestIdThreadLocal.get());
valueInTimeout.set(testRequestIdThreadLocal.get());
}
})
.materialize()
.toBlockingObservable().single();

context.shutdown();
Hystrix.reset();

assertEquals("foobar", valueInTimeout.get());
}

private static class RequestIdCallable<T> implements Callable<T> {
private final Callable<T> callable;
private final String requestId;

public RequestIdCallable(Callable<T> callable) {
this.callable = callable;
this.requestId = testRequestIdThreadLocal.get();
}

@Override
public T call() throws Exception {
String original = testRequestIdThreadLocal.get();
testRequestIdThreadLocal.set(requestId);
try {
return callable.call();
} finally {
testRequestIdThreadLocal.set(original);
}
}
}

private static final ThreadLocal<String> testRequestIdThreadLocal = new ThreadLocal<String>();

public static class DummyCommand extends HystrixCommand<Void> {

public DummyCommand() {
super(HystrixCommandGroupKey.Factory.asKey("Dummy"));
}

@Override
protected Void run() throws Exception {
System.out.println("requestId (run) = " + testRequestIdThreadLocal.get());
Thread.sleep(2000);
return null;
}
}

}

0 comments on commit fc99fce

Please sign in to comment.