Skip to content

Commit

Permalink
Fix NPE in IntermedaiteStateCalcStage (apache#2974)
Browse files Browse the repository at this point in the history
Fix NPE in IntermediateStateCalcStage
  • Loading branch information
GrantPSpencer authored and zpinto committed Jan 22, 2025
1 parent 0da7bc9 commit 003e640
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ private void computeIntermediateMap(PartitionStateMap intermediateStateMap,
entry.getValue().forEach((key, value) -> {
if (!value.getToState().equals(HelixDefinedState.DROPPED.name())) {
intermediateStateMap.setState(entry.getKey(), value.getTgtName(), value.getToState());
} else {
} else if (intermediateStateMap.getStateMap().containsKey(entry.getKey())) {
intermediateStateMap.getStateMap().get(entry.getKey()).remove(value.getTgtName());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,84 @@ public void testPartitionMissing() {
}
}

@Test
public void testMessageAlreadyApplied() {
String resourcePrefix = "resource";
int nResource = 1;
int nPartition = 5;
int nReplica = 1;

String[] resources = new String[nResource];
for (int i = 0; i < nResource; i++) {
resources[i] = resourcePrefix + "_" + i;
}

preSetup(resources, nReplica, nReplica);
event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(resources, nPartition, "OnlineOffline"));
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
getResourceMap(resources, nPartition, "OnlineOffline"));

// Initialize bestpossible state and current state
BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
MessageOutput messageSelectOutput = new MessageOutput();
IntermediateStateOutput expectedResult = new IntermediateStateOutput();

_clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(1);
setClusterConfig(_clusterConfig);

for (String resource : resources) {
IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource));
setSingleIdealState(is);

Map<String, List<String>> partitionMap = new HashMap<>();
for (int p = 0; p < nPartition; p++) {
Partition partition = new Partition(resource + "_" + p);
for (int r = 0; r < nReplica; r++) {
String instanceName = HOSTNAME_PREFIX + r;

// PartitionMap is used as a preferenceList.
partitionMap.put(partition.getPartitionName(), Collections.singletonList(instanceName));
if (p == 0) {
// Dont set current state. set best possible on another node
// Set a pending message to drop a replica that doesn't exist in currentState
currentStateOutput.setPendingMessage(resource, partition, instanceName,
generateMessage("OFFLINE", "DROPPED", instanceName));
// Set pending message to bootstrap elsewhere
String dummyInstance = "dummy_instance";
bestPossibleStateOutput.setState(resource, partition, dummyInstance, "ONLINE");
messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "ONLINE", dummyInstance));
expectedResult.setState(resource, partition, dummyInstance, "ONLINE");
} else {
currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
expectedResult.setState(resource, partition, instanceName, "ONLINE");
}
}
}
bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
}

event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
runStage(event, new ReadClusterDataStage());
runStage(event, new IntermediateStateCalcStage());

IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());

for (String resource : resources) {
// Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
// anything.
Assert.assertTrue(output.getPartitionStateMap(resource)
.getStateMap()
.equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
}
}

private void preSetup(String[] resources, int numOfLiveInstances, int numOfReplicas) {
setupIdealState(numOfLiveInstances, resources, numOfLiveInstances, numOfReplicas,
IdealState.RebalanceMode.FULL_AUTO, "OnlineOffline");
Expand Down

0 comments on commit 003e640

Please sign in to comment.