From 8736d4c9d411b3e33295ac9cfde8fc793633b93e Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 4 Jun 2013 21:18:08 -0700 Subject: [PATCH] Synchronize terminalState and subscription logic - remove the race condition that existed between a subscription coming in while onError/onCompleted was being called --- .../main/java/rx/subjects/PublishSubject.java | 56 +++++++++++-------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 10e27a10d72..0714ecfe50a 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -72,7 +72,7 @@ public static PublishSubject create() { Func1, Subscription> onSubscribe = new Func1, Subscription>() { @Override public Subscription call(Observer observer) { - // first check if terminal state exist + // shortcut check if terminal state exists already Subscription s = checkTerminalState(observer); if(s != null) return s; @@ -86,29 +86,27 @@ public void unsubscribe() { } }); - // on subscribe add it to the map of outbound observers to notify - observers.put(subscription, observer); - - // check terminal state again - s = checkTerminalState(observer); - if(s != null) return s; - /** - * NOTE: There is a race condition here. - * - * 1) terminal state gets set in onError or onCompleted - * 2) observers.put adds a new observer - * 3) checkTerminalState emits onError/onCompleted - * 4) onError or onCompleted also emits onError/onCompleted since it was adds to observers + * NOTE: We are synchronizing to avoid a race condition between terminalState being set and + * a new observer being added to observers. * - * Thus the terminal state could end up being sent twice. + * The synchronization only occurs on subscription and terminal states, it does not affect onNext calls + * so a high-volume hot-observable will not pay this cost for emitting data. * - * I'm going to leave this for now as AtomicObserver will protect against this - * and I'd rather not add blocking synchronization in here unless the above race condition - * truly is an issue. + * Due to the restricted impact of blocking synchronization here I have not pursued more complicated + * approaches to try and stay completely non-blocking. */ - - return subscription; + synchronized (terminalState) { + // check terminal state again + s = checkTerminalState(observer); + if (s != null) + return s; + + // on subscribe add it to the map of outbound observers to notify + observers.put(subscription, observer); + + return subscription; + } } private Subscription checkTerminalState(Observer observer) { @@ -141,7 +139,14 @@ protected PublishSubject(Func1, Subscription> onSubscribe, Concurren @Override public void onCompleted() { - terminalState.set(new Notification()); + /** + * Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription. + * Why use AtomicReference then? Convenient for passing around a mutable reference holder between the + * onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath. + */ + synchronized (terminalState) { + terminalState.set(new Notification()); + } for (Observer observer : snapshotOfValues()) { observer.onCompleted(); } @@ -150,7 +155,14 @@ public void onCompleted() { @Override public void onError(Exception e) { - terminalState.set(new Notification(e)); + /** + * Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription. + * Why use AtomicReference then? Convenient for passing around a mutable reference holder between the + * onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath. + */ + synchronized (terminalState) { + terminalState.set(new Notification(e)); + } for (Observer observer : snapshotOfValues()) { observer.onError(e); }