From 7b9144dfbba53fe37b1f36cd8dbe0bd0832ea62f Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 26 Sep 2016 17:12:01 -0700 Subject: [PATCH] Process ZK events in BookieWatcher outside of ZK event thread --- .../bookkeeper/client/BookieWatcher.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java index 862b4977380..350239c088c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java @@ -181,10 +181,17 @@ public void processResult(int rc, String path, Object ctx, List children HashSet newBookieAddrs = convertToBookieAddresses(children); - synchronized (this) { - Set readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies(); - placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies); - } + // Update watcher outside ZK callback thread, to avoid deadlock in case some other + // component is trying to do a blocking ZK operation + bk.mainWorkerPool.submitOrdered(path, new SafeRunnable() { + @Override + public void safeRun() { + synchronized (BookieWatcher.this) { + Set readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies(); + placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies); + } + } + }); // we don't need to close clients here, because: // a. the dead bookies will be removed from topology, which will not be used in new ensemble. @@ -230,13 +237,13 @@ public void readBookiesBlocking() throws InterruptedException, KeeperException { final LinkedBlockingQueue queue = new LinkedBlockingQueue(); readBookies(new ChildrenCallback() { public void processResult(int rc, String path, Object ctx, List children) { - try { - BookieWatcher.this.processResult(rc, path, ctx, children); - queue.put(rc); - } catch (InterruptedException e) { - logger.error("Interruped when trying to read bookies in a blocking fashion"); - throw new RuntimeException(e); - } + bk.mainWorkerPool.submitOrdered(path, new SafeRunnable() { + @Override + public void safeRun() { + BookieWatcher.this.processResult(rc, path, ctx, children); + queue.add(rc); + } + }); } }); int rc = queue.take();