Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[apache/helix] -- Added detail in the Exception message for WAGED rebalance (hard constraint) failures. #2829

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -30,6 +31,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
Expand Down Expand Up @@ -57,11 +59,16 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
private static final Logger LOG = LoggerFactory.getLogger(ConstraintBasedAlgorithm.class);
private final List<HardConstraint> _hardConstraints;
private final Map<SoftConstraint, Float> _softConstraints;
private final Set<String> logEnabledClusters = new HashSet<>();

ConstraintBasedAlgorithm(List<HardConstraint> hardConstraints,
Map<SoftConstraint, Float> softConstraints) {
_hardConstraints = hardConstraints;
_softConstraints = softConstraints;

for (HardConstraint constraint : hardConstraints) {
constraint.setLogEnabledClusters(logEnabledClusters);
}
}

@Override
Expand Down Expand Up @@ -136,11 +143,16 @@ private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica repl
}).collect(Collectors.toList());

if (candidateNodes.isEmpty()) {
LOG.info("Found no eligible candidate nodes. Enabling hard constraint level logging for cluster: {}", clusterContext.getClusterName());
enableFullLoggingForCluster(clusterContext.getClusterName());
optimalAssignment.recordAssignmentFailure(replica,
Maps.transformValues(hardConstraintFailures, this::convertFailureReasons));
return Optional.empty();
}

LOG.info("Disabling hard constraint level logging for cluster: {}", clusterContext.getClusterName());
removeClusterFromFullLogging(clusterContext.getClusterName());

