Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ALL_RESOURCES key to disabled partitions #2848

Merged
merged 5 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,6 @@ public enum InstanceOperation {
*/
UNKNOWN
}

public static final String ALL_RESOURCES_DISABLED_PARTITION_KEY = "ALL_RESOURCES";
}
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ public Set<String> getInstancesWithTag(String instanceTag) {
*/
public Set<String> getDisabledInstancesForPartition(String resource, String partition) {
Set<String> disabledInstancesForPartition = new HashSet<>(_disabledInstanceSet);

if (_disabledInstanceForPartitionMap.containsKey(resource)
&& _disabledInstanceForPartitionMap
.get(resource).containsKey(partition)) {
Expand Down Expand Up @@ -1080,8 +1081,8 @@ private void updateDisabledInstances(Collection<InstanceConfig> allInstanceConfi
_disabledInstanceSet.clear();
for (InstanceConfig config : allInstanceConfigs) {
Map<String, List<String>> disabledPartitionMap = config.getDisabledPartitionsMap();
if (config.getInstanceOperation().getOperation()
.equals(InstanceConstants.InstanceOperation.DISABLE)) {
// Treat instance as disabled if it has "DISABLE" operation or "ALL_RESOURCES" in the disabled partition map
if (config.getInstanceOperation().getOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
_disabledInstanceSet.add(config.getInstanceName());
}
for (String resource : disabledPartitionMap.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -36,6 +37,7 @@
import org.apache.helix.common.caches.CustomizedStateCache;
import org.apache.helix.common.caches.CustomizedViewCache;
import org.apache.helix.common.caches.PropertyCache;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.common.CapacityNode;
import org.apache.helix.controller.pipeline.Pipeline;
Expand All @@ -48,6 +50,7 @@
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -85,6 +88,7 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
private Set<HelixConstants.ChangeType> _refreshedChangeTypes;
private Set<String> _aggregationEnabledTypes = new HashSet<>();
private Set<CapacityNode> _simpleCapacitySet;
private final Set<String> _disabledInstancesForAllPartitionsSet = new HashSet<>();


// CrushEd strategy needs to have a stable partition list input. So this cached list persist the
Expand Down Expand Up @@ -176,6 +180,7 @@ public synchronized void refresh(HelixDataAccessor accessor) {
// TODO: remove the workaround once we are able to apply the simple fix without majorly
// TODO: impacting user's clusters.
refreshStablePartitionList(getIdealStates());
refreshDisabledInstancesForAllPartitionsSet();

if (getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) {
buildSimpleCapacityMap(getClusterConfig().getGlobalMaxPartitionAllowedPerInstance());
Expand Down Expand Up @@ -551,4 +556,22 @@ private void buildSimpleCapacityMap(int globalMaxPartitionAllowedPerInstance) {
public Set<CapacityNode> getSimpleCapacitySet() {
return _simpleCapacitySet;
}

private void refreshDisabledInstancesForAllPartitionsSet() {
_disabledInstancesForAllPartitionsSet.clear();
Collection<InstanceConfig> allConfigs = getInstanceConfigMap().values();
for (InstanceConfig config : allConfigs) {
Map<String, List<String>> disabledPartitionMap = config.getDisabledPartitionsMap();
if (disabledPartitionMap.containsKey(InstanceConstants.ALL_RESOURCES_DISABLED_PARTITION_KEY)) {
_disabledInstancesForAllPartitionsSet.add(config.getInstanceName());
}
}
}

@Override
public Set<String> getDisabledInstancesForPartition(String resource, String partition) {
Set<String> disabledInstances = super.getDisabledInstancesForPartition(resource, partition);
disabledInstances.addAll(_disabledInstancesForAllPartitionsSet);
return disabledInstances;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.List;

import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
Expand All @@ -36,8 +37,10 @@ class ReplicaActivateConstraint extends HardConstraint {
boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
List<String> disabledPartitions = node.getDisabledPartitionsMap().get(replica.getResourceName());
boolean allResourcesDisabled = node.getDisabledPartitionsMap()
.containsKey(InstanceConstants.ALL_RESOURCES_DISABLED_PARTITION_KEY);

if (disabledPartitions != null && disabledPartitions.contains(replica.getPartitionName())) {
if (allResourcesDisabled || (disabledPartitions != null && disabledPartitions.contains(replica.getPartitionName()))) {
if (enableLogging) {
LOG.info("Cannot assign the inactive replica: {}", replica.getPartitionName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
Expand All @@ -48,6 +49,7 @@ public enum InstanceMonitorMetric {
ENABLED_STATUS_GAUGE("Enabled"),
ONLINE_STATUS_GAUGE("Online"),
DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
ALL_PARTITIONS_DISABLED_GAUGE("AllPartitionsDisabled"),
MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge"),
MESSAGE_QUEUE_SIZE_GAUGE("MessageQueueSizeGauge"),
PASTDUE_MESSAGE_GAUGE("PastDueMessageGauge");
Expand Down Expand Up @@ -75,6 +77,7 @@ public String metricName() {
// Gauges
private SimpleDynamicMetric<Long> _enabledStatusGauge;
private SimpleDynamicMetric<Long> _disabledPartitionsGauge;
private SimpleDynamicMetric<Long> _allPartitionsDisabledGauge;
private SimpleDynamicMetric<Long> _onlineStatusGauge;
private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;
private SimpleDynamicMetric<Long> _messageQueueSizeGauge;
Expand Down Expand Up @@ -105,6 +108,8 @@ private void createMetrics() {
_disabledPartitionsGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetric.DISABLED_PARTITIONS_GAUGE.metricName(),
0L);
_allPartitionsDisabledGauge = new SimpleDynamicMetric<>(
InstanceMonitorMetric.ALL_PARTITIONS_DISABLED_GAUGE.metricName(), 0L);
_enabledStatusGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetric.ENABLED_STATUS_GAUGE.metricName(), 0L);
_onlineStatusGauge =
Expand All @@ -124,6 +129,7 @@ private void createMetrics() {
List<DynamicMetric<?, ?>> attributeList = Lists.newArrayList(
_totalMessagedReceivedCounter,
_disabledPartitionsGauge,
_allPartitionsDisabledGauge,
_enabledStatusGauge,
_onlineStatusGauge,
_maxCapacityUsageGauge,
Expand Down Expand Up @@ -157,6 +163,7 @@ protected long getTotalMessageReceived() {
protected long getDisabledPartitions() {
return _disabledPartitionsGauge.getValue();
}
protected long getAllPartitionsDisabled() { return _allPartitionsDisabledGauge.getValue(); }

protected long getMessageQueueSizeGauge() { return _messageQueueSizeGauge.getValue(); }

Expand Down Expand Up @@ -191,10 +198,15 @@ public synchronized void updateInstance(Set<String> tags,
Collections.sort(_tags);
}
long numDisabledPartitions = 0L;
boolean allPartitionsDisabled = false;
if (disabledPartitions != null) {
for (List<String> partitions : disabledPartitions.values()) {
if (partitions != null) {
numDisabledPartitions += partitions.size();
if (partitions.contains(InstanceConstants.ALL_RESOURCES_DISABLED_PARTITION_KEY)) {
allPartitionsDisabled = true;
numDisabledPartitions -= 1;
}
}
}
}
Expand All @@ -206,6 +218,7 @@ public synchronized void updateInstance(Set<String> tags,
_onlineStatusGauge.updateValue(isLive ? 1L : 0L);
_enabledStatusGauge.updateValue(isEnabled ? 1L : 0L);
_disabledPartitionsGauge.updateValue(numDisabledPartitions);
_allPartitionsDisabledGauge.updateValue(allPartitionsDisabled ? 1L : 0L);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand All @@ -28,16 +29,20 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.AutoRebalancer;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
Expand Down Expand Up @@ -209,4 +214,77 @@ public void testDisableFullAuto(String rebalancerName) throws Exception {

deleteCluster(clusterName);
}

@Test
public void testDisableAllPartitions() throws Exception {
final int NUM_PARTITIONS = 8;
final int NUM_REPLICAS = 3;
final long DELAY_WINDOW = 200000;
final String CRUSHED_RESOURCE = "TEST_DB0_CRUSHED";
final String WAGED_RESOURCE = "TEST_DB1_WAGED";

List<String> resources = Arrays.asList(CRUSHED_RESOURCE, WAGED_RESOURCE);
createResourceWithDelayedRebalance(CLUSTER_NAME, CRUSHED_RESOURCE,
BuiltInStateModelDefinitions.LeaderStandby.name(), NUM_PARTITIONS, NUM_REPLICAS,
NUM_REPLICAS - 1, DELAY_WINDOW, CrushEdRebalanceStrategy.class.getName());

createResourceWithWagedRebalance(CLUSTER_NAME, WAGED_RESOURCE, "MasterSlave",
NUM_PARTITIONS, NUM_REPLICAS, NUM_REPLICAS - 1);
Assert.assertTrue(_clusterVerifier.verifyByPolling());

// Disable all partitions on a first participant
MockParticipantManager disabledPartitionInstance = _participants[0];
_gSetupTool.getClusterManagementTool().enablePartition(false, CLUSTER_NAME, disabledPartitionInstance.getInstanceName(),
InstanceConstants.ALL_RESOURCES_DISABLED_PARTITION_KEY, Collections.singletonList("foobar"));

Assert.assertTrue(_clusterVerifier.verifyByPolling());
verifier(() -> {
for (String resource : resources) {
ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, resource);
// EV should not have disabled instance above the lowest state (OFFLINE)
for (String partition : ev.getPartitionSet()) {
if (ev.getStateMap(partition).containsKey(disabledPartitionInstance.getInstanceName()) &&
!ev.getStateMap(partition).get(disabledPartitionInstance.getInstanceName()).equals("OFFLINE")) {
return false;
}
}
}
return true;
}, 5000);

_gSetupTool.getClusterManagementTool().enablePartition(true, CLUSTER_NAME, disabledPartitionInstance.getInstanceName(),
InstanceConstants.ALL_RESOURCES_DISABLED_PARTITION_KEY, Collections.singletonList("foobar"));

// Assert node still has CRUSHED assignment and that it is not in OFFLINE state
// Cannot assert for WAGED resource as we can't guarantee that the node will be in subsequent assignments
Assert.assertTrue(_clusterVerifier.verifyByPolling());
verifier(() -> {
ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, CRUSHED_RESOURCE);
boolean assignmentSeen = false;
for (String partition : ev.getPartitionSet()) {
if (ev.getStateMap(partition).containsKey(disabledPartitionInstance.getInstanceName())) {
assignmentSeen = true;
if (ev.getStateMap(partition).get(disabledPartitionInstance.getInstanceName()).equals("OFFLINE")) {
return false;
}
}
}
return assignmentSeen;
}, 5000);
}

private static void verifier(TestHelper.Verifier verifier, long timeout) throws Exception {
Assert.assertTrue(TestHelper.verify(() -> {
try {
boolean result = verifier.verify();
if (!result) {
LOG.error("Verifier returned false, retrying...");
}
return result;
} catch (AssertionError e) {
LOG.error("Caught AssertionError on verifier attempt: ", e);
return false;
}
}, timeout));
}
}
Loading
Loading