Skip to content

Commit

Permalink
Enable computation of evenness score based on a Preferred Scoring Key
Browse files Browse the repository at this point in the history
  • Loading branch information
csudharsanan committed Dec 1, 2023
1 parent 7f2a88d commit 7fee710
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The constraint evaluates the score by checking the max used capacity key out of all the capacity
Expand All @@ -31,13 +33,17 @@
* It is a greedy approach since it evaluates only on the most used capacity key.
*/
class MaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
private static final Logger LOG = LoggerFactory.getLogger(MaxCapacityUsageInstanceConstraint.class.getName());

@Override
protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
float estimatedMaxUtilization = clusterContext.getEstimatedMaxUtilization();
float projectedHighestUtilization =
node.getGeneralProjectedHighestUtilization(replica.getCapacity());
return computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization);
node.getGeneralProjectedHighestUtilization(replica.getCapacity(), clusterContext.getPreferredScoringKey());
double utilizationScore = computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization);
LOG.info("[DEPEND-29018] clusterName: {}, estimatedMaxUtilization: {}, projectedHighestUtilization: {}, utilizationScore: {}, preferredScoringKey: {}",
clusterContext.getClusterName(), estimatedMaxUtilization, projectedHighestUtilization, utilizationScore, clusterContext.getPreferredScoringKey());
return utilizationScore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -31,6 +33,7 @@
* It is a greedy approach since it evaluates only on the most used capacity key.
*/
class TopStateMaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
private static final Logger LOG = LoggerFactory.getLogger(TopStateMaxCapacityUsageInstanceConstraint.class.getName());
@Override
protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
Expand All @@ -41,7 +44,10 @@ protected double getAssignmentScore(AssignableNode node, AssignableReplica repli
}
float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization();
float projectedHighestUtilization =
node.getTopStateProjectedHighestUtilization(replica.getCapacity());
return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
node.getTopStateProjectedHighestUtilization(replica.getCapacity(), clusterContext.getPreferredScoringKey());
double utilizationScore = computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
LOG.info("[DEPEND-29018] clusterName: {}, estimatedTopStateMaxUtilization: {}, projectedHighestUtilization: {}, utilizationScore: {}, preferredScoringKey: {}",
clusterContext.getClusterName(), estimatedTopStateMaxUtilization, projectedHighestUtilization, utilizationScore, clusterContext.getPreferredScoringKey());
return utilizationScore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
private Map<String, Integer> _remainingCapacity;
private Map<String, Integer> _remainingTopStateCapacity;

private String _clusterName;

/**
* Update the node with a ClusterDataCache. This resets the current assignment and recalculates
* currentCapacity.
Expand All @@ -84,6 +86,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
_remainingTopStateCapacity = new HashMap<>(instanceCapacity);
_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
_currentAssignedReplicaMap = new HashMap<>();
_clusterName = clusterConfig.getClusterName();
}

/**
Expand Down Expand Up @@ -195,7 +198,7 @@ public Set<String> getAssignedPartitionsByResource(String resource) {
Set<String> getAssignedTopStatePartitionsByResource(String resource) {
return _currentAssignedReplicaMap.getOrDefault(resource, Collections.emptyMap()).entrySet()
.stream().filter(partitionEntry -> partitionEntry.getValue().isReplicaTopState())
.map(partitionEntry -> partitionEntry.getKey()).collect(Collectors.toSet());
.map(Map.Entry::getKey).collect(Collectors.toSet());
}

/**
Expand Down Expand Up @@ -239,7 +242,27 @@ public Map<String, Integer> getMaxCapacity() {
* @return The highest utilization number of the node among all the capacity category.
*/
public float getGeneralProjectedHighestUtilization(Map<String, Integer> newUsage) {
return getProjectedHighestUtilization(newUsage, _remainingCapacity);
return getProjectedHighestUtilization(newUsage, _remainingCapacity, null);
}

