Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SWAP_IN InstanceOperation to respect HELIX_ENABLED false #2741

Merged
merged 2 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
private final Map<String, InstanceConfig> _assignableInstanceConfigMap = new HashMap<>();
private final Map<String, LiveInstance> _assignableLiveInstancesMap = new HashMap<>();
private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName = new HashMap<>();
private final Set<String> _enabledLiveSwapInInstanceNames = new HashSet<>();
private final Set<String> _liveSwapInInstanceNames = new HashSet<>();
private final Set<String> _enabledSwapInInstanceNames = new HashSet<>();
private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
private final Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance = new HashMap<>();
Expand Down Expand Up @@ -365,7 +366,8 @@ private void updateInstanceSets(Map<String, InstanceConfig> instanceConfigMap,
_assignableInstanceConfigMap.clear();
_assignableLiveInstancesMap.clear();
_swapOutInstanceNameToSwapInInstanceName.clear();
_enabledLiveSwapInInstanceNames.clear();
_liveSwapInInstanceNames.clear();
_enabledSwapInInstanceNames.clear();

Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
Expand Down Expand Up @@ -433,10 +435,12 @@ private void updateInstanceSets(Map<String, InstanceConfig> instanceConfigMap,
String swapInInstanceName = swapInInstancesByLogicalId.get(value);
if (swapInInstanceName != null) {
_swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName);
if (liveInstancesMap.containsKey(swapInInstanceName)
&& InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName),
if (liveInstancesMap.containsKey(swapInInstanceName)) {
_liveSwapInInstanceNames.add(swapInInstanceName);
}
if (InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName),
clusterConfig)) {
_enabledLiveSwapInInstanceNames.add(swapInInstanceName);
_enabledSwapInInstanceNames.add(swapInInstanceName);
}
}
});
Expand Down Expand Up @@ -825,12 +829,21 @@ public Map<String, String> getSwapOutToSwapInInstancePairs() {
}

/**
* Get all the enabled and live SWAP_IN instances.
* Get all the live SWAP_IN instances.
*
* @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT instance.
*/
public Set<String> getLiveSwapInInstanceNames() {
return Collections.unmodifiableSet(_liveSwapInInstanceNames);
}

