Skip to content

Commit

Permalink
Add recursiveDelete atomic and use for drop instance
Browse files Browse the repository at this point in the history
  • Loading branch information
GrantPSpencer committed Jan 13, 2025
1 parent 33a28e7 commit 03e1cd3
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
* under the License.
*/

import com.google.common.collect.ImmutableMap;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -274,18 +273,14 @@ public void dropInstance(String clusterName, InstanceConfig instanceConfig) {
"Node " + instanceName + " is still alive for cluster " + clusterName + ", can't drop.");
}

// delete config path
String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName);
ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
// delete instance path
dropInstancePathRecursively(instancePath, instanceConfig.getInstanceName());
dropInstancePathsRecursively(instanceName, instancePath, instanceConfigPath);
}

private void dropInstancePathRecursively(String instancePath, String instanceName) {
private void dropInstancePathsRecursively(String instanceName, String instancePath, String instanceConfigPath) {
int retryCnt = 0;
while (true) {
try {
_zkClient.deleteRecursively(instancePath);
_zkClient.deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath));
return;
} catch (ZkClientException e) {
if (retryCnt < 3 && e.getCause() instanceof ZkException && e.getCause()
Expand Down Expand Up @@ -333,11 +328,9 @@ public void purgeOfflineInstances(String clusterName, long offlineDuration) {

private void purgeInstance(String clusterName, String instanceName) {
logger.info("Purge instance {} from cluster {}.", instanceName, clusterName);

String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
_zkClient.delete(instanceConfigPath);
String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
dropInstancePathRecursively(instancePath, instanceName);
dropInstancePathsRecursively(instanceName, instancePath, instanceConfigPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
import org.apache.zookeeper.Op;

public class TestZkBaseDataAccessor extends ZkUnitTestBase {
// serialize/deserialize integer list to byte array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void testZkHelixAdmin() {
new ZkException("ZkException: failed to delete " + instancePath,
new KeeperException.NotEmptyException(
"NotEmptyException: directory" + instancePath + " is not empty"))))
.when(mockZkClient).deleteRecursively(instancePath);
.when(mockZkClient).deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath));

HelixAdmin helixAdminMock = new ZKHelixAdmin(mockZkClient);
try {
Expand Down Expand Up @@ -1342,4 +1342,29 @@ public void testEnableDisableClusterPauseMode() {
_gSetupTool.deleteCluster(clusterName);
}
}

@Test
public void testDropInstanceAtomic() {
System.out.println("Start test :" + TestHelper.getTestMethodName());
int numInstances = 5;
final String clusterName = getShortClassName();
HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
admin.addCluster(clusterName, true);
Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");

// Add instances to cluster
for (int i = 0; i < numInstances; i++) {
admin.addInstance(clusterName, new InstanceConfig("localhost_" + i));
// Create dummy message nodes
_gZkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, "localhost_" + i, ""+i));
}
Assert.assertTrue(admin.getInstancesInCluster(clusterName).size() == numInstances, "Instances should be added");

for (int i = 0; i < 5; i++) {
admin.dropInstance(clusterName, new InstanceConfig("localhost_" + i));
}
Assert.assertTrue(admin.getInstancesInCluster(clusterName).isEmpty(), "Instances should be removed");

System.out.println("End test :" + TestHelper.getTestMethodName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ String createEphemeralSequential(final String path, final Object data, final Lis

void deleteRecursively(String path);

void deleteRecursivelyAtomic(String path);
void deleteRecursivelyAtomic(List<String> paths);

boolean delete(final String path);

boolean delete(final String path, final int expectedVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,20 @@ public void deleteRecursively(String path) {
_rawZkClient.deleteRecursively(path);
}

@Override
public void deleteRecursivelyAtomic(String path) {
checkIfPathContainsShardingKey(path);
_rawZkClient.deleteRecursivelyAtomic(path);
}

@Override
public void deleteRecursivelyAtomic(List<String> paths) {
for (String path : paths) {
checkIfPathContainsShardingKey(path);
}
_rawZkClient.deleteRecursivelyAtomic(paths);
}

@Override
public boolean delete(String path) {
return delete(path, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZooDefs;
Expand Down Expand Up @@ -370,6 +371,20 @@ public void deleteRecursively(String path) {
getZkClient(path).deleteRecursively(path);
}

@Override
public void deleteRecursivelyAtomic(String path) {
getZkClient(path).deleteRecursivelyAtomic(path);
}

@Override
public void deleteRecursivelyAtomic(List<String> paths) {
// Check if all paths are in the same realm. If not, throw error as we cannot guarantee atomicity across clients.
if (paths.stream().map(this::getZkRealm).distinct().count() > 1) {
throw new IllegalArgumentException("Cannot atomically delete paths across different realms");
}
getZkClient(paths.get(0)).deleteRecursivelyAtomic(paths);
}

@Override
public boolean delete(String path) {
return delete(path, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,20 @@ public void deleteRecursively(String path) {
_innerSharedZkClient.deleteRecursively(path);
}

@Override
public void deleteRecursivelyAtomic(String path) {
checkIfPathContainsShardingKey(path);
_innerSharedZkClient.deleteRecursivelyAtomic(path);
}

@Override
public void deleteRecursivelyAtomic(List<String> paths) {
for (String path : paths) {
checkIfPathContainsShardingKey(path);
}
_innerSharedZkClient.deleteRecursivelyAtomic(paths);
}

@Override
public boolean delete(String path) {
return delete(path, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
*/

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.OptionalLong;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
Expand Down Expand Up @@ -1792,7 +1795,9 @@ public boolean deleteRecursive(String path) {
}

/**
* Delete the path as well as all its children.
* Delete the path as well as all its children. This operation is not atomic and may result in a partial deletion.
* This operation will handle concurrent deletions to the tree by another agent, but will not be able to handle
* concurrent creations.
* @param path
* @throws ZkClientException
*/
Expand Down Expand Up @@ -1820,6 +1825,74 @@ public void deleteRecursively(String path) throws ZkClientException {
}
}

/**
* Delete the path as well as all its children. This operation is atomic and will either delete all nodes or none.
* This operation may fail if another agent is concurrently creating or deleting nodes under the path.
* @param path ZK path to delete
*/
public void deleteRecursivelyAtomic(String path) {
List<Op> ops = getOpsForRecursiveDelete(path);
try {
multi(ops);
}
catch (Exception e) {
LOG.error("zkclient {}, Failed to delete {}, exception {}", _uid, path, e);
throw new ZkClientException("Failed to delete " + path, e);
}
}

/**
* Delete all provided paths as well as all their children. This operation is atomic and will either delete all nodes
* or none. This operation may fail if another agent is concurrently creating or deleting nodes under any of the paths
* @param paths ZK paths to delete
*/
public void deleteRecursivelyAtomic(List<String> paths) {
List<Op> ops = new ArrayList<>();
for (String path : paths) {
ops.addAll(getOpsForRecursiveDelete(path));
}
try {
multi(ops);
}
catch (Exception e) {
LOG.error("zkclient {}, Failed to delete paths {}, exception {}", _uid, paths, e);
throw new ZkClientException("Failed to delete paths " + paths, e);
}
}

/**
* Get the list of operations to delete the given root and all its children. Performs simple BFS to put delete
* operations for leaf nodes first before parent nodes.
* @param root the root node to delete
* @return the list of ZK operations to delete the given root and all its children
*/
private List<Op> getOpsForRecursiveDelete(String root) {
List<Op> ops = new ArrayList<>();
// Return early if the root does not exist
if (!exists(root)) {
return ops;
}

HashSet<String> visited = new HashSet<>();
Stack<String> nodes = new Stack<>();
nodes.push(root);

while (!nodes.isEmpty()) {
String node = nodes.peek();
List<String> children = getChildren(node, false);
if (children.isEmpty() || visited.contains(node)) {
nodes.pop();
ops.add(Op.delete(node, -1));
} else {
for (String child : children) {
nodes.push(node + "/" + child);
}
}
visited.add(node);
}
return ops;
}

private void processDataOrChildChange(WatchedEvent event, long notificationTime) {
final String path = event.getPath();
final boolean pathExists = event.getType() != EventType.NodeDeleted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
Expand Down Expand Up @@ -1231,4 +1233,53 @@ public void testInvalidWriteSizeLimitConfig() {
}
}
}

@Test
void testDeleteRecursivelyAtomic() {
System.out.println("Start test: " + TestHelper.getTestMethodName());
String grandParent = "/testDeleteRecursively";
String parent = grandParent + "/parent";
String child1 = parent + "/child1";
String child2 = parent + "/child2";
_zkClient.createPersistent(grandParent);
_zkClient.createPersistent(parent);
_zkClient.createPersistent(child1);
_zkClient.createPersistent(child2);
Assert.assertTrue(_zkClient.exists(grandParent));
Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());

// Test calling delete on same path twice
try {
_zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, grandParent));
Assert.fail("Operation should not succeed when attempting to delete same path twice");
} catch (ZkClientException expected) {
// Caught expected exception
}

Assert.assertTrue(_zkClient.exists(grandParent));
Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());

// Test calling delete on path that is child of another path in the list
try {
_zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, parent));
Assert.fail("Operation should not succeed when attempting to delete same path twice");
} catch (ZkClientException expected) {
// Caught expected exception
}

Assert.assertTrue(_zkClient.exists(grandParent));
Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());

// Test successfully delete multiple paths. Also that operation succeeds when attempting to delete non-existent path
String newNode = "/newNode";
_zkClient.createPersistent(newNode);
Assert.assertTrue(_zkClient.exists(newNode));

String nonexistentPath = grandParent + "/nonexistent";
Assert.assertFalse(_zkClient.exists(nonexistentPath));

_zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, newNode, nonexistentPath));
Assert.assertFalse(_zkClient.exists(grandParent));
Assert.assertFalse(_zkClient.exists(newNode));
}
}

0 comments on commit 03e1cd3

Please sign in to comment.