Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement 'StoppableCheck' Flag to Ensure Maintenance Mode is Avoided After Stoppable Instances are Shutdown #2736

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,29 @@ public class StoppableInstancesSelector {
// to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl
// loops all the types to do corresponding checks.
private final static String INSTANCE_NOT_EXIST = "HELIX:INSTANCE_NOT_EXIST";
private final static String EXCEED_MAX_OFFLINE_INSTANCES =
"HELIX:EXCEED_MAX_OFFLINE_INSTANCES";
private final String _clusterId;
private List<String> _orderOfZone;
private final String _customizedInput;
private final MaintenanceManagementService _maintenanceService;
private final ClusterTopology _clusterTopology;
private final ZKHelixDataAccessor _dataAccessor;
private final int _maxAdditionalOfflineInstances;
private final boolean _continueOnFailure;

private StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
String customizedInput, MaintenanceManagementService maintenanceService,
ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor) {
ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor,
int maxAdditionalOfflineInstances, boolean continueOnFailure) {
_clusterId = clusterId;
_orderOfZone = orderOfZone;
_customizedInput = customizedInput;
_maintenanceService = maintenanceService;
_clusterTopology = clusterTopology;
_dataAccessor = dataAccessor;
_maxAdditionalOfflineInstances = maxAdditionalOfflineInstances;
_continueOnFailure = continueOnFailure;
}

