From 56dd5d25a4fc49f30531521910415982eb403060 Mon Sep 17 00:00:00 2001 From: Xiaxuan Gao Date: Wed, 31 Jan 2024 10:34:08 -0800 Subject: [PATCH] Implement 'StoppableCheck' Flag to Ensure Maintenance Mode is Avoided After Stoppable Instances are Shutdown (#2736) Implement 'StoppableCheck' Flag to Ensure Maintenance Mode is Avoided After Stoppable Instances are offline --- .../StoppableInstancesSelector.java | 72 +++- .../resources/helix/InstancesAccessor.java | 57 ++- .../helix/rest/server/AbstractTestClass.java | 65 ++++ .../rest/server/TestInstancesAccessor.java | 325 ++++++++++++++++++ 4 files changed, 501 insertions(+), 18 deletions(-) diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java index 877aaa9c89..e366fa12f5 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java @@ -47,22 +47,29 @@ public class StoppableInstancesSelector { // to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl // loops all the types to do corresponding checks. private final static String INSTANCE_NOT_EXIST = "HELIX:INSTANCE_NOT_EXIST"; + private final static String EXCEED_MAX_OFFLINE_INSTANCES = + "HELIX:EXCEED_MAX_OFFLINE_INSTANCES"; private final String _clusterId; private List _orderOfZone; private final String _customizedInput; private final MaintenanceManagementService _maintenanceService; private final ClusterTopology _clusterTopology; private final ZKHelixDataAccessor _dataAccessor; + private final int _maxAdditionalOfflineInstances; + private final boolean _continueOnFailure; private StoppableInstancesSelector(String clusterId, List orderOfZone, String customizedInput, MaintenanceManagementService maintenanceService, - ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor) { + ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor, + int maxAdditionalOfflineInstances, boolean continueOnFailure) { _clusterId = clusterId; _orderOfZone = orderOfZone; _customizedInput = customizedInput; _maintenanceService = maintenanceService; _clusterTopology = clusterTopology; _dataAccessor = dataAccessor; + _maxAdditionalOfflineInstances = maxAdditionalOfflineInstances; + _continueOnFailure = continueOnFailure; } /** @@ -92,7 +99,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List instances, List zoneBasedInstance = getZoneBasedInstances(instances, _clusterTopology.toZoneMapping()); populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet, stoppableInstances, - failedStoppableInstances); + failedStoppableInstances, _maxAdditionalOfflineInstances - toBeStoppedInstancesSet.size()); processNonexistentInstances(instances, failedStoppableInstances); return result; @@ -129,15 +136,17 @@ public ObjectNode getStoppableInstancesCrossZones(List instances, if (instanceSet.isEmpty()) { continue; } - populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet, stoppableInstances, - failedStoppableInstances); + populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet, + stoppableInstances, failedStoppableInstances, + _maxAdditionalOfflineInstances - toBeStoppedInstancesSet.size()); } processNonexistentInstances(instances, failedStoppableInstances); return result; } private void populateStoppableInstances(List instances, Set toBeStoppedInstances, - ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException { + ArrayNode stoppableInstances, ObjectNode failedStoppableInstances, + int allowedOfflineCount) throws IOException { Map instancesStoppableChecks = _maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances, _customizedInput, toBeStoppedInstances); @@ -145,16 +154,42 @@ private void populateStoppableInstances(List instances, Set toBe for (Map.Entry instanceStoppableCheck : instancesStoppableChecks.entrySet()) { String instance = instanceStoppableCheck.getKey(); StoppableCheck stoppableCheck = instanceStoppableCheck.getValue(); - if (!stoppableCheck.isStoppable()) { - ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance); - for (String failedReason : stoppableCheck.getFailedChecks()) { - failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason)); - } - } else { + if (stoppableCheck.isStoppable() && allowedOfflineCount > 0) { stoppableInstances.add(instance); // Update the toBeStoppedInstances set with the currently identified stoppable instance. // This ensures that subsequent checks in other zones are aware of this instance's stoppable status. toBeStoppedInstances.add(instance); + allowedOfflineCount--; + continue; + } + // TODO: If the maxOffline limit is reached, we should give previous non-stoppable instances a failed reason + // of "EXCEED_MAX_OFFLINE_INSTANCES" + ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance); + consolidateResult(stoppableCheck, failedReasonsNode, allowedOfflineCount); + } + } + + private void consolidateResult(StoppableCheck stoppableCheck, + ArrayNode failedReasonsNode, int allowedOfflineCount) { + boolean failedHelixOwnChecks = false; + if (allowedOfflineCount <= 0) { + failedReasonsNode.add(JsonNodeFactory.instance.textNode(EXCEED_MAX_OFFLINE_INSTANCES)); + failedHelixOwnChecks = true; + } + + if (!stoppableCheck.isStoppable()) { + for (String failedReason : stoppableCheck.getFailedChecks()) { + // HELIX_OWN_CHECK can always be added to the failedReasonsNode. + if (failedReason.startsWith(StoppableCheck.Category.HELIX_OWN_CHECK.getPrefix())) { + failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason)); + failedHelixOwnChecks = true; + continue; + } + // CUSTOM_INSTANCE_CHECK and CUSTOM_PARTITION_CHECK can only be added to the failedReasonsNode + // if continueOnFailure is true and there is no failed Helix_OWN_CHECKS. + if (_continueOnFailure && !failedHelixOwnChecks) { + failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason)); + } } } } @@ -282,6 +317,8 @@ public static class StoppableInstancesSelectorBuilder { private MaintenanceManagementService _maintenanceService; private ClusterTopology _clusterTopology; private ZKHelixDataAccessor _dataAccessor; + private int _maxAdditionalOfflineInstances = Integer.MAX_VALUE; + private boolean _continueOnFailure; public StoppableInstancesSelectorBuilder setClusterId(String clusterId) { _clusterId = clusterId; @@ -314,9 +351,20 @@ public StoppableInstancesSelectorBuilder setDataAccessor(ZKHelixDataAccessor dat return this; } + public StoppableInstancesSelectorBuilder setMaxAdditionalOfflineInstances(int maxAdditionalOfflineInstances) { + _maxAdditionalOfflineInstances = maxAdditionalOfflineInstances; + return this; + } + + public StoppableInstancesSelectorBuilder setContinueOnFailure(boolean continueOnFailure) { + _continueOnFailure = continueOnFailure; + return this; + } + public StoppableInstancesSelector build() { return new StoppableInstancesSelector(_clusterId, _orderOfZone, _customizedInput, - _maintenanceService, _clusterTopology, _dataAccessor); + _maintenanceService, _clusterTopology, _dataAccessor, _maxAdditionalOfflineInstances, + _continueOnFailure); } } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java index fcad387dce..a87784ce78 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -20,6 +20,7 @@ */ import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -40,6 +41,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; @@ -158,7 +160,9 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId, @QueryParam("continueOnFailures") boolean continueOnFailures, @QueryParam("skipZKRead") boolean skipZKRead, @QueryParam("skipHealthCheckCategories") String skipHealthCheckCategories, - @DefaultValue("false") @QueryParam("random") boolean random, String content) { + @DefaultValue("false") @QueryParam("random") boolean random, + @DefaultValue("false") @QueryParam("notExceedMaxOfflineInstances") boolean notExceedMaxOfflineInstances, + String content) { Command cmd; try { cmd = Command.valueOf(command); @@ -203,7 +207,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId, break; case stoppable: return batchGetStoppableInstances(clusterId, node, skipZKRead, continueOnFailures, - skipHealthCheckCategorySet, random); + skipHealthCheckCategorySet, random, notExceedMaxOfflineInstances); default: _logger.error("Unsupported command :" + command); return badRequest("Unsupported command :" + command); @@ -221,7 +225,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId, private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead, boolean continueOnFailures, Set skipHealthCheckCategories, - boolean random) throws IOException { + boolean random, boolean notExceedingMaxOfflineInstances) throws IOException { try { // TODO: Process input data from the content InstancesAccessor.InstanceHealthSelectionBase selectionBase = @@ -233,7 +237,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo List orderOfZone = null; String customizedInput = null; - List toBeStoppedInstances = Collections.emptyList(); + List toBeStoppedInstances = new ArrayList<>(); // By default, if skip_stoppable_check_list is unset, all checks are performed to maintain // backward compatibility with existing clients. List skipStoppableCheckList = Collections.emptyList(); @@ -302,7 +306,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo ClusterService clusterService = new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor()); ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId); - StoppableInstancesSelector stoppableInstancesSelector = + StoppableInstancesSelector.StoppableInstancesSelectorBuilder builder = new StoppableInstancesSelector.StoppableInstancesSelectorBuilder() .setClusterId(clusterId) .setOrderOfZone(orderOfZone) @@ -310,8 +314,49 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo .setMaintenanceService(maintenanceService) .setClusterTopology(clusterTopology) .setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId)) - .build(); + .setContinueOnFailure(continueOnFailures); + + Set currentDisabledOrOfflineInstances = Collections.emptySet(); + if (notExceedingMaxOfflineInstances) { + // If the clusterConfig or maxOfflineInstancesAllowed is not set, this is an invalid request. + ClusterConfig clusterConfig = getConfigAccessor().getClusterConfig(clusterId); + if (clusterConfig == null) { + String message = + "Invalid cluster name: " + clusterId + ". Cluster config does not exist."; + _logger.error(message); + return badRequest(message); + } + int maxOfflineAllowed = clusterConfig.getMaxOfflineInstancesAllowed(); + if (maxOfflineAllowed == -1) { + String message = + "Invalid cluster config: " + clusterId + ". maxOfflineInstancesAllowed is not set."; + _logger.error(message); + return badRequest(message); + } + + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + ConfigAccessor configAccessor = getConfigAccessor(); + List liveInstances = + dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances()); + currentDisabledOrOfflineInstances = + clusterTopology.getAllInstances().stream().filter(instance -> { + // return instances that are disabled and not live. + return !configAccessor.getInstanceConfig(clusterId, instance).getInstanceEnabled() + || !liveInstances.contains(instance); + }).collect(Collectors.toSet()); + maxOfflineAllowed = + Math.max(0, maxOfflineAllowed - currentDisabledOrOfflineInstances.size()); + builder.setMaxAdditionalOfflineInstances(maxOfflineAllowed); + } + + StoppableInstancesSelector stoppableInstancesSelector = builder.build(); stoppableInstancesSelector.calculateOrderOfZone(instances, random); + Set finalCurrentDisabledOfflineInstances = currentDisabledOrOfflineInstances; + // Since maxOfflineAllowed is set, we need to filter out the instances that are already offline. + toBeStoppedInstances = toBeStoppedInstances.stream().filter( + instance -> clusterTopology.getAllInstances().contains(instance) + && !finalCurrentDisabledOfflineInstances.contains(instance)) + .collect(Collectors.toList()); ObjectNode result; switch (selectionBase) { case zone_based: diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java index d0f0c57151..404f23d261 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java @@ -131,6 +131,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { protected static BaseDataAccessor _baseAccessorTestNS; protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster"; protected static final String STOPPABLE_CLUSTER2 = "StoppableTestCluster2"; + protected static final String STOPPABLE_CLUSTER3 = "StoppableTestCluster3"; protected static final String TASK_TEST_CLUSTER = "TaskTestCluster"; protected static final List STOPPABLE_INSTANCES = Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5"); @@ -138,6 +139,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5", "instance6", "instance7", "instance8", "instance9", "instance10", "instance11", "instance12", "instance13", "instance14"); + protected static final int STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES = 3; protected static Set _clusters; protected static String _superCluster = "superCluster"; @@ -343,6 +345,8 @@ protected void setupHelixResources() throws Exception { } preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES); preSetupForCrosszoneParallelInstancesStoppableTest(STOPPABLE_CLUSTER2, STOPPABLE_INSTANCES2); + preSetupForCrosszoneParallelInstancesStoppableTestWithOfflineInstances(STOPPABLE_CLUSTER3, + STOPPABLE_INSTANCES2); } protected Set createInstances(String cluster, int numInstances) { @@ -620,6 +624,7 @@ private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterNa ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); clusterConfig.setFaultZoneType("helixZoneId"); clusterConfig.setPersistIntermediateAssignment(true); + clusterConfig.setMaxOfflineInstancesAllowed(STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES); _configAccessor.setClusterConfig(clusterName, clusterConfig); // Create instance configs List instanceConfigs = new ArrayList<>(); @@ -671,6 +676,66 @@ private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterNa _clusters.add(clusterName); _workflowMap.put(clusterName, createWorkflows(clusterName, 3)); } + + private void preSetupForCrosszoneParallelInstancesStoppableTestWithOfflineInstances( + String clusterName, List instances) throws Exception { + _gSetupTool.addCluster(clusterName, true); + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); + clusterConfig.setFaultZoneType("helixZoneId"); + clusterConfig.setPersistIntermediateAssignment(true); + clusterConfig.setMaxOfflineInstancesAllowed(STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES); + _configAccessor.setClusterConfig(clusterName, clusterConfig); + // Create instance configs + List instanceConfigs = new ArrayList<>(); + int perZoneInstancesCount = 3; + int curZoneCount = 0, zoneId = 1; + for (int i = 0; i < instances.size(); i++) { + InstanceConfig instanceConfig = new InstanceConfig(instances.get(i)); + instanceConfig.setDomain("helixZoneId=zone" + zoneId + ",host=instance" + i); + if (++curZoneCount >= perZoneInstancesCount) { + curZoneCount = 0; + zoneId++; + } + instanceConfigs.add(instanceConfig); + } + + for (InstanceConfig instanceConfig : instanceConfigs) { + _gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig); + } + + // Start participant and make two of them offline + startInstances(clusterName, new TreeSet<>(instances), instances.size() - 2); + createResources(clusterName, 1, 2, 3); + _clusterControllerManagers.add(startController(clusterName)); + + // Make sure that cluster config exists + boolean isClusterConfigExist = TestHelper.verify(() -> { + ClusterConfig stoppableClusterConfig; + try { + stoppableClusterConfig = _configAccessor.getClusterConfig(clusterName); + } catch (Exception e) { + return false; + } + return (stoppableClusterConfig != null); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(isClusterConfigExist); + // Make sure that instance config exists for the instance0 to instance5 + for (String instance: instances) { + boolean isinstanceConfigExist = TestHelper.verify(() -> { + InstanceConfig instanceConfig; + try { + instanceConfig = _configAccessor.getInstanceConfig(clusterName, instance); + } catch (Exception e) { + return false; + } + return (instanceConfig != null); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(isinstanceConfigExist); + } + _clusters.add(clusterName); + _workflowMap.put(clusterName, createWorkflows(clusterName, 3)); + } + /** * Starts a HelixRestServer for the test suite. * @return diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java index 5cfab76a06..3e91b699e2 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java @@ -33,6 +33,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.helix.ConfigAccessor; import org.apache.helix.TestHelper; import org.apache.helix.constants.InstanceConstants; import org.apache.helix.model.ClusterConfig; @@ -81,6 +82,330 @@ public Object[][] generatePayloadCrossZoneStoppableCheckWithZoneOrder() { } @Test + public void testInstanceStoppableWithDisabledAndOfflineInstances() throws Exception { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + + String instanceName = STOPPABLE_INSTANCES2.get(STOPPABLE_INSTANCES2.size() - 1); + InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instanceName); + instanceConfig.setInstanceEnabled(false); + _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instanceName, instanceConfig); + TestHelper.verify( + () -> !_configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instanceName).getInstanceEnabled(), + 1000); + + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"], \"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", + "instance2", "instance3", "instance4", "instance5", "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(), "zone2", "zone1", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "instance14", "instance9", "invalidInstance1"); + + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK" + + "¬ExceedMaxOfflineInstances=true").format( + STOPPABLE_CLUSTER3).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + // Since the maxOfflineAllowed is 0, no node is stoppable. + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertEquals(stoppableSet.size(), 1); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER", "instance13" : "SLAVE", "instance5" : "SLAVE"}. + // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2, instance5 is not stoppable. + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance1"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES", "HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance4"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + + // restore the config + instanceConfig.setInstanceEnabled(true); + _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instanceName, instanceConfig); + TestHelper.verify( + () -> _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instanceName).getInstanceEnabled(), + 1000); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test(dependsOnMethods = "testInstanceStoppableWithDisabledAndOfflineInstances") + public void testInstancesStoppableWithOfflineInstancesInTopology() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"], \"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", + "instance2", "instance3", "instance4", "instance5", "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(), "zone2", "zone1", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "instance9", "invalidInstance1"); + + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK" + + "¬ExceedMaxOfflineInstances=true").format( + STOPPABLE_CLUSTER3).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertEquals(stoppableSet.size(), STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES - 1); + // If not setting notExceedMaxOfflineInstances=true + Assert.assertTrue(stoppableSet.contains("instance3") && stoppableSet.contains("instance5")); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + // Before: "StoppableTestCluster3_db_0_8" : {"instance12" : "MASTER","instance4" : "SLAVE", "instance9" : "SLAVE"}, + // After: "StoppableTestCluster3_db_0_8" : {"instance12" : "MASTER","instance4" : "SLAVE"}, + // Since instance9 is not live, instance4 is no longer stoppable. + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance4"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance1"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + + + content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"], \"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", + "instance2", "instance3", "instance4", "instance5", "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(), "zone2", "zone1", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "instance9", "instance0", "invalidInstance1"); + + response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK" + + "¬ExceedMaxOfflineInstances=true").format( + STOPPABLE_CLUSTER3).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertEquals(stoppableSet.size(), STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES - 2); + // If not setting notExceedMaxOfflineInstances=true + Assert.assertTrue(stoppableSet.contains("instance3")); + + nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + // Before: "StoppableTestCluster3_db_0_8" : {"instance12" : "MASTER","instance4" : "SLAVE", "instance9" : "SLAVE"}, + // After: "StoppableTestCluster3_db_0_8" : {"instance12" : "MASTER","instance4" : "SLAVE"}, + // Since instance9 is not live, instance4 is no longer stoppable. + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance4"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance1"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test(dependsOnMethods = "testInstancesStoppableWithOfflineInstancesInTopology") + public void testInstanceStoppableCrossZoneWithMaxOfflineCheckViolated() throws Exception { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(STOPPABLE_CLUSTER2); + clusterConfig.setMaxOfflineInstancesAllowed(0); + _configAccessor.setClusterConfig(STOPPABLE_CLUSTER2, clusterConfig); + TestHelper.verify(() -> { + return _configAccessor.getClusterConfig(STOPPABLE_CLUSTER2).getMaxOfflineInstancesAllowed() + == 0; + }, 1000); + + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"], \"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", + "instance2", "instance3", "instance4", "instance5", "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(), "zone2", "zone1", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "instance0", "instance6", "invalidInstance1"); + + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK" + + "¬ExceedMaxOfflineInstances=true").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + // Since the maxOfflineAllowed is 0, no node is stoppable. + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertEquals(stoppableSet.size(), 0); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER", "instance13" : "SLAVE", "instance5" : "SLAVE"}. + // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2, instance5 is not stoppable. + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES", "HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance3"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance1"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES", "HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance4"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + + // restore the config + clusterConfig.setMaxOfflineInstancesAllowed(STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES); + _configAccessor.setClusterConfig(STOPPABLE_CLUSTER2, clusterConfig); + TestHelper.verify(() -> { + return _configAccessor.getClusterConfig(STOPPABLE_CLUSTER2).getMaxOfflineInstancesAllowed() + == STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES; + }, 1000); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test(dependsOnMethods = "testInstanceStoppableCrossZoneWithMaxOfflineCheckViolated") + public void testInstanceStoppableZoneBasedWithExceedingMaxOfflineInstances() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"], \"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", + "instance2", "instance3", "instance4", "instance5", "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(), "zone2", "zone1", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "invalidInstance1"); + + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK" + + "¬ExceedMaxOfflineInstances=true").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertEquals(stoppableSet.size(), STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES); + // If not setting notExceedMaxOfflineInstances=true, instance5 will be included in tbe result. + Assert.assertTrue(stoppableSet.contains("instance4") && stoppableSet.contains("instance3") && stoppableSet.contains("instance5")); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + + + content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"], \"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", + "instance2", "instance3", "instance4", "instance5", "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(), "zone2", "zone1", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "instance0", "instance6", "invalidInstance1"); + + response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK" + + "¬ExceedMaxOfflineInstances=true").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertEquals(stoppableSet.size(), 1); + // If not setting notExceedMaxOfflineInstances=true, instance5 will be included in tbe result. + Assert.assertTrue(stoppableSet.contains("instance4")); + + nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER", "instance13" : "SLAVE", "instance5" : "SLAVE"}. + // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2, instance5 is not stoppable. + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES", "HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance3"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test(dependsOnMethods = "testInstanceStoppableZoneBasedWithExceedingMaxOfflineInstances") + public void testCrossZoneStoppableWithExceedingMaxOfflineInstances() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"], \"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", + "instance2", "instance3", "instance4", "instance5", "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(), "zone2", "zone1", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "instance0", "instance6", "invalidInstance1"); + + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK" + + "¬ExceedMaxOfflineInstances=true").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertEquals(stoppableSet.size(), 1); + // If not setting notExceedMaxOfflineInstances=true, instance5 will be included in tbe result. + Assert.assertTrue(stoppableSet.contains("instance4")); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER", "instance13" : "SLAVE", "instance5" : "SLAVE"}. + // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2, instance5 is not stoppable. + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES", "HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance3"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance1"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES", "HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + + + response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK" + + "¬ExceedMaxOfflineInstances=true&continueOnFailures=true").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertEquals(stoppableSet.size(), 1); + // If not setting notExceedMaxOfflineInstances=true, instance5 will be included in tbe result. + Assert.assertTrue(stoppableSet.contains("instance4")); + + nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER", "instance13" : "SLAVE", "instance5" : "SLAVE"}. + // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2, instance5 is not stoppable. + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES", "HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance3"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance1"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES", "HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"), + ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test(dependsOnMethods = "testCrossZoneStoppableWithExceedingMaxOfflineInstances") public void testInstanceStoppableZoneBasedWithToBeStoppedInstances() throws IOException { System.out.println("Start test :" + TestHelper.getTestMethodName());