Skip to content

Commit

Permalink
Synchronize terminalState and subscription logic
Browse files Browse the repository at this point in the history
- remove the race condition that existed between a subscription coming in while onError/onCompleted was being called
  • Loading branch information
benjchristensen committed Jun 5, 2013
1 parent 7272c59 commit 8736d4c
Showing 1 changed file with 34 additions and 22 deletions.
56 changes: 34 additions & 22 deletions rxjava-core/src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static <T> PublishSubject<T> create() {
Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
@Override
public Subscription call(Observer<T> observer) {
// first check if terminal state exist
// shortcut check if terminal state exists already
Subscription s = checkTerminalState(observer);
if(s != null) return s;

Expand All @@ -86,29 +86,27 @@ public void unsubscribe() {
}
});

// on subscribe add it to the map of outbound observers to notify
observers.put(subscription, observer);

// check terminal state again
s = checkTerminalState(observer);
if(s != null) return s;

/**
* NOTE: There is a race condition here.
*
* 1) terminal state gets set in onError or onCompleted
* 2) observers.put adds a new observer
* 3) checkTerminalState emits onError/onCompleted
* 4) onError or onCompleted also emits onError/onCompleted since it was adds to observers
* NOTE: We are synchronizing to avoid a race condition between terminalState being set and
* a new observer being added to observers.
*
* Thus the terminal state could end up being sent twice.
* The synchronization only occurs on subscription and terminal states, it does not affect onNext calls
* so a high-volume hot-observable will not pay this cost for emitting data.
*
* I'm going to leave this for now as AtomicObserver will protect against this
* and I'd rather not add blocking synchronization in here unless the above race condition
* truly is an issue.
* Due to the restricted impact of blocking synchronization here I have not pursued more complicated
* approaches to try and stay completely non-blocking.
*/

return subscription;
synchronized (terminalState) {
// check terminal state again
s = checkTerminalState(observer);
if (s != null)
return s;

// on subscribe add it to the map of outbound observers to notify
observers.put(subscription, observer);

return subscription;
}
}

private Subscription checkTerminalState(Observer<T> observer) {
Expand Down Expand Up @@ -141,7 +139,14 @@ protected PublishSubject(Func1<Observer<T>, Subscription> onSubscribe, Concurren

@Override
public void onCompleted() {
terminalState.set(new Notification<T>());
/**
* Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
* Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
* onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
*/
synchronized (terminalState) {
terminalState.set(new Notification<T>());
}
for (Observer<T> observer : snapshotOfValues()) {
observer.onCompleted();
}
Expand All @@ -150,7 +155,14 @@ public void onCompleted() {

@Override
public void onError(Exception e) {
terminalState.set(new Notification<T>(e));
/**
* Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
* Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
* onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
*/
synchronized (terminalState) {
terminalState.set(new Notification<T>(e));
}
for (Observer<T> observer : snapshotOfValues()) {
observer.onError(e);
}
Expand Down

0 comments on commit 8736d4c

Please sign in to comment.