Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Stoppable Check for Non-Topology-Aware Clusters #2961

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,40 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
return result;
}

/**
* Evaluates and collects stoppable instances without respecting the zone order.
* The method iterates through instances, performing stoppable checks, and records reasons for
* non-stoppability.
*
* @param instances A list of instance to be evaluated.
* @param toBeStoppedInstances A list of instances presumed to be already stopped
* @return An ObjectNode containing:
* - 'stoppableNode': List of instances that can be stopped.
* - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and
* a list of reasons for non-stoppability as the value.
* @throws IOException
*/
public ObjectNode getStoppableInstancesWithoutTopology(List<String> instances,
List<String> toBeStoppedInstances) throws IOException {
ObjectNode result = JsonNodeFactory.instance.objectNode();
ArrayNode stoppableInstances =
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
collectEvacuatingInstances(toBeStoppedInstancesSet);

// Because zone order calculation is omitted, we must verify each instance's existence
// to ensure we only process valid instances before performing stoppable check.
Set<String> nonExistingInstances = processNonexistentInstances(instances, failedStoppableInstances);
List<String> instancesToCheck = new ArrayList<>(instances);
instancesToCheck.removeAll(nonExistingInstances);
populateStoppableInstances(instancesToCheck, toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);

return result;
}

private void populateStoppableInstances(List<String> instances, Set<String> toBeStoppedInstances,
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException {
Map<String, StoppableCheck> instancesStoppableChecks =
Expand All @@ -159,7 +193,7 @@ private void populateStoppableInstances(List<String> instances, Set<String> toBe
}
}

private void processNonexistentInstances(List<String> instances, ObjectNode failedStoppableInstances) {
private Set<String> processNonexistentInstances(List<String> instances, ObjectNode failedStoppableInstances) {
// Adding following logic to check whether instances exist or not. An instance exist could be
// checking following scenario:
// 1. Instance got dropped. (InstanceConfig is gone.)
Expand All @@ -174,6 +208,7 @@ private void processNonexistentInstances(List<String> instances, ObjectNode fail
ArrayNode failedReasonsNode = failedStoppableInstances.putArray(nonSelectedInstance);
failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST));
}
return nonSelectedInstances;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public enum InstancesProperties {
}

public enum InstanceHealthSelectionBase {
instance_based,
non_topo_based,
zone_based,
cross_zone_based
}
Expand Down Expand Up @@ -230,6 +230,9 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
List<String> instances = OBJECT_MAPPER.readValue(
node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);

List<String> orderOfZone = null;
String customizedInput = null;
Expand All @@ -252,6 +255,12 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
_logger.error(message);
return badRequest(message);
}
if (!orderOfZone.isEmpty() && selectionBase == InstanceHealthSelectionBase.non_topo_based) {
String message =
"'zone_order' is set but 'selection_base' is 'non_topo_based'. Please set 'selection_base' to 'zone_based' or 'cross_zone_based'.";
_logger.error(message);
return badRequest(message);
}
}

if (node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name()) != null) {
Expand Down Expand Up @@ -285,6 +294,14 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
}
}

if (selectionBase != InstanceHealthSelectionBase.non_topo_based && clusterTopology.getZones()
.isEmpty()) {
String message = "Cluster " + clusterId
+ " does not have any zone information. Please set zone information in cluster config.";
_logger.error(message);
return badRequest(message);
}

String namespace = getNamespace();
MaintenanceManagementService maintenanceService =
new MaintenanceManagementService.MaintenanceManagementServiceBuilder()
Expand All @@ -299,9 +316,6 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
.setSkipStoppableHealthCheckList(skipStoppableCheckList)
.build();

ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
StoppableInstancesSelector stoppableInstancesSelector =
new StoppableInstancesSelector.StoppableInstancesSelectorBuilder()
.setClusterId(clusterId)
Expand All @@ -311,18 +325,20 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
.setClusterTopology(clusterTopology)
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
.build();
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
ObjectNode result;
// TODO: Add support for clusters that do not have topology set up.
// Issue #2893: https://github.com/apache/helix/issues/2893

switch (selectionBase) {
case zone_based:
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances);
break;
case cross_zone_based:
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
result = stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, toBeStoppedInstances);
break;
case instance_based:
case non_topo_based:
result = stoppableInstancesSelector.getStoppableInstancesWithoutTopology(instances, toBeStoppedInstances);
break;
default:
throw new UnsupportedOperationException("instance_based selection is not supported yet!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS;
protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster";
protected static final String STOPPABLE_CLUSTER2 = "StoppableTestCluster2";
protected static final String STOPPABLE_CLUSTER3 = "StoppableTestCluster3";
protected static final String TASK_TEST_CLUSTER = "TaskTestCluster";
protected static final List<String> STOPPABLE_INSTANCES =
Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5");
Expand Down Expand Up @@ -343,6 +344,7 @@ protected void setupHelixResources() throws Exception {
}
preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES);
preSetupForCrosszoneParallelInstancesStoppableTest(STOPPABLE_CLUSTER2, STOPPABLE_INSTANCES2);
preSetupForNonTopoAwareInstancesStoppableTest(STOPPABLE_CLUSTER3, STOPPABLE_INSTANCES2);
}

