From d3b6c68043233bc6224c791d876fabdcc5f48495 Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Mon, 9 Sep 2024 23:42:20 -0700 Subject: [PATCH] participant update target state --- .../HelixGatewayServiceGrpcService.java | 7 +- .../participant/HelixGatewayParticipant.java | 13 +- .../service/GatewayServiceManager.java | 23 ++-- .../util/GatewayCurrentStateCache.java | 111 ++++++++---------- .../StateTransitionMessageTranslateUtil.java | 6 +- .../TestHelixGatewayParticipant.java | 73 ++++++++++-- .../util/TestGatewayCurrentStateCache.java | 33 +++--- 7 files changed, 150 insertions(+), 116 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java index c97994a9fe..7d69da460b 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java @@ -25,10 +25,9 @@ import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; @@ -52,10 +51,10 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli private static final Logger logger = LoggerFactory.getLogger(HelixGatewayServiceGrpcService.class); // Map to store the observer for each instance - private final Map> _observerMap = new ConcurrentHashMap<>(); + private final Map> _observerMap = new HashMap<>(); // A reverse map to store the instance name for each observer. It is used to find the instance when connection is closed. // map> - private final Map, Pair> _reversedObserverMap = new ConcurrentHashMap<>(); + private final Map, Pair> _reversedObserverMap = new HashMap<>(); private final GatewayServiceManager _manager; diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java index 723712cb77..8dd04644b4 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java @@ -47,8 +47,6 @@ public class HelixGatewayParticipant implements HelixManagerStateListener { private final HelixGatewayServiceChannel _gatewayServiceChannel; private final HelixManager _helixManager; private final Runnable _onDisconnectedCallback; - private final Map> _shardStateMap; - private final Map> _stateTransitionResultMap; private final GatewayServiceManager _gatewayServiceManager; @@ -59,7 +57,6 @@ private HelixGatewayParticipant(HelixGatewayServiceChannel gatewayServiceChannel _gatewayServiceChannel = gatewayServiceChannel; _helixManager = helixManager; _onDisconnectedCallback = onDisconnectedCallback; - _shardStateMap = initialShardStateMap; _stateTransitionResultMap = new ConcurrentHashMap<>(); _gatewayServiceManager = gatewayServiceManager; } @@ -71,16 +68,14 @@ public void processStateTransitionMessage(Message message) throws Exception { String concatenatedShardName = resourceId + shardId; try { - if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { - return; - } - - CompletableFuture future = new CompletableFuture<>(); - // update the target state in cache _gatewayServiceManager.updateTargetState(_helixManager.getClusterName(), _helixManager.getInstanceName(), resourceId, shardId, toState); + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { + return; + } + CompletableFuture future = new CompletableFuture<>(); _stateTransitionResultMap.put(concatenatedShardName, future); _gatewayServiceChannel.sendStateChangeRequests(_helixManager.getInstanceName(), StateTransitionMessageTranslateUtil.translateSTMsgToShardChangeRequests(message)); diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index 2ce271c888..f4faa4ea20 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -90,18 +90,10 @@ public void onGatewayServiceEvent(GatewayServiceEvent event) { } } - private GatewayCurrentStateCache getCache(String clusterName) { - return _currentStateCacheMap.computeIfAbsent(clusterName, k -> new GatewayCurrentStateCache(clusterName)); - } - public void resetTargetStateCache(String clusterName, String instanceName) { getCache(clusterName).resetTargetStateCache(instanceName); } - public void addInstanceToCache(String clusterName, String instanceName) { - getCache(clusterName).addInstanceToCache(instanceName); - } - /** * Overwrite the current state cache with the new current state map, and return the diff of the change. * @param clusterName @@ -127,14 +119,19 @@ public String serializeTargetState() { return targetStateNode.toString(); } - public void updateTargetState(String clusterName, String instanceName, String resourceId, String shardId, String toState) { - getCache(clusterName).updateTargetStateWithDiff(instanceName, Map.of(resourceId, Map.of(shardId, toState))); + public void updateTargetState(String clusterName, String instanceName, String resourceId, String shardId, + String toState) { + getCache(clusterName).updateTargetStateOfExistingInstance(instanceName, resourceId, shardId, toState); } public String getCurrentState(String clusterName, String instanceName, String resourceId, String shardId) { return getCache(clusterName).getCurrentState(instanceName, resourceId, shardId); } + public String getTargetState(String clusterName, String instanceName, String resourceId, String shardId) { + return getCache(clusterName).getTargetState(instanceName, resourceId, shardId); + } + /** * Update in memory shard state */ @@ -214,7 +211,7 @@ private void removeHelixGatewayParticipant(String clusterName, String instanceNa participant.disconnect(); _helixGatewayParticipantMap.get(clusterName).remove(instanceName); } - _currentStateCacheMap.get(clusterName).removeInstanceFromCache(instanceName); + _currentStateCacheMap.get(clusterName).removeInstanceTargetDataFromCache(instanceName); } private HelixGatewayParticipant getHelixGatewayParticipant(String clusterName, @@ -222,4 +219,8 @@ private HelixGatewayParticipant getHelixGatewayParticipant(String clusterName, return _helixGatewayParticipantMap.getOrDefault(clusterName, Collections.emptyMap()) .get(instanceName); } + + private GatewayCurrentStateCache getCache(String clusterName) { + return _currentStateCacheMap.computeIfAbsent(clusterName, k -> new GatewayCurrentStateCache(clusterName)); + } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java index 71a2576213..c3ace0d54a 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +38,7 @@ public class GatewayCurrentStateCache { // A cache of current state. It should be updated by the HelixGatewayServiceChannel // instance -> resource state (resource -> shard -> target state) - final Map _currentStateMap; + Map _currentStateMap; // A cache of target state. // instance -> resource state (resource -> shard -> target state) @@ -47,8 +46,8 @@ public class GatewayCurrentStateCache { public GatewayCurrentStateCache(String clusterName) { _clusterName = clusterName; - _currentStateMap = new ConcurrentHashMap<>(); - _targetStateMap = new ConcurrentHashMap<>(); + _currentStateMap = new HashMap<>(); + _targetStateMap = new HashMap<>(); } public String getCurrentState(String instance, String resource, String shard) { @@ -63,25 +62,24 @@ public String getTargetState(String instance, String resource, String shard) { /** * Update the cached current state of instances in a cluster, and return the diff of the change. - * @param newCurrentStateMap The new current state map of instances in the cluster + * @param userCurrentStateMap The new current state map of instances in the cluster * @return */ public Map>> updateCacheWithNewCurrentStateAndGetDiff( - Map>> newCurrentStateMap) { + Map>> userCurrentStateMap) { + Map newCurrentStateMap = new HashMap<>(_currentStateMap); Map>> diff = new HashMap<>(); - for (String instance : newCurrentStateMap.keySet()) { - if (!_currentStateMap.containsKey(instance)) { - logger.warn("Instance {} is not in the state map, skip updating", instance); - continue; - } - Map> newCurrentState = newCurrentStateMap.get(instance); - Map> resourceStateDiff = - _currentStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())) - .updateAndGetDiff(newCurrentState); - if (resourceStateDiff != null && !resourceStateDiff.isEmpty()) { - diff.put(instance, resourceStateDiff); + for (String instance : userCurrentStateMap.keySet()) { + ShardStateMap oldStateMap = _currentStateMap.get(instance); + Map> instanceDiff = oldStateMap == null ? userCurrentStateMap.get(instance) + : oldStateMap.getDiff(userCurrentStateMap.get(instance)); + if (!instanceDiff.isEmpty()) { + diff.put(instance, instanceDiff); } + newCurrentStateMap.put(instance, new ShardStateMap(userCurrentStateMap.get(instance))); } + logger.info("Update current state cache for instances: {}", diff.keySet()); + _currentStateMap = newCurrentStateMap; return diff; } @@ -89,23 +87,36 @@ public Map>> updateCacheWithNewCurrentSt * Update the current state with the changed current state maps. */ public void updateCurrentStateOfExistingInstance(String instance, String resource, String shard, String shardState) { - updateShardStateMapWithDiff(_currentStateMap, instance, Map.of(resource, Map.of(shard, shardState))); + logger.info("Update current state of instance: {}, resource: {}, shard: {}, state: {}", instance, resource, shard, + shardState); + updateShardStateMapWithDiff(_currentStateMap, instance, resource, shard, shardState); } /** * Update the target state with the changed target state maps. * All existing target states remains the same - * @param diff */ - public void updateTargetStateWithDiff(String instance, Map> diff) { - updateShardStateMapWithDiff(_targetStateMap, instance, diff); + public void updateTargetStateOfExistingInstance(String instance, String resource, String shard, String shardState) { + logger.info("Update target state of instance: {}, resource: {}, shard: {}, state: {}", instance, resource, shard, + shardState); + updateShardStateMapWithDiff(_targetStateMap, instance, resource, shard, shardState); + } + + private void updateShardStateMapWithDiff(Map stateMap, String instance, + String resource, String shard, String shardState) { + ShardStateMap curStateMap = stateMap.get(instance); + if (curStateMap == null) { + logger.warn("Instance {} is not in the state map, skip updating", instance); + return; + } + curStateMap.updateWithShardState(resource, shard, shardState); } /** * Serialize the target state assignments to a JSON Node. * example : {"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}} */ - public ObjectNode serializeTargetAssignmentsToJSONNode() { + public synchronized ObjectNode serializeTargetAssignmentsToJSONNode() { ObjectNode root = mapper.createObjectNode(); for (Map.Entry entry : _targetStateMap.entrySet()) { root.set(entry.getKey(), entry.getValue().toJSONNode()); @@ -113,61 +124,42 @@ public ObjectNode serializeTargetAssignmentsToJSONNode() { return root; } - public void removeInstanceFromCache(String instance) { - _currentStateMap.remove(instance); + /** + * Remove the target state data of an instance from the cache. + */ + public synchronized void removeInstanceTargetDataFromCache(String instance) { + logger.info("Remove instance target data from cache for instance: {}", instance); _targetStateMap.remove(instance); } - public void addInstanceToCache(String instance) { - _currentStateMap.put(instance, new ShardStateMap(new HashMap<>())); - _targetStateMap.put(instance, new ShardStateMap(new HashMap<>())); - } - - private void updateShardStateMapWithDiff(Map stateMap, String instance, - Map> diffMap) { - if (diffMap == null || diffMap.isEmpty()) { - return; - } - if (!stateMap.containsKey(instance)) { - logger.warn("Instance {} is not in the state map, skip updating", instance); - } - stateMap.get(instance).updateWithDiff(diffMap); - } - - public void resetTargetStateCache(String instance) { + /** + * Remove the current state data of an instance from the cache to an empty map. + */ + public synchronized void resetTargetStateCache(String instance) { + logger.info("Reset target state cache for instance: {}", instance); _targetStateMap.put(instance, new ShardStateMap(new HashMap<>())); } public static class ShardStateMap { Map> _stateMap; + final Object _lock = new Object(); public ShardStateMap(Map> stateMap) { _stateMap = new ConcurrentHashMap<>(stateMap); } - public String getState(String instance, String shard) { - Map shardStateMap = _stateMap.get(instance); + public String getState(String resource, String shard) { + Map shardStateMap = _stateMap.get(resource); return shardStateMap == null ? null : shardStateMap.get(shard); } - - private void updateWithDiff(Map> diffMap) { - for (Map.Entry> diffEntry : diffMap.entrySet()) { - String resource = diffEntry.getKey(); - Map diffCurrentState = diffEntry.getValue(); - if (_stateMap.get(resource) != null) { - _stateMap.get(resource).entrySet().forEach(currentMapEntry -> { - String shard = currentMapEntry.getKey(); - if (diffCurrentState.get(shard) != null) { - currentMapEntry.setValue(diffCurrentState.get(shard)); - } - }); - } else { - _stateMap.put(resource, diffCurrentState); - } + public void updateWithShardState(String resource, String shard, String shardState) { + logger.info("Update ShardStateMap of resource: {}, shard: {}, state: {}", resource, shard, shardState); + synchronized (_lock) { + _stateMap.computeIfAbsent(resource, k -> new HashMap<>()).put(shard, shardState); } } - private Map> updateAndGetDiff(Map> newCurrentStateMap) { + private Map> getDiff(Map> newCurrentStateMap) { Map> diff = new HashMap<>(); for (Map.Entry> entry : newCurrentStateMap.entrySet()) { String resource = entry.getKey(); @@ -185,7 +177,6 @@ private Map> updateAndGetDiff(Map(newCurrentStateMap); return diff; } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java index 01fbf59a31..a2d07085bb 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java @@ -92,9 +92,11 @@ public static GatewayServiceEvent translateShardStateMessageToEventAndUpdateCach .put(state.getShardName(), state.getCurrentState()); } } - manager.addInstanceToCache(shardState.getClusterName(), shardState.getInstanceName()); // update current state cache. We always overwrite the current state map for initial connection - manager.updateCacheWithNewCurrentStateAndGetDiff(shardState.getClusterName(), Map.of(shardState.getInstanceName(), shardStateMap)); + Map>> newShardStateMap = new HashMap<>(); + newShardStateMap.put(shardState.getInstanceName(), shardStateMap); + manager.updateCacheWithNewCurrentStateAndGetDiff(shardState.getClusterName(), newShardStateMap); + builder = new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName( shardState.getClusterName()).setParticipantName(shardState.getInstanceName()) .setShardStateMap(shardStateMap); diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java index 60cdac3616..128a228576 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -64,7 +65,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase { private final Map _pendingMessageMap = new ConcurrentHashMap<>(); private final AtomicInteger _onDisconnectCallbackCount = new AtomicInteger(); - private GatewayServiceManager gatewayServiceManager; + private GatewayServiceManager _gatewayServiceManager; @BeforeClass public void beforeClass() { @@ -73,7 +74,7 @@ public void beforeClass() { builder.setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.GRPC_SERVER).setGrpcServerPort(5001); GatewayServiceChannelConfig config = builder.build(); - gatewayServiceManager = new GatewayServiceManager(ZK_ADDR, config); + _gatewayServiceManager = new GatewayServiceManager(ZK_ADDR, config); // Set up the Helix cluster ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); @@ -111,11 +112,11 @@ public void afterClass() { */ private HelixGatewayParticipant addParticipant(String participantName, Map> initialShardMap) { + _gatewayServiceManager.resetTargetStateCache(CLUSTER_NAME, participantName); HelixGatewayParticipant participant = new HelixGatewayParticipant.Builder(new MockHelixGatewayServiceChannel(_pendingMessageMap), participantName, - CLUSTER_NAME, ZK_ADDR, _onDisconnectCallbackCount::incrementAndGet, gatewayServiceManager).addMultiTopStateStateModelDefinition( + CLUSTER_NAME, ZK_ADDR, _onDisconnectCallbackCount::incrementAndGet, _gatewayServiceManager).addMultiTopStateStateModelDefinition( TEST_STATE_MODEL).setInitialShardState(initialShardMap).build(); - gatewayServiceManager.addInstanceToCache(CLUSTER_NAME, participantName); _participants.add(participant); return participant; } @@ -184,15 +185,31 @@ private ShardChangeRequests getPendingMessage(String instanceName) { /** * Process the pending message for a participant. */ - private void processPendingMessage(HelixGatewayParticipant participant, - boolean isSuccess, String toState) { + private void processPendingMessage(HelixGatewayParticipant participant, boolean isSuccess, String toState) { ShardChangeRequests requests = _pendingMessageMap.remove(participant.getInstanceName()); - gatewayServiceManager.updateCurrentState( CLUSTER_NAME, participant.getInstanceName(), requests.getRequest(0).getResourceName(), - requests.getRequest(0).getShardName(), isSuccess ? toState : "ERROR"); + + Map>> newSInstanceStateMap = new HashMap<>(); + newSInstanceStateMap.put(participant.getInstanceName(), + createSingleShardStateMap(requests.getRequest(0).getResourceName(), requests.getRequest(0).getShardName(), + isSuccess ? toState : "ERROR")); + _gatewayServiceManager.updateCacheWithNewCurrentStateAndGetDiff(CLUSTER_NAME, newSInstanceStateMap); + participant.completeStateTransition(requests.getRequest(0).getResourceName(), requests.getRequest(0).getShardName(), isSuccess ? toState : "ERROR"); } + /** + * Create a single shard state map. + */ + Map> createSingleShardStateMap( String resource, String shard, String state) { + + Map> resourceStateMap = new HashMap<>(); + Map shardStateMap = new HashMap<>(); + shardStateMap.put(shard, state); + resourceStateMap.put(resource, shardStateMap); + return resourceStateMap; + } + /** * Get the current state of a Helix shard. */ @@ -223,7 +240,9 @@ private void verifyGatewayStateMatchesHelixState() throws Exception { .getResourceIdealState(CLUSTER_NAME, resourceName) .getPartitionSet()) { String helixCurrentState = getHelixCurrentState(instanceName, resourceName, shardId); - if (!participant.getCurrentState(resourceName, shardId).equals(helixCurrentState)) { + if (!participant.getCurrentState(resourceName, shardId).equals(helixCurrentState) && !( + participant.getCurrentState(resourceName, shardId).equals("DROPPED") && helixCurrentState.equals( + "UNASSIGNED"))) { return false; } } @@ -232,6 +251,28 @@ private void verifyGatewayStateMatchesHelixState() throws Exception { }), TestHelper.WAIT_DURATION)); } + private void verifyGatewayTargetStateMatchHelixTargetState() throws Exception { + Assert.assertTrue(TestHelper.verify(() -> _participants.stream().allMatch(participant -> { + String instanceName = participant.getInstanceName(); + for (String resourceName : _gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)) { + for (String shardId : _gSetupTool.getClusterManagementTool() + .getResourceIdealState(CLUSTER_NAME, resourceName) + .getPartitionSet()) { + String helixTargetState = getHelixCurrentState(instanceName, resourceName, shardId); + if (_gatewayServiceManager.getTargetState(CLUSTER_NAME, instanceName, resourceName, shardId) == null) { + System.out.println("Gateway target state is null for instance: " + instanceName + ", resource: " + resourceName + ", shard: " + shardId); + } + if (!participant.getCurrentState(resourceName, shardId).equals(helixTargetState) && !( + participant.getCurrentState(resourceName, shardId).equals("DROPPED") && helixTargetState.equals( + "UNASSIGNED"))) { + return false; + } + } + } + return true; + }), 6000L)); + } + /** * Verify that all shards for a given instance are in a specific state. */ @@ -270,6 +311,7 @@ public void testProcessStateTransitionMessageSuccess() throws Exception { // Verify that the cluster converges and all states are "ONLINE" Assert.assertTrue(_clusterVerifier.verify()); verifyGatewayStateMatchesHelixState(); + verifyGatewayTargetStateMatchHelixTargetState(); } @Test(dependsOnMethods = "testProcessStateTransitionMessageSuccess") @@ -307,7 +349,9 @@ public void testProcessStateTransitionAfterReconnect() throws Exception { verifyHelixPartitionStates(participant.getInstanceName(), HelixGatewayParticipant.UNASSIGNED_STATE); // Re-add the participant with its initial state - addParticipant(participant.getInstanceName(), Map.of()); + addParticipant(participant.getInstanceName(), createSingleShardStateMap(TEST_DB, "TestDB_0", + _gatewayServiceManager.getCurrentState(CLUSTER_NAME, participant.getInstanceName(), TEST_DB, + "TestDB_0"))); Assert.assertTrue(_clusterVerifier.verify()); // Verify the Helix state is "ONLINE" @@ -323,8 +367,10 @@ public void testProcessStateTransitionAfterReconnectAfterDroppingPartition() thr // Remove shard preference and re-add the participant removeFromPreferenceList(participant); - /*HelixGatewayParticipant participantReplacement = - addParticipant(participant.getInstanceName(), participant.getShardStateMap()); + HelixGatewayParticipant participantReplacement = addParticipant(participant.getInstanceName(), + createSingleShardStateMap(TEST_DB, "TestDB_0", + _gatewayServiceManager.getCurrentState(CLUSTER_NAME, participant.getInstanceName(), TEST_DB, + "TestDB_0"))); verifyPendingMessages(List.of(participantReplacement)); // Process the pending message successfully @@ -332,7 +378,8 @@ public void testProcessStateTransitionAfterReconnectAfterDroppingPartition() thr // Verify that the cluster converges and states are correctly updated to "ONLINE" Assert.assertTrue(_clusterVerifier.verify()); - verifyGatewayStateMatchesHelixState();*/ + verifyGatewayStateMatchesHelixState(); + verifyGatewayTargetStateMatchHelixTargetState(); } @Test(dependsOnMethods = "testProcessStateTransitionAfterReconnectAfterDroppingPartition") diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/util/TestGatewayCurrentStateCache.java b/helix-gateway/src/test/java/org/apache/helix/gateway/util/TestGatewayCurrentStateCache.java index bc05399687..17aa25637b 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/util/TestGatewayCurrentStateCache.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/util/TestGatewayCurrentStateCache.java @@ -51,29 +51,28 @@ public void testUpdateCacheWithNewCurrentStateAndGetDiff() { } @Test - public void testUpdateCacheWithCurrentStateDiff() { + public void testUpdateCacheWithExistingStateAndGetDiff() { + // Initial state + Map>> initialState = new HashMap<>(); Map> instanceState = new HashMap<>(); Map shardState = new HashMap<>(); - shardState.put("shard2", "ONLINE"); shardState.put("shard1", "ONLINE"); instanceState.put("resource1", shardState); + initialState.put("instance1", instanceState); + cache.updateCacheWithNewCurrentStateAndGetDiff(initialState); - cache.updateCurrentStateOfExistingInstance("instance1", "resource1", "shard1", "ONLINE"); - - Assert.assertEquals(cache.getCurrentState("instance1", "resource1", "shard1"), "ONLINE"); - Assert.assertEquals(cache.getCurrentState("instance1", "resource1", "shard2"), "ONLINE"); - } - - @Test - public void testUpdateTargetStateWithDiff() { - Map> targetStateChange = new HashMap<>(); - Map shardState = new HashMap<>(); - shardState.put("shard1", "OFFLINE"); - targetStateChange.put("resource1", shardState); + // New state with a change + Map>> newState = new HashMap<>(); + Map> newInstanceState = new HashMap<>(); + Map newShardState = new HashMap<>(); + newShardState.put("shard1", "OFFLINE"); + newInstanceState.put("resource1", newShardState); + newState.put("instance1", newInstanceState); - cache.updateTargetStateWithDiff("instance1", targetStateChange); + Map>> diff = cache.updateCacheWithNewCurrentStateAndGetDiff(newState); - Assert.assertEquals(cache.getTargetState("instance1", "resource1", "shard1"), "OFFLINE"); - Assert.assertEquals(cache.serializeTargetAssignmentsToJSONNode().toString(), "{\"instance1\":{\"resource1\":{\"shard1\":\"OFFLINE\"}}}"); + Assert.assertNotNull(diff); + Assert.assertEquals(diff.size(), 1); + Assert.assertEquals(diff.get("instance1").get("resource1").get("shard1"), "OFFLINE"); } }