Skip to content

Commit

Permalink
Enable computation of evenness score based on a preferred scoring key
Browse files Browse the repository at this point in the history
  • Loading branch information
csudharsanan committed Jan 24, 2024
1 parent d7b5b97 commit ca6e2bc
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected double getAssignmentScore(AssignableNode node, AssignableReplica repli
ClusterContext clusterContext) {
float estimatedMaxUtilization = clusterContext.getEstimatedMaxUtilization();
float projectedHighestUtilization =
node.getGeneralProjectedHighestUtilization(replica.getCapacity());
node.getGeneralProjectedHighestUtilization(replica.getCapacity(), clusterContext.getPreferredScoringKey());
return computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected double getAssignmentScore(AssignableNode node, AssignableReplica repli
}
float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization();
float projectedHighestUtilization =
node.getTopStateProjectedHighestUtilization(replica.getCapacity());
node.getTopStateProjectedHighestUtilization(replica.getCapacity(), clusterContext.getPreferredScoringKey());
return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,27 @@ public Map<String, Integer> getMaxCapacity() {
* @return The highest utilization number of the node among all the capacity category.
*/
public float getGeneralProjectedHighestUtilization(Map<String, Integer> newUsage) {
return getProjectedHighestUtilization(newUsage, _remainingCapacity);
return getProjectedHighestUtilization(newUsage, _remainingCapacity, null);
}

/**
* Return the most concerning capacity utilization number for evenly partition assignment.
* The method dynamically calculates the projected highest utilization number among all the
* capacity categories assuming the new capacity usage is added to the node.
*
* If the preferredScoringKey is specified then utilization number is computed based op the
* specified capacity category (key) only.
*
* For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}, preferredScoringKey: CPU
* Then this call shall return 0.9.
*
* @param newUsage the proposed new additional capacity usage.
* @param preferredScoringKey if provided, the capacity utilization will be calculated based on
* the supplied key only, else across all capacity categories.
* @return The highest utilization number of the node among the specified capacity category.
*/
public float getGeneralProjectedHighestUtilization(Map<String, Integer> newUsage, List<String> preferredScoringKey) {
return getProjectedHighestUtilization(newUsage, _remainingCapacity, preferredScoringKey);
}

/**
Expand All @@ -263,13 +283,39 @@ public float getGeneralProjectedHighestUtilization(Map<String, Integer> newUsage
* @return The highest utilization number of the node among all the capacity category.
*/
public float getTopStateProjectedHighestUtilization(Map<String, Integer> newUsage) {
return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity);
return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity, null);
}

/**
* Return the most concerning capacity utilization number for evenly partition assignment.
* The method dynamically calculates the projected highest utilization number among all the
* capacity categories assuming the new capacity usage is added to the node.
*
* If the preferredScoringKey is specified then utilization number is computed based op the
* specified capacity category (key) only.
*
* For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}, preferredScoringKey: CPU
* Then this call shall return 0.9.
*
* This function returns projected highest utilization for only top state partitions.
*
* @param newUsage the proposed new additional capacity usage.
* @param preferredScoringKey if provided, the capacity utilization will be calculated based on
* the supplied key only, else across all capacity categories.
* @return The highest utilization number of the node among all the capacity category.
*/
public float getTopStateProjectedHighestUtilization(Map<String, Integer> newUsage, List<String> preferredScoringKey) {
return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity, preferredScoringKey);
}

