-
Notifications
You must be signed in to change notification settings - Fork 228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make StickyRebalanceStrategy topology aware #2944
Changes from 4 commits
27080a5
ab1c572
5c5f866
c5e0be7
bb5b883
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,23 +21,57 @@ | |
|
||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.LinkedHashMap; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
import org.apache.helix.controller.rebalancer.topology.Topology; | ||
import org.apache.helix.model.ClusterConfig; | ||
import org.apache.helix.model.ClusterTopologyConfig; | ||
import org.apache.helix.model.InstanceConfig; | ||
|
||
/** | ||
* A Node is an entity that can serve capacity recording purpose. It has a capacity and knowledge | ||
* of partitions assigned to it, so it can decide if it can receive additional partitions. | ||
*/ | ||
public class CapacityNode { | ||
public class CapacityNode implements Comparable<CapacityNode> { | ||
private int _currentlyAssigned; | ||
private int _capacity; | ||
private final String _id; | ||
private final String _instanceName; | ||
private final String _logicaId; | ||
private final String _faultZone; | ||
private final Map<String, Set<String>> _partitionMap; | ||
|
||
public CapacityNode(String id) { | ||
_partitionMap = new HashMap<>(); | ||
_currentlyAssigned = 0; | ||
this._id = id; | ||
/** | ||
* Constructor used for non-topology-aware use case | ||
* @param instanceName The instance name of this node | ||
* @param capacity The capacity of this node | ||
*/ | ||
public CapacityNode(String instanceName, int capacity) { | ||
this._instanceName = instanceName; | ||
this._logicaId = null; | ||
this._faultZone = null; | ||
this._partitionMap = new HashMap<>(); | ||
this._capacity = capacity; | ||
this._currentlyAssigned = 0; | ||
} | ||
|
||
/** | ||
* Constructor used for non-topology-aware use case | ||
* @param instanceName The instance name of this node | ||
* @param clusterConfig The cluster config for current helix cluster | ||
* @param clusterTopologyConfig The cluster topology config for current helix cluster | ||
* @param instanceConfig The instance config for current instance | ||
*/ | ||
public CapacityNode(String instanceName, ClusterConfig clusterConfig, | ||
ClusterTopologyConfig clusterTopologyConfig, InstanceConfig instanceConfig) { | ||
this._instanceName = instanceName; | ||
this._logicaId = clusterTopologyConfig != null ? instanceConfig.getLogicalId( | ||
clusterTopologyConfig.getEndNodeType()) : instanceName; | ||
this._faultZone = computeFaultZone(clusterConfig, instanceConfig); | ||
this._partitionMap = new HashMap<>(); | ||
this._capacity = clusterConfig.getGlobalMaxPartitionAllowedPerInstance(); | ||
this._currentlyAssigned = 0; | ||
} | ||
|
||
/** | ||
|
@@ -80,11 +114,27 @@ public void setCapacity(int capacity) { | |
} | ||
|
||
/** | ||
* Get the ID of this node | ||
* @return The ID of this node | ||
* Get the instance name of this node | ||
* @return The instance name of this node | ||
*/ | ||
public String getId() { | ||
return _id; | ||
public String getInstanceName() { | ||
return _instanceName; | ||
} | ||
|
||
/** | ||
* Get the logical id of this node | ||
* @return The logical id of this node | ||
*/ | ||
public String getLogicalId() { | ||
return _logicaId; | ||
} | ||
|
||
/** | ||
* Get the fault zone of this node | ||
* @return The fault zone of this node | ||
*/ | ||
public String getFaultZone() { | ||
return _faultZone; | ||
} | ||
|
||
/** | ||
|
@@ -98,8 +148,40 @@ public int getCurrentlyAssigned() { | |
@Override | ||
public String toString() { | ||
StringBuilder sb = new StringBuilder(); | ||
sb.append("##########\nname=").append(_id).append("\nassigned:").append(_currentlyAssigned) | ||
.append("\ncapacity:").append(_capacity); | ||
sb.append("##########\nname=").append(_instanceName).append("\nassigned:") | ||
.append(_currentlyAssigned).append("\ncapacity:").append(_capacity).append("\nlogicalId:") | ||
.append(_logicaId).append("\nfaultZone:").append(_faultZone); | ||
return sb.toString(); | ||
} | ||
|
||
@Override | ||
public int compareTo(CapacityNode o) { | ||
if (_logicaId != null) { | ||
return _logicaId.compareTo(o.getLogicalId()); | ||
} | ||
return _instanceName.compareTo(o.getInstanceName()); | ||
} | ||
|
||
/** | ||
* Computes the fault zone id based on the domain and fault zone type when topology is enabled. | ||
* For example, when | ||
* the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function | ||
* returns "2". | ||
* If cannot find the fault zone type, this function leaves the fault zone id as the instance name. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we use logical ID as default fault zone if we can't find the fault zone type? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is copied from the waged rebalancer: https://github.com/apache/helix/blob/master/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java#L363-L383 Any pointers on why logical ID is a better candidate here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In terms of swap, using logic id will make the swap in/out instances in the same fault zone. But I also agree that this should align with other rebalancers' behavior. |
||
* TODO: change the return value to logical id when no fault zone type found. Also do the same for | ||
* waged rebalancer in helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java | ||
*/ | ||
private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we try to create a util class with this logic instead of copying it in two places and having to maintain both methods? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may be worth while to create a Node base class(different than the one under topology package) that extracts common logic from AssignableNode and CapacityNode. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is something I prefer to do in later iterations: consolidate |
||
LinkedHashMap<String, String> instanceTopologyMap = | ||
Topology.computeInstanceTopologyMap(clusterConfig, instanceConfig.getInstanceName(), | ||
instanceConfig, true /*earlyQuitTillFaultZone*/); | ||
|
||
StringBuilder faultZoneStringBuilder = new StringBuilder(); | ||
for (Map.Entry<String, String> entry : instanceTopologyMap.entrySet()) { | ||
faultZoneStringBuilder.append(entry.getValue()); | ||
faultZoneStringBuilder.append('/'); | ||
} | ||
faultZoneStringBuilder.setLength(faultZoneStringBuilder.length() - 1); | ||
return faultZoneStringBuilder.toString(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,8 @@ | |
import org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider; | ||
import org.apache.helix.controller.stages.CurrentStateOutput; | ||
import org.apache.helix.controller.stages.MissingTopStateRecord; | ||
import org.apache.helix.model.ClusterConfig; | ||
import org.apache.helix.model.ClusterTopologyConfig; | ||
import org.apache.helix.model.CustomizedState; | ||
import org.apache.helix.model.CustomizedStateConfig; | ||
import org.apache.helix.model.CustomizedView; | ||
|
@@ -190,7 +192,7 @@ public synchronized void refresh(HelixDataAccessor accessor) { | |
|
||
if (getClusterConfig() != null | ||
&& getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) { | ||
buildSimpleCapacityMap(getClusterConfig().getGlobalMaxPartitionAllowedPerInstance()); | ||
buildSimpleCapacityMap(); | ||
// Remove all cached IdealState because it is a global computation cannot partially be | ||
// performed for some resources. The computation is simple as well not taking too much resource | ||
// to recompute the assignments. | ||
|
@@ -573,11 +575,16 @@ public WagedInstanceCapacity getWagedInstanceCapacity() { | |
return _wagedInstanceCapacity; | ||
} | ||
|
||
private void buildSimpleCapacityMap(int globalMaxPartitionAllowedPerInstance) { | ||
private void buildSimpleCapacityMap() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just a general comment that we don't need to address right away, but we may want to address before merging this to master. Should we building the capacity nodes in the resource controller data provider? It seems like an implementation detail for one specific rebalance strategy. If there are no resources in the cluster which use StickyRebalanceStrategy then this is not needed. Why don't we move the creation of simpleCapacitySet to the rebalance strategy? If this is here because StickyRebalanceStrategy relies on global node capacity, maybe we should be implementing StatefulRebalancer interface instead of using single resources rebalance. That has method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah, this is exact the reason - we need to populate the node usage globally before the rebalance stage. |
||
ClusterConfig clusterConfig = getClusterConfig(); | ||
ClusterTopologyConfig clusterTopologyConfig = | ||
ClusterTopologyConfig.createFromClusterConfig(clusterConfig); | ||
Map<String, InstanceConfig> instanceConfigMap = getAssignableInstanceConfigMap(); | ||
_simpleCapacitySet = new HashSet<>(); | ||
for (String instance : getEnabledLiveInstances()) { | ||
CapacityNode capacityNode = new CapacityNode(instance); | ||
capacityNode.setCapacity(globalMaxPartitionAllowedPerInstance); | ||
for (String instanceName : getAssignableInstances()) { | ||
CapacityNode capacityNode = | ||
new CapacityNode(instanceName, clusterConfig, clusterTopologyConfig, | ||
instanceConfigMap.getOrDefault(instanceName, new InstanceConfig(instanceName))); | ||
_simpleCapacitySet.add(capacityNode); | ||
} | ||
} | ||
|
@@ -591,7 +598,7 @@ public void populateSimpleCapacitySetUsage(final Set<String> resourceNameSet, | |
// Convert the assignableNodes to map for quick lookup | ||
Map<String, CapacityNode> simpleCapacityMap = new HashMap<>(); | ||
for (CapacityNode node : _simpleCapacitySet) { | ||
simpleCapacityMap.put(node.getId(), node); | ||
simpleCapacityMap.put(node.getInstanceName(), node); | ||
} | ||
for (String resourceName : resourceNameSet) { | ||
// Process current state mapping | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is for topology aware?
also can we have one constructor call another?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will make this change