/**
* Return the most concerning capacity utilization number for evenly partition assignment.
* The method dynamically calculates the projected highest utilization number among all the
* capacity categories assuming the new capacity usage is added to the node.
*
* If the preferredScoringKey is specified then utilization number is computed based op the
* specified capacity category (key) only.
*
* For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}, preferredScoringKey: CPU
* Then this call shall return 0.9.
*
* @param newUsage the proposed new additional capacity usage.
* @param preferredScoringKey if provided, the capacity utilization will be calculated based on
* the supplied key only, else across all capacity categories.
* @return The highest utilization number of the node among the specified capacity category.
*/
public float getGeneralProjectedHighestUtilization(Map<String, Integer> newUsage, String preferredScoringKey) {
return getProjectedHighestUtilization(newUsage, _remainingCapacity, preferredScoringKey);
}

/**
Expand All @@ -253,18 +276,46 @@ public float getGeneralProjectedHighestUtilization(Map<String, Integer> newUsage
* @return The highest utilization number of the node among all the capacity category.
*/
public float getTopStateProjectedHighestUtilization(Map<String, Integer> newUsage) {
return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity);
return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity, null);
}

/**
* Return the most concerning capacity utilization number for evenly partition assignment.
* The method dynamically calculates the projected highest utilization number among all the
* capacity categories assuming the new capacity usage is added to the node.
*
* If the preferredScoringKey is specified then utilization number is computed based op the
* specified capacity category (key) only.
*
* For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}, preferredScoringKey: CPU
* Then this call shall return 0.9.
*
* This function returns projected highest utilization for only top state partitions.
*
* @param newUsage the proposed new additional capacity usage.
* @param preferredScoringKey if provided, the capacity utilization will be calculated based on
* the supplied key only, else across all capacity categories.
* @return The highest utilization number of the node among all the capacity category.
*/
public float getTopStateProjectedHighestUtilization(Map<String, Integer> newUsage, String preferredScoringKey) {
return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity, preferredScoringKey);
}

