Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement 'StoppableCheck' Flag to Ensure Maintenance Mode is Avoided After Stoppable Instances are Shutdown #2736

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,29 @@ public class StoppableInstancesSelector {
// to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl
// loops all the types to do corresponding checks.
private final static String INSTANCE_NOT_EXIST = "HELIX:INSTANCE_NOT_EXIST";
private final static String EXCEED_MAX_OFFLINE_INSTANCES =
"HELIX:EXCEED_MAX_OFFLINE_INSTANCES";
private final String _clusterId;
private List<String> _orderOfZone;
private final String _customizedInput;
private final MaintenanceManagementService _maintenanceService;
private final ClusterTopology _clusterTopology;
private final ZKHelixDataAccessor _dataAccessor;
private final int _maxAdditionalOfflineInstances;
private final boolean _continueOnFailure;

private StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
String customizedInput, MaintenanceManagementService maintenanceService,
ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor) {
ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor,
int maxAdditionalOfflineInstances, boolean continueOnFailure) {
_clusterId = clusterId;
_orderOfZone = orderOfZone;
_customizedInput = customizedInput;
_maintenanceService = maintenanceService;
_clusterTopology = clusterTopology;
_dataAccessor = dataAccessor;
_maxAdditionalOfflineInstances = maxAdditionalOfflineInstances;
_continueOnFailure = continueOnFailure;
}

