diff --git a/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java b/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java index d97173a9ad..f248c9becb 100644 --- a/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java +++ b/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java @@ -24,7 +24,13 @@ public class VirtualTopologyGroupConstants { public static final String GROUP_NAME = "virtualTopologyGroupName"; public static final String GROUP_NUMBER = "virtualTopologyGroupNumber"; public static final String AUTO_MAINTENANCE_MODE_DISABLED = "autoMaintenanceModeDisabled"; + public static final String ASSIGNMENT_ALGORITHM_TYPE = "assignmentAlgorithmType"; public static final String GROUP_NAME_SPLITTER = "_"; public static final String PATH_NAME_SPLITTER = "/"; public static final String VIRTUAL_FAULT_ZONE_TYPE = "virtualZone"; + public static final String FORCE_RECOMPUTE = "forceRecompute"; + + public enum VirtualGroupAssignmentAlgorithm { + ZONE_BASED, INSTANCE_BASED + } } diff --git a/helix-core/src/main/java/org/apache/helix/cloud/topology/FaultZoneBasedVirtualGroupAssignmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/cloud/topology/FaultZoneBasedVirtualGroupAssignmentAlgorithm.java new file mode 100644 index 0000000000..4b22fc99bc --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/cloud/topology/FaultZoneBasedVirtualGroupAssignmentAlgorithm.java @@ -0,0 +1,173 @@ +package org.apache.helix.cloud.topology; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import org.apache.commons.math3.util.Pair; + +import static org.apache.helix.util.VirtualTopologyUtil.computeVirtualGroupId; + +/** + * A virtual group assignment algorithm that assigns zones and their instances to virtual groups + * a way that preserves existing zone-to-group assignments whenever possible, and balances any + * remaining unassigned zones across the least-loaded groups. If the requested number of groups + * differs from the existing assignment, a new distribution is computed. Otherwise, if a zone + * already exists in the provided assignment, all its instances (including newly discovered ones) + * are placed in the same group, ensuring no zone is split across multiple virtual groups. + */ +public class FaultZoneBasedVirtualGroupAssignmentAlgorithm implements VirtualGroupAssignmentAlgorithm { + + private static final FaultZoneBasedVirtualGroupAssignmentAlgorithm _instance = + new FaultZoneBasedVirtualGroupAssignmentAlgorithm(); + + private FaultZoneBasedVirtualGroupAssignmentAlgorithm() { + } + + public static FaultZoneBasedVirtualGroupAssignmentAlgorithm getInstance() { + return _instance; + } + + @Override + public Map> computeAssignment(int numGroups, String virtualGroupName, + Map> zoneMapping, Map> virtualGroupToInstancesMap) { + // 1. If the number of requested virtual groups differs from the current assignment size, + // we must do a fresh assignment (the existing distribution is invalid). + if (numGroups != virtualGroupToInstancesMap.size()) { + Map> newAssignment = new HashMap<>(); + for (int i = 0; i < numGroups; i++) { + newAssignment.put(computeVirtualGroupId(i, virtualGroupName), new HashSet<>()); + } + + // Assign all zones from scratch in a balanced manner. + distributeUnassignedZones(newAssignment, new ArrayList<>(zoneMapping.keySet()), zoneMapping); + return constructResult(newAssignment, zoneMapping); + } + + // 2. Find unassigned zones. If there is any, incrementally assign them to the least-loaded + // virtual group. + // Build instance-to-zone mapping for quick zone lookups. + Map instanceToZoneMapping = new HashMap<>(); + for (Map.Entry> entry : zoneMapping.entrySet()) { + for (String instance : entry.getValue()) { + instanceToZoneMapping.put(instance, entry.getKey()); + } + } + + // Copy zoneMapping for tracking which zones are unassigned. + Set unassignedZones = new HashSet<>(zoneMapping.keySet()); + + // Build virtual group -> zone mapping and remove assigned zones from the unassigned list + Map> virtualGroupToZoneMapping = new HashMap<>(); + for (Map.Entry> entry : virtualGroupToInstancesMap.entrySet()) { + virtualGroupToZoneMapping.putIfAbsent(entry.getKey(), new HashSet<>()); + for (String instance : entry.getValue()) { + String zone = instanceToZoneMapping.get(instance); + virtualGroupToZoneMapping.get(entry.getKey()).add(zone); + unassignedZones.remove(zone); + } + } + + // If there are no unassigned zones, return the result as is. + if (unassignedZones.isEmpty()) { + return constructResult(virtualGroupToZoneMapping, zoneMapping); + } + + // Distribute unassigned zones to keep the overall distribution balanced. + distributeUnassignedZones(virtualGroupToZoneMapping, new ArrayList<>(unassignedZones), + zoneMapping); + return constructResult(virtualGroupToZoneMapping, zoneMapping); + } + + @Override + public Map> computeAssignment(int numGroups, String virtualGroupName, + Map> zoneMapping) { + return computeAssignment(numGroups, virtualGroupName, zoneMapping, new HashMap<>()); + } + + /** + * Distributes unassigned zones across virtual groups in a balanced manner. + * Assigns heavier zones first to the current least-loaded group. + * + * @param virtualGroupToZoneMapping Current assignment of virtual group -> set of zones. + * @param unassignedZones List of zones that have not been assigned to any group. + * @param zoneMapping Mapping of physical zone -> set of instances. + */ + private void distributeUnassignedZones( + Map> virtualGroupToZoneMapping, List unassignedZones, + Map> zoneMapping) { + + // Priority queue sorted by current load of the virtual group + // We always assign new zones to the group with the smallest load to keep them balanced. + Queue minHeap = new PriorityQueue<>( + Comparator.comparingInt(vg -> + virtualGroupToZoneMapping.get(vg).stream() + .map(zoneMapping::get) + .mapToInt(Set::size) + .sum() + ) + ); + // Seed the min-heap with existing groups + minHeap.addAll(virtualGroupToZoneMapping.keySet()); + + // Sort unassigned zones by descending number of unassigned instances, assigning "heavier" zones first. + unassignedZones.sort(Comparator.comparingInt(zone -> zoneMapping.get(zone).size()) + .reversed()); + + // Assign each zone to the least-loaded group + for (String zone : unassignedZones) { + String leastLoadVg = minHeap.poll(); + virtualGroupToZoneMapping.get(leastLoadVg).add(zone); + minHeap.offer(leastLoadVg); + } + } + + /** + * Constructs the final result by mapping virtual groups to their instances. + * + * @param vgToZonesMapping Mapping of virtual group -> set of zones. + * @param zoneToInstancesMapping Mapping of zone -> set of instances. + * @return Mapping of virtual group -> set of instances. + */ + private Map> constructResult(Map> vgToZonesMapping, + Map> zoneToInstancesMapping) { + Map> result = new HashMap<>(); + for (Map.Entry> entry : vgToZonesMapping.entrySet()) { + Set instances = new HashSet<>(); + for (String zone : entry.getValue()) { + instances.addAll(zoneToInstancesMapping.get(zone)); + } + result.put(entry.getKey(), instances); + } + return result; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java index 23da847244..1b1b3ede25 100644 --- a/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java @@ -30,6 +30,7 @@ import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants; import org.apache.helix.util.HelixUtil; +import static org.apache.helix.util.VirtualTopologyUtil.computeVirtualGroupId; /** * A strategy that densely assign virtual groups with input instance list, it doesn't move to the next one until @@ -49,7 +50,7 @@ public static FifoVirtualGroupAssignmentAlgorithm getInstance() { @Override public Map> computeAssignment(int numGroups, String virtualGroupName, - Map> zoneMapping) { + Map> zoneMapping, Map> virtualGroupToInstancesMap) { List sortedInstances = HelixUtil.sortAndFlattenZoneMapping(zoneMapping); Map> assignment = new HashMap<>(); // #instances = instancesPerGroupBase * numGroups + residuals @@ -73,7 +74,9 @@ public Map> computeAssignment(int numGroups, String virtualG return ImmutableMap.copyOf(assignment); } - private static String computeVirtualGroupId(int groupIndex, String virtualGroupName) { - return virtualGroupName + VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + groupIndex; + @Override + public Map> computeAssignment(int numGroups, String virtualGroupName, + Map> zoneMapping) { + return computeAssignment(numGroups, virtualGroupName, zoneMapping, new HashMap<>()); } } diff --git a/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java index 8d6c97feff..d5ac818a12 100644 --- a/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java @@ -24,6 +24,19 @@ public interface VirtualGroupAssignmentAlgorithm { + /** + * Compute the assignment for each virtual topology group. + * + * @param numGroups number of the virtual groups + * @param virtualGroupName virtual group name + * @param zoneMapping current zone mapping from zoneId to instanceIds + * @param virtualGroupToInstancesMap current virtual group mapping from virtual group Id to instancesIds + * @return the assignment as mapping from virtual group ID to instanceIds + */ + default Map> computeAssignment(int numGroups, String virtualGroupName, + Map> zoneMapping, Map> virtualGroupToInstancesMap) { + return computeAssignment(numGroups, virtualGroupName, zoneMapping); + } /** * Compute the assignment for each virtual topology group. @@ -33,6 +46,7 @@ public interface VirtualGroupAssignmentAlgorithm { * @param zoneMapping current zone mapping from zoneId to instanceIds * @return the assignment as mapping from virtual group ID to instanceIds */ + @Deprecated Map> computeAssignment(int numGroups, String virtualGroupName, Map> zoneMapping); } diff --git a/helix-core/src/main/java/org/apache/helix/util/VirtualTopologyUtil.java b/helix-core/src/main/java/org/apache/helix/util/VirtualTopologyUtil.java new file mode 100644 index 0000000000..876163b4f7 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/util/VirtualTopologyUtil.java @@ -0,0 +1,38 @@ +package org.apache.helix.util; + +import io.netty.util.internal.StringUtil; +import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants; + +public class VirtualTopologyUtil { + public static String computeVirtualGroupId(int groupIndex, String virtualGroupName) { + return virtualGroupName + VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + groupIndex; + } + + /** + * Ensures the provided fault zone type string ends with + * the virtual fault zone type suffix. + * + * @param oldFaultZoneType The original fault zone type. Must not be null or empty. + * @return The fault zone type string with the virtual fault zone type appended if necessary. + * @throws IllegalArgumentException if {@code oldFaultZoneType} is null or empty + */ + public static String computeVirtualFaultZoneTypeKey(String oldFaultZoneType) { + if (StringUtil.isNullOrEmpty(oldFaultZoneType)) { + throw new IllegalArgumentException("The old fault zone type is null or empty"); + } + + String suffix = VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + + VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE; + + // If already ends with splitter + VIRTUAL_FAULT_ZONE_TYPE, return as-is + if (oldFaultZoneType.endsWith(suffix)) { + return oldFaultZoneType; + } + + // Otherwise, remove any existing suffix parts beyond the first splitter, if needed + String[] segments = oldFaultZoneType.split(VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER); + String baseName = segments[0]; + + return baseName + suffix; + } +} diff --git a/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestFaultZoneBasedVirtualGroupAssignment.java b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestFaultZoneBasedVirtualGroupAssignment.java new file mode 100644 index 0000000000..85463fe522 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestFaultZoneBasedVirtualGroupAssignment.java @@ -0,0 +1,169 @@ +package org.apache.helix.cloud.virtualTopologyGroup; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.cloud.topology.FaultZoneBasedVirtualGroupAssignmentAlgorithm; +import org.apache.helix.cloud.topology.VirtualGroupAssignmentAlgorithm; +import org.testng.Assert; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.apache.helix.util.VirtualTopologyUtil.computeVirtualGroupId; + +public class TestFaultZoneBasedVirtualGroupAssignment { + + private static final String GROUP_NAME = "test_virtual_group"; + private static final int ZONE_NUMBER = 20; + private static final int INSTANCES_PER_ZONE = 5; + private Map> _zoneMapping = new HashMap<>(); + private Map> _zoneMapping2 = new HashMap<>(); + + @BeforeTest + public void prepare() { + _zoneMapping = new HashMap<>(); + _zoneMapping2 = new HashMap<>(); + int instanceIdx = 0; + for (int i = 0; i < ZONE_NUMBER; i++) { + String zone = "zone_" + i; + _zoneMapping.computeIfAbsent(zone, k -> new HashSet<>()); + _zoneMapping2.computeIfAbsent(zone, k -> new HashSet<>()); + for (int j = 0; j < INSTANCES_PER_ZONE; j++) { + String instance = "instance_" + instanceIdx++; + _zoneMapping.get(zone).add(instance); + _zoneMapping2.get(zone).add(instance); + } + } + // Add a branch zone zone_20 to zoneMapping2 + _zoneMapping2.computeIfAbsent("zone_20", k -> new HashSet<>()); + for (int j = 0; j < INSTANCES_PER_ZONE; j++) { + String instance = "instance_" + instanceIdx++; + _zoneMapping2.get("zone_" + (ZONE_NUMBER)).add(instance); + } + } + + @Test(dataProvider = "getMappingTests") + public void testAssignmentScheme(int numGroups, Map> expected, + VirtualGroupAssignmentAlgorithm algorithm, Map> zoneMapping, + Map> virtualMapping) { + Assert.assertEquals( + algorithm.computeAssignment(numGroups, GROUP_NAME, zoneMapping, virtualMapping), expected); + } + + @DataProvider + public Object[][] getMappingTests() { + VirtualGroupAssignmentAlgorithm algorithm = FaultZoneBasedVirtualGroupAssignmentAlgorithm.getInstance(); + + // The virtual groups should be balanced across zones + Map> virtualMapping = new HashMap<>(); + + virtualMapping.put(computeVirtualGroupId(0, GROUP_NAME), new HashSet<>()); + virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_5")); + virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_1")); + virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_16")); + virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_7")); + virtualMapping.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_14")); + + virtualMapping.put(computeVirtualGroupId(1, GROUP_NAME), new HashSet<>()); + virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_0")); + virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_12")); + virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_3")); + virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_18")); + virtualMapping.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_10")); + + virtualMapping.put(computeVirtualGroupId(2, GROUP_NAME), new HashSet<>()); + virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_17")); + virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_9")); + virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_11")); + virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_19")); + virtualMapping.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_4")); + + virtualMapping.put(computeVirtualGroupId(3, GROUP_NAME), new HashSet<>()); + virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_13")); + virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_6")); + virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_2")); + virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_15")); + virtualMapping.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_8")); + + + Map> virtualMapping2 = new HashMap<>(); + virtualMapping2.put(computeVirtualGroupId(0, GROUP_NAME), new HashSet<>()); + virtualMapping2.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_1")); + virtualMapping2.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping.get("zone_12")); + + virtualMapping2.put(computeVirtualGroupId(1, GROUP_NAME), new HashSet<>()); + virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_0")); + virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_5")); + virtualMapping2.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping.get("zone_13")); + + virtualMapping2.put(computeVirtualGroupId(2, GROUP_NAME), new HashSet<>()); + virtualMapping2.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_17")); + virtualMapping2.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_6")); + virtualMapping2.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping.get("zone_15")); + + virtualMapping2.put(computeVirtualGroupId(3, GROUP_NAME), new HashSet<>()); + virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_19")); + virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_8")); + virtualMapping2.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping.get("zone_9")); + + virtualMapping2.put(computeVirtualGroupId(4, GROUP_NAME), new HashSet<>()); + virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_7")); + virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_10")); + virtualMapping2.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping.get("zone_18")); + + virtualMapping2.put(computeVirtualGroupId(5, GROUP_NAME), new HashSet<>()); + virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_3")); + virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_16")); + virtualMapping2.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping.get("zone_14")); + + virtualMapping2.put(computeVirtualGroupId(6, GROUP_NAME), new HashSet<>()); + virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_11")); + virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_2")); + virtualMapping2.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping.get("zone_4")); + + + Map> virtualMapping3 = new HashMap<>(); + virtualMapping3.put(computeVirtualGroupId(0, GROUP_NAME), new HashSet<>()); + virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_1")); + virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_12")); + virtualMapping3.get(computeVirtualGroupId(0, GROUP_NAME)).addAll(_zoneMapping2.get("zone_20")); + + virtualMapping3.put(computeVirtualGroupId(1, GROUP_NAME), new HashSet<>()); + virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_0")); + virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_5")); + virtualMapping3.get(computeVirtualGroupId(1, GROUP_NAME)).addAll(_zoneMapping2.get("zone_13")); + + virtualMapping3.put(computeVirtualGroupId(2, GROUP_NAME), new HashSet<>()); + virtualMapping3.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping2.get("zone_17")); + virtualMapping3.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping2.get("zone_6")); + virtualMapping3.get(computeVirtualGroupId(2, GROUP_NAME)).addAll(_zoneMapping2.get("zone_15")); + + virtualMapping3.put(computeVirtualGroupId(3, GROUP_NAME), new HashSet<>()); + virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_19")); + virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_8")); + virtualMapping3.get(computeVirtualGroupId(3, GROUP_NAME)).addAll(_zoneMapping2.get("zone_9")); + + virtualMapping3.put(computeVirtualGroupId(4, GROUP_NAME), new HashSet<>()); + virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_7")); + virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_10")); + virtualMapping3.get(computeVirtualGroupId(4, GROUP_NAME)).addAll(_zoneMapping2.get("zone_18")); + + virtualMapping3.put(computeVirtualGroupId(5, GROUP_NAME), new HashSet<>()); + virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_3")); + virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_16")); + virtualMapping3.get(computeVirtualGroupId(5, GROUP_NAME)).addAll(_zoneMapping2.get("zone_14")); + + virtualMapping3.put(computeVirtualGroupId(6, GROUP_NAME), new HashSet<>()); + virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_11")); + virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_2")); + virtualMapping3.get(computeVirtualGroupId(6, GROUP_NAME)).addAll(_zoneMapping2.get("zone_4")); + + return new Object[][]{{4, virtualMapping, algorithm, _zoneMapping, new HashMap<>()}, + {7, virtualMapping2, algorithm, _zoneMapping, new HashMap<>()}, + // Should incrementally add the new zone to the virtual groups + {7, virtualMapping3, algorithm, _zoneMapping2, virtualMapping2}}; + } +} diff --git a/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java index 54f4365800..a77a47c04a 100644 --- a/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java +++ b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java @@ -21,11 +21,12 @@ import com.google.common.collect.Sets; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants; + import org.apache.helix.cloud.topology.FifoVirtualGroupAssignmentAlgorithm; import org.apache.helix.cloud.topology.VirtualGroupAssignmentAlgorithm; import org.apache.helix.util.HelixUtil; @@ -34,6 +35,8 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.apache.helix.util.VirtualTopologyUtil.computeVirtualGroupId; + public class TestVirtualTopologyGroupAssignment { private static final String GROUP_NAME = "test_virtual_group"; @@ -61,34 +64,32 @@ public void testFlattenZoneMapping() { @Test(dataProvider = "getMappingTests") public void testAssignmentScheme(int numGroups, Map> expected, VirtualGroupAssignmentAlgorithm algorithm) { - Assert.assertEquals(algorithm.computeAssignment(numGroups, GROUP_NAME, _zoneMapping), expected); + Assert.assertEquals(algorithm.computeAssignment(numGroups, GROUP_NAME, _zoneMapping, + Collections.emptyMap()), expected); } @DataProvider public Object[][] getMappingTests() { Map> virtualMapping = new HashMap<>(); VirtualGroupAssignmentAlgorithm algorithm = FifoVirtualGroupAssignmentAlgorithm.getInstance(); - virtualMapping.put(computeVirtualGroupId(0), Sets.newHashSet("1", "2", "3", "4", "5")); - virtualMapping.put(computeVirtualGroupId(1), Sets.newHashSet("6", "7", "8", "9")); - virtualMapping.put(computeVirtualGroupId(2), Sets.newHashSet("a", "b", "c", "d")); - Assert.assertEquals(algorithm.computeAssignment(3, GROUP_NAME, _zoneMapping), + virtualMapping.put(computeVirtualGroupId(0, GROUP_NAME), Sets.newHashSet("1", "2", "3", "4", "5")); + virtualMapping.put(computeVirtualGroupId(1, GROUP_NAME), Sets.newHashSet("6", "7", "8", "9")); + virtualMapping.put(computeVirtualGroupId(2, GROUP_NAME), Sets.newHashSet("a", "b", "c", "d")); + Assert.assertEquals(algorithm.computeAssignment(3, GROUP_NAME, _zoneMapping, Collections.emptyMap()), virtualMapping); Map> virtualMapping2 = new HashMap<>(); - virtualMapping2.put(computeVirtualGroupId(0), Sets.newHashSet("1", "2")); - virtualMapping2.put(computeVirtualGroupId(1), Sets.newHashSet("3", "4")); - virtualMapping2.put(computeVirtualGroupId(2), Sets.newHashSet("5", "6")); - virtualMapping2.put(computeVirtualGroupId(3), Sets.newHashSet("7", "8")); - virtualMapping2.put(computeVirtualGroupId(4), Sets.newHashSet("9", "a")); - virtualMapping2.put(computeVirtualGroupId(5), Sets.newHashSet("b")); - virtualMapping2.put(computeVirtualGroupId(6), Sets.newHashSet("c")); - virtualMapping2.put(computeVirtualGroupId(7), Sets.newHashSet("d")); + virtualMapping2.put(computeVirtualGroupId(0, GROUP_NAME), Sets.newHashSet("1", "2")); + virtualMapping2.put(computeVirtualGroupId(1, GROUP_NAME), Sets.newHashSet("3", "4")); + virtualMapping2.put(computeVirtualGroupId(2, GROUP_NAME), Sets.newHashSet("5", "6")); + virtualMapping2.put(computeVirtualGroupId(3, GROUP_NAME), Sets.newHashSet("7", "8")); + virtualMapping2.put(computeVirtualGroupId(4, GROUP_NAME), Sets.newHashSet("9", "a")); + virtualMapping2.put(computeVirtualGroupId(5, GROUP_NAME), Sets.newHashSet("b")); + virtualMapping2.put(computeVirtualGroupId(6, GROUP_NAME), Sets.newHashSet("c")); + virtualMapping2.put(computeVirtualGroupId(7, GROUP_NAME), Sets.newHashSet("d")); + return new Object[][] { {3, virtualMapping, algorithm}, {8, virtualMapping2, algorithm} }; } - - private static String computeVirtualGroupId(int groupIndex) { - return GROUP_NAME + VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + groupIndex; - } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java index db93571e85..89a49e9e11 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java @@ -35,6 +35,15 @@ public interface ClusterService { */ ClusterTopology getClusterTopology(String cluster); + /** + * Get the topology of a virtual cluster. If useRealTopology is true, return the real topology + * of the cluster. If useRealTopology is false, return the virtual topology of the cluster. + * @param cluster the cluster name + * @param useRealTopology whether to use the real topology or the virtual topology + * @return the cluster topology + */ + ClusterTopology getTopologyOfVirtualCluster(String cluster, boolean useRealTopology); + /** * Get cluster basic information * @param clusterId diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java index a152c3e647..527aa92abe 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -30,6 +31,7 @@ import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; +import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; @@ -48,18 +50,41 @@ public ClusterServiceImpl(HelixDataAccessor dataAccessor, ConfigAccessor configA @Override public ClusterTopology getClusterTopology(String cluster) { String zoneField = _configAccessor.getClusterConfig(cluster).getFaultZoneType(); + return getTopologyUnderDomainType(zoneField, cluster); + } + + @Override + public ClusterTopology getTopologyOfVirtualCluster(String cluster, boolean useRealTopology) { + String virtualZoneField = _configAccessor.getClusterConfig(cluster).getFaultZoneType(); + String faultZone = virtualZoneField.split(VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER)[0]; + if (useRealTopology) { + // If the user requested to use real topology, return the real topology + return getTopologyUnderDomainType(faultZone, cluster); + } + + String virtualZoneSuffix = VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + + VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE; + // If the cluster doesn't have a virtual topology but the user requested, return empty + // topology, indicating that virtual topology is not enabled + if (!virtualZoneField.endsWith(virtualZoneSuffix)) { + return new ClusterTopology(cluster, new ArrayList<>(), new HashSet<>()); + } + return getTopologyUnderDomainType(virtualZoneField, cluster); + } + + private ClusterTopology getTopologyUnderDomainType(String faultZone, String clusterId) { PropertyKey.Builder keyBuilder = _dataAccessor.keyBuilder(); List instanceConfigs = _dataAccessor.getChildValues(keyBuilder.instanceConfigs(), true); Map> instanceMapByZone = new HashMap<>(); if (instanceConfigs != null && !instanceConfigs.isEmpty()) { for (InstanceConfig instanceConfig : instanceConfigs) { - if (!instanceConfig.getDomainAsMap().containsKey(zoneField)) { + if (!instanceConfig.getDomainAsMap().containsKey(faultZone)) { continue; } final String instanceName = instanceConfig.getInstanceName(); final ClusterTopology.Instance instance = new ClusterTopology.Instance(instanceName); - final String zoneId = instanceConfig.getDomainAsMap().get(zoneField); + final String zoneId = instanceConfig.getDomainAsMap().get(faultZone); if (instanceMapByZone.containsKey(zoneId)) { instanceMapByZone.get(zoneId).add(instance); } else { @@ -79,7 +104,7 @@ public ClusterTopology getClusterTopology(String cluster) { } // Get all the instances names - return new ClusterTopology(cluster, zones, + return new ClusterTopology(clusterId, zones, instanceConfigs.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toSet())); } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/VirtualTopologyGroupService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/VirtualTopologyGroupService.java index 2fd5f28247..efc36eaad7 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/VirtualTopologyGroupService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/VirtualTopologyGroupService.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,9 +35,9 @@ import org.apache.helix.HelixException; import org.apache.helix.PropertyPathBuilder; import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants; +import org.apache.helix.cloud.topology.FaultZoneBasedVirtualGroupAssignmentAlgorithm; import org.apache.helix.cloud.topology.FifoVirtualGroupAssignmentAlgorithm; import org.apache.helix.cloud.topology.VirtualGroupAssignmentAlgorithm; -import org.apache.helix.model.CloudConfig; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.InstanceConfig; @@ -46,6 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.helix.util.VirtualTopologyUtil.computeVirtualFaultZoneTypeKey; /** * Service for virtual topology group. @@ -60,7 +62,7 @@ public class VirtualTopologyGroupService { private final ClusterService _clusterService; private final ConfigAccessor _configAccessor; private final HelixDataAccessor _dataAccessor; - private final VirtualGroupAssignmentAlgorithm _assignmentAlgorithm; + private VirtualGroupAssignmentAlgorithm _assignmentAlgorithm; public VirtualTopologyGroupService(HelixAdmin helixAdmin, ClusterService clusterService, ConfigAccessor configAccessor, HelixDataAccessor dataAccessor) { @@ -68,7 +70,7 @@ public VirtualTopologyGroupService(HelixAdmin helixAdmin, ClusterService cluster _clusterService = clusterService; _configAccessor = configAccessor; _dataAccessor = dataAccessor; - _assignmentAlgorithm = FifoVirtualGroupAssignmentAlgorithm.getInstance(); + _assignmentAlgorithm = FifoVirtualGroupAssignmentAlgorithm.getInstance(); // default assignment algorithm } /** @@ -86,10 +88,27 @@ public VirtualTopologyGroupService(HelixAdmin helixAdmin, ClusterService cluster * -- if set false or not set, the cluster will automatically enter maintenance mode and exit after * the call succeeds. It won't proceed if the cluster is already in maintenance mode. * Either case, the cluster must be in maintenance mode before config change. + * {@link VirtualTopologyGroupConstants#ASSIGNMENT_ALGORITHM_TYPE} is optional, default to INSTANCE_BASED. + * {@link VirtualTopologyGroupConstants#FORCE_RECOMPUTE} is optional, default to false. + * -- if set true, the virtual topology group will be recomputed from scratch by ignoring the existing + * virtual topology group information. + * -- if set false or not set, the virtual topology group will be incrementally computed based on the + * existing virtual topology group information if possible. */ public void addVirtualTopologyGroup(String clusterName, Map customFields) { // validation ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); + // Collect the real topology of the cluster and the virtual topology of the cluster + ClusterTopology clusterTopology = _clusterService.getTopologyOfVirtualCluster(clusterName, true); + // If forceRecompute is set to true, we will recompute the virtual topology group from scratch + // by ignoring the existing virtual topology group information. + String forceRecompute = customFields.getOrDefault(VirtualTopologyGroupConstants.FORCE_RECOMPUTE, "false"); + boolean forceRecomputeFlag = Boolean.parseBoolean(forceRecompute); + ClusterTopology virtualTopology = + forceRecomputeFlag ? new ClusterTopology(clusterName, Collections.emptyList(), + Collections.emptySet()) + : _clusterService.getTopologyOfVirtualCluster(clusterName, false); + Preconditions.checkState(clusterConfig.isTopologyAwareEnabled(), "Topology-aware rebalance is not enabled in cluster " + clusterName); String groupName = customFields.get(VirtualTopologyGroupConstants.GROUP_NAME); @@ -103,14 +122,39 @@ public void addVirtualTopologyGroup(String clusterName, Map cust } catch (NumberFormatException ex) { throw new IllegalArgumentException("virtualTopologyGroupNumber " + groupNumberStr + " is not an integer.", ex); } + + String algorithm = customFields.get(VirtualTopologyGroupConstants.ASSIGNMENT_ALGORITHM_TYPE); + algorithm = algorithm == null ? VirtualTopologyGroupConstants.VirtualGroupAssignmentAlgorithm.INSTANCE_BASED.toString() : algorithm; + if (algorithm != null) { + VirtualTopologyGroupConstants.VirtualGroupAssignmentAlgorithm algorithmEnum = null; + try { + algorithmEnum = + VirtualTopologyGroupConstants.VirtualGroupAssignmentAlgorithm.valueOf(algorithm); + } catch (Exception e) { + throw new IllegalArgumentException( + "Failed to instantiate assignment algorithm " + algorithm, e); + } + switch (algorithmEnum) { + case ZONE_BASED: + Preconditions.checkArgument(numGroups <= clusterTopology.getZones().size(), + "Number of virtual groups cannot be greater than the number of zones."); + _assignmentAlgorithm = FaultZoneBasedVirtualGroupAssignmentAlgorithm.getInstance(); + break; + case INSTANCE_BASED: + Preconditions.checkArgument(numGroups <= clusterTopology.getAllInstances().size(), + "Number of virtual groups cannot be greater than the number of instances."); + _assignmentAlgorithm = FifoVirtualGroupAssignmentAlgorithm.getInstance(); + break; + default: + throw new IllegalArgumentException("Unsupported assignment algorithm " + algorithm); + } + } LOG.info("Computing virtual topology group for cluster {} with param {}", clusterName, customFields); // compute group assignment - ClusterTopology clusterTopology = _clusterService.getClusterTopology(clusterName); - Preconditions.checkArgument(numGroups <= clusterTopology.getAllInstances().size(), - "Number of virtual groups cannot be greater than the number of instances."); Map> assignment = - _assignmentAlgorithm.computeAssignment(numGroups, groupName, clusterTopology.toZoneMapping()); + _assignmentAlgorithm.computeAssignment(numGroups, groupName, + clusterTopology.toZoneMapping(), virtualTopology.toZoneMapping()); boolean autoMaintenanceModeDisabled = Boolean.parseBoolean( customFields.getOrDefault(VirtualTopologyGroupConstants.AUTO_MAINTENANCE_MODE_DISABLED, "false")); @@ -137,7 +181,7 @@ public void addVirtualTopologyGroup(String clusterName, Map cust private void updateConfigs(String clusterName, ClusterConfig clusterConfig, Map> assignment) { List zkPaths = new ArrayList<>(); List> updaters = new ArrayList<>(); - createInstanceConfigUpdater(clusterName, assignment).forEach((zkPath, updater) -> { + createInstanceConfigUpdater(clusterConfig, assignment).forEach((zkPath, updater) -> { zkPaths.add(zkPath); updaters.add(updater); }); @@ -151,7 +195,7 @@ private void updateConfigs(String clusterName, ClusterConfig clusterConfig, Map< // update cluster config String virtualTopologyString = computeVirtualTopologyString(clusterConfig); clusterConfig.setTopology(virtualTopologyString); - clusterConfig.setFaultZoneType(VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE); + clusterConfig.setFaultZoneType(computeVirtualFaultZoneTypeKey(clusterConfig.getFaultZoneType())); _configAccessor.updateClusterConfig(clusterName, clusterConfig); LOG.info("Successfully update instance and cluster config for {}", clusterName); } @@ -160,28 +204,28 @@ private void updateConfigs(String clusterName, ClusterConfig clusterConfig, Map< static String computeVirtualTopologyString(ClusterConfig clusterConfig) { ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig); String endNodeType = clusterTopologyConfig.getEndNodeType(); - String[] splits = new String[] {"", VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE, endNodeType}; + String[] splits = new String[] {"", computeVirtualFaultZoneTypeKey(clusterConfig.getFaultZoneType()), endNodeType}; return String.join(VirtualTopologyGroupConstants.PATH_NAME_SPLITTER, splits); } /** * Create updater for instance config for async update. - * @param clusterName cluster name of the instances. + * @param clusterConfig cluster config for the cluster which the instance reside. * @param assignment virtual group assignment. * @return a map from instance zkPath to its {@link DataUpdater} to update. */ @VisibleForTesting static Map> createInstanceConfigUpdater( - String clusterName, Map> assignment) { + ClusterConfig clusterConfig, Map> assignment) { Map> updaters = new HashMap<>(); for (Map.Entry> entry : assignment.entrySet()) { String virtualGroup = entry.getKey(); for (String instanceName : entry.getValue()) { - String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName); + String path = PropertyPathBuilder.instanceConfig(clusterConfig.getClusterName(), instanceName); updaters.put(path, currentData -> { InstanceConfig instanceConfig = new InstanceConfig(currentData); Map domainMap = instanceConfig.getDomainAsMap(); - domainMap.put(VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE, virtualGroup); + domainMap.put(computeVirtualFaultZoneTypeKey(clusterConfig.getFaultZoneType()), virtualGroup); instanceConfig.setDomain(domainMap); return instanceConfig.getRecord(); }); diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java index e882cf0cea..4ae7b3dfe8 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java @@ -75,6 +75,8 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.apache.helix.cloud.azure.AzureConstants.AZURE_FAULT_ZONE_TYPE; + public class TestClusterAccessor extends AbstractTestClass { private static final String VG_CLUSTER = "vgCluster"; @@ -221,32 +223,6 @@ public void testGetClusterTopologyAndFaultZoneMap() throws IOException { "/instance:TestCluster_1localhost_12927")))); } - @Test(dataProvider = "prepareVirtualTopologyTests", dependsOnMethods = "testGetClusters") - public void testAddVirtualTopologyGroup(String requestParam, int numGroups, - Map instanceToGroup) throws IOException { - post("clusters/" + VG_CLUSTER, - ImmutableMap.of("command", "addVirtualTopologyGroup"), - Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE), - Response.Status.OK.getStatusCode()); - Map topology = getMapResponseFromRest(String.format("clusters/%s/topology", VG_CLUSTER)); - Assert.assertTrue(topology.containsKey("zones")); - Assert.assertEquals(((List) topology.get("zones")).size(), numGroups); - - ClusterConfig clusterConfig = getClusterConfigFromRest(VG_CLUSTER); - String expectedTopology = "/" + VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE + "/hostname"; - Assert.assertEquals(clusterConfig.getTopology(), expectedTopology); - Assert.assertEquals(clusterConfig.getFaultZoneType(), VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE); - - HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(VG_CLUSTER, _baseAccessor); - for (Map.Entry entry : instanceToGroup.entrySet()) { - InstanceConfig instanceConfig = - helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(entry.getKey())); - String expectedGroup = entry.getValue(); - Assert.assertEquals(instanceConfig.getDomainAsMap().get(VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE), - expectedGroup); - } - } - @Test(dependsOnMethods = "testGetClusters") public void testVirtualTopologyGroupMaintenanceMode() throws JsonProcessingException { setupClusterForVirtualTopology(VG_CLUSTER); @@ -270,6 +246,36 @@ public void testVirtualTopologyGroupMaintenanceMode() throws JsonProcessingExcep Assert.assertTrue(isMaintenanceModeEnabled(VG_CLUSTER)); } + @Test(dataProvider = "prepareVirtualTopologyTests", dependsOnMethods = "testVirtualTopologyGroupMaintenanceMode") + public void testAddVirtualTopologyGroup(String requestParam, int numGroups, + Map instanceToGroup) throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName() + " with requestParam: " + requestParam); + post("clusters/" + VG_CLUSTER, + ImmutableMap.of("command", "addVirtualTopologyGroup"), + Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE), + Response.Status.OK.getStatusCode()); + Map topology = getMapResponseFromRest(String.format("clusters/%s/topology", VG_CLUSTER)); + Assert.assertTrue(topology.containsKey("zones")); + Assert.assertEquals(((List) topology.get("zones")).size(), numGroups, + "virtual groups not created as expected. Need " + numGroups + " groups but got " + + (topology.get("zones"))); + + ClusterConfig clusterConfig = getClusterConfigFromRest(VG_CLUSTER); + String expectedFaultZoneType = AZURE_FAULT_ZONE_TYPE + "_" + VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE; + String expectedTopology = "/" + expectedFaultZoneType + "/hostname"; + Assert.assertEquals(clusterConfig.getTopology(), expectedTopology); + Assert.assertEquals(clusterConfig.getFaultZoneType(), expectedFaultZoneType); + + HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(VG_CLUSTER, _baseAccessor); + for (Map.Entry entry : instanceToGroup.entrySet()) { + InstanceConfig instanceConfig = + helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(entry.getKey())); + String expectedGroup = entry.getValue(); + Assert.assertEquals(instanceConfig.getDomainAsMap().get(expectedFaultZoneType), + expectedGroup); + } + } + private boolean isMaintenanceModeEnabled(String clusterName) throws JsonProcessingException { String body = get("clusters/" + clusterName + "/maintenance", null, Response.Status.OK.getStatusCode(), true); @@ -281,6 +287,14 @@ public Object[][] prepareVirtualTopologyTests() { setupClusterForVirtualTopology(VG_CLUSTER); String test1 = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\"}"; String test2 = "{\"virtualTopologyGroupNumber\":\"9\",\"virtualTopologyGroupName\":\"vgTest\"}"; + // Split 5 zones into 2 virtual groups, expect 0-1-2 in virtual group 0, 3-4 in virtual group 1 + String test3 = "{\"virtualTopologyGroupNumber\":\"2\",\"virtualTopologyGroupName\":\"vgTest\"," + + "\"assignmentAlgorithmType\":\"ZONE_BASED\"}"; + String test4 = "{\"virtualTopologyGroupNumber\":\"5\",\"virtualTopologyGroupName\":\"vgTest\"," + + "\"assignmentAlgorithmType\":\"ZONE_BASED\"}"; + String test5 = "{\"virtualTopologyGroupNumber\":\"2\",\"virtualTopologyGroupName\":\"vgTest\"," + + "\"assignmentAlgorithmType\":\"ZONE_BASED\",\"forceRecompute\"" + + ":\"true\"}"; return new Object[][] { {test1, 7, ImmutableMap.of( "vgCluster_localhost_12918", "vgTest_0", @@ -297,7 +311,29 @@ public Object[][] prepareVirtualTopologyTests() { "vgCluster_localhost_12918", "vgTest_0", "vgCluster_localhost_12919", "vgTest_0", "vgCluster_localhost_12925", "vgTest_4", - "vgCluster_localhost_12927", "vgTest_6")} + "vgCluster_localhost_12927", "vgTest_6")}, + {test3, 2, ImmutableMap.of( + "vgCluster_localhost_12918", "vgTest_0", + "vgCluster_localhost_12919", "vgTest_0", + "vgCluster_localhost_12925", "vgTest_1", + "vgCluster_localhost_12927", "vgTest_0")}, + {test4, 5, ImmutableMap.of( + "vgCluster_localhost_12918", "vgTest_4", + "vgCluster_localhost_12919", "vgTest_4", + "vgCluster_localhost_12925", "vgTest_2", + "vgCluster_localhost_12927", "vgTest_1")}, + // repeat test3 for deterministic and test for decreasing numGroups + {test3, 2, ImmutableMap.of( + "vgCluster_localhost_12918", "vgTest_0", + "vgCluster_localhost_12919", "vgTest_0", + "vgCluster_localhost_12925", "vgTest_1", + "vgCluster_localhost_12927", "vgTest_0")}, + // Force recompute to reassign instances to virtual groups + {test5, 2, ImmutableMap.of( + "vgCluster_localhost_12918", "vgTest_0", + "vgCluster_localhost_12919", "vgTest_0", + "vgCluster_localhost_12925", "vgTest_1", + "vgCluster_localhost_12927", "vgTest_0")}, }; } @@ -940,7 +976,7 @@ public void testAddClusterWithCloudConfig() throws Exception { ClusterConfig clusterConfigFromZk = _configAccessor.getClusterConfig(clusterName); Assert.assertEquals(clusterConfigFromZk.getTopology(), AzureConstants.AZURE_TOPOLOGY); - Assert.assertEquals(clusterConfigFromZk.getFaultZoneType(), AzureConstants.AZURE_FAULT_ZONE_TYPE); + Assert.assertEquals(clusterConfigFromZk.getFaultZoneType(), AZURE_FAULT_ZONE_TYPE); Assert.assertTrue(clusterConfigFromZk.isTopologyAwareEnabled()); } diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java index 23182d2bb6..8bd414da4b 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java @@ -120,6 +120,44 @@ public void testCheckTopologyAware() { Assert.assertTrue(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER)); } + @Test + public void testGetVirtualTopology() { + InstanceConfig instanceConfig1 = new InstanceConfig("instance0"); + instanceConfig1.setDomain("helixZoneId=zone0, helixZoneId_virtualZone=virtualZone0"); + InstanceConfig instanceConfig2 = new InstanceConfig("instance1"); + instanceConfig2.setDomain("helixZoneId=zone1, helixZoneId_virtualZone=virtualZone1"); + InstanceConfig instanceConfig3 = new InstanceConfig("instance3"); + instanceConfig3.setDomain("helixZoneId=zone3"); + List instanceConfigs = ImmutableList.of(instanceConfig1, instanceConfig2, instanceConfig3); + + Mock mock = new Mock(); + ClusterConfig mockConfig = new ClusterConfig(TEST_CLUSTER); + mockConfig.setFaultZoneType("helixZoneId_virtualZone"); + when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(mockConfig); + when(mock.dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER)); + when(mock.dataAccessor.getChildValues(any(PropertyKey.class), anyBoolean())) + .thenReturn(instanceConfigs); + + // When use `getClusterTopology` on a virtual topology cluster, it shall return topology + // based on the configured fault zone type + ClusterTopology clusterTopology = mock.clusterService.getClusterTopology(TEST_CLUSTER); + Assert.assertEquals(clusterTopology.getZones().size(), 2); + Assert.assertEquals(clusterTopology.getClusterId(), TEST_CLUSTER); + Assert.assertEquals(clusterTopology.getZones().get(0).getInstances().size(), 1); + + // When use `getVirtualClusterTopology` on a virtual topology cluster, it shall return the + // virtual topology + clusterTopology = mock.clusterService.getTopologyOfVirtualCluster(TEST_CLUSTER, true); + Assert.assertEquals(clusterTopology.getZones().size(), 3); + Assert.assertEquals(clusterTopology.getClusterId(), TEST_CLUSTER); + + // When use `getVirtualClusterTopology` on a virtual topology cluster, it shall return the + // virtual topology + clusterTopology = mock.clusterService.getTopologyOfVirtualCluster(TEST_CLUSTER, false); + Assert.assertEquals(clusterTopology.getZones().size(), 2); + Assert.assertEquals(clusterTopology.getClusterId(), TEST_CLUSTER); + } + private final class Mock { private HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class); private ConfigAccessor configAccessor = mock(ConfigAccessor.class); diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestVirtualTopologyGroupService.java b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestVirtualTopologyGroupService.java index 3ffc29c74c..685b5347d3 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestVirtualTopologyGroupService.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestVirtualTopologyGroupService.java @@ -22,6 +22,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; + +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,10 +31,7 @@ import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixException; import org.apache.helix.cloud.azure.AzureConstants; -import org.apache.helix.cloud.constants.CloudProvider; -import org.apache.helix.model.CloudConfig; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.InstanceConfig; @@ -40,12 +39,15 @@ import org.apache.helix.rest.server.json.cluster.ClusterTopology; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.zkclient.DataUpdater; +import org.mockito.ArgumentMatchers; import org.testng.Assert; import org.testng.annotations.BeforeTest; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.apache.helix.cloud.azure.AzureConstants.AZURE_FAULT_ZONE_TYPE; import static org.apache.helix.cloud.constants.VirtualTopologyGroupConstants.*; +import static org.apache.helix.util.VirtualTopologyUtil.computeVirtualFaultZoneTypeKey; import static org.mockito.Mockito.*; @@ -53,6 +55,7 @@ public class TestVirtualTopologyGroupService { private static final String TEST_CLUSTER = "Test_Cluster"; private static final String TEST_CLUSTER0 = "TestCluster_0"; private static final String TEST_CLUSTER1 = "TestCluster_1"; + private static final String FAULT_ZONE_TYPE = "helixZoneId"; private final ConfigAccessor _configAccessor = mock(ConfigAccessor.class); private final HelixDataAccessor _dataAccessor = mock(HelixDataAccessor.class); @@ -75,13 +78,18 @@ public void prepare() { assignment.put("virtual_group_0", ImmutableSet.of("instance_0", "instance_1")); assignment.put("virtual_group_1", ImmutableSet.of("instance_2")); - _updaterMap = VirtualTopologyGroupService.createInstanceConfigUpdater(TEST_CLUSTER, assignment); - - ClusterConfig clusterConfig = new ClusterConfig(TEST_CLUSTER0); - clusterConfig.setFaultZoneType(AzureConstants.AZURE_FAULT_ZONE_TYPE); - clusterConfig.setTopology(AzureConstants.AZURE_TOPOLOGY); - clusterConfig.setTopologyAwareEnabled(true); - when(_configAccessor.getClusterConfig(TEST_CLUSTER0)).thenReturn(clusterConfig); + ClusterConfig testClusterConfig = new ClusterConfig(TEST_CLUSTER); + testClusterConfig.setFaultZoneType(FAULT_ZONE_TYPE); + testClusterConfig.setTopology("/helixZoneId"); + testClusterConfig.setTopologyAwareEnabled(true); + when(_configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(testClusterConfig); + _updaterMap = VirtualTopologyGroupService.createInstanceConfigUpdater(testClusterConfig, assignment); + + ClusterConfig testClusterConfig0 = new ClusterConfig(TEST_CLUSTER0); + testClusterConfig0.setFaultZoneType(AZURE_FAULT_ZONE_TYPE); + testClusterConfig0.setTopology(AzureConstants.AZURE_TOPOLOGY); + testClusterConfig0.setTopologyAwareEnabled(true); + when(_configAccessor.getClusterConfig(TEST_CLUSTER0)).thenReturn(testClusterConfig0); _helixAdmin = mock(HelixAdmin.class); when(_helixAdmin.isInMaintenanceMode(anyString())).thenReturn(true); @@ -91,6 +99,10 @@ public void prepare() { when(_dataAccessor.updateChildren(anyList(), anyList(), anyInt())).thenReturn(results); ClusterService clusterService = mock(ClusterService.class); when(clusterService.getClusterTopology(anyString())).thenReturn(prepareClusterTopology()); + when(clusterService.getTopologyOfVirtualCluster(anyString(), ArgumentMatchers.eq(true))).thenReturn( + prepareClusterTopology()); + when(clusterService.getTopologyOfVirtualCluster(anyString(), ArgumentMatchers.eq(false))).thenReturn( + new ClusterTopology(TEST_CLUSTER0, Collections.emptyList(), Collections.emptySet())); _service = new VirtualTopologyGroupService(_helixAdmin, clusterService, _configAccessor, _dataAccessor); } @@ -134,6 +146,14 @@ public void testNumberOfInstanceCheck() { GROUP_NAME, "test-group", GROUP_NUMBER, "10", AUTO_MAINTENANCE_MODE_DISABLED, "true")); } + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Number of virtual groups cannot be greater than the number of zones.*") + public void testFaultZoneBasedVirtualGroupAssignment() { + _service.addVirtualTopologyGroup(TEST_CLUSTER0, ImmutableMap.of( + GROUP_NAME, "test-group", GROUP_NUMBER, "3", AUTO_MAINTENANCE_MODE_DISABLED, "true", + ASSIGNMENT_ALGORITHM_TYPE, "ZONE_BASED")); + } + @Test(expectedExceptions = IllegalArgumentException.class) public void testParamValidation() { _service.addVirtualTopologyGroup(TEST_CLUSTER0, ImmutableMap.of(GROUP_NUMBER, "2")); @@ -150,11 +170,11 @@ public void testInstanceConfigUpdater(String zkPath, InstanceConfig instanceConf public Object[][] instanceTestProvider() { return new Object[][] { {computeZkPath("instance_0"), _instanceConfig0, - ImmutableMap.of("helixZoneId", "zone0", VIRTUAL_FAULT_ZONE_TYPE, "virtual_group_0")}, + ImmutableMap.of(FAULT_ZONE_TYPE, "zone0", computeVirtualFaultZoneTypeKey(FAULT_ZONE_TYPE), "virtual_group_0")}, {computeZkPath("instance_1"), _instanceConfig1, - ImmutableMap.of("helixZoneId", "zone0", VIRTUAL_FAULT_ZONE_TYPE, "virtual_group_0")}, + ImmutableMap.of(FAULT_ZONE_TYPE, "zone0", computeVirtualFaultZoneTypeKey(FAULT_ZONE_TYPE), "virtual_group_0")}, {computeZkPath("instance_2"), _instanceConfig2, - ImmutableMap.of("helixZoneId", "zone1", VIRTUAL_FAULT_ZONE_TYPE, "virtual_group_1")} + ImmutableMap.of(FAULT_ZONE_TYPE, "zone1", computeVirtualFaultZoneTypeKey(FAULT_ZONE_TYPE), "virtual_group_1")} }; } @@ -163,8 +183,9 @@ public void testVirtualTopologyString() { ClusterConfig testConfig = new ClusterConfig("testId"); testConfig.setTopologyAwareEnabled(true); testConfig.setTopology("/zone/instance"); + testConfig.setFaultZoneType("zone"); Assert.assertEquals(VirtualTopologyGroupService.computeVirtualTopologyString(testConfig), - "/virtualZone/instance"); + "/zone_virtualZone/instance"); } private static ClusterTopology prepareClusterTopology() {