Skip to content
This repository has been archived by the owner on Apr 30, 2019. It is now read-only.

Commit

Permalink
Process ZK events in BookieWatcher outside of ZK event thread
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Sep 27, 2016
1 parent 8982635 commit 7b9144d
Showing 1 changed file with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,17 @@ public void processResult(int rc, String path, Object ctx, List<String> children

HashSet<BookieSocketAddress> newBookieAddrs = convertToBookieAddresses(children);

synchronized (this) {
Set<BookieSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
}
// Update watcher outside ZK callback thread, to avoid deadlock in case some other
// component is trying to do a blocking ZK operation
bk.mainWorkerPool.submitOrdered(path, new SafeRunnable() {
@Override
public void safeRun() {
synchronized (BookieWatcher.this) {
Set<BookieSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
}
}
});

// we don't need to close clients here, because:
// a. the dead bookies will be removed from topology, which will not be used in new ensemble.
Expand Down Expand Up @@ -230,13 +237,13 @@ public void readBookiesBlocking() throws InterruptedException, KeeperException {
final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
readBookies(new ChildrenCallback() {
public void processResult(int rc, String path, Object ctx, List<String> children) {
try {
BookieWatcher.this.processResult(rc, path, ctx, children);
queue.put(rc);
} catch (InterruptedException e) {
logger.error("Interruped when trying to read bookies in a blocking fashion");
throw new RuntimeException(e);
}
bk.mainWorkerPool.submitOrdered(path, new SafeRunnable() {
@Override
public void safeRun() {
BookieWatcher.this.processResult(rc, path, ctx, children);
queue.add(rc);
}
});
}
});
int rc = queue.take();
Expand Down

0 comments on commit 7b9144d

Please sign in to comment.