protected Set<String> createInstances(String cluster, int numInstances) {
Expand Down Expand Up @@ -711,6 +713,62 @@ private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterNa
_clusters.add(clusterName);
_workflowMap.put(clusterName, createWorkflows(clusterName, 3));
}

private void preSetupForNonTopoAwareInstancesStoppableTest(String clusterName,
List<String> instances) throws Exception {
_gSetupTool.addCluster(clusterName, true);
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setFaultZoneType("helixZoneId");
clusterConfig.setPersistIntermediateAssignment(true);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Create instance configs that do not include the domain field
List<InstanceConfig> instanceConfigs = new ArrayList<>();
int perZoneInstancesCount = 3;
int curZoneCount = 0;
for (int i = 0; i < instances.size(); i++) {
InstanceConfig instanceConfig = new InstanceConfig(instances.get(i));
if (++curZoneCount >= perZoneInstancesCount) {
curZoneCount = 0;
}
instanceConfigs.add(instanceConfig);
}

for (InstanceConfig instanceConfig : instanceConfigs) {
_gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig);
}

// Start participant
startInstances(clusterName, new TreeSet<>(instances), instances.size());
createResources(clusterName, 1, 2, 3);
_clusterControllerManagers.add(startController(clusterName));

// Make sure that cluster config exists
boolean isClusterConfigExist = TestHelper.verify(() -> {
ClusterConfig stoppableClusterConfig;
try {
stoppableClusterConfig = _configAccessor.getClusterConfig(clusterName);
} catch (Exception e) {
return false;
}
return (stoppableClusterConfig != null);
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(isClusterConfigExist);
// Make sure that instance config exists for the instance0 to instance5
for (String instance: instances) {
boolean isinstanceConfigExist = TestHelper.verify(() -> {
InstanceConfig instanceConfig;
try {
instanceConfig = _configAccessor.getInstanceConfig(clusterName, instance);
} catch (Exception e) {
return false;
}
return (instanceConfig != null);
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(isinstanceConfigExist);
}
_clusters.add(clusterName);
_workflowMap.put(clusterName, createWorkflows(clusterName, 3));
}
/**
* Starts a HelixRestServer for the test suite.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -633,6 +634,94 @@ public void testSkipClusterLevelHealthCheck() throws IOException {
System.out.println("End test :" + TestHelper.getTestMethodName());
}

@Test(dependsOnMethods = "testSkipClusterLevelHealthCheck")
public void testNonTopoAwareStoppableCheck() throws JsonProcessingException {
System.out.println("Start test :" + TestHelper.getTestMethodName());

// STOPPABLE_CLUSTER3 is a cluster is non topology aware cluster
String content = String.format(
"{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}",
InstancesAccessor.InstancesProperties.selection_base.name(),
InstancesAccessor.InstanceHealthSelectionBase.non_topo_based.name(),
InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3",
"instance6", "instance9", "instance10", "instance11", "instance12", "instance13",
"instance14", "invalidInstance",
InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(), "INSTANCE_NOT_ENABLED", "INSTANCE_NOT_STABLE");

// Change instance config of instance1 & instance0 to be evacuating
String instance0 = "instance0";
InstanceConfig instanceConfig =
_configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instance0);
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance0, instanceConfig);
String instance1 = "instance1";
InstanceConfig instanceConfig1 =
_configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instance1);
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1, instanceConfig1);

// It takes time to reflect the changes.
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER3).setZkAddr(ZK_ADDR).build();
Assert.assertTrue(verifier.verifyByPolling());

Response response = new JerseyUriRequestBuilder(
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
STOPPABLE_CLUSTER3).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class));

Set<String> stoppableSet = getStringSet(jsonNode,
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
Assert.assertTrue(stoppableSet.contains("instance12") && stoppableSet.contains("instance3")
&& stoppableSet.contains("instance10"));

JsonNode nonStoppableInstances = jsonNode.get(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"),
ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
Assert.assertEquals(getStringSet(nonStoppableInstances, "instance14"),
ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance0, instanceConfig);
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1, instanceConfig1);

System.out.println("End test :" + TestHelper.getTestMethodName());
}

@Test(dependsOnMethods = "testSkipClusterLevelHealthCheck")
public void testNonTopoAwareStoppableCheckWithException() throws JsonProcessingException {
System.out.println("Start test :" + TestHelper.getTestMethodName());

// STOPPABLE_CLUSTER3 is a cluster is non topology aware cluster
String content = String.format(
"{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}",
InstancesAccessor.InstancesProperties.selection_base.name(),
InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3",
"instance6", "instance9", "instance10", "instance11", "instance12", "instance13",
"instance14", "invalidInstance",
InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(), "INSTANCE_NOT_ENABLED", "INSTANCE_NOT_STABLE");

// It takes time to reflect the changes.
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER3).setZkAddr(ZK_ADDR).build();
Assert.assertTrue(verifier.verifyByPolling());

// Making the REST Call to cross zone stoppable check while the cluster has no topology aware
// setup. The call should return an error.
Response response = new JerseyUriRequestBuilder(
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
STOPPABLE_CLUSTER3)
.isBodyReturnExpected(true)
.expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode())
.post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));

System.out.println("End test :" + TestHelper.getTestMethodName());
}

private Set<String> getStringSet(JsonNode jsonNode, String key) {
Set<String> result = new HashSet<>();
jsonNode.withArray(key).forEach(s -> result.add(s.textValue()));
Expand Down
Loading