Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Stoppable Check for Non-Topology-Aware Clusters #2961

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,15 @@ public Map<String, List<String>> getAggregatedStoppableCheck(String baseUrl,
if (instances != null && !instances.isEmpty()) {
payLoads.put("instances", instances);
}
if (toBeStoppedInstances != null && !toBeStoppedInstances.isEmpty()) {
payLoads.put("to_be_stopped_instances", toBeStoppedInstances);
// Before sending the request, make sure the toBeStoppedInstances has no overlap with instances
Set<String> remainingToBeStoppedInstances = toBeStoppedInstances;
if (instances != null && toBeStoppedInstances != null) {
remainingToBeStoppedInstances =
toBeStoppedInstances.stream().filter(ins -> !instances.contains(ins))
.collect(Collectors.toSet());
}
if (remainingToBeStoppedInstances != null && !remainingToBeStoppedInstances.isEmpty()) {
payLoads.put("to_be_stopped_instances", remainingToBeStoppedInstances);
}
if (clusterId != null) {
payLoads.put("cluster_id", clusterId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
collectEvacuatingInstances(toBeStoppedInstancesSet);
findToBeStoppedInstances(toBeStoppedInstancesSet);

List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
Expand Down Expand Up @@ -119,7 +119,7 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
collectEvacuatingInstances(toBeStoppedInstancesSet);
findToBeStoppedInstances(toBeStoppedInstancesSet);

Map<String, Set<String>> zoneMapping = _clusterTopology.toZoneMapping();
for (String zone : _orderOfZone) {
Expand All @@ -136,6 +136,40 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
return result;
}

/**
* Evaluates and collects stoppable instances not based on the zone order.
* The method iterates through instances, performing stoppable checks, and records reasons for
* non-stoppability.
*
* @param instances A list of instance to be evaluated.
* @param toBeStoppedInstances A list of instances presumed to be already stopped
* @return An ObjectNode containing:
* - 'stoppableNode': List of instances that can be stopped.
* - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and
* a list of reasons for non-stoppability as the value.
* @throws IOException
*/
public ObjectNode getStoppableInstancesNonZoneBased(List<String> instances,
List<String> toBeStoppedInstances) throws IOException {
ObjectNode result = JsonNodeFactory.instance.objectNode();
ArrayNode stoppableInstances =
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
findToBeStoppedInstances(toBeStoppedInstancesSet);

// Because zone order calculation is omitted, we must verify each instance's existence
// to ensure we only process valid instances before performing stoppable check.
Set<String> nonExistingInstances = processNonexistentInstances(instances, failedStoppableInstances);
List<String> instancesToCheck = new ArrayList<>(instances);
instancesToCheck.removeAll(nonExistingInstances);
populateStoppableInstances(instancesToCheck, toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);

return result;
}

private void populateStoppableInstances(List<String> instances, Set<String> toBeStoppedInstances,
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException {
Map<String, StoppableCheck> instancesStoppableChecks =
Expand All @@ -159,7 +193,7 @@ private void populateStoppableInstances(List<String> instances, Set<String> toBe
}
}

private void processNonexistentInstances(List<String> instances, ObjectNode failedStoppableInstances) {
private Set<String> processNonexistentInstances(List<String> instances, ObjectNode failedStoppableInstances) {
// Adding following logic to check whether instances exist or not. An instance exist could be
// checking following scenario:
// 1. Instance got dropped. (InstanceConfig is gone.)
Expand All @@ -174,6 +208,7 @@ private void processNonexistentInstances(List<String> instances, ObjectNode fail
ArrayNode failedReasonsNode = failedStoppableInstances.putArray(nonSelectedInstance);
failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST));
}
return nonSelectedInstances;
}

/**
Expand Down Expand Up @@ -258,18 +293,21 @@ private Map<String, Set<String>> getOrderedZoneToInstancesMap(
}

/**
* Collect instances marked for evacuation in the current topology and add them into the given set
* Collect instances within the cluster where the instance operation is set to EVACUATE, SWAP_IN, or UNKNOWN.
* And add them into the given toBeStoppedInstances set.
*
* @param toBeStoppedInstances A set of instances we presume to be stopped.
*/
private void collectEvacuatingInstances(Set<String> toBeStoppedInstances) {
private void findToBeStoppedInstances(Set<String> toBeStoppedInstances) {
Set<String> allInstances = _clusterTopology.getAllInstances();
for (String instance : allInstances) {
PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
InstanceConfig instanceConfig =
_dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instance));
if (InstanceConstants.InstanceOperation.EVACUATE.equals(
instanceConfig.getInstanceOperation().getOperation())) {
InstanceConstants.InstanceOperation operation = instanceConfig.getInstanceOperation().getOperation();
if (operation == InstanceConstants.InstanceOperation.EVACUATE
|| operation == InstanceConstants.InstanceOperation.SWAP_IN
|| operation == InstanceConstants.InstanceOperation.UNKNOWN) {
toBeStoppedInstances.add(instance);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public enum InstancesProperties {
}

public enum InstanceHealthSelectionBase {
instance_based,
non_zone_based,
zone_based,
cross_zone_based
}
Expand Down Expand Up @@ -224,12 +224,17 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
boolean random) throws IOException {
try {
// TODO: Process input data from the content
// TODO: Implement the logic to automatically detect the selection base. https://github.com/apache/helix/issues/2968#issue-2691677799
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
InstancesAccessor.InstanceHealthSelectionBase.valueOf(
node.get(InstancesAccessor.InstancesProperties.selection_base.name()) == null
? InstanceHealthSelectionBase.non_zone_based : InstanceHealthSelectionBase.valueOf(
node.get(InstancesAccessor.InstancesProperties.selection_base.name()).textValue());

List<String> instances = OBJECT_MAPPER.readValue(
node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());

List<String> orderOfZone = null;
String customizedInput = null;
Expand All @@ -252,6 +257,12 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
_logger.error(message);
return badRequest(message);
}
if (!orderOfZone.isEmpty() && selectionBase == InstanceHealthSelectionBase.non_zone_based) {
String message =
"'zone_order' is set but 'selection_base' is 'non_zone_based'. Please set 'selection_base' to 'zone_based' or 'cross_zone_based'.";
_logger.error(message);
return badRequest(message);
}
}

if (node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name()) != null) {
Expand Down Expand Up @@ -285,6 +296,33 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
}
}

ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
if (selectionBase != InstanceHealthSelectionBase.non_zone_based) {
if (!clusterService.isClusterTopologyAware(clusterId)) {
String message = "Cluster " + clusterId
+ " is not topology aware. Please enable the topology in cluster config or set "
+ "'selection_base' to 'non_zone_based'.";
_logger.error(message);
return badRequest(message);
}

// Find instances that lack topology information
Set<String> instancesWithTopology =
clusterTopology.toZoneMapping().entrySet().stream().flatMap(entry -> entry.getValue().stream())
.collect(Collectors.toSet());
Set<String> allInstances = clusterTopology.getAllInstances();
Set<String> topologyUnawareInstances = new HashSet<>(instances).stream().filter(
instance -> !instancesWithTopology.contains(instance) && allInstances.contains(instance))
.collect(Collectors.toSet());
if (!topologyUnawareInstances.isEmpty()) {
String message = "Instances " + topologyUnawareInstances
+ " do not have topology information. Please set topology information in instance config or"
+ " set 'selection_base' to 'non_zone_based'.";
_logger.error(message);
return badRequest(message);
}
}

String namespace = getNamespace();
MaintenanceManagementService maintenanceService =
new MaintenanceManagementService.MaintenanceManagementServiceBuilder()
Expand All @@ -299,9 +337,6 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
.setSkipStoppableHealthCheckList(skipStoppableCheckList)
.build();

ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
StoppableInstancesSelector stoppableInstancesSelector =
new StoppableInstancesSelector.StoppableInstancesSelectorBuilder()
.setClusterId(clusterId)
Expand All @@ -311,18 +346,20 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
.setClusterTopology(clusterTopology)
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
.build();
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
ObjectNode result;
// TODO: Add support for clusters that do not have topology set up.
// Issue #2893: https://github.com/apache/helix/issues/2893

switch (selectionBase) {
case zone_based:
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances);
break;
case cross_zone_based:
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
result = stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, toBeStoppedInstances);
break;
case instance_based:
case non_zone_based:
result = stoppableInstancesSelector.getStoppableInstancesNonZoneBased(instances, toBeStoppedInstances);
break;
default:
throw new UnsupportedOperationException("instance_based selection is not supported yet!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ public interface ClusterService {
* @return
*/
ClusterInfo getClusterInfo(String clusterId);

/**
* Check if the cluster is topology aware
* @param clusterId
* @return
*/
boolean isClusterTopologyAware(String clusterId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import java.util.Map;
import java.util.stream.Collectors;

import io.netty.util.internal.StringUtil;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.rest.server.json.cluster.ClusterInfo;
Expand Down Expand Up @@ -102,4 +104,11 @@ public ClusterInfo getClusterInfo(String clusterId) {
.instances(_dataAccessor.getChildNames(keyBuilder.instances()))
.liveInstances(_dataAccessor.getChildNames(keyBuilder.liveInstances())).build();
}

@Override
public boolean isClusterTopologyAware(String clusterId) {
ClusterConfig config = _configAccessor.getClusterConfig(clusterId);
return config.isTopologyAwareEnabled() && !StringUtil.isNullOrEmpty(config.getFaultZoneType())
&& !StringUtil.isNullOrEmpty(config.getTopology());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.junit.Assert;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TestCustomRestClient {
Expand Down Expand Up @@ -234,4 +238,39 @@ public void testGetAggregatedStoppableCheck() throws IOException {
Assert.assertTrue(Arrays.stream(healthyInstances).allMatch(instance -> clusterHealth.get(instance).isEmpty()));
Assert.assertTrue(Arrays.stream(nonStoppableInstances).noneMatch(instance -> clusterHealth.get(instance).isEmpty()));
}

@Test(description = "Test if the aggregated stoppable check request has the correct format when there"
+ "are duplicate instances in the instances list and the toBeStoppedInstances list.")
public void testAggregatedCheckRemoveDuplicateInstances()
throws IOException {
String clusterId = "cluster1";

MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient);
HttpResponse httpResponse = mock(HttpResponse.class);
StatusLine statusLine = mock(StatusLine.class);

when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
when(httpResponse.getStatusLine()).thenReturn(statusLine);
when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);

customRestClient.getAggregatedStoppableCheck(HTTP_LOCALHOST,
ImmutableList.of("n1", "n2"),
ImmutableSet.of("n1"), clusterId, Collections.emptyMap());

// Make sure that the duplicate instances are removed from the toBeStoppedInstances list
ObjectMapper OBJECT_MAPPER = new ObjectMapper();
verify(_httpClient).execute(argThat(x -> {
String request = null;
try {
request = EntityUtils.toString(((HttpPost) x).getEntity());
JsonNode node = OBJECT_MAPPER.readTree(request);
String instancesInRequest = node.get("instances").toString();
Assert.assertEquals(instancesInRequest, "[\"n1\",\"n2\"]");
Assert.assertNull(node.get("to_be_stopped_instances"));
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
}
}
Loading
Loading