Skip to content

Commit

Permalink
Add support for ALL_RESOURCES key to disabled partitions (#2848)
Browse files Browse the repository at this point in the history
Add support for ALL_RESOURCES key to disabled partitions
  • Loading branch information
GrantPSpencer authored Jul 25, 2024
1 parent efd114f commit eda45fc
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,6 @@ public enum InstanceOperation {
*/
UNKNOWN
}

public static final String ALL_RESOURCES_DISABLED_PARTITION_KEY = "ALL_RESOURCES";
}
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ public Set<String> getInstancesWithTag(String instanceTag) {
*/
public Set<String> getDisabledInstancesForPartition(String resource, String partition) {
Set<String> disabledInstancesForPartition = new HashSet<>(_disabledInstanceSet);

if (_disabledInstanceForPartitionMap.containsKey(resource)
&& _disabledInstanceForPartitionMap
.get(resource).containsKey(partition)) {
Expand Down Expand Up @@ -1080,8 +1081,8 @@ private void updateDisabledInstances(Collection<InstanceConfig> allInstanceConfi
_disabledInstanceSet.clear();
for (InstanceConfig config : allInstanceConfigs) {
Map<String, List<String>> disabledPartitionMap = config.getDisabledPartitionsMap();
if (config.getInstanceOperation().getOperation()
.equals(InstanceConstants.InstanceOperation.DISABLE)) {
// Treat instance as disabled if it has "DISABLE" operation or "ALL_RESOURCES" in the disabled partition map
if (config.getInstanceOperation().getOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
_disabledInstanceSet.add(config.getInstanceName());
}
for (String resource : disabledPartitionMap.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -36,6 +37,7 @@
import org.apache.helix.common.caches.CustomizedStateCache;
import org.apache.helix.common.caches.CustomizedViewCache;
import org.apache.helix.common.caches.PropertyCache;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.common.CapacityNode;
import org.apache.helix.controller.pipeline.Pipeline;
Expand All @@ -48,6 +50,7 @@
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -85,6 +88,7 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
private Set<HelixConstants.ChangeType> _refreshedChangeTypes;
private Set<String> _aggregationEnabledTypes = new HashSet<>();
private Set<CapacityNode> _simpleCapacitySet;
private final Set<String> _disabledInstancesForAllPartitionsSet = new HashSet<>();


// CrushEd strategy needs to have a stable partition list input. So this cached list persist the
Expand Down Expand Up @@ -176,6 +180,7 @@ public synchronized void refresh(HelixDataAccessor accessor) {
// TODO: remove the workaround once we are able to apply the simple fix without majorly
// TODO: impacting user's clusters.
refreshStablePartitionList(getIdealStates());
refreshDisabledInstancesForAllPartitionsSet();

if (getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) {
buildSimpleCapacityMap(getClusterConfig().getGlobalMaxPartitionAllowedPerInstance());
Expand Down Expand Up @@ -551,4 +556,22 @@ private void buildSimpleCapacityMap(int globalMaxPartitionAllowedPerInstance) {
public Set<CapacityNode> getSimpleCapacitySet() {
return _simpleCapacitySet;
}

private void refreshDisabledInstancesForAllPartitionsSet() {
_disabledInstancesForAllPartitionsSet.clear();
Collection<InstanceConfig> allConfigs = getInstanceConfigMap().values();
for (InstanceConfig config : allConfigs) {
Map<String, List<String>> disabledPartitionMap = config.getDisabledPartitionsMap();
if (disabledPartitionMap.containsKey(InstanceConstants.ALL_RESOURCES_DISABLED_PARTITION_KEY)) {
_disabledInstancesForAllPartitionsSet.add(config.getInstanceName());
}
}
}

@Override
public Set<String> getDisabledInstancesForPartition(String resource, String partition) {
Set<String> disabledInstances = super.getDisabledInstancesForPartition(resource, partition);
disabledInstances.addAll(_disabledInstancesForAllPartitionsSet);
return disabledInstances;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.List;

import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
Expand All @@ -36,8 +37,10 @@ class ReplicaActivateConstraint extends HardConstraint {
boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
List<String> disabledPartitions = node.getDisabledPartitionsMap().get(replica.getResourceName());
boolean allResourcesDisabled = node.getDisabledPartitionsMap()
.containsKey(InstanceConstants.ALL_RESOURCES_DISABLED_PARTITION_KEY);

if (disabledPartitions != null && disabledPartitions.contains(replica.getPartitionName())) {
if (allResourcesDisabled || (disabledPartitions != null && disabledPartitions.contains(replica.getPartitionName()))) {
if (enableLogging) {
LOG.info("Cannot assign the inactive replica: {}", replica.getPartitionName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
Expand All @@ -48,6 +49,7 @@ public enum InstanceMonitorMetric {
ENABLED_STATUS_GAUGE("Enabled"),
ONLINE_STATUS_GAUGE("Online"),
DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
ALL_PARTITIONS_DISABLED_GAUGE("AllPartitionsDisabled"),
MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge"),
MESSAGE_QUEUE_SIZE_GAUGE("MessageQueueSizeGauge"),
PASTDUE_MESSAGE_GAUGE("PastDueMessageGauge");
Expand Down Expand Up @@ -75,6 +77,7 @@ public String metricName() {
// Gauges
private SimpleDynamicMetric<Long> _enabledStatusGauge;
private SimpleDynamicMetric<Long> _disabledPartitionsGauge;
private SimpleDynamicMetric<Long> _allPartitionsDisabledGauge;
private SimpleDynamicMetric<Long> _onlineStatusGauge;
private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;
private SimpleDynamicMetric<Long> _messageQueueSizeGauge;
Expand Down Expand Up @@ -105,6 +108,8 @@ private void createMetrics() {
_disabledPartitionsGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetric.DISABLED_PARTITIONS_GAUGE.metricName(),
0L);
_allPartitionsDisabledGauge = new SimpleDynamicMetric<>(
InstanceMonitorMetric.ALL_PARTITIONS_DISABLED_GAUGE.metricName(), 0L);
_enabledStatusGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetric.ENABLED_STATUS_GAUGE.metricName(), 0L);
_onlineStatusGauge =
Expand All @@ -124,6 +129,7 @@ private void createMetrics() {
List<DynamicMetric<?, ?>> attributeList = Lists.newArrayList(
_totalMessagedReceivedCounter,
_disabledPartitionsGauge,
_allPartitionsDisabledGauge,
_enabledStatusGauge,
_onlineStatusGauge,
_maxCapacityUsageGauge,
Expand Down Expand Up @@ -157,6 +163,7 @@ protected long getTotalMessageReceived() {
protected long getDisabledPartitions() {
return _disabledPartitionsGauge.getValue();
}
protected long getAllPartitionsDisabled() { return _allPartitionsDisabledGauge.getValue(); }

protected long getMessageQueueSizeGauge() { return _messageQueueSizeGauge.getValue(); }

Expand Down Expand Up @@ -191,10 +198,15 @@ public synchronized void updateInstance(Set<String> tags,
Collections.sort(_tags);
}
long numDisabledPartitions = 0L;
boolean allPartitionsDisabled = false;
if (disabledPartitions != null) {
for (List<String> partitions : disabledPartitions.values()) {
if (partitions != null) {
numDisabledPartitions += partitions.size();
if (partitions.contains(InstanceConstants.ALL_RESOURCES_DISABLED_PARTITION_KEY)) {
allPartitionsDisabled = true;
numDisabledPartitions -= 1;
}
}
}
}
Expand All @@ -206,6 +218,7 @@ public synchronized void updateInstance(Set<String> tags,
_onlineStatusGauge.updateValue(isLive ? 1L : 0L);
_enabledStatusGauge.updateValue(isEnabled ? 1L : 0L);
_disabledPartitionsGauge.updateValue(numDisabledPartitions);
_allPartitionsDisabledGauge.updateValue(allPartitionsDisabled ? 1L : 0L);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand All @@ -28,16 +29,20 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.AutoRebalancer;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
Expand Down Expand Up @@ -209,4 +214,82 @@ public void testDisableFullAuto(String rebalancerName) throws Exception {

deleteCluster(clusterName);
}

@Test
public void testDisableAllPartitions() throws Exception {
final int NUM_PARTITIONS = 8;
final int NUM_REPLICAS = 3;
final long DELAY_WINDOW = 200000;
final String CRUSHED_RESOURCE = "TEST_DB0_CRUSHED";
final String WAGED_RESOURCE = "TEST_DB1_WAGED";

List<String> resources = Arrays.asList(CRUSHED_RESOURCE, WAGED_RESOURCE);
createResourceWithDelayedRebalance(CLUSTER_NAME, CRUSHED_RESOURCE,
BuiltInStateModelDefinitions.LeaderStandby.name(), NUM_PARTITIONS, NUM_REPLICAS,
NUM_REPLICAS - 1, DELAY_WINDOW, CrushEdRebalanceStrategy.class.getName());

createResourceWithWagedRebalance(CLUSTER_NAME, WAGED_RESOURCE, "MasterSlave",
NUM_PARTITIONS, NUM_REPLICAS, NUM_REPLICAS - 1);
Assert.assertTrue(_clusterVerifier.verifyByPolling());

// Disable all partitions on a first participant
MockParticipantManager disabledPartitionInstance = _participants[0];
_gSetupTool.getClusterManagementTool().enablePartition(false, CLUSTER_NAME, disabledPartitionInstance.getInstanceName(),
InstanceConstants.ALL_RESOURCES_DISABLED_PARTITION_KEY, Collections.singletonList("foobar"));

Assert.assertTrue(_clusterVerifier.verifyByPolling());
verifier(() -> {
for (String resource : resources) {
ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, resource);
// EV should not have disabled instance above the lowest state (OFFLINE)
for (String partition : ev.getPartitionSet()) {
if (ev.getStateMap(partition).containsKey(disabledPartitionInstance.getInstanceName()) &&
!ev.getStateMap(partition).get(disabledPartitionInstance.getInstanceName()).equals("OFFLINE")) {
return false;
}
}
}
return true;
}, 5000);

_gSetupTool.getClusterManagementTool().enablePartition(true, CLUSTER_NAME, disabledPartitionInstance.getInstanceName(),
InstanceConstants.ALL_RESOURCES_DISABLED_PARTITION_KEY, Collections.singletonList("foobar"));

// Assert node still has CRUSHED assignment and that it is not in OFFLINE state
// Cannot assert for WAGED resource as we can't guarantee that the node will be in subsequent assignments
Assert.assertTrue(_clusterVerifier.verifyByPolling());
verifier(() -> {
ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, CRUSHED_RESOURCE);
boolean assignmentSeen = false;
for (String partition : ev.getPartitionSet()) {
if (ev.getStateMap(partition).containsKey(disabledPartitionInstance.getInstanceName())) {
assignmentSeen = true;
if (ev.getStateMap(partition).get(disabledPartitionInstance.getInstanceName()).equals("OFFLINE")) {
return false;
}
}
}
return assignmentSeen;
}, 5000);

// Cleanup test resources
for (String resource : resources) {
_gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, resource);
}
}

private static void verifier(TestHelper.Verifier verifier, long timeout) throws Exception {
Assert.assertTrue(TestHelper.verify(() -> {
try {
boolean result = verifier.verify();
if (!result) {
LOG.error("Verifier returned false, retrying...");
}
return result;
} catch (AssertionError e) {
LOG.error("Caught AssertionError on verifier attempt: ", e);
return false;
}
}, timeout));
}
}
Loading

0 comments on commit eda45fc

Please sign in to comment.