Skip to content

Commit

Permalink
Implement 'StoppableCheck' Flag to Ensure Maintenance Mode is Avoided…
Browse files Browse the repository at this point in the history
… After Stoppable Instances are Shutdown (#2736)

Implement 'StoppableCheck' Flag to Ensure Maintenance Mode is Avoided After Stoppable Instances are offline
  • Loading branch information
MarkGaox authored Jan 31, 2024
1 parent 5455135 commit 56dd5d2
Show file tree
Hide file tree
Showing 4 changed files with 501 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,29 @@ public class StoppableInstancesSelector {
// to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl
// loops all the types to do corresponding checks.
private final static String INSTANCE_NOT_EXIST = "HELIX:INSTANCE_NOT_EXIST";
private final static String EXCEED_MAX_OFFLINE_INSTANCES =
"HELIX:EXCEED_MAX_OFFLINE_INSTANCES";
private final String _clusterId;
private List<String> _orderOfZone;
private final String _customizedInput;
private final MaintenanceManagementService _maintenanceService;
private final ClusterTopology _clusterTopology;
private final ZKHelixDataAccessor _dataAccessor;
private final int _maxAdditionalOfflineInstances;
private final boolean _continueOnFailure;

private StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
String customizedInput, MaintenanceManagementService maintenanceService,
ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor) {
ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor,
int maxAdditionalOfflineInstances, boolean continueOnFailure) {
_clusterId = clusterId;
_orderOfZone = orderOfZone;
_customizedInput = customizedInput;
_maintenanceService = maintenanceService;
_clusterTopology = clusterTopology;
_dataAccessor = dataAccessor;
_maxAdditionalOfflineInstances = maxAdditionalOfflineInstances;
_continueOnFailure = continueOnFailure;
}

/**
Expand Down Expand Up @@ -92,7 +99,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);
failedStoppableInstances, _maxAdditionalOfflineInstances - toBeStoppedInstancesSet.size());
processNonexistentInstances(instances, failedStoppableInstances);

return result;
Expand Down Expand Up @@ -129,32 +136,60 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
if (instanceSet.isEmpty()) {
continue;
}
populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);
populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet,
stoppableInstances, failedStoppableInstances,
_maxAdditionalOfflineInstances - toBeStoppedInstancesSet.size());
}
processNonexistentInstances(instances, failedStoppableInstances);
return result;
}

private void populateStoppableInstances(List<String> instances, Set<String> toBeStoppedInstances,
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException {
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances,
int allowedOfflineCount) throws IOException {
Map<String, StoppableCheck> instancesStoppableChecks =
_maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances,
_customizedInput, toBeStoppedInstances);

for (Map.Entry<String, StoppableCheck> instanceStoppableCheck : instancesStoppableChecks.entrySet()) {
String instance = instanceStoppableCheck.getKey();
StoppableCheck stoppableCheck = instanceStoppableCheck.getValue();
if (!stoppableCheck.isStoppable()) {
ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance);
for (String failedReason : stoppableCheck.getFailedChecks()) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
}
} else {
if (stoppableCheck.isStoppable() && allowedOfflineCount > 0) {
stoppableInstances.add(instance);
// Update the toBeStoppedInstances set with the currently identified stoppable instance.
// This ensures that subsequent checks in other zones are aware of this instance's stoppable status.
toBeStoppedInstances.add(instance);
allowedOfflineCount--;
continue;
}
// TODO: If the maxOffline limit is reached, we should give previous non-stoppable instances a failed reason
// of "EXCEED_MAX_OFFLINE_INSTANCES"
ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance);
consolidateResult(stoppableCheck, failedReasonsNode, allowedOfflineCount);
}
}

private void consolidateResult(StoppableCheck stoppableCheck,
ArrayNode failedReasonsNode, int allowedOfflineCount) {
boolean failedHelixOwnChecks = false;
if (allowedOfflineCount <= 0) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(EXCEED_MAX_OFFLINE_INSTANCES));
failedHelixOwnChecks = true;
}

if (!stoppableCheck.isStoppable()) {
for (String failedReason : stoppableCheck.getFailedChecks()) {
// HELIX_OWN_CHECK can always be added to the failedReasonsNode.
if (failedReason.startsWith(StoppableCheck.Category.HELIX_OWN_CHECK.getPrefix())) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
failedHelixOwnChecks = true;
continue;
}
// CUSTOM_INSTANCE_CHECK and CUSTOM_PARTITION_CHECK can only be added to the failedReasonsNode
// if continueOnFailure is true and there is no failed Helix_OWN_CHECKS.
if (_continueOnFailure && !failedHelixOwnChecks) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
}
}
}
}
Expand Down Expand Up @@ -282,6 +317,8 @@ public static class StoppableInstancesSelectorBuilder {
private MaintenanceManagementService _maintenanceService;
private ClusterTopology _clusterTopology;
private ZKHelixDataAccessor _dataAccessor;
private int _maxAdditionalOfflineInstances = Integer.MAX_VALUE;
private boolean _continueOnFailure;

public StoppableInstancesSelectorBuilder setClusterId(String clusterId) {
_clusterId = clusterId;
Expand Down Expand Up @@ -314,9 +351,20 @@ public StoppableInstancesSelectorBuilder setDataAccessor(ZKHelixDataAccessor dat
return this;
}

public StoppableInstancesSelectorBuilder setMaxAdditionalOfflineInstances(int maxAdditionalOfflineInstances) {
_maxAdditionalOfflineInstances = maxAdditionalOfflineInstances;
return this;
}

public StoppableInstancesSelectorBuilder setContinueOnFailure(boolean continueOnFailure) {
_continueOnFailure = continueOnFailure;
return this;
}

public StoppableInstancesSelector build() {
return new StoppableInstancesSelector(_clusterId, _orderOfZone, _customizedInput,
_maintenanceService, _clusterTopology, _dataAccessor);
_maintenanceService, _clusterTopology, _dataAccessor, _maxAdditionalOfflineInstances,
_continueOnFailure);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand All @@ -40,6 +41,7 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
Expand Down Expand Up @@ -158,7 +160,9 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId,
@QueryParam("continueOnFailures") boolean continueOnFailures,
@QueryParam("skipZKRead") boolean skipZKRead,
@QueryParam("skipHealthCheckCategories") String skipHealthCheckCategories,
@DefaultValue("false") @QueryParam("random") boolean random, String content) {
@DefaultValue("false") @QueryParam("random") boolean random,
@DefaultValue("false") @QueryParam("notExceedMaxOfflineInstances") boolean notExceedMaxOfflineInstances,
String content) {
Command cmd;
try {
cmd = Command.valueOf(command);
Expand Down Expand Up @@ -203,7 +207,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId,
break;
case stoppable:
return batchGetStoppableInstances(clusterId, node, skipZKRead, continueOnFailures,
skipHealthCheckCategorySet, random);
skipHealthCheckCategorySet, random, notExceedMaxOfflineInstances);
default:
_logger.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
Expand All @@ -221,7 +225,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId,

private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead,
boolean continueOnFailures, Set<StoppableCheck.Category> skipHealthCheckCategories,
boolean random) throws IOException {
boolean random, boolean notExceedingMaxOfflineInstances) throws IOException {
try {
// TODO: Process input data from the content
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
Expand All @@ -233,7 +237,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo

List<String> orderOfZone = null;
String customizedInput = null;
List<String> toBeStoppedInstances = Collections.emptyList();
List<String> toBeStoppedInstances = new ArrayList<>();
// By default, if skip_stoppable_check_list is unset, all checks are performed to maintain
// backward compatibility with existing clients.
List<HealthCheck> skipStoppableCheckList = Collections.emptyList();
Expand Down Expand Up @@ -302,16 +306,57 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
StoppableInstancesSelector stoppableInstancesSelector =
StoppableInstancesSelector.StoppableInstancesSelectorBuilder builder =
new StoppableInstancesSelector.StoppableInstancesSelectorBuilder()
.setClusterId(clusterId)
.setOrderOfZone(orderOfZone)
.setCustomizedInput(customizedInput)
.setMaintenanceService(maintenanceService)
.setClusterTopology(clusterTopology)
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
.build();
.setContinueOnFailure(continueOnFailures);

Set<String> currentDisabledOrOfflineInstances = Collections.emptySet();
if (notExceedingMaxOfflineInstances) {
// If the clusterConfig or maxOfflineInstancesAllowed is not set, this is an invalid request.
ClusterConfig clusterConfig = getConfigAccessor().getClusterConfig(clusterId);
if (clusterConfig == null) {
String message =
"Invalid cluster name: " + clusterId + ". Cluster config does not exist.";
_logger.error(message);
return badRequest(message);
}
int maxOfflineAllowed = clusterConfig.getMaxOfflineInstancesAllowed();
if (maxOfflineAllowed == -1) {
String message =
"Invalid cluster config: " + clusterId + ". maxOfflineInstancesAllowed is not set.";
_logger.error(message);
return badRequest(message);
}

HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
ConfigAccessor configAccessor = getConfigAccessor();
List<String> liveInstances =
dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances());
currentDisabledOrOfflineInstances =
clusterTopology.getAllInstances().stream().filter(instance -> {
// return instances that are disabled and not live.
return !configAccessor.getInstanceConfig(clusterId, instance).getInstanceEnabled()
|| !liveInstances.contains(instance);
}).collect(Collectors.toSet());
maxOfflineAllowed =
Math.max(0, maxOfflineAllowed - currentDisabledOrOfflineInstances.size());
builder.setMaxAdditionalOfflineInstances(maxOfflineAllowed);
}

StoppableInstancesSelector stoppableInstancesSelector = builder.build();
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
Set<String> finalCurrentDisabledOfflineInstances = currentDisabledOrOfflineInstances;
// Since maxOfflineAllowed is set, we need to filter out the instances that are already offline.
toBeStoppedInstances = toBeStoppedInstances.stream().filter(
instance -> clusterTopology.getAllInstances().contains(instance)
&& !finalCurrentDisabledOfflineInstances.contains(instance))
.collect(Collectors.toList());
ObjectNode result;
switch (selectionBase) {
case zone_based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS;
protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster";
protected static final String STOPPABLE_CLUSTER2 = "StoppableTestCluster2";
protected static final String STOPPABLE_CLUSTER3 = "StoppableTestCluster3";
protected static final String TASK_TEST_CLUSTER = "TaskTestCluster";
protected static final List<String> STOPPABLE_INSTANCES =
Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5");
protected static final List<String> STOPPABLE_INSTANCES2 =
Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5",
"instance6", "instance7", "instance8", "instance9", "instance10", "instance11",
"instance12", "instance13", "instance14");
protected static final int STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES = 3;

protected static Set<String> _clusters;
protected static String _superCluster = "superCluster";
Expand Down Expand Up @@ -343,6 +345,8 @@ protected void setupHelixResources() throws Exception {
}
preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES);
preSetupForCrosszoneParallelInstancesStoppableTest(STOPPABLE_CLUSTER2, STOPPABLE_INSTANCES2);
preSetupForCrosszoneParallelInstancesStoppableTestWithOfflineInstances(STOPPABLE_CLUSTER3,
STOPPABLE_INSTANCES2);
}

protected Set<String> createInstances(String cluster, int numInstances) {
Expand Down Expand Up @@ -620,6 +624,7 @@ private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterNa
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setFaultZoneType("helixZoneId");
clusterConfig.setPersistIntermediateAssignment(true);
clusterConfig.setMaxOfflineInstancesAllowed(STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Create instance configs
List<InstanceConfig> instanceConfigs = new ArrayList<>();
Expand Down Expand Up @@ -671,6 +676,66 @@ private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterNa
_clusters.add(clusterName);
_workflowMap.put(clusterName, createWorkflows(clusterName, 3));
}

private void preSetupForCrosszoneParallelInstancesStoppableTestWithOfflineInstances(
String clusterName, List<String> instances) throws Exception {
_gSetupTool.addCluster(clusterName, true);
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setFaultZoneType("helixZoneId");
clusterConfig.setPersistIntermediateAssignment(true);
clusterConfig.setMaxOfflineInstancesAllowed(STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Create instance configs
List<InstanceConfig> instanceConfigs = new ArrayList<>();
int perZoneInstancesCount = 3;
int curZoneCount = 0, zoneId = 1;
for (int i = 0; i < instances.size(); i++) {
InstanceConfig instanceConfig = new InstanceConfig(instances.get(i));
instanceConfig.setDomain("helixZoneId=zone" + zoneId + ",host=instance" + i);
if (++curZoneCount >= perZoneInstancesCount) {
curZoneCount = 0;
zoneId++;
}
instanceConfigs.add(instanceConfig);
}

for (InstanceConfig instanceConfig : instanceConfigs) {
_gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig);
}

// Start participant and make two of them offline
startInstances(clusterName, new TreeSet<>(instances), instances.size() - 2);
createResources(clusterName, 1, 2, 3);
_clusterControllerManagers.add(startController(clusterName));

// Make sure that cluster config exists
boolean isClusterConfigExist = TestHelper.verify(() -> {
ClusterConfig stoppableClusterConfig;
try {
stoppableClusterConfig = _configAccessor.getClusterConfig(clusterName);
} catch (Exception e) {
return false;
}
return (stoppableClusterConfig != null);
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(isClusterConfigExist);
// Make sure that instance config exists for the instance0 to instance5
for (String instance: instances) {
boolean isinstanceConfigExist = TestHelper.verify(() -> {
InstanceConfig instanceConfig;
try {
instanceConfig = _configAccessor.getInstanceConfig(clusterName, instance);
} catch (Exception e) {
return false;
}
return (instanceConfig != null);
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(isinstanceConfigExist);
}
_clusters.add(clusterName);
_workflowMap.put(clusterName, createWorkflows(clusterName, 3));
}

/**
* Starts a HelixRestServer for the test suite.
* @return
Expand Down
Loading

0 comments on commit 56dd5d2

Please sign in to comment.