Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[apache/helix] -- Added detail in the Exception message for WAGED rebalance (hard constraint) failures. #2829

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -30,6 +31,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
Expand Down Expand Up @@ -57,11 +59,16 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
private static final Logger LOG = LoggerFactory.getLogger(ConstraintBasedAlgorithm.class);
private final List<HardConstraint> _hardConstraints;
private final Map<SoftConstraint, Float> _softConstraints;
private final Set<String> logEnabledReplicas = new HashSet<>();

ConstraintBasedAlgorithm(List<HardConstraint> hardConstraints,
Map<SoftConstraint, Float> softConstraints) {
_hardConstraints = hardConstraints;
_softConstraints = softConstraints;

for (HardConstraint constraint : hardConstraints) {
constraint.setLogEnabledReplicas(logEnabledReplicas);
}
}

@Override
Expand Down Expand Up @@ -136,11 +143,14 @@ private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica repl
}).collect(Collectors.toList());

if (candidateNodes.isEmpty()) {
enableFullLoggingForReplica(replica);
optimalAssignment.recordAssignmentFailure(replica,
Maps.transformValues(hardConstraintFailures, this::convertFailureReasons));
return Optional.empty();
}

removeReplicaFromFullLogging(replica);

return candidateNodes.parallelStream().map(node -> new HashMap.SimpleEntry<>(node,
getAssignmentNormalizedScore(node, replica, clusterContext)))
.max((nodeEntry1, nodeEntry2) -> {
Expand Down Expand Up @@ -179,6 +189,27 @@ private List<String> convertFailureReasons(List<HardConstraint> hardConstraints)
.collect(Collectors.toList());
}

/**
* Adds replica for full logging in all hard constraints
* @param replica replica to be added
*/
private void enableFullLoggingForReplica(AssignableReplica replica) {
logEnabledReplicas.add(replica.toString());
}

/**
* Removes the replica from full logging in all hard constraints (if added previously)
* @param replica replica to be removed
*/
private void removeReplicaFromFullLogging(AssignableReplica replica) {
logEnabledReplicas.remove(replica.toString());
}

@VisibleForTesting
Set<String> getLogEnabledReplicas() {
return logEnabledReplicas;
}

private static class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
private final AssignableReplica _replica;
private float _score = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
clusterContext.getPartitionsForResourceAndFaultZone(replica.getResourceName(), node.getFaultZone());

if (partitionsForResourceAndFaultZone.contains(replica.getPartitionName())) {
LOG.debug("A fault zone cannot contain more than 1 replica of same partition. Found replica for partition: {}",
replica.getPartitionName());
if (isLoggingEnabled(replica)) {
LOG.info("A fault zone cannot contain more than 1 replica of same partition. Found replica for partition: {}",
replica.getPartitionName());
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
* under the License.
*/

import java.util.Set;

import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
Expand All @@ -29,6 +31,8 @@
*/
abstract class HardConstraint {

protected Set<String> logEnabledReplicas;

/**
* Check if the replica could be assigned to the node
* @return True if the proposed assignment is valid; False otherwise
Expand All @@ -44,4 +48,21 @@ abstract boolean isAssignmentValid(AssignableNode node, AssignableReplica replic
String getDescription() {
return getClass().getName();
}

/**
* Check if the logging is enabled for the replica
* @param replica The replica to be checked
*/
public boolean isLoggingEnabled(AssignableReplica replica) {
return logEnabledReplicas != null && logEnabledReplicas.contains(replica.toString());
}

/**
* Set the reference of the replicas that need to be logged.
* @param logEnabledReplicas The replicas that need to be logged
*/
public void setLogEnabledReplicas(Set<String> logEnabledReplicas) {
this.logEnabledReplicas = logEnabledReplicas;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
for (String key : replicaCapacity.keySet()) {
if (nodeCapacity.containsKey(key)) {
if (nodeCapacity.get(key) < replicaCapacity.get(key)) {
LOG.debug("Node has insufficient capacity for: {}. Left available: {}, Required: {}",
key, nodeCapacity.get(key), replicaCapacity.get(key));
if (isLoggingEnabled(replica)) {
LOG.info("Node has insufficient capacity for: {}. Left available: {}, Required: {}",
key, nodeCapacity.get(key), replicaCapacity.get(key));
}
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
|| assignedPartitionsByResourceSize < resourceMaxPartitionsPerInstance;

if (!exceedResourceMaxPartitionLimit) {
LOG.debug("Cannot exceed the max number of partitions per resource ({}) limitation on node. Assigned replica count: {}",
resourceMaxPartitionsPerInstance, assignedPartitionsByResourceSize);
if (isLoggingEnabled(replica)) {
LOG.info("Cannot exceed the max number of partitions per resource ({}) limitation on node. Assigned replica count: {}",
resourceMaxPartitionsPerInstance, assignedPartitionsByResourceSize);
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
List<String> disabledPartitions = node.getDisabledPartitionsMap().get(replica.getResourceName());

if (disabledPartitions != null && disabledPartitions.contains(replica.getPartitionName())) {
LOG.debug("Cannot assign the inactive replica: {}", replica.getPartitionName());
if (isLoggingEnabled(replica)) {
LOG.info("Cannot assign the inactive replica: {}", replica.getPartitionName());
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
Set<String> assignedPartitionsByResource = node.getAssignedPartitionsByResource(replica.getResourceName());

if (assignedPartitionsByResource.contains(replica.getPartitionName())) {
LOG.debug("Same partition ({}) of different states cannot co-exist in one instance", replica.getPartitionName());
if (isLoggingEnabled(replica)) {
LOG.info("Same partition ({}) of different states cannot co-exist in one instance", replica.getPartitionName());
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
}

if (!node.getInstanceTags().contains(replica.getResourceInstanceGroupTag())) {
LOG.debug("Instance doesn't have the tag of the replica ({})", replica.getResourceInstanceGroupTag());
if (isLoggingEnabled(replica)) {
LOG.info("Instance doesn't have the tag of the replica ({})", replica.getResourceInstanceGroupTag());
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
* under the License.
*/

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
Expand All @@ -35,12 +36,16 @@

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


public class TestConstraintBasedAlgorithm {
@Test(expectedExceptions = HelixRebalanceException.class)
public void testCalculateNoValidAssignment() throws IOException, HelixRebalanceException {

@Test
public void testCalculateNoValidAssignment() throws IOException {
HardConstraint mockHardConstraint = mock(HardConstraint.class);
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(false);
Expand All @@ -49,7 +54,40 @@ public void testCalculateNoValidAssignment() throws IOException, HelixRebalanceE
new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableMap.of(mockSoftConstraint, 1f));
ClusterModel clusterModel = new ClusterModelTestHelper().getDefaultClusterModel();
try {
algorithm.calculate(clusterModel);
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}

Assert.assertEquals(algorithm.getLogEnabledReplicas().size(), 1);
verify(mockHardConstraint, times(1)).isAssignmentValid(any(), any(), any());
}

@Test
public void testCalculateNoValidAssignmentFirstAndThenRecovery() throws IOException, HelixRebalanceException {
HardConstraint mockHardConstraint = mock(HardConstraint.class);
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any()))
.thenReturn(false) // hard constraint fails
.thenReturn(true); // hard constraint recovers
when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0);
ConstraintBasedAlgorithm algorithm =
new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableMap.of(mockSoftConstraint, 1f));
ClusterModel clusterModel = new ClusterModelTestHelper().getDefaultClusterModel();
try {
algorithm.calculate(clusterModel);
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}

Assert.assertEquals(algorithm.getLogEnabledReplicas().size(), 1);
verify(mockHardConstraint, times(1)).isAssignmentValid(any(), any(), any());

// calling again for recovery (no exception)
algorithm.calculate(clusterModel);
Assert.assertEquals(algorithm.getLogEnabledReplicas().size(), 0);
}

@Test
Expand Down
Loading