diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java index 7d74c26807..9ff589579d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java @@ -22,6 +22,8 @@ import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The constraint evaluates the score by checking the max used capacity key out of all the capacity @@ -31,13 +33,17 @@ * It is a greedy approach since it evaluates only on the most used capacity key. */ class MaxCapacityUsageInstanceConstraint extends UsageSoftConstraint { + private static final Logger LOG = LoggerFactory.getLogger(MaxCapacityUsageInstanceConstraint.class.getName()); @Override protected double getAssignmentScore(AssignableNode node, AssignableReplica replica, ClusterContext clusterContext) { float estimatedMaxUtilization = clusterContext.getEstimatedMaxUtilization(); float projectedHighestUtilization = - node.getGeneralProjectedHighestUtilization(replica.getCapacity()); - return computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization); + node.getGeneralProjectedHighestUtilization(replica.getCapacity(), clusterContext.getPreferredScoringKey()); + double utilizationScore = computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization); + LOG.info("[DEPEND-29018] clusterName: {}, estimatedMaxUtilization: {}, projectedHighestUtilization: {}, utilizationScore: {}, preferredScoringKey: {}", + clusterContext.getClusterName(), estimatedMaxUtilization, projectedHighestUtilization, utilizationScore, clusterContext.getPreferredScoringKey()); + return utilizationScore; } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java index 145425398a..c3a4d80eff 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java @@ -22,6 +22,8 @@ import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -31,6 +33,7 @@ * It is a greedy approach since it evaluates only on the most used capacity key. */ class TopStateMaxCapacityUsageInstanceConstraint extends UsageSoftConstraint { + private static final Logger LOG = LoggerFactory.getLogger(TopStateMaxCapacityUsageInstanceConstraint.class.getName()); @Override protected double getAssignmentScore(AssignableNode node, AssignableReplica replica, ClusterContext clusterContext) { @@ -41,7 +44,10 @@ protected double getAssignmentScore(AssignableNode node, AssignableReplica repli } float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization(); float projectedHighestUtilization = - node.getTopStateProjectedHighestUtilization(replica.getCapacity()); - return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization); + node.getTopStateProjectedHighestUtilization(replica.getCapacity(), clusterContext.getPreferredScoringKey()); + double utilizationScore = computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization); + LOG.info("[DEPEND-29018] clusterName: {}, estimatedTopStateMaxUtilization: {}, projectedHighestUtilization: {}, utilizationScore: {}, preferredScoringKey: {}", + clusterContext.getClusterName(), estimatedTopStateMaxUtilization, projectedHighestUtilization, utilizationScore, clusterContext.getPreferredScoringKey()); + return utilizationScore; } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index e29052a0dd..ad6adff5c1 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -63,6 +63,8 @@ public class AssignableNode implements Comparable { private Map _remainingCapacity; private Map _remainingTopStateCapacity; + private String _clusterName; + /** * Update the node with a ClusterDataCache. This resets the current assignment and recalculates * currentCapacity. @@ -84,6 +86,7 @@ public class AssignableNode implements Comparable { _remainingTopStateCapacity = new HashMap<>(instanceCapacity); _maxPartition = clusterConfig.getMaxPartitionsPerInstance(); _currentAssignedReplicaMap = new HashMap<>(); + _clusterName = clusterConfig.getClusterName(); } /** @@ -195,7 +198,7 @@ public Set getAssignedPartitionsByResource(String resource) { Set getAssignedTopStatePartitionsByResource(String resource) { return _currentAssignedReplicaMap.getOrDefault(resource, Collections.emptyMap()).entrySet() .stream().filter(partitionEntry -> partitionEntry.getValue().isReplicaTopState()) - .map(partitionEntry -> partitionEntry.getKey()).collect(Collectors.toSet()); + .map(Map.Entry::getKey).collect(Collectors.toSet()); } /** @@ -239,7 +242,27 @@ public Map getMaxCapacity() { * @return The highest utilization number of the node among all the capacity category. */ public float getGeneralProjectedHighestUtilization(Map newUsage) { - return getProjectedHighestUtilization(newUsage, _remainingCapacity); + return getProjectedHighestUtilization(newUsage, _remainingCapacity, null); + } + + /** + * Return the most concerning capacity utilization number for evenly partition assignment. + * The method dynamically calculates the projected highest utilization number among all the + * capacity categories assuming the new capacity usage is added to the node. + * + * If the preferredScoringKey is specified then utilization number is computed based op the + * specified capacity category (key) only. + * + * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}, preferredScoringKey: CPU + * Then this call shall return 0.9. + * + * @param newUsage the proposed new additional capacity usage. + * @param preferredScoringKey if provided, the capacity utilization will be calculated based on + * the supplied key only, else across all capacity categories. + * @return The highest utilization number of the node among the specified capacity category. + */ + public float getGeneralProjectedHighestUtilization(Map newUsage, String preferredScoringKey) { + return getProjectedHighestUtilization(newUsage, _remainingCapacity, preferredScoringKey); } /** @@ -253,18 +276,46 @@ public float getGeneralProjectedHighestUtilization(Map newUsage * @return The highest utilization number of the node among all the capacity category. */ public float getTopStateProjectedHighestUtilization(Map newUsage) { - return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity); + return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity, null); + } + + /** + * Return the most concerning capacity utilization number for evenly partition assignment. + * The method dynamically calculates the projected highest utilization number among all the + * capacity categories assuming the new capacity usage is added to the node. + * + * If the preferredScoringKey is specified then utilization number is computed based op the + * specified capacity category (key) only. + * + * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}, preferredScoringKey: CPU + * Then this call shall return 0.9. + * + * This function returns projected highest utilization for only top state partitions. + * + * @param newUsage the proposed new additional capacity usage. + * @param preferredScoringKey if provided, the capacity utilization will be calculated based on + * the supplied key only, else across all capacity categories. + * @return The highest utilization number of the node among all the capacity category. + */ + public float getTopStateProjectedHighestUtilization(Map newUsage, String preferredScoringKey) { + return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity, preferredScoringKey); } private float getProjectedHighestUtilization(Map newUsage, - Map remainingCapacity) { + Map remainingCapacity, String preferredScoringKey) { + Set capacityKeySet = _maxAllowedCapacity.keySet(); + if (preferredScoringKey != null && capacityKeySet.contains(preferredScoringKey)) { + capacityKeySet = ImmutableSet.of(preferredScoringKey); + } float highestCapacityUtilization = 0; - for (String capacityKey : _maxAllowedCapacity.keySet()) { + for (String capacityKey : capacityKeySet) { float capacityValue = _maxAllowedCapacity.get(capacityKey); float utilization = (capacityValue - remainingCapacity.get(capacityKey) + newUsage .getOrDefault(capacityKey, 0)) / capacityValue; highestCapacityUtilization = Math.max(highestCapacityUtilization, utilization); } + LOG.info("[DEPEND-29018] clusterName: {}, newUsage: {}, remainingCapacity: {}, preferredScoringKey: {}, highestCapacityUtilization: {}", + _clusterName, newUsage, remainingCapacity, preferredScoringKey, highestCapacityUtilization); return highestCapacityUtilization; } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java index 07e33df162..3f58094a13 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java @@ -24,17 +24,24 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import com.google.common.collect.ImmutableSet; import org.apache.helix.HelixException; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ResourceAssignment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class tracks the rebalance-related global cluster status. */ public class ClusterContext { + private static final Logger LOG = LoggerFactory.getLogger(ClusterContext.class.getName()); + // This estimation helps to ensure global partition count evenness private final int _estimatedMaxPartitionCount; // This estimation helps to ensure global top state replica count evenness @@ -57,6 +64,8 @@ public class ClusterContext { private final Map _estimateUtilizationMap; // Cluster total capacity. Used to compute score when sorting replicas. private final Map _clusterCapacityMap; + private final String _preferredScoringKey; + private final String _clusterName; /** * Construct the cluster context based on the current instance status. @@ -64,17 +73,25 @@ public class ClusterContext { * @param nodeSet All the active nodes that are managed by the rebalancer */ ClusterContext(Set replicaSet, Set nodeSet, - Map baselineAssignment, Map bestPossibleAssignment) { + Map baselineAssignment, Map bestPossibleAssignment) { + this(replicaSet, nodeSet, baselineAssignment, bestPossibleAssignment, null); + } + + ClusterContext(Set replicaSet, Set nodeSet, + Map baselineAssignment, Map bestPossibleAssignment, + ClusterConfig clusterConfig) { int instanceCount = nodeSet.size(); int totalReplicas = 0; int totalTopStateReplicas = 0; Map totalUsage = new HashMap<>(); Map totalTopStateUsage = new HashMap<>(); Map totalCapacity = new HashMap<>(); + _preferredScoringKey = Optional.ofNullable(clusterConfig).map(ClusterConfig::getPreferredScoringKey).orElse(null); + _clusterName = Optional.ofNullable(clusterConfig).map(ClusterConfig::getClusterName).orElse(null); for (Map.Entry> entry : replicaSet.stream() - .collect(Collectors.groupingBy(AssignableReplica::getResourceName)) - .entrySet()) { + .collect(Collectors.groupingBy(AssignableReplica::getResourceName)) + .entrySet()) { int replicas = entry.getValue().size(); totalReplicas += replicas; @@ -85,14 +102,14 @@ public class ClusterContext { if (replica.isReplicaTopState()) { totalTopStateReplicas += 1; replica.getCapacity().forEach( - (key, value) -> totalTopStateUsage.compute(key, (k, v) -> (v == null) ? value : (v + value))); + (key, value) -> totalTopStateUsage.compute(key, (k, v) -> (v == null) ? value : (v + value))); } replica.getCapacity().forEach( - (key, value) -> totalUsage.compute(key, (k, v) -> (v == null) ? value : (v + value))); + (key, value) -> totalUsage.compute(key, (k, v) -> (v == null) ? value : (v + value))); } } nodeSet.forEach(node -> node.getMaxCapacity().forEach( - (key, value) -> totalCapacity.compute(key, (k, v) -> (v == null) ? value : (v + value)))); + (key, value) -> totalCapacity.compute(key, (k, v) -> (v == null) ? value : (v + value)))); // TODO: these variables correspond to one constraint each, and may become unnecessary if the // constraints are not used. A better design is to make them pluggable. @@ -103,8 +120,8 @@ public class ClusterContext { _estimateUtilizationMap = Collections.emptyMap(); _clusterCapacityMap = Collections.emptyMap(); } else { - _estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, totalUsage); - _estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, totalTopStateUsage); + _estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, totalUsage, _preferredScoringKey, _clusterName); + _estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, totalTopStateUsage, _preferredScoringKey, _clusterName); _estimateUtilizationMap = estimateUtilization(totalCapacity, totalUsage); _clusterCapacityMap = Collections.unmodifiableMap(totalCapacity); } @@ -114,6 +131,23 @@ public class ClusterContext { _bestPossibleAssignment = bestPossibleAssignment; } + + /** + * Get preferred scoring key if set. + * + * @return PreferredScoringKey which is used in computation of evenness score + */ + public String getPreferredScoringKey() { + return _preferredScoringKey; + } + + /** + * Get cluster name if set. + */ + public String getClusterName() { + return _clusterName; + } + public Map getBaselineAssignment() { return _baselineAssignment == null || _baselineAssignment.isEmpty() ? Collections.emptyMap() : _baselineAssignment; } @@ -190,16 +224,39 @@ private static int estimateAvgReplicaCount(int replicaCount, int instanceCount) return (int) Math.floor((float) replicaCount / instanceCount); } + /** + * Estimates the max utilization number from all capacity categories and their usages. + * If the preferredScoringKey is specified then max utilization number is computed based op the + * specified capacity category (key) only. + * + * For example, if totalCapacity is {CPU: 0.6, MEM: 0.7, DISK: 0.9}, totalUsage is {CPU: 0.1, MEM: 0.2, DISK: 0.3}, + * preferredScoringKey: CPU. Then this call shall return 0.16. If preferredScoringKey + * is not specified, this call returns 0.33 which would be the max utilization for the DISK. + * + * @param totalCapacity Sum total of max capacity of all active nodes managed by a rebalancer + * @param totalUsage Sum total of capacity usage of all partition replicas that are managed by the rebalancer + * @param preferredScoringKey if provided, the max utilization will be calculated based on + * the supplied key only, else across all capacity categories. + * @param clusterName clusterName is used for logging purposes only. + * @return The max utilization number from the specified capacity categories. + */ + private static float estimateMaxUtilization(Map totalCapacity, - Map totalUsage) { + Map totalUsage, + String preferredScoringKey, String clusterName) { float estimatedMaxUsage = 0; - for (String capacityKey : totalCapacity.keySet()) { + Set capacityKeySet = totalCapacity.keySet(); + if (preferredScoringKey != null && capacityKeySet.contains(preferredScoringKey)) { + capacityKeySet = ImmutableSet.of(preferredScoringKey); + } + for (String capacityKey : capacityKeySet) { int maxCapacity = totalCapacity.get(capacityKey); int usage = totalUsage.getOrDefault(capacityKey, 0); float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity; estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization); } - + LOG.info("[DEPEND-29018] clusterName: {}, totalCapacity: {}, totalUsage: {}, preferredScoringKey: {}, estimatedMaxUsage: {}", + clusterName, totalCapacity, totalUsage, preferredScoringKey, estimatedMaxUsage); return estimatedMaxUsage; } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index dffaec3e04..99f3f92a0c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -235,7 +235,7 @@ private static ClusterModel generateClusterModel(ResourceControllerDataProvider // Construct and initialize cluster context. ClusterContext context = new ClusterContext( replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()), - assignableNodes, idealAssignment, currentAssignment); + assignableNodes, idealAssignment, currentAssignment, dataProvider.getClusterConfig()); // Initial the cluster context with the allocated assignments. context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes)); diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index e33b902049..22ffc56128 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -154,7 +154,10 @@ public enum ClusterConfigProperty { HELIX_DISABLED_TYPE, // The last time when the on-demand rebalance is triggered. - LAST_ON_DEMAND_REBALANCE_TIMESTAMP + LAST_ON_DEMAND_REBALANCE_TIMESTAMP, + + //Preferred scoring key used in evenness score computation + PREFERRED_SCORING_KEY } public enum GlobalRebalancePreferenceKey { @@ -1198,4 +1201,23 @@ public void setLastOnDemandRebalanceTimestamp(long rebalanceTimestamp) { _record.setLongField(ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(), rebalanceTimestamp); } + + /** + * Get preferred scoring key if set. + * + * @return PreferredScoringKey that is used in computation of evenness score + */ + public String getPreferredScoringKey() { + return _record.getSimpleField(ClusterConfigProperty.PREFERRED_SCORING_KEY.name()); + } + + /** + * Set preferred scoring key for cluster. + * + * @param preferredScoringKey value used in evenness score computation + */ + public void setPreferredScoringKey(String preferredScoringKey) { + _record.setSimpleField(ClusterConfigProperty.PREFERRED_SCORING_KEY.name(), + preferredScoringKey); + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java index f08371ad59..4ff54f7b44 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java @@ -29,6 +29,8 @@ import static org.mockito.Matchers.anyMap; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.any; public class TestMaxCapacityUsageInstanceConstraint { private AssignableReplica _testReplica; @@ -45,7 +47,7 @@ public void setUp() { @Test public void testGetNormalizedScore() { - when(_testNode.getGeneralProjectedHighestUtilization(anyMap())).thenReturn(0.8f); + when(_testNode.getGeneralProjectedHighestUtilization(anyMap(), any())).thenReturn(0.8f); when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f); double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); // Convert to float so as to compare with equal. @@ -54,4 +56,18 @@ public void testGetNormalizedScore() { _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); Assert.assertTrue(normalizedScore > 0.99); } + + @Test + public void testGetNormalizedScoreWithPreferredScoringKey() { + String preferredScoringkey = "CU"; + when(_testNode.getGeneralProjectedHighestUtilization(anyMap(), eq(preferredScoringkey))).thenReturn(0.5f); + when(_clusterContext.getPreferredScoringKey()).thenReturn(preferredScoringkey); + when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f); + double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); + // Convert to float so as to compare with equal. + Assert.assertEquals((float) score,0.5f); + double normalizedScore = + _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); + Assert.assertTrue(normalizedScore > 0.99); + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java index 947d0a18aa..308ad8ecc3 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java @@ -26,6 +26,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Matchers.anyMap; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -47,7 +49,7 @@ public void setUp() { @Test public void testGetNormalizedScore() { when(_testReplica.isReplicaTopState()).thenReturn(true); - when(_testNode.getTopStateProjectedHighestUtilization(anyMap())).thenReturn(0.8f); + when(_testNode.getTopStateProjectedHighestUtilization(anyMap(), any())).thenReturn(0.8f); when(_clusterContext.getEstimatedTopStateMaxUtilization()).thenReturn(1f); double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); // Convert to float so as to compare with equal. @@ -56,4 +58,20 @@ public void testGetNormalizedScore() { _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); Assert.assertTrue(normalizedScore > 0.99); } + + + @Test + public void testGetNormalizedScoreWithPreferredScoringKey() { + String preferredScoringkey = "CU"; + when(_testReplica.isReplicaTopState()).thenReturn(true); + when(_testNode.getTopStateProjectedHighestUtilization(anyMap(), eq(preferredScoringkey))).thenReturn(0.5f); + when(_clusterContext.getPreferredScoringKey()).thenReturn(preferredScoringkey); + when(_clusterContext.getEstimatedTopStateMaxUtilization()).thenReturn(1f); + double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); + // Convert to float so as to compare with equal. + Assert.assertEquals((float) score,0.5f); + double normalizedScore = + _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); + Assert.assertTrue(normalizedScore > 0.99); + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java index 7171755e97..92d9a2d6d2 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java @@ -26,10 +26,13 @@ import java.util.Set; import java.util.stream.Collectors; +import com.google.common.collect.ImmutableMap; import org.apache.helix.HelixException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.model.ClusterConfig; import org.testng.Assert; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class TestClusterContext extends AbstractTestClusterModel { @@ -94,4 +97,46 @@ public void testDuplicateAssign() throws IOException { context .addPartitionToFaultZone(_testFaultZoneId, _resourceNames.get(0), _partitionNames.get(0)); } + + @DataProvider(name = "preferredScoringKey") + public static Object[][] preferredScoringKey() { + return new Object[][]{ + {"item1"},//valid key + {"item3"},//valid key + {"item-x"},//invalid key + {null} + }; + } + + @Test(dataProvider = "preferredScoringKey") + public void testEstimateMaxUtilization(String preferredScoringKey) throws IOException { + ResourceControllerDataProvider testCache = setupClusterDataCache(); + Set assignmentSet = generateReplicas(testCache); + ClusterConfig clusterConfig = testCache.getClusterConfig(); + clusterConfig.setPreferredScoringKey(preferredScoringKey); + ClusterContext context = + new ClusterContext(assignmentSet, generateNodes(testCache), new HashMap<>(), + new HashMap<>(), clusterConfig); + /** + * Total Capacity and Total Usage values calculated from nodeSet and replicaSet above are as follows: + * TotalCapacity : {"item1",20, "item2",40, "item3",30} + * TotalUsage : {"item1",16, "item2",32, "item3",0} + * Using these values to validate the results of estimateMaxUtilization. + */ + + validateResult(ImmutableMap.of("item1", 20, "item2", 40, "item3", 30), + ImmutableMap.of("item1", 16, "item2", 32, "item3", 0), + preferredScoringKey, context.getEstimatedMaxUtilization()); + } + + private void validateResult(Map totalCapacity, Map totalUsage, + String preferredScoringKey, float actualEstimatedMaxUtilization) { + if (preferredScoringKey == null || !totalCapacity.keySet().contains(preferredScoringKey)) { + //estimatedMaxUtilization calculated from all capacity keys + Assert.assertEquals(actualEstimatedMaxUtilization, 0.8f); + return; + } + //estimatedMaxUtilization calculated using preferredScoringKey only. + Assert.assertEquals(actualEstimatedMaxUtilization, (float) totalUsage.get(preferredScoringKey) / totalCapacity.get(preferredScoringKey)); + } }