private float getProjectedHighestUtilization(Map<String, Integer> newUsage,
Map<String, Integer> remainingCapacity) {
Map<String, Integer> remainingCapacity, List<String> preferredScoringKey) {
Set<String> capacityKeySet = _maxAllowedCapacity.keySet();
if (preferredScoringKey != null && preferredScoringKey.size() != 0 && capacityKeySet.contains(preferredScoringKey.get(0))) {
capacityKeySet = preferredScoringKey.stream().collect(Collectors.toSet());
}
float highestCapacityUtilization = 0;
for (String capacityKey : _maxAllowedCapacity.keySet()) {
for (String capacityKey : capacityKeySet) {
float capacityValue = _maxAllowedCapacity.get(capacityKey);
float utilization = (capacityValue - remainingCapacity.get(capacityKey) + newUsage
.getOrDefault(capacityKey, 0)) / capacityValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* This class tracks the rebalance-related global cluster status.
*/
public class ClusterContext {
private static final Logger LOG = LoggerFactory.getLogger(ClusterContext.class.getName());
// This estimation helps to ensure global partition count evenness
private final int _estimatedMaxPartitionCount;
// This estimation helps to ensure global top state replica count evenness
Expand All @@ -57,20 +63,29 @@ public class ClusterContext {
private final Map<String, Integer> _estimateUtilizationMap;
// Cluster total capacity. Used to compute score when sorting replicas.
private final Map<String, Integer> _clusterCapacityMap;

private final List<String> _preferredScoringKey;
private final String _clusterName;
/**
* Construct the cluster context based on the current instance status.
* @param replicaSet All the partition replicas that are managed by the rebalancer
* @param nodeSet All the active nodes that are managed by the rebalancer
*/
ClusterContext(Set<AssignableReplica> replicaSet, Set<AssignableNode> nodeSet,
Map<String, ResourceAssignment> baselineAssignment, Map<String, ResourceAssignment> bestPossibleAssignment) {
Map<String, ResourceAssignment> baselineAssignment, Map<String, ResourceAssignment> bestPossibleAssignment) {
this(replicaSet, nodeSet, baselineAssignment, bestPossibleAssignment, null);
}

ClusterContext(Set<AssignableReplica> replicaSet, Set<AssignableNode> nodeSet,
Map<String, ResourceAssignment> baselineAssignment, Map<String, ResourceAssignment> bestPossibleAssignment,
ClusterConfig clusterConfig) {
int instanceCount = nodeSet.size();
int totalReplicas = 0;
int totalTopStateReplicas = 0;
Map<String, Integer> totalUsage = new HashMap<>();
Map<String, Integer> totalTopStateUsage = new HashMap<>();
Map<String, Integer> totalCapacity = new HashMap<>();
_preferredScoringKey = Optional.ofNullable(clusterConfig).map(ClusterConfig::getPreferredScoringKey).orElse(null);
_clusterName = Optional.ofNullable(clusterConfig).map(ClusterConfig::getClusterName).orElse(null);

for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
.collect(Collectors.groupingBy(AssignableReplica::getResourceName))
Expand Down Expand Up @@ -103,17 +118,31 @@ public class ClusterContext {
_estimateUtilizationMap = Collections.emptyMap();
_clusterCapacityMap = Collections.emptyMap();
} else {
_estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, totalUsage);
_estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, totalTopStateUsage);
_estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, totalUsage, _preferredScoringKey);
_estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, totalTopStateUsage, _preferredScoringKey);
_estimateUtilizationMap = estimateUtilization(totalCapacity, totalUsage);
_clusterCapacityMap = Collections.unmodifiableMap(totalCapacity);
}
LOG.info(
"clusterName: {}, preferredScoringKey: {}, estimatedMaxUtilization: {}, estimatedTopStateMaxUtilization: {}",
_clusterName, _preferredScoringKey, _estimatedMaxUtilization,
_estimatedTopStateMaxUtilization);
_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
_estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
_baselineAssignment = baselineAssignment;
_bestPossibleAssignment = bestPossibleAssignment;
}


/**
* Get preferred scoring key if set.
*
* @return PreferredScoringKey which is used in computation of evenness score
*/
public List<String> getPreferredScoringKey() {
return _preferredScoringKey;
}

public Map<String, ResourceAssignment> getBaselineAssignment() {
return _baselineAssignment == null || _baselineAssignment.isEmpty() ? Collections.emptyMap() : _baselineAssignment;
}
Expand Down Expand Up @@ -190,10 +219,31 @@ private static int estimateAvgReplicaCount(int replicaCount, int instanceCount)
return (int) Math.floor((float) replicaCount / instanceCount);
}

/**
* Estimates the max utilization number from all capacity categories and their usages.
* If the preferredScoringKey is specified then max utilization number is computed based op the
* specified capacity category (key) only.
*
* For example, if totalCapacity is {CPU: 0.6, MEM: 0.7, DISK: 0.9}, totalUsage is {CPU: 0.1, MEM: 0.2, DISK: 0.3},
* preferredScoringKey: CPU. Then this call shall return 0.16. If preferredScoringKey
* is not specified, this call returns 0.33 which would be the max utilization for the DISK.
*
* @param totalCapacity Sum total of max capacity of all active nodes managed by a rebalancer
* @param totalUsage Sum total of capacity usage of all partition replicas that are managed by the rebalancer
* @param preferredScoringKey if provided, the max utilization will be calculated based on
* the supplied key only, else across all capacity categories.
* @return The max utilization number from the specified capacity categories.
*/

