diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java index df0bb40400..c8a1d0d36d 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java @@ -211,9 +211,10 @@ public Stat (EntryMode mode, int version, long ctime, long mtime, long etime) { * @param key key to identify the entry * @param updater An updater that modifies the entry value. * @param retryOnFailure If true, updater should retry applying updated data upon failure. + * @param createIfAbsent If true, create the entry if it does not exist. * @return the updated value. */ - T update(final String key, DataUpdater updater, boolean retryOnFailure); + T update(final String key, DataUpdater updater, boolean retryOnFailure, boolean createIfAbsent); /** * Check if there is an entry for the given key. diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index d02430df8e..f4594d5d6f 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -19,6 +19,7 @@ * under the License. */ +import java.util.ConcurrentModificationException; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -208,11 +209,11 @@ public void set(String key, T data, int version) { @Override public T update(String key, DataUpdater updater) { - return update(key, updater, false); + return update(key, updater, false, false); } @Override - public T update(String key, DataUpdater updater, boolean retryOnFailure) { + public T update(String key, DataUpdater updater, boolean retryOnFailure, boolean createIfAbsent) { final int MAX_RETRY_ATTEMPTS = 3; int retryAttempts = 0; boolean retry; @@ -230,23 +231,29 @@ public T update(String key, DataUpdater updater, boolean retryOnFailure) { } catch (MetaClientBadVersionException badVersionException) { // If exceeded max retry attempts, re-throw exception if (retryAttempts >= MAX_RETRY_ATTEMPTS) { - LOG.error("Failed to update node at " + key + " after " + MAX_RETRY_ATTEMPTS + " attempts."); + LOG.error("Failed to update node at {} after {} attempts.", key, MAX_RETRY_ATTEMPTS); throw badVersionException; } // Retry on bad version retry = true; } catch (MetaClientNoNodeException noNodeException) { + if (!createIfAbsent) { + LOG.error("Failed to update node at {} as node does not exist. createIfAbsent was {}.", key, createIfAbsent); + throw noNodeException; + } // If node does not exist, attempt to create it - pass null to updater T newData = updater.update(null); if (newData != null) { try { create(key, newData); updatedData = newData; + // If parent node for key does not exist, then updater will immediately fail due to uncaught NoNodeException } catch (MetaClientNodeExistsException nodeExistsException) { - // If exceeded max retry attempts, re-throw exception + // If exceeded max retry attempts, cast to ConcurrentModification exception and re-throw. if (retryAttempts >= MAX_RETRY_ATTEMPTS) { - LOG.error("Failed to update node at " + key + " after " + MAX_RETRY_ATTEMPTS + " attempts."); - throw nodeExistsException; + LOG.error("Failed to update node at {} after {} attempts.", key, MAX_RETRY_ATTEMPTS); + throw new ConcurrentModificationException("Failed to update node at " + key + " after " + + MAX_RETRY_ATTEMPTS + " attempts.", nodeExistsException); } // If node now exists, then retry update retry = true; diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index 18697b9197..2919893ff8 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -245,26 +245,42 @@ public Integer update(Integer currentData) { @Test public void testUpdateWithRetry() throws InterruptedException { final boolean RETRY_ON_FAILURE = true; - int testIterationCount = 2; + final boolean CREATE_IF_ABSENT = true; final String key = "/TestZkMetaClient_testUpdateWithRetry"; ZkMetaClientConfig config = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build(); try (ZkMetaClient zkMetaClient = new ZkMetaClient<>(config)) { zkMetaClient.connect(); int initValue = 3; - DataUpdater updater = new DataUpdater() { + // Basic updater that increments node value by 1, starting at initValue + DataUpdater basicUpdater = new DataUpdater() { @Override public Integer update(Integer currentData) { return currentData != null ? currentData + 1 : initValue; } }; - // Test updater creates node if it doesn't exist - Integer newData = zkMetaClient.update(key, updater, RETRY_ON_FAILURE); + // Test updater fails create node if it doesn't exist when createIfAbsent is false + try { + zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, false); + Assert.fail("Updater should have thrown error"); + } catch (MetaClientNoNodeException e) { + Assert.assertFalse(zkMetaClient.exists(key) != null); + } + + // Test updater fails when parent path does not exist + try { + zkMetaClient.update(key + "/child", basicUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); + Assert.fail("Updater should have thrown error"); + } catch (MetaClientNoNodeException e) { + Assert.assertFalse(zkMetaClient.exists(key + "/child") != null); + } + + // Test updater creates node if it doesn't exist when createIfAbsent is true + Integer newData = zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); Assert.assertEquals((int) newData, initValue); Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 0); - // Cleanup zkMetaClient.delete(key); @@ -285,7 +301,7 @@ public Integer update(Integer currentData) { public Integer update(Integer currentData) { try { while (!latch.get()) { - zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE); + zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); } return currentData != null ? currentData + 1 : initValue; } catch (MetaClientException e) { @@ -301,7 +317,7 @@ public Integer update(Integer currentData) { try { latch.set(false); while (!latch.get()) { - zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE); + zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); } return currentData != null ? currentData + 1 : initValue; } catch (MetaClientException e) { @@ -341,7 +357,7 @@ public Integer update(Integer currentData) { // Latched updater should read znode at version 0, but attempt to write to version 1 which fails. Should retry // and increment version to 2 zkMetaClient.create(key, initValue); - zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE); + zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1); Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2); @@ -349,14 +365,14 @@ public Integer update(Integer currentData) { latch.set(false); // Test updater fails on retries exceeded try { - zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE); + zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); Assert.fail("Updater should have thrown error"); } catch (MetaClientBadVersionException e) {} // Test updater throws error try { - zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE); + zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); Assert.fail("DataUpdater should have thrown error"); } catch (RuntimeException e) {} @@ -364,7 +380,7 @@ public Integer update(Integer currentData) { latch.set(false); zkMetaClient.delete(key); // Test updater retries update if node does not exist on read, but then exists when updater attempts to create it - zkMetaClient.update(key, failOnFirstCreateLatchedUpdater, RETRY_ON_FAILURE); + zkMetaClient.update(key, failOnFirstCreateLatchedUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1); Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 1); zkMetaClient.delete(key);