/**
* Get all the enabled SWAP_IN instances.
*
* @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT instance.
*/
public Set<String> getEnabledLiveSwapInInstanceNames() {
return Collections.unmodifiableSet(_enabledLiveSwapInInstanceNames);
public Set<String> getEnabledSwapInInstanceNames() {
return Collections.unmodifiableSet(_enabledSwapInInstanceNames);
}

public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,7 @@ protected Map<String, String> computeBestPossibleStateForPartition(Set<String> l
bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
String instanceToDrop = combinedPreferenceList.get(combinedPreferenceList.size() - i - 1);
// We do not want to drop a SWAP_IN node if it is at the end of the preferenceList,
// because partitions are actively being added on this node to prepare for SWAP completion.
if (cache == null || !cache.getEnabledLiveSwapInInstanceNames().contains(instanceToDrop)) {
bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name());
}
bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ private void addSwapInInstancesToBestPossibleState(Map<String, Resource> resourc
// 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster.
Map<String, String> swapOutToSwapInInstancePairs = cache.getSwapOutToSwapInInstancePairs();
// 2. Get all enabled and live SWAP_IN instances in the cluster.
Set<String> enabledLiveSwapInInstances = cache.getEnabledLiveSwapInInstanceNames();
Set<String> liveSwapInInstances = cache.getLiveSwapInInstanceNames();
Set<String> enabledSwapInInstances = cache.getEnabledSwapInInstanceNames();
// 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end.
// Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active
// swap occurring.
if (!enabledLiveSwapInInstances.isEmpty() && !cache.isMaintenanceModeEnabled()) {
// Skipping this when there are not SWAP_IN instances that are alive will reduce computation time.
if (!liveSwapInInstances.isEmpty() && !cache.isMaintenanceModeEnabled()) {
resourceMap.forEach((resourceName, resource) -> {
StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
bestPossibleStateOutput.getResourceStatesMap().get(resourceName).getStateMap()
Expand All @@ -148,6 +148,27 @@ private void addSwapInInstancesToBestPossibleState(Map<String, Resource> resourc
commonInstances.retainAll(swapOutToSwapInInstancePairs.keySet());

commonInstances.forEach(swapOutInstance -> {
// If the corresponding swap-in instance is not live, skip assigning to it.
if (!liveSwapInInstances.contains(
swapOutToSwapInInstancePairs.get(swapOutInstance))) {
return;
}

// If the corresponding swap-in instance is not enabled, assign replicas with
// initial state.
if (!enabledSwapInInstances.contains(
swapOutToSwapInInstancePairs.get(swapOutInstance))) {
stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
stateModelDef.getInitialState());
return;
}

// If the swap-in node is live and enabled, do assignment with the following logic:
// 1. If the swap-out instance's replica is a topState, set the swap-in instance's replica
// to the topState if the StateModel allows for another replica with the topState to be added.
// Otherwise, set the swap-in instance's replica to the secondTopState.
// 2. If the swap-out instance's replica is a secondTopState, set the swap-in instance's replica
// to the same secondTopState.
if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState())) {

String topStateCount =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,97 @@ public void testNodeSwap() throws Exception {
}

@Test(dependsOnMethods = "testNodeSwap")
public void testNodeSwapDisableAndReenable() throws Exception {
System.out.println(
"START TestInstanceOperation.testNodeSwap() at " + new Date(System.currentTimeMillis()));
removeOfflineOrDisabledOrSwapInInstances();

// Store original EV
Map<String, ExternalView> originalEVs = getEVs();

Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();

// Set instance's InstanceOperation to SWAP_OUT
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
_gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
InstanceConstants.InstanceOperation.SWAP_OUT);

// Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
Assert.assertTrue(_clusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());

// Add instance with InstanceOperation set to SWAP_IN
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
InstanceConstants.InstanceOperation.SWAP_IN, true, -1);

// Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT
// and adding the SWAP_IN instance to the cluster.
// Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
// but none of them are in a top state.
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Set.of(instanceToSwapInName), Collections.emptySet());

// Assert canSwapBeCompleted is true
Assert.assertTrue(_gSetupTool.getClusterManagementTool()
.canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));

// Disable the SWAP_IN instance
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, instanceToSwapInName, false);

// Check that the SWAP_IN instance's replicas match the SWAP_OUT instance's replicas
// but all of them are OFFLINE
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
Map<String, Map<String, String>> resourcePartitionStateOnSwapOutInstance =
getResourcePartitionStateOnInstance(getEVs(), instanceToSwapOutName);
Map<String, Map<String, String>> resourcePartitionStateOnSwapInInstance =
getResourcePartitionStateOnInstance(getEVs(), instanceToSwapInName);
Assert.assertEquals(
resourcePartitionStateOnSwapInInstance.values().stream().flatMap(p -> p.keySet().stream())
.collect(Collectors.toSet()),
resourcePartitionStateOnSwapOutInstance.values().stream().flatMap(p -> p.keySet().stream())
.collect(Collectors.toSet()));
Set<String> swapInInstancePartitionStates =
resourcePartitionStateOnSwapInInstance.values().stream().flatMap(e -> e.values().stream())
.collect(Collectors.toSet());
Assert.assertEquals(swapInInstancePartitionStates.size(), 1);
Assert.assertTrue(swapInInstancePartitionStates.contains("OFFLINE"));

// Re-enable the SWAP_IN instance
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapInName, true);
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());

// Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);

// Assert completeSwapIfPossible is true
Assert.assertTrue(_gSetupTool.getClusterManagementTool()
.completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));

Assert.assertTrue(_clusterVerifier.verifyByPolling());

// Validate that the SWAP_IN instance is now in the routing tables.
validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);

// Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
Assert.assertFalse(_gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());

// Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before
// swap was completed.
verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Set.of(instanceToSwapInName))), TIMEOUT);
}

@Test(dependsOnMethods = "testNodeSwapDisableAndReenable")
public void testNodeSwapSwapInNodeNoInstanceOperationDisabled() throws Exception {
System.out.println(
"START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationDisabled() at "
Expand Down Expand Up @@ -685,6 +776,8 @@ public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception {
// Cancel SWAP by disabling the SWAP_IN instance and remove SWAP_OUT InstanceOperation from SWAP_OUT instance.
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, instanceToSwapInName, false);
// Stop the participant
_participants.get(_participants.size() - 1).syncStop();

// Wait for cluster to converge.
Assert.assertTrue(_clusterVerifier.verifyByPolling());
Expand Down
Loading