/**
Expand Down Expand Up @@ -92,7 +99,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);
failedStoppableInstances, _maxAdditionalOfflineInstances - toBeStoppedInstancesSet.size());
processNonexistentInstances(instances, failedStoppableInstances);

return result;
Expand Down Expand Up @@ -129,32 +136,53 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
if (instanceSet.isEmpty()) {
continue;
}
populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);
populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet,
stoppableInstances, failedStoppableInstances,
_maxAdditionalOfflineInstances - toBeStoppedInstancesSet.size());
}
processNonexistentInstances(instances, failedStoppableInstances);
return result;
}

private void populateStoppableInstances(List<String> instances, Set<String> toBeStoppedInstances,
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException {
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances,
int allowedOfflineInstances) throws IOException {
Map<String, StoppableCheck> instancesStoppableChecks =
_maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances,
_customizedInput, toBeStoppedInstances);

for (Map.Entry<String, StoppableCheck> instanceStoppableCheck : instancesStoppableChecks.entrySet()) {
String instance = instanceStoppableCheck.getKey();
StoppableCheck stoppableCheck = instanceStoppableCheck.getValue();
if (!stoppableCheck.isStoppable()) {
ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance);
for (String failedReason : stoppableCheck.getFailedChecks()) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
}
} else {
if (stoppableCheck.isStoppable() && allowedOfflineInstances > 0) {
stoppableInstances.add(instance);
// Update the toBeStoppedInstances set with the currently identified stoppable instance.
// This ensures that subsequent checks in other zones are aware of this instance's stoppable status.
toBeStoppedInstances.add(instance);
allowedOfflineInstances--;
continue;
}
ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance);
boolean failedHelixOwnChecks = false;
if (allowedOfflineInstances <= 0) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(EXCEED_MAX_OFFLINE_INSTANCES));
failedHelixOwnChecks = true;
}

if (!stoppableCheck.isStoppable()) {
for (String failedReason : stoppableCheck.getFailedChecks()) {
// HELIX_OWN_CHECK can always be added to the failedReasonsNode.
if (failedReason.startsWith(StoppableCheck.Category.HELIX_OWN_CHECK.getPrefix())) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
failedHelixOwnChecks = true;
continue;
}
// CUSTOM_INSTANCE_CHECK and CUSTOM_PARTITION_CHECK can only be added to the failedReasonsNode
// if continueOnFailure is true and there is no failed Helix_OWN_CHECKS.
if (_continueOnFailure && !failedHelixOwnChecks) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we should skip check when continueOnFailure is false as a perf improvement. Instead of filter it when consolidate result..?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Say our maxAllowedOffline = 3, and there are 5 instances. The problem is when we do batchGetStoppable for these 5 instances, we would have to process all 5 in parallel. However, if we process them in parallel, then there is no easy way for the individual instance to know the stop of itself will violate the maxAllowedOffline = 3 constraint unless we put a lock on everything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to handle this is to process instances by the amount of maxAllowedOffline. Say our maxAllowedOffline = 3, and there are 10 instances in the same zone. In the first iteration, we process instance1-3. If the cumulative stoppable instances count doesn't exceed maxAllowedOffline, we do the next iteration of instances4-6 and so on. But I'm worried about the performance in this design because now our API can only batchProcess maxAllowedOffline number of instances in parallel. If the instanceList is super long, our check could take many iteration to be finished.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My 2cents: first comes correctness and then comes performance optimization. i am not completely plugged into your work, but correctness is very important to focus on.

failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
}
}
}
}
}
Expand Down Expand Up @@ -282,6 +310,8 @@ public static class StoppableInstancesSelectorBuilder {
private MaintenanceManagementService _maintenanceService;
private ClusterTopology _clusterTopology;
private ZKHelixDataAccessor _dataAccessor;
private int _maxAdditionalOfflineInstances = Integer.MAX_VALUE;
private boolean _continueOnFailure;

public StoppableInstancesSelectorBuilder setClusterId(String clusterId) {
_clusterId = clusterId;
Expand Down Expand Up @@ -314,9 +344,20 @@ public StoppableInstancesSelectorBuilder setDataAccessor(ZKHelixDataAccessor dat
return this;
}

public StoppableInstancesSelectorBuilder setMaxAdditionalOfflineInstances(int maxAdditionalOfflineInstances) {
_maxAdditionalOfflineInstances = maxAdditionalOfflineInstances;
return this;
}

public StoppableInstancesSelectorBuilder setContinueOnFailure(boolean continueOnFailure) {
_continueOnFailure = continueOnFailure;
return this;
}

public StoppableInstancesSelector build() {
return new StoppableInstancesSelector(_clusterId, _orderOfZone, _customizedInput,
_maintenanceService, _clusterTopology, _dataAccessor);
_maintenanceService, _clusterTopology, _dataAccessor, _maxAdditionalOfflineInstances,
_continueOnFailure);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -158,7 +159,9 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId,
@QueryParam("continueOnFailures") boolean continueOnFailures,
@QueryParam("skipZKRead") boolean skipZKRead,
@QueryParam("skipHealthCheckCategories") String skipHealthCheckCategories,
@DefaultValue("false") @QueryParam("random") boolean random, String content) {
@DefaultValue("false") @QueryParam("random") boolean random,
@DefaultValue("false") @QueryParam("notExceedMaxOfflineInstances") boolean notExceedMaxOfflineInstances,
String content) {
Command cmd;
try {
cmd = Command.valueOf(command);
Expand Down Expand Up @@ -203,7 +206,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId,
break;
case stoppable:
return batchGetStoppableInstances(clusterId, node, skipZKRead, continueOnFailures,
skipHealthCheckCategorySet, random);
skipHealthCheckCategorySet, random, notExceedMaxOfflineInstances);
default:
_logger.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
Expand All @@ -221,7 +224,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId,

private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead,
boolean continueOnFailures, Set<StoppableCheck.Category> skipHealthCheckCategories,
boolean random) throws IOException {
boolean random, boolean notExceedingMaxOfflineInstances) throws IOException {
try {
// TODO: Process input data from the content
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
Expand All @@ -233,7 +236,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo

List<String> orderOfZone = null;
String customizedInput = null;
List<String> toBeStoppedInstances = Collections.emptyList();
List<String> toBeStoppedInstances = new ArrayList<>();
// By default, if skip_stoppable_check_list is unset, all checks are performed to maintain
// backward compatibility with existing clients.
List<HealthCheck> skipStoppableCheckList = Collections.emptyList();
Expand Down Expand Up @@ -302,16 +305,36 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
StoppableInstancesSelector stoppableInstancesSelector =
StoppableInstancesSelector.StoppableInstancesSelectorBuilder builder =
new StoppableInstancesSelector.StoppableInstancesSelectorBuilder()
.setClusterId(clusterId)
.setOrderOfZone(orderOfZone)
.setCustomizedInput(customizedInput)
.setMaintenanceService(maintenanceService)
.setClusterTopology(clusterTopology)
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
.build();
.setContinueOnFailure(continueOnFailures);

if (notExceedingMaxOfflineInstances) {
ClusterConfig clusterConfig = getConfigAccessor().getClusterConfig(clusterId);
if (clusterConfig == null) {
String message =
"Invalid cluster name: " + clusterId + ". Cluster config does not exist.";
_logger.error(message);
return badRequest(message);
}
// If maxOfflineInstancesAllowed is not set, it means there is no limit on the number of offline instances.
// Therefore, builder sets the maxOfflineInstancesAllowed to the default value, Integer.MAX_VALUE.
if (clusterConfig.getMaxOfflineInstancesAllowed() != -1) {
builder.setMaxAdditionalOfflineInstances(clusterConfig.getMaxOfflineInstancesAllowed());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep -1 and have special handling when MaxOfflineInstances <0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a even more reasonable solution is to not allow user do stoppableCheck if they didn't provide maxOfflineInstancesAllowed in their cluster config. What do you think?

}
}

StoppableInstancesSelector stoppableInstancesSelector = builder.build();
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
Set<String> invalidInstances = new HashSet<>(toBeStoppedInstances);
invalidInstances.removeAll(clusterTopology.getAllInstances());
toBeStoppedInstances.removeAll(invalidInstances);
ObjectNode result;
switch (selectionBase) {
case zone_based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5",
"instance6", "instance7", "instance8", "instance9", "instance10", "instance11",
"instance12", "instance13", "instance14");
protected static final int STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES = 3;

protected static Set<String> _clusters;
protected static String _superCluster = "superCluster";
Expand Down Expand Up @@ -620,6 +621,7 @@ private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterNa
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setFaultZoneType("helixZoneId");
clusterConfig.setPersistIntermediateAssignment(true);
clusterConfig.setMaxOfflineInstancesAllowed(STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Create instance configs
List<InstanceConfig> instanceConfigs = new ArrayList<>();
Expand Down
Loading
Loading