/**
Expand Down Expand Up @@ -92,7 +99,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);
failedStoppableInstances, _maxAdditionalOfflineInstances - toBeStoppedInstancesSet.size());
processNonexistentInstances(instances, failedStoppableInstances);

return result;
Expand Down Expand Up @@ -129,32 +136,60 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
if (instanceSet.isEmpty()) {
continue;
}
populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);
populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet,
stoppableInstances, failedStoppableInstances,
_maxAdditionalOfflineInstances - toBeStoppedInstancesSet.size());
}
processNonexistentInstances(instances, failedStoppableInstances);
return result;
}

private void populateStoppableInstances(List<String> instances, Set<String> toBeStoppedInstances,
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException {
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances,
int allowedOfflineCount) throws IOException {
Map<String, StoppableCheck> instancesStoppableChecks =
_maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances,
_customizedInput, toBeStoppedInstances);

for (Map.Entry<String, StoppableCheck> instanceStoppableCheck : instancesStoppableChecks.entrySet()) {
String instance = instanceStoppableCheck.getKey();
StoppableCheck stoppableCheck = instanceStoppableCheck.getValue();
if (!stoppableCheck.isStoppable()) {
ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance);
for (String failedReason : stoppableCheck.getFailedChecks()) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
}
} else {
if (stoppableCheck.isStoppable() && allowedOfflineCount > 0) {
stoppableInstances.add(instance);
// Update the toBeStoppedInstances set with the currently identified stoppable instance.
// This ensures that subsequent checks in other zones are aware of this instance's stoppable status.
toBeStoppedInstances.add(instance);
allowedOfflineCount--;
continue;
}
// TODO: If the maxOffline limit is reached, we should give previous non-stoppable instances a failed reason
// of "EXCEED_MAX_OFFLINE_INSTANCES"
ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance);
consolidateResult(stoppableCheck, failedReasonsNode, allowedOfflineCount);
}
}

private void consolidateResult(StoppableCheck stoppableCheck,
ArrayNode failedReasonsNode, int allowedOfflineCount) {
boolean failedHelixOwnChecks = false;
if (allowedOfflineCount <= 0) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(EXCEED_MAX_OFFLINE_INSTANCES));
failedHelixOwnChecks = true;
}

if (!stoppableCheck.isStoppable()) {
for (String failedReason : stoppableCheck.getFailedChecks()) {
// HELIX_OWN_CHECK can always be added to the failedReasonsNode.
if (failedReason.startsWith(StoppableCheck.Category.HELIX_OWN_CHECK.getPrefix())) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
failedHelixOwnChecks = true;
continue;
}
// CUSTOM_INSTANCE_CHECK and CUSTOM_PARTITION_CHECK can only be added to the failedReasonsNode
// if continueOnFailure is true and there is no failed Helix_OWN_CHECKS.
if (_continueOnFailure && !failedHelixOwnChecks) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
}
}
}
}
Expand Down Expand Up @@ -282,6 +317,8 @@ public static class StoppableInstancesSelectorBuilder {
private MaintenanceManagementService _maintenanceService;
private ClusterTopology _clusterTopology;
private ZKHelixDataAccessor _dataAccessor;
private int _maxAdditionalOfflineInstances = Integer.MAX_VALUE;
private boolean _continueOnFailure;

public StoppableInstancesSelectorBuilder setClusterId(String clusterId) {
_clusterId = clusterId;
Expand Down Expand Up @@ -314,9 +351,20 @@ public StoppableInstancesSelectorBuilder setDataAccessor(ZKHelixDataAccessor dat
return this;
}

public StoppableInstancesSelectorBuilder setMaxAdditionalOfflineInstances(int maxAdditionalOfflineInstances) {
_maxAdditionalOfflineInstances = maxAdditionalOfflineInstances;
return this;
}

public StoppableInstancesSelectorBuilder setContinueOnFailure(boolean continueOnFailure) {
_continueOnFailure = continueOnFailure;
return this;
}

public StoppableInstancesSelector build() {
return new StoppableInstancesSelector(_clusterId, _orderOfZone, _customizedInput,
_maintenanceService, _clusterTopology, _dataAccessor);
_maintenanceService, _clusterTopology, _dataAccessor, _maxAdditionalOfflineInstances,
_continueOnFailure);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand All @@ -40,6 +41,7 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
Expand Down Expand Up @@ -158,7 +160,9 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId,
@QueryParam("continueOnFailures") boolean continueOnFailures,
@QueryParam("skipZKRead") boolean skipZKRead,
@QueryParam("skipHealthCheckCategories") String skipHealthCheckCategories,
@DefaultValue("false") @QueryParam("random") boolean random, String content) {
@DefaultValue("false") @QueryParam("random") boolean random,
@DefaultValue("false") @QueryParam("notExceedMaxOfflineInstances") boolean notExceedMaxOfflineInstances,
String content) {
Command cmd;
try {
cmd = Command.valueOf(command);
Expand Down Expand Up @@ -203,7 +207,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId,
break;
case stoppable:
return batchGetStoppableInstances(clusterId, node, skipZKRead, continueOnFailures,
skipHealthCheckCategorySet, random);
skipHealthCheckCategorySet, random, notExceedMaxOfflineInstances);
default:
_logger.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
Expand All @@ -221,7 +225,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId,

private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead,
boolean continueOnFailures, Set<StoppableCheck.Category> skipHealthCheckCategories,
boolean random) throws IOException {
boolean random, boolean notExceedingMaxOfflineInstances) throws IOException {
try {
// TODO: Process input data from the content
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
Expand All @@ -233,7 +237,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo

List<String> orderOfZone = null;
String customizedInput = null;
List<String> toBeStoppedInstances = Collections.emptyList();
List<String> toBeStoppedInstances = new ArrayList<>();
// By default, if skip_stoppable_check_list is unset, all checks are performed to maintain
// backward compatibility with existing clients.
List<HealthCheck> skipStoppableCheckList = Collections.emptyList();
Expand Down Expand Up @@ -302,16 +306,57 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
StoppableInstancesSelector stoppableInstancesSelector =
StoppableInstancesSelector.StoppableInstancesSelectorBuilder builder =
new StoppableInstancesSelector.StoppableInstancesSelectorBuilder()
.setClusterId(clusterId)
.setOrderOfZone(orderOfZone)
.setCustomizedInput(customizedInput)
.setMaintenanceService(maintenanceService)
.setClusterTopology(clusterTopology)
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
.build();
.setContinueOnFailure(continueOnFailures);

Set<String> currentDisabledOrOfflineInstances = Collections.emptySet();
if (notExceedingMaxOfflineInstances) {
// If the clusterConfig or maxOfflineInstancesAllowed is not set, this is an invalid request.
ClusterConfig clusterConfig = getConfigAccessor().getClusterConfig(clusterId);
if (clusterConfig == null) {
String message =
"Invalid cluster name: " + clusterId + ". Cluster config does not exist.";
_logger.error(message);
return badRequest(message);
}
int maxOfflineAllowed = clusterConfig.getMaxOfflineInstancesAllowed();
if (maxOfflineAllowed == -1) {
String message =
"Invalid cluster config: " + clusterId + ". maxOfflineInstancesAllowed is not set.";
_logger.error(message);
return badRequest(message);
}

HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
ConfigAccessor configAccessor = getConfigAccessor();
List<String> liveInstances =
dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances());
currentDisabledOrOfflineInstances =
clusterTopology.getAllInstances().stream().filter(instance -> {
// return instances that are disabled and not live.
return !configAccessor.getInstanceConfig(clusterId, instance).getInstanceEnabled()
|| !liveInstances.contains(instance);
}).collect(Collectors.toSet());
maxOfflineAllowed =
Math.max(0, maxOfflineAllowed - currentDisabledOrOfflineInstances.size());
builder.setMaxAdditionalOfflineInstances(maxOfflineAllowed);
}

StoppableInstancesSelector stoppableInstancesSelector = builder.build();
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
Set<String> finalCurrentDisabledOfflineInstances = currentDisabledOrOfflineInstances;
// Since maxOfflineAllowed is set, we need to filter out the instances that are already offline.
toBeStoppedInstances = toBeStoppedInstances.stream().filter(
instance -> clusterTopology.getAllInstances().contains(instance)
&& !finalCurrentDisabledOfflineInstances.contains(instance))
.collect(Collectors.toList());
ObjectNode result;
switch (selectionBase) {
case zone_based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS;
protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster";
protected static final String STOPPABLE_CLUSTER2 = "StoppableTestCluster2";
protected static final String STOPPABLE_CLUSTER3 = "StoppableTestCluster3";
protected static final String TASK_TEST_CLUSTER = "TaskTestCluster";
protected static final List<String> STOPPABLE_INSTANCES =
Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5");
protected static final List<String> STOPPABLE_INSTANCES2 =
Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5",
"instance6", "instance7", "instance8", "instance9", "instance10", "instance11",
"instance12", "instance13", "instance14");
protected static final int STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES = 3;

protected static Set<String> _clusters;
protected static String _superCluster = "superCluster";
Expand Down Expand Up @@ -343,6 +345,8 @@ protected void setupHelixResources() throws Exception {
}
preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES);
preSetupForCrosszoneParallelInstancesStoppableTest(STOPPABLE_CLUSTER2, STOPPABLE_INSTANCES2);
preSetupForCrosszoneParallelInstancesStoppableTestWithOfflineInstances(STOPPABLE_CLUSTER3,
STOPPABLE_INSTANCES2);
}

protected Set<String> createInstances(String cluster, int numInstances) {
Expand Down Expand Up @@ -620,6 +624,7 @@ private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterNa
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setFaultZoneType("helixZoneId");
clusterConfig.setPersistIntermediateAssignment(true);
clusterConfig.setMaxOfflineInstancesAllowed(STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Create instance configs
List<InstanceConfig> instanceConfigs = new ArrayList<>();
Expand Down Expand Up @@ -671,6 +676,66 @@ private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterNa
_clusters.add(clusterName);
_workflowMap.put(clusterName, createWorkflows(clusterName, 3));
}

private void preSetupForCrosszoneParallelInstancesStoppableTestWithOfflineInstances(
String clusterName, List<String> instances) throws Exception {
_gSetupTool.addCluster(clusterName, true);
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setFaultZoneType("helixZoneId");
clusterConfig.setPersistIntermediateAssignment(true);
clusterConfig.setMaxOfflineInstancesAllowed(STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Create instance configs
List<InstanceConfig> instanceConfigs = new ArrayList<>();
int perZoneInstancesCount = 3;
int curZoneCount = 0, zoneId = 1;
for (int i = 0; i < instances.size(); i++) {
InstanceConfig instanceConfig = new InstanceConfig(instances.get(i));
instanceConfig.setDomain("helixZoneId=zone" + zoneId + ",host=instance" + i);
if (++curZoneCount >= perZoneInstancesCount) {
curZoneCount = 0;
zoneId++;
}
instanceConfigs.add(instanceConfig);
}

for (InstanceConfig instanceConfig : instanceConfigs) {
_gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig);
}

// Start participant and make two of them offline
startInstances(clusterName, new TreeSet<>(instances), instances.size() - 2);
createResources(clusterName, 1, 2, 3);
_clusterControllerManagers.add(startController(clusterName));

// Make sure that cluster config exists
boolean isClusterConfigExist = TestHelper.verify(() -> {
ClusterConfig stoppableClusterConfig;
try {
stoppableClusterConfig = _configAccessor.getClusterConfig(clusterName);
} catch (Exception e) {
return false;
}
return (stoppableClusterConfig != null);
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(isClusterConfigExist);
// Make sure that instance config exists for the instance0 to instance5
for (String instance: instances) {
boolean isinstanceConfigExist = TestHelper.verify(() -> {
InstanceConfig instanceConfig;
try {
instanceConfig = _configAccessor.getInstanceConfig(clusterName, instance);
} catch (Exception e) {
return false;
}
return (instanceConfig != null);
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(isinstanceConfigExist);
}
_clusters.add(clusterName);
_workflowMap.put(clusterName, createWorkflows(clusterName, 3));
}

/**
* Starts a HelixRestServer for the test suite.
* @return
Expand Down
Loading
Loading