Skip to content

Commit

Permalink
Exclude Custom Checks When Instance Is Not Alive (#2993)
Browse files Browse the repository at this point in the history
Exclude Custom Checks When Instance Is Not Alive
  • Loading branch information
MarkGaox authored Jan 21, 2025
1 parent 0f95ef3 commit e6adf5b
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.google.common.collect.ImmutableSet;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
Expand Down Expand Up @@ -98,6 +99,8 @@ public class MaintenanceManagementService {
// maintain the backward compatibility with users who don't use MaintenanceManagementServiceBuilder
// to create the MaintenanceManagementService object.
private List<HealthCheck> _skipStoppableHealthCheckList = Collections.emptyList();
// default value false to maintain backward compatibility
private boolean _skipCustomChecksIfNoLiveness = false;

public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor, boolean skipZKRead, String namespace) {
Expand Down Expand Up @@ -152,7 +155,7 @@ public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
private MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor, CustomRestClient customRestClient, boolean skipZKRead,
Set<String> nonBlockingHealthChecks, Set<StoppableCheck.Category> skipHealthCheckCategories,
List<HealthCheck> skipStoppableHealthCheckList, String namespace) {
List<HealthCheck> skipStoppableHealthCheckList, String namespace, boolean skipCustomChecksIfNoLiveness) {
_dataAccessor =
new HelixDataAccessorWrapper(dataAccessor, customRestClient,
namespace);
Expand All @@ -166,6 +169,7 @@ private MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
_skipStoppableHealthCheckList = skipStoppableHealthCheckList == null ? Collections.emptyList()
: skipStoppableHealthCheckList;
_namespace = namespace;
_skipCustomChecksIfNoLiveness = skipCustomChecksIfNoLiveness;
}

/**
Expand Down Expand Up @@ -502,15 +506,20 @@ private List<String> batchCustomInstanceStoppableCheck(String clusterId, List<St
return instances;
}

// Skip performing a custom check on any dead instance if the user set _skipCustomCheckIfInstanceNotAlive
// to true.
List<String> instanceIdsForCustomCheck = filterOutDeadInstancesIfNeeded(instances);

// If the config has exactUrl and the CLUSTER level customer check is not skipped, we will
// perform the custom check at cluster level.
if (restConfig.getCompleteConfiguredHealthUrl().isPresent()) {
if (_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK)) {
if (_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK)
|| instanceIdsForCustomCheck.isEmpty()) {
return instances;
}

Map<String, StoppableCheck> clusterLevelCustomCheckResult =
performAggregatedCustomCheck(clusterId, instances,
performAggregatedCustomCheck(clusterId, instanceIdsForCustomCheck,
restConfig.getCompleteConfiguredHealthUrl().get(), customPayLoads,
toBeStoppedInstances);
List<String> instancesForNextCheck = new ArrayList<>();
Expand All @@ -526,7 +535,7 @@ private List<String> batchCustomInstanceStoppableCheck(String clusterId, List<St

// Reaching here means the rest config requires instances/partition level checks. We will
// perform the custom check at instance/partition level if they are not skipped.
List<String> instancesForCustomPartitionLevelChecks = instances;
List<String> instancesForCustomPartitionLevelChecks = instanceIdsForCustomCheck;
if (!_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)) {
Map<String, Future<StoppableCheck>> customInstanceLevelChecks = instances.stream().collect(
Collectors.toMap(Function.identity(), instance -> POOL.submit(
Expand Down Expand Up @@ -560,6 +569,42 @@ private List<String> batchCustomInstanceStoppableCheck(String clusterId, List<St
return instancesForCustomPartitionLevelChecks;
}

/**
* Helper Methods
* <p>
* If users set skipCustomCheckIfInstanceNotAlive to true, filter out dead instances
* to avoid running custom checks on them.
*
* @param instanceIds the list of instances
* @return either the original list or a filtered list of only live instances
*/
private List<String> filterOutDeadInstancesIfNeeded(List<String> instanceIds) {
if (!_skipCustomChecksIfNoLiveness) {
// We are not skipping the not-alive check, so just return all instances.
return instanceIds;
}

// Retrieve the set of currently live instances
PropertyKey.Builder keyBuilder = _dataAccessor.keyBuilder();
List<String> liveNodes = _dataAccessor.getChildNames(keyBuilder.liveInstances());

// Filter out instances that are not in the live list
List<String> filtered = new ArrayList<>();
List<String> skipped = new ArrayList<>();
for (String instanceId : instanceIds) {
if (liveNodes.contains(instanceId)) {
filtered.add(instanceId);
} else {
skipped.add(instanceId);
}
}

if (!skipped.isEmpty()) {
LOG.info("Skipping any custom checks for instances due to liveness: {}", skipped);
}
return filtered;
}

private Map<String, MaintenanceManagementInstanceInfo> batchInstanceHealthCheck(String clusterId,
List<String> instances, List<String> healthChecks, Map<String, String> healthCheckConfig) {
List<String> instancesForNext = new ArrayList<>(instances);
Expand Down Expand Up @@ -890,6 +935,7 @@ private void addMinActiveReplicaChecks(String clusterId, Map<String, Future<Stop
public static class MaintenanceManagementServiceBuilder {
private ConfigAccessor _configAccessor;
private boolean _skipZKRead;
private boolean _skipCustomChecksIfNoLiveness = false;
private String _namespace;
private ZKHelixDataAccessor _dataAccessor;
private CustomRestClient _customRestClient;
Expand Down Expand Up @@ -942,11 +988,17 @@ public MaintenanceManagementServiceBuilder setSkipStoppableHealthCheckList(
return this;
}

public MaintenanceManagementServiceBuilder setSkipCustomChecksIfNoLiveness(
boolean skipCustomChecksIfNoLiveness) {
_skipCustomChecksIfNoLiveness = skipCustomChecksIfNoLiveness;
return this;
}

public MaintenanceManagementService build() {
validate();
return new MaintenanceManagementService(_dataAccessor, _configAccessor, _customRestClient,
_skipZKRead, _nonBlockingHealthChecks, _skipHealthCheckCategories,
_skipStoppableHealthCheckList, _namespace);
_skipStoppableHealthCheckList, _namespace, _skipCustomChecksIfNoLiveness);
}

private void validate() throws IllegalArgumentException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public enum InstancesProperties {
online,
disabled,
selection_base,
skip_custom_check_if_instance_not_alive,
zone_order,
to_be_stopped_instances,
skip_stoppable_check_list,
Expand Down Expand Up @@ -296,6 +297,13 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
}
}

boolean skipCustomChecksIfNoLiveness = false;
if (node.get(InstancesProperties.skip_custom_check_if_instance_not_alive.name()) != null) {
skipCustomChecksIfNoLiveness = node.get(
InstancesAccessor.InstancesProperties.skip_custom_check_if_instance_not_alive.name())
.asBoolean();
}

ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
if (selectionBase != InstanceHealthSelectionBase.non_zone_based) {
if (!clusterService.isClusterTopologyAware(clusterId)) {
Expand Down Expand Up @@ -335,6 +343,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
.setSkipHealthCheckCategories(skipHealthCheckCategories)
.setNamespace(namespace)
.setSkipStoppableHealthCheckList(skipStoppableCheckList)
.setSkipCustomChecksIfNoLiveness(skipCustomChecksIfNoLiveness)
.build();

StoppableInstancesSelector stoppableInstancesSelector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
Expand Down Expand Up @@ -248,7 +249,51 @@ public void testInstanceStoppableCrossZoneBasedWithSelectedCheckList() throws IO
System.out.println("End test :" + TestHelper.getTestMethodName());
}

@Test(dependsOnMethods = "testInstanceStoppableCrossZoneBasedWithSelectedCheckList")
@Test(dependsOnMethods = "testCrossZoneStoppableWithoutZoneOrder")
public void testSkipCustomChecksIfInstanceNotAlive() throws JsonProcessingException {
System.out.println("Start test :" + TestHelper.getTestMethodName());

// Instance 4 and 5 in stoppable cluster 1 are not alive
String content = String.format(
"{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"], \"%s"
+ "\": \"%b\"}",
InstancesAccessor.InstancesProperties.selection_base.name(),
InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
InstancesAccessor.InstancesProperties.instances.name(), "instance4", "instance5", "invalidInstance",
InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(), "INSTANCE_NOT_ALIVE", "EMPTY_RESOURCE_ASSIGNMENT", "INSTANCE_NOT_STABLE",
InstancesAccessor.InstancesProperties.skip_custom_check_if_instance_not_alive.name(), true);

// Set the dummy custom checks for the cluster. The custom checks should be skipped.
ConfigAccessor configAccessor = new ConfigAccessor(ZK_ADDR);
Assert.assertNull(configAccessor.getRESTConfig(STOPPABLE_CLUSTER));
RESTConfig restConfig = new RESTConfig(STOPPABLE_CLUSTER);
restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL, "TEST_URL");
configAccessor.setRESTConfig(STOPPABLE_CLUSTER, restConfig);
Assert.assertEquals(restConfig, configAccessor.getRESTConfig(STOPPABLE_CLUSTER));

// Even if we don't skip custom stoppable checks, the instance is not alive so it should be stoppable
Response response = new JerseyUriRequestBuilder(
"clusters/{}/instances?command=stoppable").format(
STOPPABLE_CLUSTER).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class));
Set<String> stoppableSet = getStringSet(jsonNode,
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
Assert.assertTrue(stoppableSet.contains("instance4"));
Assert.assertTrue(stoppableSet.contains("instance5"));
JsonNode nonStoppableInstances = jsonNode.get(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());

Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));

// After the test finishes, remove the dummy custom checks REST config
configAccessor.deleteRESTConfig(STOPPABLE_CLUSTER);
Assert.assertNull(configAccessor.getRESTConfig(STOPPABLE_CLUSTER));

System.out.println("End test :" + TestHelper.getTestMethodName());
}

@Test(dependsOnMethods = "testSkipCustomChecksIfInstanceNotAlive")
public void testInstanceStoppableCrossZoneBasedWithEvacuatingInstances() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
String content = String.format(
Expand Down

0 comments on commit e6adf5b

Please sign in to comment.