private float getProjectedHighestUtilization(Map<String, Integer> newUsage,
Map<String, Integer> remainingCapacity) {
Map<String, Integer> remainingCapacity, String preferredScoringKey) {
Set<String> capacityKeySet = _maxAllowedCapacity.keySet();
if (preferredScoringKey != null && capacityKeySet.contains(preferredScoringKey)) {
capacityKeySet = ImmutableSet.of(preferredScoringKey);
}
float highestCapacityUtilization = 0;
for (String capacityKey : _maxAllowedCapacity.keySet()) {
for (String capacityKey : capacityKeySet) {
float capacityValue = _maxAllowedCapacity.get(capacityKey);
float utilization = (capacityValue - remainingCapacity.get(capacityKey) + newUsage
.getOrDefault(capacityKey, 0)) / capacityValue;
highestCapacityUtilization = Math.max(highestCapacityUtilization, utilization);
}
LOG.info("[DEPEND-29018] clusterName: {}, newUsage: {}, remainingCapacity: {}, preferredScoringKey: {}, highestCapacityUtilization: {}",
_clusterName, newUsage, remainingCapacity, preferredScoringKey, highestCapacityUtilization);
return highestCapacityUtilization;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,24 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* This class tracks the rebalance-related global cluster status.
*/
public class ClusterContext {
private static final Logger LOG = LoggerFactory.getLogger(ClusterContext.class.getName());

// This estimation helps to ensure global partition count evenness
private final int _estimatedMaxPartitionCount;
// This estimation helps to ensure global top state replica count evenness
Expand All @@ -57,24 +64,34 @@ public class ClusterContext {
private final Map<String, Integer> _estimateUtilizationMap;
// Cluster total capacity. Used to compute score when sorting replicas.
private final Map<String, Integer> _clusterCapacityMap;
private final String _preferredScoringKey;
private final String _clusterName;

/**
* Construct the cluster context based on the current instance status.
* @param replicaSet All the partition replicas that are managed by the rebalancer
* @param nodeSet All the active nodes that are managed by the rebalancer
*/
ClusterContext(Set<AssignableReplica> replicaSet, Set<AssignableNode> nodeSet,
Map<String, ResourceAssignment> baselineAssignment, Map<String, ResourceAssignment> bestPossibleAssignment) {
Map<String, ResourceAssignment> baselineAssignment, Map<String, ResourceAssignment> bestPossibleAssignment) {
this(replicaSet, nodeSet, baselineAssignment, bestPossibleAssignment, null);
}

ClusterContext(Set<AssignableReplica> replicaSet, Set<AssignableNode> nodeSet,
Map<String, ResourceAssignment> baselineAssignment, Map<String, ResourceAssignment> bestPossibleAssignment,
ClusterConfig clusterConfig) {
int instanceCount = nodeSet.size();
int totalReplicas = 0;
int totalTopStateReplicas = 0;
Map<String, Integer> totalUsage = new HashMap<>();
Map<String, Integer> totalTopStateUsage = new HashMap<>();
Map<String, Integer> totalCapacity = new HashMap<>();
_preferredScoringKey = Optional.ofNullable(clusterConfig).map(ClusterConfig::getPreferredScoringKey).orElse(null);
_clusterName = Optional.ofNullable(clusterConfig).map(ClusterConfig::getClusterName).orElse(null);

for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
.collect(Collectors.groupingBy(AssignableReplica::getResourceName))
.entrySet()) {
.collect(Collectors.groupingBy(AssignableReplica::getResourceName))
.entrySet()) {
int replicas = entry.getValue().size();
totalReplicas += replicas;

Expand All @@ -85,14 +102,14 @@ public class ClusterContext {
if (replica.isReplicaTopState()) {
totalTopStateReplicas += 1;
replica.getCapacity().forEach(
(key, value) -> totalTopStateUsage.compute(key, (k, v) -> (v == null) ? value : (v + value)));
(key, value) -> totalTopStateUsage.compute(key, (k, v) -> (v == null) ? value : (v + value)));
}
replica.getCapacity().forEach(
(key, value) -> totalUsage.compute(key, (k, v) -> (v == null) ? value : (v + value)));
(key, value) -> totalUsage.compute(key, (k, v) -> (v == null) ? value : (v + value)));
}
}
nodeSet.forEach(node -> node.getMaxCapacity().forEach(
(key, value) -> totalCapacity.compute(key, (k, v) -> (v == null) ? value : (v + value))));
(key, value) -> totalCapacity.compute(key, (k, v) -> (v == null) ? value : (v + value))));

// TODO: these variables correspond to one constraint each, and may become unnecessary if the
// constraints are not used. A better design is to make them pluggable.
Expand All @@ -103,8 +120,8 @@ public class ClusterContext {
_estimateUtilizationMap = Collections.emptyMap();
_clusterCapacityMap = Collections.emptyMap();
} else {
_estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, totalUsage);
_estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, totalTopStateUsage);
_estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, totalUsage, _preferredScoringKey, _clusterName);
_estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, totalTopStateUsage, _preferredScoringKey, _clusterName);
_estimateUtilizationMap = estimateUtilization(totalCapacity, totalUsage);
_clusterCapacityMap = Collections.unmodifiableMap(totalCapacity);
}
Expand All @@ -114,6 +131,23 @@ public class ClusterContext {
_bestPossibleAssignment = bestPossibleAssignment;
}


/**
* Get preferred scoring key if set.
*
* @return PreferredScoringKey which is used in computation of evenness score
*/
public String getPreferredScoringKey() {
return _preferredScoringKey;
}

/**
* Get cluster name if set.
*/
public String getClusterName() {
return _clusterName;
}