private static float estimateMaxUtilization(Map<String, Integer> totalCapacity,
Map<String, Integer> totalUsage) {
Map<String, Integer> totalUsage,
List<String> preferredScoringKey) {
float estimatedMaxUsage = 0;
for (String capacityKey : totalCapacity.keySet()) {
Set<String> capacityKeySet = totalCapacity.keySet();
if (preferredScoringKey != null && preferredScoringKey.size() != 0 && capacityKeySet.contains(preferredScoringKey.get(0))) {
capacityKeySet = preferredScoringKey.stream().collect(Collectors.toSet());
}
for (String capacityKey : capacityKeySet) {
int maxCapacity = totalCapacity.get(capacityKey);
int usage = totalUsage.getOrDefault(capacityKey, 0);
float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ private static ClusterModel generateClusterModel(ResourceControllerDataProvider
// Construct and initialize cluster context.
ClusterContext context = new ClusterContext(
replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
assignableNodes, logicalIdIdealAssignment, logicalIdCurrentAssignment);
assignableNodes, logicalIdIdealAssignment, logicalIdCurrentAssignment, dataProvider.getClusterConfig());

// Initial the cluster context with the allocated assignments.
context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ public enum ClusterConfigProperty {
HELIX_DISABLED_TYPE,

// The last time when the on-demand rebalance is triggered.
LAST_ON_DEMAND_REBALANCE_TIMESTAMP
LAST_ON_DEMAND_REBALANCE_TIMESTAMP,

//Preferred scoring key used in evenness score computation
PREFERRED_SCORING_KEY
}

public enum GlobalRebalancePreferenceKey {
Expand Down Expand Up @@ -1198,4 +1201,25 @@ public void setLastOnDemandRebalanceTimestamp(long rebalanceTimestamp) {
_record.setLongField(ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(),
rebalanceTimestamp);
}

/**
* Get the list of preferred scoring keys if set.
*
* @return PreferredScoringKey that is used in computation of evenness score
*/
public List<String> getPreferredScoringKey() {
return _record.getListField(ClusterConfigProperty.PREFERRED_SCORING_KEY.name());
}

/**
* Set preferred scoring key for cluster.
* preferredScoringKey is set as a List to make it generic and accommodate any future use case.
* preferredScoringKey will be a singleton list for current use case.
*
* @param preferredScoringKey value used in evenness score computation
*/
public void setPreferredScoringKey(List<String> preferredScoringKey) {
_record.setListField(ClusterConfigProperty.PREFERRED_SCORING_KEY.name(),
preferredScoringKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
* under the License.
*/

import java.util.Collections;
import java.util.List;

import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
Expand All @@ -29,6 +32,8 @@
import static org.mockito.Matchers.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.any;

public class TestMaxCapacityUsageInstanceConstraint {
private AssignableReplica _testReplica;
Expand All @@ -45,7 +50,7 @@ public void setUp() {

@Test
public void testGetNormalizedScore() {
when(_testNode.getGeneralProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
when(_testNode.getGeneralProjectedHighestUtilization(anyMap(), any())).thenReturn(0.8f);
when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
// Convert to float so as to compare with equal.
Expand All @@ -54,4 +59,19 @@ public void testGetNormalizedScore() {
_constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
Assert.assertTrue(normalizedScore > 0.99);
}

@Test
public void testGetNormalizedScoreWithPreferredScoringKey() {
List<String> preferredScoringkey = Collections.singletonList("CU");
when(_testNode.getGeneralProjectedHighestUtilization(anyMap(),
eq(preferredScoringkey))).thenReturn(0.5f);
when(_clusterContext.getPreferredScoringKey()).thenReturn(preferredScoringkey);
when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
// Convert to float so as to compare with equal.
Assert.assertEquals((float) score,0.5f);
double normalizedScore =
_constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
Assert.assertTrue(normalizedScore > 0.99);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
* under the License.
*/

import java.util.Collections;
import java.util.List;

import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -47,7 +52,7 @@ public void setUp() {
@Test
public void testGetNormalizedScore() {
when(_testReplica.isReplicaTopState()).thenReturn(true);
when(_testNode.getTopStateProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
when(_testNode.getTopStateProjectedHighestUtilization(anyMap(), any())).thenReturn(0.8f);
when(_clusterContext.getEstimatedTopStateMaxUtilization()).thenReturn(1f);
double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
// Convert to float so as to compare with equal.
Expand All @@ -56,4 +61,21 @@ public void testGetNormalizedScore() {
_constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
Assert.assertTrue(normalizedScore > 0.99);
}


@Test
public void testGetNormalizedScoreWithPreferredScoringKey() {
List<String> preferredScoringkey = Collections.singletonList("CU");
when(_testReplica.isReplicaTopState()).thenReturn(true);
when(_testNode.getTopStateProjectedHighestUtilization(anyMap(),
eq(preferredScoringkey))).thenReturn(0.5f);
when(_clusterContext.getPreferredScoringKey()).thenReturn(preferredScoringkey);
when(_clusterContext.getEstimatedTopStateMaxUtilization()).thenReturn(1f);
double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
// Convert to float so as to compare with equal.
Assert.assertEquals((float) score,0.5f);
double normalizedScore =
_constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
Assert.assertTrue(normalizedScore > 0.99);
}
}
Loading

0 comments on commit ca6e2bc

Please sign in to comment.