Skip to content

Commit

Permalink
Instance Evacuation prevent baseline re-computation after dropped (#2740
Browse files Browse the repository at this point in the history
)

Instance Evacuation logic is moved to BaseControllerDataProvider cache and will only cause baseline recalculation when the evacuate is triggered instead of evacuating the node and then recalculating baseline after node is dropped causing more shuffling. Also fix getAllLiveInstances to exclude TimedOutForMaintenance nodes.
  • Loading branch information
zpinto authored Jan 30, 2024
1 parent f193045 commit 054d77a
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ public class BaseControllerDataProvider implements ControlContextProvider {
private final Set<String> _enabledSwapInInstanceNames = new HashSet<>();
private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
private final Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance = new HashMap<>();
private Map<String, LiveInstance> _allLiveInstanceExcludeTimedOutForMaintenance = new HashMap<>();
private Map<String, LiveInstance> _assignableLiveInstanceExcludeTimedOutForMaintenance =
new HashMap<>();

public BaseControllerDataProvider() {
this(AbstractDataCache.UNKNOWN_CLUSTER, AbstractDataCache.UNKNOWN_PIPELINE);
Expand Down Expand Up @@ -345,7 +347,6 @@ private void refreshInstanceConfigs(final HelixDataAccessor accessor,
* Refreshes the assignable instances and SWAP related caches. This should be called after
* liveInstance and instanceConfig caches are refreshed. To determine what instances are
* assignable and live, it takes a combination of both the all instanceConfigs and liveInstances.
* TODO: Add EVACUATE InstanceOperation to be filtered out in assignable nodes.
*
* @param instanceConfigMap InstanceConfig map from instanceConfig cache
* @param liveInstancesMap LiveInstance map from liveInstance cache
Expand Down Expand Up @@ -406,7 +407,9 @@ private void updateInstanceSets(Map<String, InstanceConfig> instanceConfigMap,
_assignableInstanceConfigMap.put(node, currentInstanceConfig);
filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
}
} else {
} else if (!currentInstanceConfig.getInstanceOperation()
.equals(InstanceConstants.InstanceOperation.EVACUATE.name())) {
// EVACUATE instances are not considered to be assignable.
_assignableInstanceConfigMap.put(node, currentInstanceConfig);
filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
}
Expand Down Expand Up @@ -472,7 +475,8 @@ private void refreshManagementSignals(final HelixDataAccessor accessor) {
// If maintenance mode has exited, clear cached timed-out nodes
if (!_isMaintenanceModeEnabled) {
_timedOutInstanceDuringMaintenance.clear();
_liveInstanceExcludeTimedOutForMaintenance.clear();
_allLiveInstanceExcludeTimedOutForMaintenance.clear();
_assignableLiveInstanceExcludeTimedOutForMaintenance.clear();
}
}

Expand All @@ -484,21 +488,26 @@ private void timeoutNodesDuringMaintenance(final HelixDataAccessor accessor, Clu
timeOutWindow = clusterConfig.getOfflineNodeTimeOutForMaintenanceMode();
}
if (timeOutWindow >= 0 && isMaintenanceModeEnabled) {
for (String instance : _assignableLiveInstancesMap.keySet()) {
for (String instance : _allLiveInstanceCache.getPropertyMap().keySet()) {
// 1. Check timed-out cache and don't do repeated work;
// 2. Check for nodes that didn't exist in the last iteration, because it has been checked;
// 3. For all other nodes, check if it's timed-out.
// When maintenance mode is first entered, all nodes will be checked as a result.
if (!_timedOutInstanceDuringMaintenance.contains(instance)
&& !_liveInstanceExcludeTimedOutForMaintenance.containsKey(instance)
&& !_allLiveInstanceExcludeTimedOutForMaintenance.containsKey(instance)
&& isInstanceTimedOutDuringMaintenance(accessor, instance, timeOutWindow)) {
_timedOutInstanceDuringMaintenance.add(instance);
}
}
}
if (isMaintenanceModeEnabled) {
_liveInstanceExcludeTimedOutForMaintenance = _assignableLiveInstancesMap.entrySet().stream()
.filter(e -> !_timedOutInstanceDuringMaintenance.contains(e.getKey()))
_allLiveInstanceExcludeTimedOutForMaintenance =
_allLiveInstanceCache.getPropertyMap().entrySet().stream()
.filter(e -> !_timedOutInstanceDuringMaintenance.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
_assignableLiveInstanceExcludeTimedOutForMaintenance =
_assignableLiveInstancesMap.entrySet().stream()
.filter(e -> !_timedOutInstanceDuringMaintenance.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Expand Down Expand Up @@ -649,7 +658,7 @@ public Map<String, Map<String, String>> getIdealStateRules() {
*/
public Map<String, LiveInstance> getAssignableLiveInstances() {
if (isMaintenanceModeEnabled()) {
return Collections.unmodifiableMap(_liveInstanceExcludeTimedOutForMaintenance);
return Collections.unmodifiableMap(_assignableLiveInstanceExcludeTimedOutForMaintenance);
}

return Collections.unmodifiableMap(_assignableLiveInstancesMap);
Expand All @@ -663,6 +672,10 @@ public Map<String, LiveInstance> getAssignableLiveInstances() {
* @return A map of LiveInstances to their instance names
*/
public Map<String, LiveInstance> getLiveInstances() {
if (isMaintenanceModeEnabled()) {
return Collections.unmodifiableMap(_allLiveInstanceExcludeTimedOutForMaintenance);
}

return _allLiveInstanceCache.getPropertyMap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public ResourceAssignment computeBestPossiblePartitionState(T cache, IdealState
List<String> preferenceList = getPreferenceList(partition, idealState,
Collections.unmodifiableSet(cache.getAssignableLiveInstances().keySet()));
Map<String, String> bestStateForPartition =
computeBestPossibleStateForPartition(cache.getAssignableLiveInstances().keySet(),
computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(),
stateModelDef,
preferenceList, currentStateOutput, disabledInstancesForPartition, idealState,
cache.getClusterConfig(), partition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
* under the License.
*/

import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -33,7 +32,6 @@

import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
Expand All @@ -55,8 +53,6 @@
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
public static ImmutableSet<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT =
ImmutableSet.of(InstanceConstants.InstanceOperation.EVACUATE.name());

@Override
public IdealState computeNewIdealState(String resourceName,
Expand Down Expand Up @@ -160,14 +156,8 @@ public IdealState computeNewIdealState(String resourceName,
stateCountMap, maxPartition);

List<String> allNodeList = new ArrayList<>(assignableNodes);
List<String> liveEnabledAssignableNodeList = new ArrayList<>(assignableLiveEnabledNodes);

// TODO: Currently we have 2 groups of instances and compute preference list twice and merge.
// Eventually we want to have exclusive groups of instance for different instance tag.
List<String> liveEnabledAssignableNodeList = new ArrayList<>(
// We will not assign partitions to instances with EVACUATE InstanceOperation.
DelayedRebalanceUtil.filterOutEvacuatingInstances(
clusterData.getAssignableInstanceConfigMap(),
assignableLiveEnabledNodes));
// sort node lists to ensure consistent preferred assignments
Collections.sort(allNodeList);
Collections.sort(liveEnabledAssignableNodeList);
Expand Down Expand Up @@ -276,7 +266,9 @@ public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDa
cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
List<String> preferenceList = getPreferenceList(partition, idealState, activeNodes);
Map<String, String> bestStateForPartition =
computeBestPossibleStateForPartition(liveNodes, stateModelDef, preferenceList,
// We use cache.getLiveInstances().keySet() to make sure we gracefully handle n -> n + 1 replicas if possible
// when the one of the current nodes holding the replica is no longer considered assignable. (ex: EVACUATE)
computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef, preferenceList,
currentStateOutput, disabledInstancesForPartition, idealState, clusterConfig,
partition, cache.getAbnormalStateResolver(stateModelDefName), cache);

Expand Down Expand Up @@ -404,7 +396,7 @@ protected Map<String, String> computeBestPossibleStateForPartition(Set<String> l

// If the load-balance finishes (all replica are migrated to new instances),
// we should drop all partitions from previous assigned instances.
if (!currentMapWithPreferenceList.values().contains(HelixDefinedState.ERROR.name())
if (!currentMapWithPreferenceList.containsValue(HelixDefinedState.ERROR.name())
&& bestPossibleStateMap.size() > numReplicas && readyToDrop(currentStateMap,
bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,28 +85,28 @@ public static long getRebalanceDelay(IdealState idealState, ClusterConfig cluste
}

/**
* @return all active nodes (live nodes not marked as evacuate plus offline-yet-active nodes)
* @return all active nodes (live nodes plus offline-yet-active nodes)
* while considering cluster delay rebalance configurations.
*/
public static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
if (!isDelayRebalanceEnabled(clusterConfig)) {
return filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes);
return liveEnabledNodes;
}
return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes,
instanceConfigMap, clusterConfig.getRebalanceDelayTime(), clusterConfig);
}

/**
* @return all active nodes (live nodes not marked as evacuate plus offline-yet-active nodes)
* @return all active nodes (live nodes plus offline-yet-active nodes)
* while considering cluster delay rebalance configurations.
*/
public static Set<String> getActiveNodes(Set<String> allNodes, IdealState idealState,
Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
return filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes);
return liveEnabledNodes;
}
return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes,
instanceConfigMap, delay, clusterConfig);
Expand All @@ -128,17 +128,7 @@ private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> live
activeNodes.add(ins);
}
}
// TODO: change this after merging operation and helix-enable field.
return filterOutEvacuatingInstances(instanceConfigMap, activeNodes);
}

public static Set<String> filterOutEvacuatingInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(instance -> (instanceConfigMap.get(instance) != null && !instanceConfigMap.get(instance)
.getInstanceOperation()
.equals(InstanceConstants.InstanceOperation.EVACUATE.name())))
.collect(Collectors.toSet());
return activeNodes;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,7 @@ private Map<String, ResourceAssignment> handleDelayedRebalanceMinActiveReplica(
RebalanceAlgorithm algorithm) throws HelixRebalanceException {

// the "real" live nodes at the time
// TODO: Move evacuation into BaseControllerDataProvider assignableNode logic.
final Set<String> enabledLiveInstances = DelayedRebalanceUtil.filterOutEvacuatingInstances(
clusterData.getAssignableInstanceConfigMap(),
clusterData.getAssignableEnabledLiveInstances());
final Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();

if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
// no need for additional process, return the current resource assignment
Expand Down Expand Up @@ -626,10 +623,7 @@ protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider clust
String resourceName = resourceAssignment.getResourceName();
IdealState currentIdealState = clusterData.getIdealState(resourceName);

// TODO: Move evacuation into BaseControllerDataProvider assignableNode logic.
Set<String> enabledLiveInstances = DelayedRebalanceUtil.filterOutEvacuatingInstances(
clusterData.getAssignableInstanceConfigMap(),
clusterData.getAssignableEnabledLiveInstances());
Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();

int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public class ZKHelixAdmin implements HelixAdmin {
private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3;
private static final ImmutableSet<String> ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE =
ImmutableSet.of("", InstanceConstants.InstanceOperation.SWAP_IN.name());
private static final ImmutableSet<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT =
ImmutableSet.of(InstanceConstants.InstanceOperation.EVACUATE.name());

private final RealmAwareZkClient _zkClient;
private final ConfigAccessor _configAccessor;
Expand Down Expand Up @@ -840,7 +842,7 @@ public boolean completeSwapIfPossible(String clusterName, String instanceName) {
public boolean isReadyForPreparingJoiningCluster(String clusterName, String instanceName) {
if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
InstanceConfig config = getInstanceConfig(clusterName, instanceName);
return config != null && DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
return config != null && INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
config.getInstanceOperation());
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,17 @@ public void testEvacuate() throws Exception {
// Drop semi-auto DBs
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, semiAutoDB);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
}

// Disable, stop, and drop the instance from the cluster.
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToEvacuate, false);
_participants.get(0).syncStop();
removeOfflineOrDisabledOrSwapInInstances();

// Compare the current ev with the previous one, it should be exactly the same since the baseline should not change
// after the instance is dropped.
Assert.assertTrue(_clusterVerifier.verifyByPolling());
Assert.assertEquals(getEVs(), assignment);
}

@Test(dependsOnMethods = "testEvacuate")
public void testRevertEvacuation() throws Exception {
Expand Down Expand Up @@ -1248,7 +1258,7 @@ public void testMarkEvacuationAfterEMM() throws Exception {
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate));
}

Assert.assertTrue(_clusterVerifier.verifyByPolling());
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());

// exit MM
_gSetupTool.getClusterManagementTool()
Expand Down
Loading

0 comments on commit 054d77a

Please sign in to comment.