diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java index 925b2d2169..8d5d025b3c 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java @@ -45,6 +45,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixException; +import org.apache.helix.PropertyKey; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; @@ -98,6 +99,8 @@ public class MaintenanceManagementService { // maintain the backward compatibility with users who don't use MaintenanceManagementServiceBuilder // to create the MaintenanceManagementService object. private List _skipStoppableHealthCheckList = Collections.emptyList(); + // default value false to maintain backward compatibility + private boolean _skipCustomChecksIfNoLiveness = false; public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor, boolean skipZKRead, String namespace) { @@ -152,7 +155,7 @@ public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, private MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor, CustomRestClient customRestClient, boolean skipZKRead, Set nonBlockingHealthChecks, Set skipHealthCheckCategories, - List skipStoppableHealthCheckList, String namespace) { + List skipStoppableHealthCheckList, String namespace, boolean skipCustomChecksIfNoLiveness) { _dataAccessor = new HelixDataAccessorWrapper(dataAccessor, customRestClient, namespace); @@ -166,6 +169,7 @@ private MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, _skipStoppableHealthCheckList = skipStoppableHealthCheckList == null ? Collections.emptyList() : skipStoppableHealthCheckList; _namespace = namespace; + _skipCustomChecksIfNoLiveness = skipCustomChecksIfNoLiveness; } /** @@ -502,15 +506,20 @@ private List batchCustomInstanceStoppableCheck(String clusterId, List instanceIdsForCustomCheck = filterOutDeadInstancesIfNeeded(instances); + // If the config has exactUrl and the CLUSTER level customer check is not skipped, we will // perform the custom check at cluster level. if (restConfig.getCompleteConfiguredHealthUrl().isPresent()) { - if (_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK)) { + if (_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK) + || instanceIdsForCustomCheck.isEmpty()) { return instances; } Map clusterLevelCustomCheckResult = - performAggregatedCustomCheck(clusterId, instances, + performAggregatedCustomCheck(clusterId, instanceIdsForCustomCheck, restConfig.getCompleteConfiguredHealthUrl().get(), customPayLoads, toBeStoppedInstances); List instancesForNextCheck = new ArrayList<>(); @@ -526,7 +535,7 @@ private List batchCustomInstanceStoppableCheck(String clusterId, List instancesForCustomPartitionLevelChecks = instances; + List instancesForCustomPartitionLevelChecks = instanceIdsForCustomCheck; if (!_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)) { Map> customInstanceLevelChecks = instances.stream().collect( Collectors.toMap(Function.identity(), instance -> POOL.submit( @@ -560,6 +569,42 @@ private List batchCustomInstanceStoppableCheck(String clusterId, List + * If users set skipCustomCheckIfInstanceNotAlive to true, filter out dead instances + * to avoid running custom checks on them. + * + * @param instanceIds the list of instances + * @return either the original list or a filtered list of only live instances + */ + private List filterOutDeadInstancesIfNeeded(List instanceIds) { + if (!_skipCustomChecksIfNoLiveness) { + // We are not skipping the not-alive check, so just return all instances. + return instanceIds; + } + + // Retrieve the set of currently live instances + PropertyKey.Builder keyBuilder = _dataAccessor.keyBuilder(); + List liveNodes = _dataAccessor.getChildNames(keyBuilder.liveInstances()); + + // Filter out instances that are not in the live list + List filtered = new ArrayList<>(); + List skipped = new ArrayList<>(); + for (String instanceId : instanceIds) { + if (liveNodes.contains(instanceId)) { + filtered.add(instanceId); + } else { + skipped.add(instanceId); + } + } + + if (!skipped.isEmpty()) { + LOG.info("Skipping any custom checks for instances due to liveness: {}", skipped); + } + return filtered; + } + private Map batchInstanceHealthCheck(String clusterId, List instances, List healthChecks, Map healthCheckConfig) { List instancesForNext = new ArrayList<>(instances); @@ -890,6 +935,7 @@ private void addMinActiveReplicaChecks(String clusterId, Map stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertTrue(stoppableSet.contains("instance4")); + Assert.assertTrue(stoppableSet.contains("instance5")); + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + + // After the test finishes, remove the dummy custom checks REST config + configAccessor.deleteRESTConfig(STOPPABLE_CLUSTER); + Assert.assertNull(configAccessor.getRESTConfig(STOPPABLE_CLUSTER)); + + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test(dependsOnMethods = "testSkipCustomChecksIfInstanceNotAlive") public void testInstanceStoppableCrossZoneBasedWithEvacuatingInstances() throws IOException { System.out.println("Start test :" + TestHelper.getTestMethodName()); String content = String.format(