return candidateNodes.parallelStream().map(node -> new HashMap.SimpleEntry<>(node,
getAssignmentNormalizedScore(node, replica, clusterContext)))
.max((nodeEntry1, nodeEntry2) -> {
Expand Down Expand Up @@ -179,6 +191,27 @@ private List<String> convertFailureReasons(List<HardConstraint> hardConstraints)
.collect(Collectors.toList());
}

/**
* Adds cluster name for full logging in all hard constraints
* @param clusterName cluster to be added
*/
private void enableFullLoggingForCluster(String clusterName) {
logEnabledClusters.add(clusterName);
}

/**
* Removes the cluster from full logging in all hard constraints (if added previously)
* @param clusterName cluster to be removed
*/
private void removeClusterFromFullLogging(String clusterName) {
logEnabledClusters.remove(clusterName);
}

@VisibleForTesting
Set<String> getLogEnabledClusters() {
return logEnabledClusters;
}

private static class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
private final AssignableReplica _replica;
private float _score = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
clusterContext.getPartitionsForResourceAndFaultZone(replica.getResourceName(), node.getFaultZone());

if (partitionsForResourceAndFaultZone.contains(replica.getPartitionName())) {
LOG.debug("A fault zone cannot contain more than 1 replica of same partition. Found replica for partition: {}",
replica.getPartitionName());
if (isLoggingEnabled(clusterContext.getClusterName())) {
LOG.info("A fault zone cannot contain more than 1 replica of same partition. Found replica for partition: {}",
replica.getPartitionName());
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
* under the License.
*/

import java.util.Set;

import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
Expand All @@ -29,6 +31,8 @@
*/
abstract class HardConstraint {

protected Set<String> logEnabledClusters;

/**
* Check if the replica could be assigned to the node
* @return True if the proposed assignment is valid; False otherwise
Expand All @@ -44,4 +48,21 @@ abstract boolean isAssignmentValid(AssignableNode node, AssignableReplica replic
String getDescription() {
return getClass().getName();
}

/**
* Check if the logging is enabled for the replica
* @param clusterName The name of the cluster to be checked
*/
public boolean isLoggingEnabled(String clusterName) {
return logEnabledClusters != null && logEnabledClusters.contains(clusterName);
}

/**
* Set the reference of the replicas that need to be logged.
* @param logEnabledClusters The clusters that need to be logged
*/
public void setLogEnabledClusters(Set<String> logEnabledClusters) {
this.logEnabledClusters = logEnabledClusters;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
for (String key : replicaCapacity.keySet()) {
if (nodeCapacity.containsKey(key)) {
if (nodeCapacity.get(key) < replicaCapacity.get(key)) {
LOG.debug("Node has insufficient capacity for: {}. Left available: {}, Required: {}",
key, nodeCapacity.get(key), replicaCapacity.get(key));
if (isLoggingEnabled(clusterContext.getClusterName())) {
LOG.info("Node has insufficient capacity for: {}. Left available: {}, Required: {}",
key, nodeCapacity.get(key), replicaCapacity.get(key));
}
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
|| assignedPartitionsByResourceSize < resourceMaxPartitionsPerInstance;

if (!exceedResourceMaxPartitionLimit) {
LOG.debug("Cannot exceed the max number of partitions per resource ({}) limitation on node. Assigned replica count: {}",
resourceMaxPartitionsPerInstance, assignedPartitionsByResourceSize);
if (isLoggingEnabled(clusterContext.getClusterName())) {
LOG.info("Cannot exceed the max number of partitions per resource ({}) limitation on node. Assigned replica count: {}",
resourceMaxPartitionsPerInstance, assignedPartitionsByResourceSize);
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
List<String> disabledPartitions = node.getDisabledPartitionsMap().get(replica.getResourceName());

if (disabledPartitions != null && disabledPartitions.contains(replica.getPartitionName())) {
LOG.debug("Cannot assign the inactive replica: {}", replica.getPartitionName());
if (isLoggingEnabled(clusterContext.getClusterName())) {
LOG.info("Cannot assign the inactive replica: {}", replica.getPartitionName());
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
Set<String> assignedPartitionsByResource = node.getAssignedPartitionsByResource(replica.getResourceName());

if (assignedPartitionsByResource.contains(replica.getPartitionName())) {
LOG.debug("Same partition ({}) of different states cannot co-exist in one instance", replica.getPartitionName());
if (isLoggingEnabled(clusterContext.getClusterName())) {
LOG.info("Same partition ({}) of different states cannot co-exist in one instance", replica.getPartitionName());
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
}

if (!node.getInstanceTags().contains(replica.getResourceInstanceGroupTag())) {
LOG.debug("Instance doesn't have the tag of the replica ({})", replica.getResourceInstanceGroupTag());
if (isLoggingEnabled(clusterContext.getClusterName())) {
LOG.info("Instance doesn't have the tag of the replica ({})", replica.getResourceInstanceGroupTag());
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ public Map<String, Integer> getClusterCapacityMap() {
return _clusterCapacityMap;
}

public String getClusterName() {
return _clusterName;
}

public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) {
return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
.getOrDefault(resourceName, Collections.emptySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
* under the License.
*/

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
Expand All @@ -35,12 +36,16 @@

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


public class TestConstraintBasedAlgorithm {
@Test(expectedExceptions = HelixRebalanceException.class)
public void testCalculateNoValidAssignment() throws IOException, HelixRebalanceException {

@Test
public void testCalculateNoValidAssignment() throws IOException {
HardConstraint mockHardConstraint = mock(HardConstraint.class);
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(false);
Expand All @@ -49,7 +54,42 @@ public void testCalculateNoValidAssignment() throws IOException, HelixRebalanceE
new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableMap.of(mockSoftConstraint, 1f));
ClusterModel clusterModel = new ClusterModelTestHelper().getDefaultClusterModel();
try {
algorithm.calculate(clusterModel);
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}

Assert.assertEquals(algorithm.getLogEnabledClusters().size(), 1);
Assert.assertTrue(algorithm.getLogEnabledClusters().contains(clusterModel.getContext().getClusterName()));
verify(mockHardConstraint, times(1)).isAssignmentValid(any(), any(), any());
}

@Test
public void testCalculateNoValidAssignmentFirstAndThenRecovery() throws IOException, HelixRebalanceException {
HardConstraint mockHardConstraint = mock(HardConstraint.class);
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any()))
.thenReturn(false) // hard constraint fails
.thenReturn(true); // hard constraint recovers
when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0);
ConstraintBasedAlgorithm algorithm =
new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableMap.of(mockSoftConstraint, 1f));
ClusterModel clusterModel = new ClusterModelTestHelper().getDefaultClusterModel();
try {
algorithm.calculate(clusterModel);
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}

Assert.assertEquals(algorithm.getLogEnabledClusters().size(), 1);
Assert.assertTrue(algorithm.getLogEnabledClusters().contains(clusterModel.getContext().getClusterName()));
verify(mockHardConstraint, times(1)).isAssignmentValid(any(), any(), any());

// calling again for recovery (no exception)
algorithm.calculate(clusterModel);
Assert.assertEquals(algorithm.getLogEnabledClusters().size(), 0);
}

@Test
Expand Down
Loading