Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
GrantPSpencer committed Nov 21, 2024
1 parent 14766e7 commit 88df65a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,11 @@ void handleResourceCapacityCalculation(ClusterEvent event, ResourceControllerDat
CurrentStateOutput currentStateOutput) {
Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
if (skipCapacityCalculation(cache, resourceMap, event)) {
// Ensure instance capacity is null if there are no resources. This prevents using a stale map when all resources
// are removed and then a new resource is added.
if (resourceMap == null || resourceMap.isEmpty()) {
cache.setWagedCapacityProviders(null, null);
}
return;
}

Expand All @@ -364,6 +369,12 @@ void handleResourceCapacityCalculation(ClusterEvent event, ResourceControllerDat
.filter(entry -> WagedValidationUtil.isWagedEnabled(cache.getIdealState(entry.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

// Ensure instance capacity is null if there are no WAGED enabled instances
if (wagedEnabledResourceMap.isEmpty()) {
cache.setWagedCapacityProviders(null, null);
return;
}

// Phase 1: Rebuild Always
WagedInstanceCapacity capacityProvider = new WagedInstanceCapacity(cache);
WagedResourceWeightsProvider weightProvider = new WagedResourceWeightsProvider(cache);
Expand All @@ -381,7 +392,7 @@ void handleResourceCapacityCalculation(ClusterEvent event, ResourceControllerDat
*/
static boolean skipCapacityCalculation(ResourceControllerDataProvider cache, Map<String, Resource> resourceMap,
ClusterEvent event) {
if (resourceMap == null) {
if (resourceMap == null || resourceMap.isEmpty()) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import org.apache.helix.ConfigAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
Expand All @@ -22,7 +24,7 @@
public class TestWagedNPE extends ZkTestBase {

public static String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster";
public static int PARTICIPANT_COUNT = 10;
public static int PARTICIPANT_COUNT = 3;
public static List<MockParticipantManager> _participants = new ArrayList<>();
public static ClusterControllerManager _controller;
public static ConfigAccessor _configAccessor;
Expand All @@ -48,39 +50,42 @@ public void beforeClass() {
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
}

// This test was constructed to capture the bug described in issue 2891
// https://github.com/apache/helix/issues/2891
@Test
public void testNPE() throws Exception {
int numPartition = 3;
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();

// Create 1 WAGED Resource
String firstDB = "firstDB";
int numPartition = 10;
_gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, "LeaderStandby",
IdealState.RebalanceMode.FULL_AUTO.name(), null);

IdealState idealStateOne =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, firstDB);
idealStateOne.setMinActiveReplicas(2);
idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName());
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, firstDB, idealStateOne);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3);

BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
.setResources(Collections.singleton(firstDB))
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();

// Wait for cluster to converge
Assert.assertTrue(verifier.verifyByPolling());

// drop resource
// Drop resource
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, firstDB);

// Wait for cluster to converge
Assert.assertTrue(verifier.verifyByPolling());

// add instance
MockParticipantManager participantToAdd = addParticipant("instance_to_add");
verifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
.setResources(new HashSet<>(_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)))
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
addParticipant("instance_to_add");

// Wait for cluster to converge
Assert.assertTrue(verifier.verifyByPolling());

// add resource again
// Add a new resource
String secondDb = "secondDB";
_configAccessor.setResourceConfig(CLUSTER_NAME, secondDb, new ResourceConfig(secondDb));
_gSetupTool.addResourceToCluster(CLUSTER_NAME, secondDb, numPartition, "LeaderStandby",
Expand All @@ -92,11 +97,7 @@ public void testNPE() throws Exception {
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, secondDb, idealStateTwo);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, secondDb, 3);


verifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
.setResources(Collections.singleton(secondDb))
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
// Confirm cluster can converge. Cluster will not converge if NPE occurs during pipeline run
Assert.assertTrue(verifier.verifyByPolling());
}

Expand All @@ -107,18 +108,4 @@ public MockParticipantManager addParticipant(String instanceName) {
_participants.add(participant);
return participant;
}

public void dropParticipant(String instanceName) {
MockParticipantManager participantToDrop = _participants.stream()
.filter(p -> p.getInstanceName().equals(instanceName)).findFirst().get();
dropParticipant(participantToDrop);

}

public void dropParticipant(MockParticipantManager participantToDrop) {
participantToDrop.syncStop();
_gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME,
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, participantToDrop.getInstanceName()));
_participants.remove(participantToDrop);
}
}

0 comments on commit 88df65a

Please sign in to comment.