From 770d6a74c6a83e83dfbb5a87364d36eac7858c6a Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 1 Oct 2013 12:54:36 -0700 Subject: [PATCH] BugFix: ConcurrencyStrategy.wrapCallable was not being used for reactive event-loop threads. The Rx Scheduler now correctly has state transferred to it with ConcurrencyStrategy.wrapCallable. Note that if the Rx sequence ever migrates to another thread (such as using observeOn) it is up to the user to deal with this as Hystrix loses control of the flow at that point. This change does ensure though that the initial threads the callbacks are performed on has the expected thread context. The design of this code is not very elegant and uses a workaround similar to posting variables on a stack before a method uses them. I have used this approach because I do not want to add a 'wrapFunc2' method to the ConcurrencyStrategy as the use of Func2 is just an implementation detail internally and the users of the library should not have to replicate the effort for both Callable and Func2. Thus the internal code is a little odd but the public API is untouched. --- .../com/netflix/hystrix/HystrixCommand.java | 2 +- .../concurrency/HystrixContextFunc2.java | 51 +++++++++++++++---- .../concurrency/HystrixContextScheduler.java | 14 +++-- 3 files changed, 52 insertions(+), 15 deletions(-) diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java index 27430df17..251fdcfd8 100755 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java @@ -853,7 +853,7 @@ public Observable call(Throwable e) { // don't waste overhead if it's the 'immediate' scheduler // otherwise we'll 'observeOn' and wrap with the HystrixContextScheduler // to copy state across threads (if threads are involved) - o = o.observeOn(new HystrixContextScheduler(observeOn)); + o = o.observeOn(new HystrixContextScheduler(concurrencyStrategy, observeOn)); } o = o.finallyDo(new Action0() { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextFunc2.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextFunc2.java index 0025c134c..4fffafe4d 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextFunc2.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextFunc2.java @@ -15,6 +15,9 @@ */ package com.netflix.hystrix.strategy.concurrency; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + import rx.Scheduler; import rx.Subscription; import rx.util.functions.Func2; @@ -31,23 +34,53 @@ public class HystrixContextFunc2 implements Func2 private final Func2 actual; private final HystrixRequestContext parentThreadState; + private final Callable c; + + /* + * This is a workaround to needing to use Callable but + * needing to pass `Scheduler t1, T t2` into it after construction. + * + * Think of it like sticking t1 and t2 on the stack and then calling the function + * that uses them. + * + * This should all be thread-safe without issues despite multi-step execution + * because this Func2 is only ever executed once by Hystrix and construction will always + * precede `call` being invoked once. + * + */ + private final AtomicReference t1Holder = new AtomicReference(); + private final AtomicReference t2Holder = new AtomicReference(); - public HystrixContextFunc2(Func2 action) { + public HystrixContextFunc2(final HystrixConcurrencyStrategy concurrencyStrategy, Func2 action) { this.actual = action; this.parentThreadState = HystrixRequestContext.getContextForCurrentThread(); + + this.c = concurrencyStrategy.wrapCallable(new Callable() { + + @Override + public Subscription call() throws Exception { + HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread(); + try { + // set the state of this thread to that of its parent + HystrixRequestContext.setContextOnCurrentThread(parentThreadState); + // execute actual Func2 with the state of the parent + return actual.call(t1Holder.get(), t2Holder.get()); + } finally { + // restore this thread back to its original state + HystrixRequestContext.setContextOnCurrentThread(existingState); + } + } + }); } @Override public Subscription call(Scheduler t1, T t2) { - HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread(); try { - // set the state of this thread to that of its parent - HystrixRequestContext.setContextOnCurrentThread(parentThreadState); - // execute actual Func2 with the state of the parent - return actual.call(t1, t2); - } finally { - // restore this thread back to its original state - HystrixRequestContext.setContextOnCurrentThread(existingState); + this.t1Holder.set(t1); + this.t2Holder.set(t2); + return c.call(); + } catch (Exception e) { + throw new RuntimeException("Failed executing wrapped Func2", e); } } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java index 0fa22092c..aa03fc1bb 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java @@ -15,32 +15,36 @@ */ package com.netflix.hystrix.strategy.concurrency; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import rx.Scheduler; import rx.Subscription; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.util.functions.Func2; /** - * Wrap a {@link Scheduler} so that scheduled actions are wrapped with {@link HystrixContextFunc2} so that + * Wrap a {@link Scheduler} so that scheduled actions are wrapped with {@link HystrixContextFunc2} so that * the {@link HystrixRequestContext} is properly copied across threads (if they are used by the {@link Scheduler}). */ public class HystrixContextScheduler extends Scheduler { + private final HystrixConcurrencyStrategy concurrencyStrategy; private final Scheduler actualScheduler; - public HystrixContextScheduler(Scheduler scheduler) { + public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, Scheduler scheduler) { this.actualScheduler = scheduler; + this.concurrencyStrategy = concurrencyStrategy; } @Override - public Subscription schedule(T state, Func2 action) { - return actualScheduler.schedule(state, new HystrixContextFunc2(action)); + public Subscription schedule(final T state, final Func2 action) { + return actualScheduler.schedule(state, new HystrixContextFunc2(concurrencyStrategy, action)); } @Override public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { - return actualScheduler.schedule(state, new HystrixContextFunc2(action), delayTime, unit); + return actualScheduler.schedule(state, new HystrixContextFunc2(concurrencyStrategy, action), delayTime, unit); } }