Skip to content

Commit

Permalink
add retryOnFailure param to updater, limit retries, refactor metaclie…
Browse files Browse the repository at this point in the history
…nt updater tests
  • Loading branch information
GrantPSpencer committed Jun 5, 2024
1 parent 089b78f commit 11bf562
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,25 @@ public Stat (EntryMode mode, int version, long ctime, long mtime, long etime) {

/**
* Update existing data of a given key using an updater. This method will issue a read to get
* current data and apply updater upon the current data.
* current data and apply updater upon the current data. This method will NOT retry applying updated
* data upon failure.
* @param key key to identify the entry
* @param updater An updater that modifies the entry value.
* @return the updated value.
*/
T update(final String key, DataUpdater<T> updater);


/**
* Update existing data of a given key using an updater. This method will issue a read to get
* current data and apply updater upon the current data.
* @param key key to identify the entry
* @param updater An updater that modifies the entry value.
* @param retryOnFailure If true, updater should retry applying updated data upon failure.
* @return the updated value.
*/
T update(final String key, DataUpdater<T> updater, boolean retryOnFailure);

/**
* Check if there is an entry for the given key.
* @param key key to identify the entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,18 @@ public void set(String key, T data, int version) {

@Override
public T update(String key, DataUpdater<T> updater) {
return update(key, updater, false);
}

@Override
public T update(String key, DataUpdater<T> updater, boolean retryOnFailure) {
final int MAX_RETRY_ATTEMPTS = 3;
int retryAttempts = 0;
boolean retry;
T updatedData = null;
do {
retry = false;
retryAttempts++;
try {
ImmutablePair<T, Stat> tup = getDataAndStat(key);
Stat stat = tup.right;
Expand All @@ -220,6 +228,11 @@ public T update(String key, DataUpdater<T> updater) {
set(key, newData, stat.getVersion());
updatedData = newData;
} catch (MetaClientBadVersionException badVersionException) {
// If exceeded max retry attempts, re-throw exception
if (retryAttempts >= MAX_RETRY_ATTEMPTS) {
LOG.error("Failed to update node at " + key + " after " + MAX_RETRY_ATTEMPTS + " attempts.");
throw badVersionException;
}
// Retry on bad version
retry = true;
} catch (MetaClientNoNodeException noNodeException) {
Expand All @@ -230,14 +243,19 @@ public T update(String key, DataUpdater<T> updater) {
create(key, newData);
updatedData = newData;
} catch (MetaClientNodeExistsException nodeExistsException) {
// If exceeded max retry attempts, re-throw exception
if (retryAttempts >= MAX_RETRY_ATTEMPTS) {
LOG.error("Failed to update node at " + key + " after " + MAX_RETRY_ATTEMPTS + " attempts.");
throw nodeExistsException;
}
// If node now exists, then retry update
retry = true;
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
}
}
} while (retry);
} while (retryOnFailure && retry);
return updatedData;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.DataUpdater;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.api.DirectChildChangeListener;

import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -215,8 +214,7 @@ public void testSet() {
}

@Test
public void testUpdate() throws InterruptedException {
int testIterationCount = 2;
public void testUpdate() {
final String key = "/TestZkMetaClient_testUpdate";
ZkMetaClientConfig config =
new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
Expand All @@ -230,76 +228,143 @@ public Integer update(Integer currentData) {
}
};

zkMetaClient.create(key, initValue);

// Test updater basic success
for (int i = 0; i < testIterationCount; i++) {
for (int i = 1; i < 3; i++) {
Integer newData = zkMetaClient.update(key, updater);
Assert.assertEquals((int) newData, initValue + i);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), i);
}

// Cleanup
zkMetaClient.delete(key);
}
}

@Test
public void testUpdateWithRetry() throws InterruptedException {
final boolean RETRY_ON_FAILURE = true;
int testIterationCount = 2;
final String key = "/TestZkMetaClient_testUpdateWithRetry";
ZkMetaClientConfig config =
new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
zkMetaClient.connect();
int initValue = 3;
DataUpdater<Integer> updater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
return currentData != null ? currentData + 1 : initValue;
}
};

// Test updater creates node if it doesn't exist
Integer newData = zkMetaClient.update(key, updater, RETRY_ON_FAILURE);
Assert.assertEquals((int) newData, initValue);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 0);


// Cleanup
zkMetaClient.delete(key);

AtomicBoolean latch = new AtomicBoolean();
DataUpdater<Integer> noOpUpdater = new DataUpdater<Integer>() {

// Increments znode version and sets latch value to true
DataUpdater<Integer> versionIncrementUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
latch.set(true);
return currentData;
}
};

DataUpdater<Integer> latchedUpdater = new DataUpdater<Integer>() {
// Reads znode, calls versionIncrementUpdater, fails to update due to bad version, then retries and should succeed
DataUpdater<Integer> failsOnceUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
try {
while (!latch.get()) {
Thread.sleep(200);
zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE);
}
return currentData != null ? currentData + 1 : initValue;
} catch (InterruptedException e) {
} catch (MetaClientException e) {
return -1;
}
}
};

// Test updater retries on bad version
zkMetaClient.create(key, initValue);
for (int i = 0; i < testIterationCount; i++) {
Thread thread = new Thread(() -> {
zkMetaClient.update(key, latchedUpdater);
});
thread.start();
zkMetaClient.update(key, noOpUpdater);
thread.join();
latch.set(false);
Assert.assertEquals((int) zkMetaClient.get(key), initValue + i + 1);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2 + (i*2));
}
// Always fails to update due to bad version
DataUpdater<Integer> alwaysFailLatchedUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
try {
latch.set(false);
while (!latch.get()) {
zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE);
}
return currentData != null ? currentData + 1 : initValue;
} catch (MetaClientException e) {
return -1;
}
}
};

// Updater reads znode, sees it does not exist and attempts to create it, but should fail as znode already created
// due to create() call in updater. Should then retry and successfully update the node.
DataUpdater<Integer> failOnFirstCreateLatchedUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
try {
if (!latch.get()) {
zkMetaClient.create(key, initValue);
latch.set(true);
}
return currentData != null ? currentData + 1 : initValue;
} catch (MetaClientException e) {
return -1;
}
}
};

// Throws error when update called
DataUpdater<Integer> errorUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
throw new RuntimeException("IGNORABLE: Test dataUpdater correctly throws exception");
}
};

// Reset latch
latch.set(false);
// Test updater retries on bad version
// Latched updater should read znode at version 0, but attempt to write to version 1 which fails. Should retry
// and increment version to 2
zkMetaClient.create(key, initValue);
zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE);
Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2);

// Reset latch
latch.set(false);
// Test updater fails on retries exceeded
try {
zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE);
Assert.fail("Updater should have thrown error");
} catch (MetaClientBadVersionException e) {}


// Test updater throws error
try {
zkMetaClient.update(key, errorUpdater);
zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE);
Assert.fail("DataUpdater should have thrown error");
} catch (RuntimeException e) {}

zkMetaClient.delete(key);

// Test updater retries update if node now exists when attempting to create it
// Reset latch and cleanup old node
latch.set(false);
Thread thread = new Thread(() -> {
zkMetaClient.update(key, latchedUpdater);
});
thread.start();
zkMetaClient.create(key, initValue);
latch.set(true);
thread.join();
zkMetaClient.delete(key);
// Test updater retries update if node does not exist on read, but then exists when updater attempts to create it
zkMetaClient.update(key, failOnFirstCreateLatchedUpdater, RETRY_ON_FAILURE);
Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 1);
zkMetaClient.delete(key);
Expand Down

0 comments on commit 11bf562

Please sign in to comment.