Skip to content

Commit

Permalink
Add functionality to forcefully kill an instance (#2898)
Browse files Browse the repository at this point in the history
Add functionality to forcefully kill an instance
  • Loading branch information
GrantPSpencer authored Sep 5, 2024
1 parent d6e5315 commit 719b722
Show file tree
Hide file tree
Showing 10 changed files with 804 additions and 19 deletions.
19 changes: 19 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -844,4 +844,23 @@ default boolean isReadyForPreparingJoiningCluster(String clusterName, String ins
throw new UnsupportedOperationException(
"isReadyForPreparingJoiningCluster is not implemented.");
}

/**
* WARNING: THIS METHOD WILL FORCEFULLY DROP THE NODE FROM THE EXTERNAL VIEW WITHOUT WAITING FOR IT TO PROCESS
* ANY DOWNWARD STATE TRANSITIONS. USE WITH CAUTION AS THE NODE WILL KEEP ITS CURRENT STATES LOCALLY AND NOT DOWNWARD
* ST UNTIL IT RECONNECTS UNDER A NEW SESSION AND RESETS TO THE INITIALSTATE.
* Attempt to forcefully kill an instance in the cluster. This will remove the instance's
* LIVEINSTANCES znode, causing the controller to treat the instance as offline. The instance
* will not receive any state transitions until it rejoins the cluster. If the instance's
* current ZK session expires, then it will rejoin the cluster automatically.
*/
default boolean forceKillInstance(String clusterName, String instanceName) {
throw new UnsupportedOperationException("forceKillInstance is not implemented.");
}

default boolean forceKillInstance(String clusterName, String instanceName, String reason,
InstanceConstants.InstanceOperationSource operationSource) {
throw new UnsupportedOperationException("forceKillInstance is not implemented.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import com.google.common.collect.ImmutableMap;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
Expand All @@ -27,6 +28,7 @@
import java.net.UnknownHostException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -106,6 +108,8 @@
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -731,6 +735,33 @@ public boolean isReadyForPreparingJoiningCluster(String clusterName, String inst
return false;
}

@Override
public boolean forceKillInstance(String clusterName, String instanceName) {
return forceKillInstance(clusterName, instanceName, "Force kill instance", null);
}

@Override
public boolean forceKillInstance(String clusterName, String instanceName, String reason,
InstanceConstants.InstanceOperationSource operationSource) {
logger.info("Force kill instance {} in cluster {}.", instanceName, clusterName);

InstanceConfig.InstanceOperation instanceOperationObj = new InstanceConfig.InstanceOperation.Builder()
.setOperation(InstanceConstants.InstanceOperation.UNKNOWN).setReason(reason)
.setSource(operationSource != null ? operationSource : InstanceConstants.InstanceOperationSource.USER).build();
InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName);
instanceConfig.setInstanceOperation(instanceOperationObj);

// Set instance operation to unknown and delete live instance in one operation
List<Op> operations = Arrays.asList(
Op.setData(PropertyPathBuilder.instanceConfig(clusterName, instanceName),
_zkClient.serialize(instanceConfig.getRecord(),
PropertyPathBuilder.instanceConfig(clusterName, instanceName)), -1),
Op.delete(PropertyPathBuilder.liveInstance(clusterName, instanceName), -1));

List< OpResult> opResults = _zkClient.multi(operations);
return opResults.stream().noneMatch(result -> result instanceof OpResult.ErrorResult);
}

/**
* Return true if Instance has any current state or pending message. Otherwise, return false if instance is offline,
* instance has no active session, or if instance is online but has no current state or pending message.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@


public class TestInstanceOperation extends ZkTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
private static final Logger LOG = LoggerFactory.getLogger(TestInstanceOperation.class);
public static final int TIMEOUT = 10000;
private final int ZONE_COUNT = 4;
protected final int START_NUM_NODE = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public enum Command {
completeSwapIfPossible,
onDemandRebalance,
isEvacuateFinished,
setPartitionsToError
setPartitionsToError,
forceKillInstance
}

@Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
Expand Down Expand Up @@ -509,6 +508,13 @@ public Response updateInstance(@PathParam("clusterId") String clusterId,
return serverError(e);
}
return OK(OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("successful", evacuateFinished)));
case forceKillInstance:
boolean instanceForceKilled = admin.forceKillInstance(clusterId, instanceName, reason, instanceOperationSource);
if (!instanceForceKilled) {
return serverError("Failed to forcefully kill instance: " + instanceName +
". Possible that instance was already stopped.");
}
return OK(OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("successful", instanceForceKilled)));
default:
LOG.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ private void setupZooKeepers() {
}

protected void setupHelixResources() throws Exception {
_clusters = createClusters(5);
_clusters = createClusters(6);
_gSetupTool.addCluster(_superCluster, true);
_gSetupTool.addCluster(TASK_TEST_CLUSTER, true);
_clusters.add(_superCluster);
Expand All @@ -336,10 +336,9 @@ protected void setupHelixResources() throws Exception {
_configAccessor.setClusterConfig(cluster, clusterConfig);
createResourceConfigs(cluster, 8);
_workflowMap.put(cluster, createWorkflows(cluster, 3));
Set<String> resources = createResources(cluster, 8, MIN_ACTIVE_REPLICA, NUM_REPLICA);
createResources(cluster, 8, MIN_ACTIVE_REPLICA, NUM_REPLICA);
_instancesMap.put(cluster, instances);
_liveInstancesMap.put(cluster, liveInstances);
_resourcesMap.put(cluster, resources);
_clusterControllerManagers.add(startController(cluster));
}
preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES);
Expand All @@ -356,20 +355,60 @@ protected Set<String> createInstances(String cluster, int numInstances) {
return instances;
}

protected Set<String> createResources(String cluster, int numResources, int minActiveReplica,
protected void addParticipant(String cluster, String instanceName) {
// Create instance
_gSetupTool.addInstanceToCluster(cluster, instanceName);
_instancesMap.get(cluster).add(instanceName);

// Start participant
MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, cluster, instanceName);
Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new);
StateMachineEngine stateMachineEngine = participant.getStateMachineEngine();

stateMachineEngine.registerStateModelFactory("Task",
new TaskStateModelFactory(participant, taskFactoryReg));
participant.syncStart();
_mockParticipantManagers.add(participant);
_liveInstancesMap.get(cluster).add(instanceName);
}

protected void dropParticipant(String cluster, String instanceName) {
// find mock participant manager with instanceName and remove it from _mockParticipantManagers.
MockParticipantManager toRemoveManager = _mockParticipantManagers.stream()
.filter(manager -> manager.getInstanceName().equals(instanceName)).findFirst().orElse(null);
if (toRemoveManager != null) {
toRemoveManager.syncStop();
_mockParticipantManagers.remove(toRemoveManager);
}

InstanceConfig instanceConfig = _gSetupTool.getClusterManagementTool().getInstanceConfig(cluster, instanceName);
_gSetupTool.getClusterManagementTool().dropInstance(cluster, instanceConfig);
_instancesMap.get(cluster).remove(instanceName);
_liveInstancesMap.get(cluster).remove(instanceName);
}

protected void createResources(String cluster, int numResources, int minActiveReplica,
int replicationFactor) {
Set<String> resources = new HashSet<>();
for (int i = 0; i < numResources; i++) {
String resource = cluster + "_db_" + i;
_gSetupTool.addResourceToCluster(cluster, resource, NUM_PARTITIONS, "MasterSlave");
IdealState idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
idealState.setMinActiveReplicas(minActiveReplica);
_gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState);
_gSetupTool.rebalanceStorageCluster(cluster, resource, replicationFactor);
resources.add(resource);
addResource(cluster, resource, NUM_PARTITIONS, "MasterSlave", minActiveReplica,
replicationFactor);
}
return resources;
}

protected void addResource(String cluster, String resource, int numPartitions, String stateModel,
int minActiveReplica, int replicationFactor) {
_gSetupTool.addResourceToCluster(cluster, resource, numPartitions, stateModel);
IdealState idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
idealState.setMinActiveReplicas(minActiveReplica);
_gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState);
_gSetupTool.rebalanceStorageCluster(cluster, resource, replicationFactor);
if (!_resourcesMap.containsKey(cluster)) {
_resourcesMap.put(cluster, new HashSet<>());
}
_resourcesMap.get(cluster).add(resource);
}

protected Set<String> createResourceConfigs(String cluster, int numResources) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.testng.annotations.Test;

public class TestInstancesAccessor extends AbstractTestClass {
private final static String CLUSTER_NAME = "TestCluster_4";
private final static String CLUSTER_NAME = "TestCluster_5";

@DataProvider
public Object[][] generatePayloadCrossZoneStoppableCheckWithZoneOrder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.testng.annotations.Test;

public class TestPartitionAssignmentAPI extends AbstractTestClass {
private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
private static final Logger LOG = LoggerFactory.getLogger(TestPartitionAssignmentAPI.class);

private static final int REPLICAS = 3;
private static final int MIN_ACTIVE_REPLICAS = 2;
Expand Down
Loading

0 comments on commit 719b722

Please sign in to comment.