diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java index 018cbab73..8c8ec9fb8 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java @@ -791,7 +791,7 @@ public Subscriber call(final Subscriber child) { * Define the action to perform on timeout outside of the TimerListener to it can capture the HystrixRequestContext * of the calling thread which doesn't exist on the Timer thread. */ - final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(new Runnable() { + final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() { @Override public void run() { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapser.java index f5ecf70ed..b7cf5e502 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapser.java @@ -121,7 +121,7 @@ private class CollapsedTask implements TimerListener { CollapsedTask() { // this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread) // so we create the callable now where we can capture the thread context - callableWithContextOfParent = concurrencyStrategy.wrapCallable(new HystrixContextCallable(new Callable() { + callableWithContextOfParent = new HystrixContextCallable(concurrencyStrategy, new Callable() { // the wrapCallable call allows a strategy to capture thread-context if desired @Override @@ -144,7 +144,7 @@ public Void call() throws Exception { return null; } - })); + }); } @Override diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextCallable.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextCallable.java index 62a596f83..eefb31df0 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextCallable.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextCallable.java @@ -30,8 +30,8 @@ public class HystrixContextCallable implements Callable { private final Callable actual; private final HystrixRequestContext parentThreadState; - public HystrixContextCallable(Callable actual) { - this.actual = actual; + public HystrixContextCallable(HystrixConcurrencyStrategy concurrencyStrategy, Callable actual) { + this.actual = concurrencyStrategy.wrapCallable(actual); this.parentThreadState = HystrixRequestContext.getContextForCurrentThread(); } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextRunnable.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextRunnable.java index d73c8cafe..5f9cde7d3 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextRunnable.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextRunnable.java @@ -15,6 +15,8 @@ */ package com.netflix.hystrix.strategy.concurrency; +import java.util.concurrent.Callable; + /** * Wrapper around {@link Runnable} that manages the {@link HystrixRequestContext} initialization and cleanup for the execution of the {@link Runnable} * @@ -22,11 +24,19 @@ */ public class HystrixContextRunnable implements Runnable { - private final Runnable actual; + private final Callable actual; private final HystrixRequestContext parentThreadState; - public HystrixContextRunnable(Runnable actual) { - this.actual = actual; + public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) { + this.actual = concurrencyStrategy.wrapCallable(new Callable() { + + @Override + public Void call() throws Exception { + actual.run(); + return null; + } + + }); this.parentThreadState = HystrixRequestContext.getContextForCurrentThread(); } @@ -37,7 +47,11 @@ public void run() { // set the state of this thread to that of its parent HystrixRequestContext.setContextOnCurrentThread(parentThreadState); // execute actual Callable with the state of the parent - actual.run(); + try { + actual.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } } finally { // restore this thread back to its original state HystrixRequestContext.setContextOnCurrentThread(existingState); diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java index cd5cff4be..38fba326e 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java @@ -22,6 +22,7 @@ import com.netflix.hystrix.collapser.RealCollapserTimer; import com.netflix.hystrix.collapser.RequestCollapser; import com.netflix.hystrix.collapser.RequestCollapserFactory; +import com.netflix.hystrix.strategy.HystrixPlugins; import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable; import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableHolder; @@ -329,7 +330,7 @@ public void testRequestVariableLifecycle2() throws Exception { // kick off work (simulating a single request with multiple threads) for (int t = 0; t < 5; t++) { - Thread th = new Thread(new HystrixContextRunnable(new Runnable() { + Thread th = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { @Override public void run() { diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java index a0d84a29e..0f4d63bfc 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java @@ -1910,7 +1910,7 @@ public void testExecutionSemaphoreWithQueue() { final TryableSemaphore semaphore = new TryableSemaphore(HystrixProperty.Factory.asProperty(1)); - Runnable r = new HystrixContextRunnable(new Runnable() { + Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { @Override public void run() { @@ -1982,7 +1982,7 @@ public void testExecutionSemaphoreWithExecution() { final TryableSemaphore semaphore = new TryableSemaphore(HystrixProperty.Factory.asProperty(1)); - Runnable r = new HystrixContextRunnable(new Runnable() { + Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { @Override public void run() { @@ -2045,7 +2045,7 @@ public void testRejectedExecutionSemaphoreWithFallback() { final AtomicBoolean exceptionReceived = new AtomicBoolean(); - Runnable r = new HystrixContextRunnable(new Runnable() { + Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { @Override public void run() { @@ -2118,7 +2118,7 @@ public void testSemaphorePermitsInUse() { // used to signal that all command can finish final CountDownLatch sharedLatch = new CountDownLatch(1); - final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(new Runnable() { + final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { public void run() { try { new LatchedSemaphoreCommand(circuitBreaker, sharedSemaphore, startLatch, sharedLatch).execute(); @@ -2146,7 +2146,7 @@ public void run() { // tracks failures to obtain semaphores final AtomicInteger failureCount = new AtomicInteger(); - final Thread isolatedThread = new Thread(new HystrixContextRunnable(new Runnable() { + final Thread isolatedThread = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { public void run() { try { new LatchedSemaphoreCommand(circuitBreaker, isolatedSemaphore, startLatch, isolatedLatch).execute(); diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java index 82882f7a6..0014360ee 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java @@ -1737,7 +1737,7 @@ public void testExecutionSemaphoreWithQueue() { final TryableSemaphore semaphore = new TryableSemaphore(HystrixProperty.Factory.asProperty(1)); - Runnable r = new HystrixContextRunnable(new Runnable() { + Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { @Override public void run() { @@ -1809,7 +1809,7 @@ public void testExecutionSemaphoreWithExecution() { final TryableSemaphore semaphore = new TryableSemaphore(HystrixProperty.Factory.asProperty(1)); - Runnable r = new HystrixContextRunnable(new Runnable() { + Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { @Override public void run() { @@ -1872,7 +1872,7 @@ public void testRejectedExecutionSemaphoreWithFallback() { final AtomicBoolean exceptionReceived = new AtomicBoolean(); - Runnable r = new HystrixContextRunnable(new Runnable() { + Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { @Override public void run() { @@ -1945,7 +1945,7 @@ public void testSemaphorePermitsInUse() { // used to signal that all command can finish final CountDownLatch sharedLatch = new CountDownLatch(1); - final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(new Runnable() { + final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { public void run() { try { new LatchedSemaphoreCommand(circuitBreaker, sharedSemaphore, startLatch, sharedLatch).execute(); @@ -1973,7 +1973,7 @@ public void run() { // tracks failures to obtain semaphores final AtomicInteger failureCount = new AtomicInteger(); - final Thread isolatedThread = new Thread(new HystrixContextRunnable(new Runnable() { + final Thread isolatedThread = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { public void run() { try { new LatchedSemaphoreCommand(circuitBreaker, isolatedSemaphore, startLatch, isolatedLatch).execute(); diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/strategy/HystrixPluginsTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/strategy/HystrixPluginsTest.java index 63c165835..99d4d46aa 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/strategy/HystrixPluginsTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/strategy/HystrixPluginsTest.java @@ -2,11 +2,20 @@ import static org.junit.Assert.*; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.After; import org.junit.Test; +import rx.functions.Action1; + +import com.netflix.hystrix.Hystrix; +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier; import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifierDefault; import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher; @@ -143,5 +152,74 @@ public void testPropertiesStrategyViaProperty() { public static class HystrixPropertiesStrategyTestImpl extends HystrixPropertiesStrategy { // just use defaults } + + @Test + public void testRequestContextViaPluginInTimeout() { + HystrixPlugins.getInstance().registerConcurrencyStrategy(new HystrixConcurrencyStrategy() { + @Override + public Callable wrapCallable(final Callable callable) { + return new RequestIdCallable(callable); + } + }); + + HystrixRequestContext context = HystrixRequestContext.initializeContext(); + + testRequestIdThreadLocal.set("foobar"); + final AtomicReference valueInTimeout = new AtomicReference(); + + new DummyCommand().toObservable() + .doOnError(new Action1() { + @Override + public void call(Throwable throwable) { + System.out.println("initialized = " + HystrixRequestContext.isCurrentThreadInitialized()); + System.out.println("requestId (timeout) = " + testRequestIdThreadLocal.get()); + valueInTimeout.set(testRequestIdThreadLocal.get()); + } + }) + .materialize() + .toBlockingObservable().single(); + + context.shutdown(); + Hystrix.reset(); + + assertEquals("foobar", valueInTimeout.get()); + } + + private static class RequestIdCallable implements Callable { + private final Callable callable; + private final String requestId; + + public RequestIdCallable(Callable callable) { + this.callable = callable; + this.requestId = testRequestIdThreadLocal.get(); + } + + @Override + public T call() throws Exception { + String original = testRequestIdThreadLocal.get(); + testRequestIdThreadLocal.set(requestId); + try { + return callable.call(); + } finally { + testRequestIdThreadLocal.set(original); + } + } + } + + private static final ThreadLocal testRequestIdThreadLocal = new ThreadLocal(); + + public static class DummyCommand extends HystrixCommand { + + public DummyCommand() { + super(HystrixCommandGroupKey.Factory.asKey("Dummy")); + } + + @Override + protected Void run() throws Exception { + System.out.println("requestId (run) = " + testRequestIdThreadLocal.get()); + Thread.sleep(2000); + return null; + } + } }