public Map<String, ResourceAssignment> getBaselineAssignment() {
return _baselineAssignment == null || _baselineAssignment.isEmpty() ? Collections.emptyMap() : _baselineAssignment;
}
Expand Down Expand Up @@ -190,16 +224,39 @@ private static int estimateAvgReplicaCount(int replicaCount, int instanceCount)
return (int) Math.floor((float) replicaCount / instanceCount);
}

/**
* Estimates the max utilization number from all capacity categories and their usages.
* If the preferredScoringKey is specified then max utilization number is computed based op the
* specified capacity category (key) only.
*
* For example, if totalCapacity is {CPU: 0.6, MEM: 0.7, DISK: 0.9}, totalUsage is {CPU: 0.1, MEM: 0.2, DISK: 0.3},
* preferredScoringKey: CPU. Then this call shall return 0.16. If preferredScoringKey
* is not specified, this call returns 0.33 which would be the max utilization for the DISK.
*
* @param totalCapacity Sum total of max capacity of all active nodes managed by a rebalancer
* @param totalUsage Sum total of capacity usage of all partition replicas that are managed by the rebalancer
* @param preferredScoringKey if provided, the max utilization will be calculated based on
* the supplied key only, else across all capacity categories.
* @param clusterName clusterName is used for logging purposes only.
* @return The max utilization number from the specified capacity categories.
*/

private static float estimateMaxUtilization(Map<String, Integer> totalCapacity,
Map<String, Integer> totalUsage) {
Map<String, Integer> totalUsage,
String preferredScoringKey, String clusterName) {
float estimatedMaxUsage = 0;
for (String capacityKey : totalCapacity.keySet()) {
Set<String> capacityKeySet = totalCapacity.keySet();
if (preferredScoringKey != null && capacityKeySet.contains(preferredScoringKey)) {
capacityKeySet = ImmutableSet.of(preferredScoringKey);
}
for (String capacityKey : capacityKeySet) {
int maxCapacity = totalCapacity.get(capacityKey);
int usage = totalUsage.getOrDefault(capacityKey, 0);
float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
}

LOG.info("[DEPEND-29018] clusterName: {}, totalCapacity: {}, totalUsage: {}, preferredScoringKey: {}, estimatedMaxUsage: {}",
clusterName, totalCapacity, totalUsage, preferredScoringKey, estimatedMaxUsage);
return estimatedMaxUsage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private static ClusterModel generateClusterModel(ResourceControllerDataProvider
// Construct and initialize cluster context.
ClusterContext context = new ClusterContext(
replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
assignableNodes, idealAssignment, currentAssignment);
assignableNodes, idealAssignment, currentAssignment, dataProvider.getClusterConfig());
// Initial the cluster context with the allocated assignments.
context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ public enum ClusterConfigProperty {
HELIX_DISABLED_TYPE,

// The last time when the on-demand rebalance is triggered.
LAST_ON_DEMAND_REBALANCE_TIMESTAMP
LAST_ON_DEMAND_REBALANCE_TIMESTAMP,

//Preferred scoring key used in evenness score computation
PREFERRED_SCORING_KEY
}

public enum GlobalRebalancePreferenceKey {
Expand Down Expand Up @@ -1198,4 +1201,23 @@ public void setLastOnDemandRebalanceTimestamp(long rebalanceTimestamp) {
_record.setLongField(ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(),
rebalanceTimestamp);
}

/**
* Get preferred scoring key if set.
*
* @return PreferredScoringKey that is used in computation of evenness score
*/
public String getPreferredScoringKey() {
return _record.getSimpleField(ClusterConfigProperty.PREFERRED_SCORING_KEY.name());
}

/**
* Set preferred scoring key for cluster.
*
* @param preferredScoringKey value used in evenness score computation
*/
public void setPreferredScoringKey(String preferredScoringKey) {
_record.setSimpleField(ClusterConfigProperty.PREFERRED_SCORING_KEY.name(),
preferredScoringKey);
}
}
Loading

0 comments on commit 7fee710

Please sign in to comment.