Skip to content

Commit

Permalink
only create node if createIfAbsent is true
Browse files Browse the repository at this point in the history
  • Loading branch information
GrantPSpencer committed Jun 10, 2024
1 parent 11bf562 commit 02fa9d9
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,10 @@ public Stat (EntryMode mode, int version, long ctime, long mtime, long etime) {
* @param key key to identify the entry
* @param updater An updater that modifies the entry value.
* @param retryOnFailure If true, updater should retry applying updated data upon failure.
* @param createIfAbsent If true, create the entry if it does not exist.
* @return the updated value.
*/
T update(final String key, DataUpdater<T> updater, boolean retryOnFailure);
T update(final String key, DataUpdater<T> updater, boolean retryOnFailure, boolean createIfAbsent);

/**
* Check if there is an entry for the given key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -208,11 +209,11 @@ public void set(String key, T data, int version) {

@Override
public T update(String key, DataUpdater<T> updater) {
return update(key, updater, false);
return update(key, updater, false, false);
}

@Override
public T update(String key, DataUpdater<T> updater, boolean retryOnFailure) {
public T update(String key, DataUpdater<T> updater, boolean retryOnFailure, boolean createIfAbsent) {
final int MAX_RETRY_ATTEMPTS = 3;
int retryAttempts = 0;
boolean retry;
Expand All @@ -230,23 +231,29 @@ public T update(String key, DataUpdater<T> updater, boolean retryOnFailure) {
} catch (MetaClientBadVersionException badVersionException) {
// If exceeded max retry attempts, re-throw exception
if (retryAttempts >= MAX_RETRY_ATTEMPTS) {
LOG.error("Failed to update node at " + key + " after " + MAX_RETRY_ATTEMPTS + " attempts.");
LOG.error("Failed to update node at {} after {} attempts.", key, MAX_RETRY_ATTEMPTS);
throw badVersionException;
}
// Retry on bad version
retry = true;
} catch (MetaClientNoNodeException noNodeException) {
if (!createIfAbsent) {
LOG.error("Failed to update node at {} as node does not exist. createIfAbsent was {}.", key, createIfAbsent);
throw noNodeException;
}
// If node does not exist, attempt to create it - pass null to updater
T newData = updater.update(null);
if (newData != null) {
try {
create(key, newData);
updatedData = newData;
// If parent node for key does not exist, then updater will immediately fail due to uncaught NoNodeException
} catch (MetaClientNodeExistsException nodeExistsException) {
// If exceeded max retry attempts, re-throw exception
// If exceeded max retry attempts, cast to ConcurrentModification exception and re-throw.
if (retryAttempts >= MAX_RETRY_ATTEMPTS) {
LOG.error("Failed to update node at " + key + " after " + MAX_RETRY_ATTEMPTS + " attempts.");
throw nodeExistsException;
LOG.error("Failed to update node at {} after {} attempts.", key, MAX_RETRY_ATTEMPTS);
throw new ConcurrentModificationException("Failed to update node at " + key + " after " +
MAX_RETRY_ATTEMPTS + " attempts.", nodeExistsException);
}
// If node now exists, then retry update
retry = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,42 @@ public Integer update(Integer currentData) {
@Test
public void testUpdateWithRetry() throws InterruptedException {
final boolean RETRY_ON_FAILURE = true;
int testIterationCount = 2;
final boolean CREATE_IF_ABSENT = true;
final String key = "/TestZkMetaClient_testUpdateWithRetry";
ZkMetaClientConfig config =
new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
zkMetaClient.connect();
int initValue = 3;
DataUpdater<Integer> updater = new DataUpdater<Integer>() {
// Basic updater that increments node value by 1, starting at initValue
DataUpdater<Integer> basicUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
return currentData != null ? currentData + 1 : initValue;
}
};

// Test updater creates node if it doesn't exist
Integer newData = zkMetaClient.update(key, updater, RETRY_ON_FAILURE);
// Test updater fails create node if it doesn't exist when createIfAbsent is false
try {
zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, false);
Assert.fail("Updater should have thrown error");
} catch (MetaClientNoNodeException e) {
Assert.assertFalse(zkMetaClient.exists(key) != null);
}

// Test updater fails when parent path does not exist
try {
zkMetaClient.update(key + "/child", basicUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.fail("Updater should have thrown error");
} catch (MetaClientNoNodeException e) {
Assert.assertFalse(zkMetaClient.exists(key + "/child") != null);
}

// Test updater creates node if it doesn't exist when createIfAbsent is true
Integer newData = zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.assertEquals((int) newData, initValue);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 0);


// Cleanup
zkMetaClient.delete(key);

Expand All @@ -285,7 +301,7 @@ public Integer update(Integer currentData) {
public Integer update(Integer currentData) {
try {
while (!latch.get()) {
zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE);
zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
}
return currentData != null ? currentData + 1 : initValue;
} catch (MetaClientException e) {
Expand All @@ -301,7 +317,7 @@ public Integer update(Integer currentData) {
try {
latch.set(false);
while (!latch.get()) {
zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE);
zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
}
return currentData != null ? currentData + 1 : initValue;
} catch (MetaClientException e) {
Expand Down Expand Up @@ -341,30 +357,30 @@ public Integer update(Integer currentData) {
// Latched updater should read znode at version 0, but attempt to write to version 1 which fails. Should retry
// and increment version to 2
zkMetaClient.create(key, initValue);
zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE);
zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2);

// Reset latch
latch.set(false);
// Test updater fails on retries exceeded
try {
zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE);
zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.fail("Updater should have thrown error");
} catch (MetaClientBadVersionException e) {}


// Test updater throws error
try {
zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE);
zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.fail("DataUpdater should have thrown error");
} catch (RuntimeException e) {}

// Reset latch and cleanup old node
latch.set(false);
zkMetaClient.delete(key);
// Test updater retries update if node does not exist on read, but then exists when updater attempts to create it
zkMetaClient.update(key, failOnFirstCreateLatchedUpdater, RETRY_ON_FAILURE);
zkMetaClient.update(key, failOnFirstCreateLatchedUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 1);
zkMetaClient.delete(key);
Expand Down

0 comments on commit 02fa9d9

Please sign in to comment.