Skip to content

Commit

Permalink
address comment and add test
Browse files Browse the repository at this point in the history
  • Loading branch information
tengfmu committed Oct 15, 2024
1 parent 5c5f866 commit c5e0be7
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ private void buildSimpleCapacityMap() {
ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
Map<String, InstanceConfig> instanceConfigMap = getAssignableInstanceConfigMap();
_simpleCapacitySet = new HashSet<>();
for (String instanceName : getEnabledLiveInstances()) {
for (String instanceName : getAssignableInstances()) {
CapacityNode capacityNode =
new CapacityNode(instanceName, clusterConfig, clusterTopologyConfig,
instanceConfigMap.getOrDefault(instanceName, new InstanceConfig(instanceName)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,6 @@ protected void setLastOnDemandRebalanceTimeInCluster(HelixZkClient zkClient,
configAccessor.setClusterConfig(clusterName, clusterConfig);
}

protected void setGlobalMaxPartitionAllowedPerInstanceInCluster(HelixZkClient zkClient,
String clusterName, int maxPartitionAllowed) {
ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
clusterConfig.setGlobalMaxPartitionAllowedPerInstance(maxPartitionAllowed);
configAccessor.setClusterConfig(clusterName, clusterConfig);
}

protected IdealState createResourceWithDelayedRebalance(String clusterName, String db,
String stateModel, int numPartition, int replica, int minActiveReplica, long delay) {
return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
Expand All @@ -57,6 +59,7 @@ public class TestStickyRebalanceStrategy extends ZkTestBase {
protected ClusterControllerManager _controller;
protected List<MockParticipantManager> _participants = new ArrayList<>();
protected List<MockParticipantManager> _additionalParticipants = new ArrayList<>();
protected Map<String, String> _instanceNameZoneMap = new HashMap<>();
protected int _minActiveReplica = 0;
protected ZkHelixClusterVerifier _clusterVerifier;
protected List<String> _testDBs = new ArrayList<>();
Expand All @@ -67,27 +70,17 @@ public class TestStickyRebalanceStrategy extends ZkTestBase {
@BeforeClass
public void beforeClass() throws Exception {
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
_configAccessor = new ConfigAccessor(_gZkClient);

_gSetupTool.addCluster(CLUSTER_NAME, true);

for (int i = 0; i < NUM_NODE; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);

// start dummy participants
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
participant.syncStart();
_participants.add(participant);
_participants.addAll(addInstance("" + START_PORT + i, "zone-" + i % REPLICAS, true));
}

for (int i = NUM_NODE; i < NUM_NODE + ADDITIONAL_NUM_NODE; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);

MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
_additionalParticipants.add(participant);
_additionalParticipants.addAll(
addInstance("" + START_PORT + i, "zone-" + i % REPLICAS, false));
}

// start controller
Expand Down Expand Up @@ -147,9 +140,27 @@ public void afterTest() throws InterruptedException {
_clusterVerifier.verifyByPolling();
}

@Test
public void testNoSameZoneAssignment() throws Exception {
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME, 1);
Map<String, ExternalView> externalViews = createTestDBs();
for (ExternalView ev : externalViews.values()) {
Map<String, Map<String, String>> assignments = ev.getRecord().getMapFields();
Assert.assertNotNull(assignments);
Assert.assertEquals(assignments.size(), PARTITIONS);
for (Map<String, String> assignmentMap : assignments.values()) {
Assert.assertEquals(assignmentMap.keySet().size(), REPLICAS);
Set<String> zoneSet = new HashSet<>();
for (String instanceName : assignmentMap.keySet()) {
zoneSet.add(_instanceNameZoneMap.get(instanceName));
}
Assert.assertEquals(zoneSet.size(), REPLICAS);
}
}
}
@Test
public void testFirstTimeAssignmentWithNoInitialLiveNodes() throws Exception {
setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 1);
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME, 1);
// Shut down all the nodes
for (int i = 0; i < NUM_NODE; i++) {
_participants.get(i).syncStop();
Expand All @@ -175,7 +186,7 @@ public void testFirstTimeAssignmentWithNoInitialLiveNodes() throws Exception {

@Test
public void testNoPartitionMovementWithNewInstanceAdd() throws Exception {
setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 1);
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME, 1);
Map<String, ExternalView> externalViewsBefore = createTestDBs();

// Start more new instances
Expand All @@ -197,7 +208,7 @@ public void testNoPartitionMovementWithNewInstanceAdd() throws Exception {

@Test
public void testNoPartitionMovementWithInstanceDown() throws Exception {
setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 1);
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME, 1);
Map<String, ExternalView> externalViewsBefore = createTestDBs();

// Shut down 2 instances
Expand All @@ -220,7 +231,7 @@ public void testNoPartitionMovementWithInstanceDown() throws Exception {

@Test
public void testNoPartitionMovementWithInstanceRestart() throws Exception {
setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 1);
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME, 1);
// Create resource
Map<String, ExternalView> externalViewsBefore = createTestDBs();
// Shut down half of the nodes
Expand Down Expand Up @@ -265,14 +276,14 @@ public void testNoPartitionMovementWithInstanceRestart() throws Exception {

@Test
public void testFirstTimeAssignmentWithStackingPlacement() throws Exception {
setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 2);
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME, 2);
Map<String, ExternalView> externalViewsBefore = createTestDBs();
validateAllPartitionAssigned(externalViewsBefore);
}

@Test
public void testNoPartitionMovementWithNewInstanceAddWithStackingPlacement() throws Exception {
setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 2);
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME, 2);
Map<String, ExternalView> externalViewsBefore = createTestDBs();

// Start more new instances
Expand All @@ -294,7 +305,7 @@ public void testNoPartitionMovementWithNewInstanceAddWithStackingPlacement() thr

@Test
public void testNoPartitionMovementWithInstanceDownWithStackingPlacement() throws Exception {
setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 2);
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME, 2);
// Shut down half of the nodes given we allow stacking placement
for (int i = 0; i < NUM_NODE / 2; i++) {
_participants.get(i).syncStop();
Expand Down Expand Up @@ -395,4 +406,39 @@ private void validateAllPartitionAssigned(Map<String, ExternalView> externalView
}
}
}

private void setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(String clusterName,
int maxPartitionAllowed) {
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setTopology("/zone/host/logicalId");
clusterConfig.setFaultZoneType("zone");
clusterConfig.setTopologyAwareEnabled(true);
clusterConfig.setGlobalMaxPartitionAllowedPerInstance(maxPartitionAllowed);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
}

private List<MockParticipantManager> addInstance(String instanceNameSuffix, String zone,
boolean enabled) {
List<MockParticipantManager> participants = new ArrayList<>();
String storageNodeName = PARTICIPANT_PREFIX + "_" + instanceNameSuffix;
_instanceNameZoneMap.put(storageNodeName, zone);
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);

String domain =
String.format("zone=%s,host=%s,logicalId=%s", zone, storageNodeName, instanceNameSuffix);
InstanceConfig instanceConfig =
_configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName);
instanceConfig.setDomain(domain);
_gSetupTool.getClusterManagementTool()
.setInstanceConfig(CLUSTER_NAME, storageNodeName, instanceConfig);
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
if (enabled) {
// start dummy participant
participant.syncStart();
}
participants.add(participant);

return participants;
}
}

0 comments on commit c5e0be7

Please sign in to comment.