Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metaclient updater retry logic #2805

Merged
merged 4 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
Expand Down Expand Up @@ -207,16 +208,37 @@ public void set(String key, T data, int version) {

@Override
public T update(String key, DataUpdater<T> updater) {
org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
// TODO: add retry logic for ZkBadVersionException.
try {
T oldData = _zkClient.readData(key, stat);
T newData = updater.update(oldData);
set(key, newData, stat.getVersion());
return newData;
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
boolean retry;
T updatedData = null;
do {
retry = false;
try {
ImmutablePair<T, Stat> tup = getDataAndStat(key);
Stat stat = tup.right;
T oldData = tup.left;
T newData = updater.update(oldData);
set(key, newData, stat.getVersion());
updatedData = newData;
} catch (MetaClientBadVersionException badVersionException) {
// Retry on bad version
retry = true;
} catch (MetaClientNoNodeException noNodeException) {
// If node does not exist, attempt to create it - pass null to updater
T newData = updater.update(null);
if (newData != null) {
try {
create(key, newData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating the PR!
Question about the retry logic here.
For example, when user try to update node "/a/b/c", and there is no node "/a/b", set in line 228 with throw NoNodeException, and we reach line 243. Since there is no node "/a/b", create will also throw NoNodeException, and we keep retry until expire. Should we do recursive create instead? Or we just let it expire and forward the NoNodeException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good question. I am leaning towards not recursively creating, because that is the same behavior as helix zk client. However, if this API's contract with customers is "give me a node path, and I will make sure that it has the data you want" then I think there's a good argument for making our create attempt recursive rather than just on the leaf node. cc @junkaixue do you have thoughts on this?

And to clarify, this is the current flow if you tried to update path /a/b/c when no nodes in that path exist yet. (It should fail before retrying update)
try to update /a/b/c --> MetaClientNoNodeException is caught
try to create /a/b/ --> MetaClientNoNodeException this is not caught and so immediately fails
we do not retry update as method failed due to above exception

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also thinking of no creation. If this is update, it supposed to be updating something was there. It has the possibility that the node just be deleted recursively before update. Then update should be an invalidate operation and fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would second no creation as well. Say if we try to update "/a/b/c" and get no node exception, we just fail immediately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spoke with @xyuanlu offline. Added createIfAbsent parameter. Base functionality is to not create the node, but will attempt to create leaf node if it is not present. It will not recursively create and will fail on first create() call if parent path does not exist.

Added relevant test cases to cover this
cc @junkaixue

updatedData = newData;
} catch (MetaClientNodeExistsException nodeExistsException) {
// If node now exists, then retry update
retry = true;
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
}
}
} while (retry);
return updatedData;
}

//TODO: Get Expiry Time in Stat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,40 +215,93 @@ public void testSet() {
}

@Test
public void testUpdate() {
public void testUpdate() throws InterruptedException {
int testIterationCount = 2;
final String key = "/TestZkMetaClient_testUpdate";
ZkMetaClientConfig config =
new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
zkMetaClient.connect();
int initValue = 3;
zkMetaClient.create(key, initValue);
MetaClientInterface.Stat entryStat = zkMetaClient.exists(key);
Assert.assertEquals(entryStat.getVersion(), 0);
DataUpdater<Integer> updater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
return currentData != null ? currentData + 1 : initValue;
}
};

// Test updater basic success
for (int i = 0; i < testIterationCount; i++) {
Integer newData = zkMetaClient.update(key, updater);
Assert.assertEquals((int) newData, initValue + i);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), i);
}

// test update() and validate entry value and version
Integer newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
zkMetaClient.delete(key);

AtomicBoolean latch = new AtomicBoolean();
DataUpdater<Integer> noOpUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
return currentData + 1;
latch.set(true);
return currentData;
}
});
Assert.assertEquals((int) newData, (int) initValue + 1);
};

entryStat = zkMetaClient.exists(key);
Assert.assertEquals(entryStat.getVersion(), 1);
DataUpdater<Integer> latchedUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
try {
while (!latch.get()) {
Thread.sleep(200);
}
return currentData != null ? currentData + 1 : initValue;
} catch (InterruptedException e) {
return -1;
}
}
};

newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
// Test updater retries on bad version
zkMetaClient.create(key, initValue);
for (int i = 0; i < testIterationCount; i++) {
Thread thread = new Thread(() -> {
zkMetaClient.update(key, latchedUpdater);
});
thread.start();
zkMetaClient.update(key, noOpUpdater);
thread.join();
latch.set(false);
Assert.assertEquals((int) zkMetaClient.get(key), initValue + i + 1);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2 + (i*2));
}

DataUpdater<Integer> errorUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
return currentData + 1;
throw new RuntimeException("IGNORABLE: Test dataUpdater correctly throws exception");
}
});
};

entryStat = zkMetaClient.exists(key);
Assert.assertEquals(entryStat.getVersion(), 2);
Assert.assertEquals((int) newData, (int) initValue + 2);
// Test updater throws error
try {
zkMetaClient.update(key, errorUpdater);
Assert.fail("DataUpdater should have thrown error");
} catch (RuntimeException e) {}

zkMetaClient.delete(key);

// Test updater retries update if node now exists when attempting to create it
latch.set(false);
Thread thread = new Thread(() -> {
zkMetaClient.update(key, latchedUpdater);
});
thread.start();
zkMetaClient.create(key, initValue);
latch.set(true);
thread.join();
Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 1);
zkMetaClient.delete(key);
}
}
Expand Down
Loading