Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkGaox committed Nov 25, 2024
1 parent 6f89e11 commit 855e7d6
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,15 @@ public Map<String, List<String>> getAggregatedStoppableCheck(String baseUrl,
if (instances != null && !instances.isEmpty()) {
payLoads.put("instances", instances);
}
if (toBeStoppedInstances != null && !toBeStoppedInstances.isEmpty()) {
payLoads.put("to_be_stopped_instances", toBeStoppedInstances);
// Before sending the request, make sure the toBeStoppedInstances has no overlap with instances
Set<String> remainingToBeStoppedInstances = toBeStoppedInstances;
if (instances != null && toBeStoppedInstances != null) {
remainingToBeStoppedInstances =
toBeStoppedInstances.stream().filter(ins -> !instances.contains(ins))
.collect(Collectors.toSet());
}
if (remainingToBeStoppedInstances != null && !remainingToBeStoppedInstances.isEmpty()) {
payLoads.put("to_be_stopped_instances", remainingToBeStoppedInstances);
}
if (clusterId != null) {
payLoads.put("cluster_id", clusterId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
collectEvacuatingInstances(toBeStoppedInstancesSet);
findToBeStoppedInstances(toBeStoppedInstancesSet);

List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
Expand Down Expand Up @@ -119,7 +119,7 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
collectEvacuatingInstances(toBeStoppedInstancesSet);
findToBeStoppedInstances(toBeStoppedInstancesSet);

Map<String, Set<String>> zoneMapping = _clusterTopology.toZoneMapping();
for (String zone : _orderOfZone) {
Expand All @@ -137,7 +137,7 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
}

/**
* Evaluates and collects stoppable instances without respecting the zone order.
* Evaluates and collects stoppable instances zone order.
* The method iterates through instances, performing stoppable checks, and records reasons for
* non-stoppability.
*
Expand All @@ -149,15 +149,15 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
* a list of reasons for non-stoppability as the value.
* @throws IOException
*/
public ObjectNode getStoppableInstancesWithoutTopology(List<String> instances,
public ObjectNode getStoppableInstancesNonZoneBased(List<String> instances,
List<String> toBeStoppedInstances) throws IOException {
ObjectNode result = JsonNodeFactory.instance.objectNode();
ArrayNode stoppableInstances =
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
collectEvacuatingInstances(toBeStoppedInstancesSet);
findToBeStoppedInstances(toBeStoppedInstancesSet);

// Because zone order calculation is omitted, we must verify each instance's existence
// to ensure we only process valid instances before performing stoppable check.
Expand Down Expand Up @@ -293,18 +293,21 @@ private Map<String, Set<String>> getOrderedZoneToInstancesMap(
}

/**
* Collect instances marked for evacuation in the current topology and add them into the given set
* Collect instances marked for evacuation, swap, or unkown in the current topology and add
* them into the given set
*
* @param toBeStoppedInstances A set of instances we presume to be stopped.
*/
private void collectEvacuatingInstances(Set<String> toBeStoppedInstances) {
private void findToBeStoppedInstances(Set<String> toBeStoppedInstances) {
Set<String> allInstances = _clusterTopology.getAllInstances();
for (String instance : allInstances) {
PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
InstanceConfig instanceConfig =
_dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instance));
if (InstanceConstants.InstanceOperation.EVACUATE.equals(
instanceConfig.getInstanceOperation().getOperation())) {
InstanceConstants.InstanceOperation operation = instanceConfig.getInstanceOperation().getOperation();
if (operation == InstanceConstants.InstanceOperation.EVACUATE
|| operation == InstanceConstants.InstanceOperation.SWAP_IN
|| operation == InstanceConstants.InstanceOperation.UNKNOWN) {
toBeStoppedInstances.add(instance);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public enum InstancesProperties {
}

public enum InstanceHealthSelectionBase {
non_topo_based,
non_zone_based,
zone_based,
cross_zone_based
}
Expand Down Expand Up @@ -224,15 +224,17 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
boolean random) throws IOException {
try {
// TODO: Process input data from the content
// TODO: Implement the logic to automatically detect the selection base. https://github.com/apache/helix/issues/2968#issue-2691677799
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
InstancesAccessor.InstanceHealthSelectionBase.valueOf(
node.get(InstancesAccessor.InstancesProperties.selection_base.name()) == null
? InstanceHealthSelectionBase.non_zone_based : InstanceHealthSelectionBase.valueOf(
node.get(InstancesAccessor.InstancesProperties.selection_base.name()).textValue());

List<String> instances = OBJECT_MAPPER.readValue(
node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);

List<String> orderOfZone = null;
String customizedInput = null;
Expand All @@ -255,9 +257,9 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
_logger.error(message);
return badRequest(message);
}
if (!orderOfZone.isEmpty() && selectionBase == InstanceHealthSelectionBase.non_topo_based) {
if (!orderOfZone.isEmpty() && selectionBase == InstanceHealthSelectionBase.non_zone_based) {
String message =
"'zone_order' is set but 'selection_base' is 'non_topo_based'. Please set 'selection_base' to 'zone_based' or 'cross_zone_based'.";
"'zone_order' is set but 'selection_base' is 'non_zone_based'. Please set 'selection_base' to 'zone_based' or 'cross_zone_based'.";
_logger.error(message);
return badRequest(message);
}
Expand Down Expand Up @@ -294,12 +296,26 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
}
}

if (selectionBase != InstanceHealthSelectionBase.non_topo_based && clusterTopology.getZones()
.isEmpty()) {
String message = "Cluster " + clusterId
+ " does not have any zone information. Please set zone information in cluster config.";
_logger.error(message);
return badRequest(message);
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
if (selectionBase != InstanceHealthSelectionBase.non_zone_based) {
if (!clusterService.isClusterTopologyAware(clusterId)) {
String message = "Cluster " + clusterId
+ " is not topology aware. Please enable the topology in cluster config or set "
+ "'selection_base' to 'non_zone_based'.";
_logger.error(message);
return badRequest(message);
}

// Find instances that lack topology information
Set<String> topologyUnawareInstances =
findTopologyUnawareInstances(clusterTopology, instances);
if (!topologyUnawareInstances.isEmpty()) {
String message = "Instances " + topologyUnawareInstances
+ " do not have topology information. Please set topology information in instance config or"
+ " set 'selection_base' to 'non_zone_based'.";
_logger.error(message);
return badRequest(message);
}
}

String namespace = getNamespace();
Expand Down Expand Up @@ -336,8 +352,8 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
result = stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, toBeStoppedInstances);
break;
case non_topo_based:
result = stoppableInstancesSelector.getStoppableInstancesWithoutTopology(instances, toBeStoppedInstances);
case non_zone_based:
result = stoppableInstancesSelector.getStoppableInstancesNonZoneBased(instances, toBeStoppedInstances);
break;
default:
throw new UnsupportedOperationException("instance_based selection is not supported yet!");
Expand All @@ -354,4 +370,21 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
throw e;
}
}

/**
* Find the topology unaware instances in the given list of instances.
* @param topology The cluster topology
* @param instances The list of instances to check
* @return The set of instances that do not have topology information but are present in the cluster.
*/
private Set<String> findTopologyUnawareInstances(ClusterTopology topology,
List<String> instances) {
Set<String> instancesWithTopology =
topology.toZoneMapping().entrySet().stream().flatMap(entry -> entry.getValue().stream())
.collect(Collectors.toSet());
Set<String> allInstances = topology.getAllInstances();
return new HashSet<>(instances).stream().filter(
instance -> !instancesWithTopology.contains(instance) && allInstances.contains(instance))
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ public interface ClusterService {
* @return
*/
ClusterInfo getClusterInfo(String clusterId);

/**
* Check if the cluster is topology aware
* @param clusterId
* @return
*/
boolean isClusterTopologyAware(String clusterId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import java.util.Map;
import java.util.stream.Collectors;

import io.netty.util.internal.StringUtil;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.rest.server.json.cluster.ClusterInfo;
Expand Down Expand Up @@ -102,4 +104,11 @@ public ClusterInfo getClusterInfo(String clusterId) {
.instances(_dataAccessor.getChildNames(keyBuilder.instances()))
.liveInstances(_dataAccessor.getChildNames(keyBuilder.liveInstances())).build();
}

@Override
public boolean isClusterTopologyAware(String clusterId) {
ClusterConfig config = _configAccessor.getClusterConfig(clusterId);
return config.isTopologyAwareEnabled() && !StringUtil.isNullOrEmpty(config.getFaultZoneType())
&& !StringUtil.isNullOrEmpty(config.getTopology());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.junit.Assert;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TestCustomRestClient {
Expand Down Expand Up @@ -234,4 +238,39 @@ public void testGetAggregatedStoppableCheck() throws IOException {
Assert.assertTrue(Arrays.stream(healthyInstances).allMatch(instance -> clusterHealth.get(instance).isEmpty()));
Assert.assertTrue(Arrays.stream(nonStoppableInstances).noneMatch(instance -> clusterHealth.get(instance).isEmpty()));
}

@Test(description = "Test if the aggregated stoppable check request has the correct format when there"
+ "are duplicate instances in the instances list and the toBeStoppedInstances list.")
public void testAggregatedCheckRemoveDuplicateInstances()
throws IOException {
String clusterId = "cluster1";

MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient);
HttpResponse httpResponse = mock(HttpResponse.class);
StatusLine statusLine = mock(StatusLine.class);

when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
when(httpResponse.getStatusLine()).thenReturn(statusLine);
when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);

customRestClient.getAggregatedStoppableCheck(HTTP_LOCALHOST,
ImmutableList.of("n1", "n2"),
ImmutableSet.of("n1"), clusterId, Collections.emptyMap());

// Make sure that the duplicate instances are removed from the toBeStoppedInstances list
ObjectMapper OBJECT_MAPPER = new ObjectMapper();
verify(_httpClient).execute(argThat(x -> {
String request = null;
try {
request = EntityUtils.toString(((HttpPost) x).getEntity());
JsonNode node = OBJECT_MAPPER.readTree(request);
String instancesInRequest = node.get("instances").toString();
Assert.assertEquals(instancesInRequest, "[\"n1\",\"n2\"]");
Assert.assertNull(node.get("to_be_stopped_instances"));
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ private void preSetupForParallelInstancesStoppableTest(String clusterName,
_gSetupTool.addCluster(clusterName, true);
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setFaultZoneType("helixZoneId");
clusterConfig.setTopologyAwareEnabled(true);
clusterConfig.setTopology("/helixZoneId/instance");
clusterConfig.setPersistIntermediateAssignment(true);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Create instance configs
Expand Down Expand Up @@ -661,6 +663,8 @@ private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterNa
_gSetupTool.addCluster(clusterName, true);
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setFaultZoneType("helixZoneId");
clusterConfig.setTopologyAwareEnabled(true);
clusterConfig.setTopology("/helixZoneId/instance");
clusterConfig.setPersistIntermediateAssignment(true);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Create instance configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public void testNonTopoAwareStoppableCheck() throws JsonProcessingException {
String content = String.format(
"{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}",
InstancesAccessor.InstancesProperties.selection_base.name(),
InstancesAccessor.InstanceHealthSelectionBase.non_topo_based.name(),
InstancesAccessor.InstanceHealthSelectionBase.non_zone_based.name(),
InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3",
"instance6", "instance9", "instance10", "instance11", "instance12", "instance13",
"instance14", "invalidInstance",
Expand All @@ -657,7 +657,7 @@ public void testNonTopoAwareStoppableCheck() throws JsonProcessingException {
String instance1 = "instance1";
InstanceConfig instanceConfig1 =
_configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instance1);
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.SWAP_IN);
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1, instanceConfig1);

// It takes time to reflect the changes.
Expand Down Expand Up @@ -722,6 +722,49 @@ public void testNonTopoAwareStoppableCheckWithException() throws JsonProcessingE
System.out.println("End test :" + TestHelper.getTestMethodName());
}

@Test(description = "Test zone selection base with instance that don't have topology set in the config",
dependsOnMethods = "testNonTopoAwareStoppableCheckWithException")
public void testZoneSelectionBaseWithInstanceThatDontHaveTopologySet() {
System.out.println("Start test :" + TestHelper.getTestMethodName());

// STOPPABLE_CLUSTER3 is a cluster is non topology aware cluster
String content = String.format(
"{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}",
InstancesAccessor.InstancesProperties.selection_base.name(),
InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3",
"instance6", "instance9", "instance10", "instance11", "instance12", "instance13",
"instance14", "invalidInstance",
InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(), "INSTANCE_NOT_ENABLED", "INSTANCE_NOT_STABLE");

String instance1 = "instance1";
InstanceConfig instanceConfig1 =
_configAccessor.getInstanceConfig(STOPPABLE_CLUSTER2, instance1);
String domain = instanceConfig1.getDomainAsString();
instanceConfig1.setDomain("FALSE_DOMAIN");
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance1, instanceConfig1);

// It takes time to reflect the changes.
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER3).setZkAddr(ZK_ADDR).build();
Assert.assertTrue(verifier.verifyByPolling());

// Making the REST Call to cross zone stoppable check while the cluster has no topology aware
// setup. The call should return an error.
Response response = new JerseyUriRequestBuilder(
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
STOPPABLE_CLUSTER3)
.isBodyReturnExpected(true)
.expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode())
.post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));

// Restore the changes on instance 1
instanceConfig1.setDomain(domain);
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1, instanceConfig1);

System.out.println("End test :" + TestHelper.getTestMethodName());
}

private Set<String> getStringSet(JsonNode jsonNode, String key) {
Set<String> result = new HashSet<>();
jsonNode.withArray(key).forEach(s -> result.add(s.textValue()));
Expand Down
Loading

0 comments on commit 855e7d6

Please sign in to comment.