diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java index 27430df17..251fdcfd8 100755 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java @@ -853,7 +853,7 @@ public Observable call(Throwable e) { // don't waste overhead if it's the 'immediate' scheduler // otherwise we'll 'observeOn' and wrap with the HystrixContextScheduler // to copy state across threads (if threads are involved) - o = o.observeOn(new HystrixContextScheduler(observeOn)); + o = o.observeOn(new HystrixContextScheduler(concurrencyStrategy, observeOn)); } o = o.finallyDo(new Action0() { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextFunc2.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextFunc2.java index 0025c134c..4fffafe4d 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextFunc2.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextFunc2.java @@ -15,6 +15,9 @@ */ package com.netflix.hystrix.strategy.concurrency; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + import rx.Scheduler; import rx.Subscription; import rx.util.functions.Func2; @@ -31,23 +34,53 @@ public class HystrixContextFunc2 implements Func2 private final Func2 actual; private final HystrixRequestContext parentThreadState; + private final Callable c; + + /* + * This is a workaround to needing to use Callable but + * needing to pass `Scheduler t1, T t2` into it after construction. + * + * Think of it like sticking t1 and t2 on the stack and then calling the function + * that uses them. + * + * This should all be thread-safe without issues despite multi-step execution + * because this Func2 is only ever executed once by Hystrix and construction will always + * precede `call` being invoked once. + * + */ + private final AtomicReference t1Holder = new AtomicReference(); + private final AtomicReference t2Holder = new AtomicReference(); - public HystrixContextFunc2(Func2 action) { + public HystrixContextFunc2(final HystrixConcurrencyStrategy concurrencyStrategy, Func2 action) { this.actual = action; this.parentThreadState = HystrixRequestContext.getContextForCurrentThread(); + + this.c = concurrencyStrategy.wrapCallable(new Callable() { + + @Override + public Subscription call() throws Exception { + HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread(); + try { + // set the state of this thread to that of its parent + HystrixRequestContext.setContextOnCurrentThread(parentThreadState); + // execute actual Func2 with the state of the parent + return actual.call(t1Holder.get(), t2Holder.get()); + } finally { + // restore this thread back to its original state + HystrixRequestContext.setContextOnCurrentThread(existingState); + } + } + }); } @Override public Subscription call(Scheduler t1, T t2) { - HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread(); try { - // set the state of this thread to that of its parent - HystrixRequestContext.setContextOnCurrentThread(parentThreadState); - // execute actual Func2 with the state of the parent - return actual.call(t1, t2); - } finally { - // restore this thread back to its original state - HystrixRequestContext.setContextOnCurrentThread(existingState); + this.t1Holder.set(t1); + this.t2Holder.set(t2); + return c.call(); + } catch (Exception e) { + throw new RuntimeException("Failed executing wrapped Func2", e); } } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java index 0fa22092c..aa03fc1bb 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java @@ -15,32 +15,36 @@ */ package com.netflix.hystrix.strategy.concurrency; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import rx.Scheduler; import rx.Subscription; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.util.functions.Func2; /** - * Wrap a {@link Scheduler} so that scheduled actions are wrapped with {@link HystrixContextFunc2} so that + * Wrap a {@link Scheduler} so that scheduled actions are wrapped with {@link HystrixContextFunc2} so that * the {@link HystrixRequestContext} is properly copied across threads (if they are used by the {@link Scheduler}). */ public class HystrixContextScheduler extends Scheduler { + private final HystrixConcurrencyStrategy concurrencyStrategy; private final Scheduler actualScheduler; - public HystrixContextScheduler(Scheduler scheduler) { + public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, Scheduler scheduler) { this.actualScheduler = scheduler; + this.concurrencyStrategy = concurrencyStrategy; } @Override - public Subscription schedule(T state, Func2 action) { - return actualScheduler.schedule(state, new HystrixContextFunc2(action)); + public Subscription schedule(final T state, final Func2 action) { + return actualScheduler.schedule(state, new HystrixContextFunc2(concurrencyStrategy, action)); } @Override public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { - return actualScheduler.schedule(state, new HystrixContextFunc2(action), delayTime, unit); + return actualScheduler.schedule(state, new HystrixContextFunc2(concurrencyStrategy, action), delayTime, unit); } }