From 418e57f275281f1b3b35bb97f0fbddf5abba77c4 Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Wed, 28 Aug 2024 22:35:02 -0700 Subject: [PATCH 1/5] add test --- .../service/GatewayServiceManager.java | 9 ++ .../util/GatewayCurrentStateCache.java | 144 ++++++++++++++++++ .../utils/TestGatewayCurrentStateCache.java | 89 +++++++++++ 3 files changed, 242 insertions(+) create mode 100644 helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java create mode 100644 helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index cceabc8874..0835a90485 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -26,11 +26,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.helix.common.caches.CurrentStateCache; import org.apache.helix.gateway.api.constant.GatewayServiceEventType; import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory; import org.apache.helix.gateway.participant.HelixGatewayParticipant; +import org.apache.helix.gateway.util.GatewayCurrentStateCache; import org.apache.helix.gateway.util.PerKeyBlockingExecutor; @@ -60,6 +62,8 @@ public class GatewayServiceManager { private final GatewayServiceChannelConfig _gatewayServiceChannelConfig; + private final Map _currentStateCacheMap; + public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatewayServiceChannelConfig) { _helixGatewayParticipantMap = new ConcurrentHashMap<>(); _zkAddress = zkAddress; @@ -68,6 +72,7 @@ public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatew _connectionEventProcessor = new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable _gatewayServiceChannelConfig = gatewayServiceChannelConfig; + _currentStateCacheMap = new ConcurrentHashMap<>(); } /** @@ -143,6 +148,10 @@ public void stopService() { _helixGatewayParticipantMap.clear(); } + public GatewayCurrentStateCache getCurrentStateCache(String clusterName) { + return _currentStateCacheMap.computeIfAbsent(clusterName, k -> new GatewayCurrentStateCache(clusterName)); + } + private void createHelixGatewayParticipant(String clusterName, String instanceName, Map> initialShardStateMap) { // Create and add the participant to the participant map diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java new file mode 100644 index 0000000000..df7744aae9 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java @@ -0,0 +1,144 @@ +package org.apache.helix.gateway.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +public class GatewayCurrentStateCache { + String _clusterName; + + // A cache of current state. It should be updated by the HelixGatewayServiceChannel + // instance -> instances assignments + Map> _currentStateMap; + + // A cache of target state. + // instance -> assignments + Map> _targetStateMap; + + boolean _targetStateChanged = false; + + public GatewayCurrentStateCache(String clusterName) { + _clusterName = clusterName; + _currentStateMap = new ConcurrentHashMap<>(); + _targetStateMap = new ConcurrentHashMap<>(); + } + + public String getCurrentState(String instance, String shard) { + return _currentStateMap.get(instance).get(shard); + } + + /** + * Update the cached current state of instances in a cluster, and return the diff of the change. + * @param newCurrentStateMap The new current state map of instances in the cluster + * @return + */ + public Map> updateCacheWithNewCurrentStateAndGetDiff( + Map> newCurrentStateMap) { + Map> diff = null; + for (Map.Entry> entry : newCurrentStateMap.entrySet()) { + String instance = entry.getKey(); + Map newCurrentState = entry.getValue(); + Map oldCurrentState = _currentStateMap.get(instance); + if (oldCurrentState == null || !oldCurrentState.equals(newCurrentState)) { + if (diff == null) { + diff = new HashMap<>(); + } + if (oldCurrentState == null) { + diff.put(instance, newCurrentState); + continue; + } + for (String shard : newCurrentState.keySet()) { + if (oldCurrentState.get(shard) == null || !oldCurrentState.get(shard).equals(newCurrentState.get(shard))) { + diff.computeIfAbsent(instance, k -> new HashMap<>()).put(shard, newCurrentState.get(shard)); + } + } + } + } + _currentStateMap = newCurrentStateMap; + return diff; + } + + public void updateCacheWithCurrentStateDiff(Map> currentStateDiff) { + updateShardStateMapWithDiff(currentStateDiff, _currentStateMap); + } + + /** + * Udate the target state with the changed target state maps. + * All existing target states remains the same + * @param targetStateChangeMap + */ + public void updateTargetStateWithDiff(Map> targetStateChangeMap) { + _targetStateChanged = updateShardStateMapWithDiff(targetStateChangeMap, _targetStateMap); + } + + public boolean isTargetStateChanged() { + return _targetStateChanged; + } + + public void resetTargetStateChanged() { + _targetStateChanged = false; + } + + public Map> getTargetStateMap() { + return _targetStateMap; + } + + public String serializeTargetAssignments() { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode root = mapper.createObjectNode(); + for (Map.Entry> entry : _targetStateMap.entrySet()) { + String instance = entry.getKey(); + Map assignments = entry.getValue(); + ObjectNode instanceNode = mapper.createObjectNode(); + for (Map.Entry assignment : assignments.entrySet()) { + instanceNode.put(assignment.getKey(), assignment.getValue()); + } + root.set(instance, instanceNode); + } + return root.toString(); + } + + private boolean updateShardStateMapWithDiff(Map> diffMap, + Map> currentMap) { + if (diffMap == null || diffMap.isEmpty()) { + return false; + } + for (Map.Entry> entry : diffMap.entrySet()) { + String instance = entry.getKey(); + Map currentState = entry.getValue(); + if (currentMap.get(instance) == null) { + currentMap.put(instance, currentState); + } else { + currentMap.get(instance).entrySet().stream().forEach(e -> { + if (currentState.get(e.getKey()) != null) { + e.setValue(currentState.get(e.getKey())); + } + }); + } + } + return true; + } +} + diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java new file mode 100644 index 0000000000..3753095c5a --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java @@ -0,0 +1,89 @@ +package org.apache.helix.gateway.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.Map; +import org.apache.helix.gateway.util.GatewayCurrentStateCache; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class TestGatewayCurrentStateCache { + private GatewayCurrentStateCache cache = new GatewayCurrentStateCache("TestCluster");; + + @Test + public void testUpdateCacheWithNewCurrentStateAndGetDiff() { + Map> newState = new HashMap<>(); + Map instanceState = new HashMap<>(); + instanceState.put("shard1", "ONLINE"); + instanceState.put("shard2", "OFFLINE"); + newState.put("instance1", instanceState); + + Map> diff = cache.updateCacheWithNewCurrentStateAndGetDiff(newState); + + Assert.assertNotNull(diff); + Assert.assertEquals(diff.size(), 1); + Assert.assertEquals(diff.get("instance1").size(), 2); + Assert.assertEquals(diff.get("instance1").get("shard1"), "ONLINE"); + Assert.assertEquals(diff.get("instance1").get("shard2"), "OFFLINE"); + Assert.assertEquals(cache.getCurrentState("instance1", "shard1"), "ONLINE"); + Assert.assertEquals(cache.getCurrentState("instance1", "shard2"), "OFFLINE"); + } + + @Test(dependsOnMethods = "testUpdateCacheWithNewCurrentStateAndGetDiff") + public void testUpdateCacheWithCurrentStateDiff() { + Map> diff = new HashMap<>(); + Map instanceState = new HashMap<>(); + instanceState.put("shard1", "OFFLINE"); + diff.put("instance1", instanceState); + + cache.updateCacheWithCurrentStateDiff(diff); + Assert.assertEquals(cache.getCurrentState("instance1", "shard1"), "OFFLINE"); + Assert.assertEquals(cache.getCurrentState("instance1", "shard2"), "OFFLINE"); + } + + @Test(dependsOnMethods = "testUpdateCacheWithCurrentStateDiff") + public void testUpdateTargetStateWithDiff() { + Map> targetStateChange = new HashMap<>(); + Map instanceState = new HashMap<>(); + instanceState.put("shard1", "ONLINE"); + targetStateChange.put("instance1", instanceState); + + cache.updateTargetStateWithDiff(targetStateChange); + + Assert.assertTrue(cache.isTargetStateChanged()); + Assert.assertEquals(cache.getTargetStateMap().get("instance1").get("shard1"), "ONLINE"); + } + + @Test(dependsOnMethods = "testUpdateTargetStateWithDiff") + public void testSerializeTargetAssignments() { + Map> targetState = new HashMap<>(); + Map instanceState = new HashMap<>(); + instanceState.put("shard1", "OFFLINE"); + targetState.put("instance1", instanceState); + + cache.updateTargetStateWithDiff(targetState); + + String serialized = cache.serializeTargetAssignments(); + Assert.assertTrue(serialized.contains("\"instance1\":{\"shard1\":\"OFFLINE\"}")); + } +} From 45312d1313575153a2b76fe1a9206dde32bc9e6a Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Thu, 29 Aug 2024 20:44:22 -0700 Subject: [PATCH 2/5] comments --- .../service/GatewayServiceManager.java | 3 +- .../util/GatewayCurrentStateCache.java | 60 +++++++++---------- .../utils/TestGatewayCurrentStateCache.java | 4 +- 3 files changed, 33 insertions(+), 34 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index 0835a90485..d64d1c6617 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -72,7 +73,7 @@ public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatew _connectionEventProcessor = new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable _gatewayServiceChannelConfig = gatewayServiceChannelConfig; - _currentStateCacheMap = new ConcurrentHashMap<>(); + _currentStateCacheMap = new HashMap<>(); } /** diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java index df7744aae9..85b4bc3e08 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; public class GatewayCurrentStateCache { @@ -38,11 +37,13 @@ public class GatewayCurrentStateCache { Map> _targetStateMap; boolean _targetStateChanged = false; + final Object _targetStateLock; public GatewayCurrentStateCache(String clusterName) { _clusterName = clusterName; - _currentStateMap = new ConcurrentHashMap<>(); - _targetStateMap = new ConcurrentHashMap<>(); + _currentStateMap = new HashMap<>(); + _targetStateMap = new HashMap<>(); + _targetStateLock = new Object(); } public String getCurrentState(String instance, String shard) { @@ -56,19 +57,16 @@ public String getCurrentState(String instance, String shard) { */ public Map> updateCacheWithNewCurrentStateAndGetDiff( Map> newCurrentStateMap) { - Map> diff = null; + Map> diff = new HashMap<>(); for (Map.Entry> entry : newCurrentStateMap.entrySet()) { String instance = entry.getKey(); Map newCurrentState = entry.getValue(); Map oldCurrentState = _currentStateMap.get(instance); - if (oldCurrentState == null || !oldCurrentState.equals(newCurrentState)) { - if (diff == null) { - diff = new HashMap<>(); - } - if (oldCurrentState == null) { - diff.put(instance, newCurrentState); - continue; - } + if (oldCurrentState == null) { + diff.put(instance, newCurrentState); + continue; + } + if (!oldCurrentState.equals(newCurrentState)) { for (String shard : newCurrentState.keySet()) { if (oldCurrentState.get(shard) == null || !oldCurrentState.get(shard).equals(newCurrentState.get(shard))) { diff.computeIfAbsent(instance, k -> new HashMap<>()).put(shard, newCurrentState.get(shard)); @@ -90,32 +88,34 @@ public void updateCacheWithCurrentStateDiff(Map> cur * @param targetStateChangeMap */ public void updateTargetStateWithDiff(Map> targetStateChangeMap) { - _targetStateChanged = updateShardStateMapWithDiff(targetStateChangeMap, _targetStateMap); - } - - public boolean isTargetStateChanged() { - return _targetStateChanged; - } - - public void resetTargetStateChanged() { - _targetStateChanged = false; + synchronized (_targetStateLock) { + _targetStateChanged = updateShardStateMapWithDiff(targetStateChangeMap, _targetStateMap); + } } - public Map> getTargetStateMap() { - return _targetStateMap; + public Map> getTargetStateMapIfChanged() { + synchronized (_targetStateLock) { + if (_targetStateChanged) { + _targetStateChanged = false; + return _targetStateMap; + } + return null; + } } public String serializeTargetAssignments() { ObjectMapper mapper = new ObjectMapper(); ObjectNode root = mapper.createObjectNode(); - for (Map.Entry> entry : _targetStateMap.entrySet()) { - String instance = entry.getKey(); - Map assignments = entry.getValue(); - ObjectNode instanceNode = mapper.createObjectNode(); - for (Map.Entry assignment : assignments.entrySet()) { - instanceNode.put(assignment.getKey(), assignment.getValue()); + synchronized (_targetStateLock) { + for (Map.Entry> entry : _targetStateMap.entrySet()) { + String instance = entry.getKey(); + Map assignments = entry.getValue(); + ObjectNode instanceNode = mapper.createObjectNode(); + for (Map.Entry assignment : assignments.entrySet()) { + instanceNode.put(assignment.getKey(), assignment.getValue()); + } + root.set(instance, instanceNode); } - root.set(instance, instanceNode); } return root.toString(); } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java index 3753095c5a..bb54eed56a 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.helix.gateway.util.GatewayCurrentStateCache; import org.testng.Assert; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -70,8 +69,7 @@ public void testUpdateTargetStateWithDiff() { cache.updateTargetStateWithDiff(targetStateChange); - Assert.assertTrue(cache.isTargetStateChanged()); - Assert.assertEquals(cache.getTargetStateMap().get("instance1").get("shard1"), "ONLINE"); + Assert.assertEquals(cache.getTargetStateMapIfChanged().get("instance1").get("shard1"), "ONLINE"); } @Test(dependsOnMethods = "testUpdateTargetStateWithDiff") From 6ddbf7dbb52c1a722fcc0f99073904328db5170a Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Sat, 31 Aug 2024 22:35:28 -0700 Subject: [PATCH 3/5] naming --- .../util/GatewayCurrentStateCache.java | 166 ++++++++++++------ .../utils/TestGatewayCurrentStateCache.java | 54 ------ 2 files changed, 109 insertions(+), 111 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java index 85b4bc3e08..3cd32a404a 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java @@ -25,16 +25,20 @@ import java.util.Map; +/** + * A cache to store the current target assignment, and the reported current state of the instances in a cluster. + */ public class GatewayCurrentStateCache { + static ObjectMapper mapper = new ObjectMapper(); String _clusterName; // A cache of current state. It should be updated by the HelixGatewayServiceChannel - // instance -> instances assignments - Map> _currentStateMap; + // instance -> resource state (resource -> shard -> target state) + Map _currentStateMap; // A cache of target state. - // instance -> assignments - Map> _targetStateMap; + // instance -> resource state (resource -> shard -> target state) + Map _targetStateMap; boolean _targetStateChanged = false; final Object _targetStateLock; @@ -46,8 +50,8 @@ public GatewayCurrentStateCache(String clusterName) { _targetStateLock = new Object(); } - public String getCurrentState(String instance, String shard) { - return _currentStateMap.get(instance).get(shard); + public String getCurrentState(String instance, String resource, String shard) { + return _currentStateMap.get(instance).getState(resource, shard); } /** @@ -55,45 +59,46 @@ public String getCurrentState(String instance, String shard) { * @param newCurrentStateMap The new current state map of instances in the cluster * @return */ - public Map> updateCacheWithNewCurrentStateAndGetDiff( - Map> newCurrentStateMap) { - Map> diff = new HashMap<>(); - for (Map.Entry> entry : newCurrentStateMap.entrySet()) { - String instance = entry.getKey(); - Map newCurrentState = entry.getValue(); - Map oldCurrentState = _currentStateMap.get(instance); - if (oldCurrentState == null) { - diff.put(instance, newCurrentState); - continue; - } - if (!oldCurrentState.equals(newCurrentState)) { - for (String shard : newCurrentState.keySet()) { - if (oldCurrentState.get(shard) == null || !oldCurrentState.get(shard).equals(newCurrentState.get(shard))) { - diff.computeIfAbsent(instance, k -> new HashMap<>()).put(shard, newCurrentState.get(shard)); - } - } - } + public Map>> updateCacheWithNewCurrentStateAndGetDiff( + Map>> newCurrentStateMap) { + Map>> diff = new HashMap<>(); + for(String instance : newCurrentStateMap.keySet()) { + Map> newCurrentState = newCurrentStateMap.get(instance); + diff.put(instance, _currentStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())) + .updateMapAndGetDiff(newCurrentState)); } - _currentStateMap = newCurrentStateMap; return diff; } - public void updateCacheWithCurrentStateDiff(Map> currentStateDiff) { - updateShardStateMapWithDiff(currentStateDiff, _currentStateMap); + /** + * Update the cache with the current state diff. + * All existing target states remains the same + * @param currentStateDiff + */ + public void updateCacheWithCurrentStateDiff(Map>> currentStateDiff) { + for (String instance : currentStateDiff.keySet()) { + Map> currentStateDiffMap = currentStateDiff.get(instance); + _currentStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())) + .updateShardStateMapWithDiff(currentStateDiffMap); + } } /** - * Udate the target state with the changed target state maps. + * Update the target state with the changed target state maps. * All existing target states remains the same * @param targetStateChangeMap */ - public void updateTargetStateWithDiff(Map> targetStateChangeMap) { + public void updateTargetStateWithDiff(String instance, Map> targetStateChangeMap) { synchronized (_targetStateLock) { - _targetStateChanged = updateShardStateMapWithDiff(targetStateChangeMap, _targetStateMap); + _targetStateChanged = _targetStateMap.get(instance).updateShardStateMapWithDiff(targetStateChangeMap); } } - public Map> getTargetStateMapIfChanged() { + /** + * Get the target state map if it has changed since last time and reset the _targetStateChanged flag in. + * @return The target state map if it has changed, otherwise return null. + */ + public Map getTargetStateMapIfChanged() { synchronized (_targetStateLock) { if (_targetStateChanged) { _targetStateChanged = false; @@ -103,42 +108,89 @@ public Map> getTargetStateMapIfChanged() { } } + /** + * Serialize the target state assignments to a JSON string. + * example : {"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}} + */ public String serializeTargetAssignments() { - ObjectMapper mapper = new ObjectMapper(); ObjectNode root = mapper.createObjectNode(); - synchronized (_targetStateLock) { - for (Map.Entry> entry : _targetStateMap.entrySet()) { - String instance = entry.getKey(); - Map assignments = entry.getValue(); - ObjectNode instanceNode = mapper.createObjectNode(); - for (Map.Entry assignment : assignments.entrySet()) { - instanceNode.put(assignment.getKey(), assignment.getValue()); - } - root.set(instance, instanceNode); - } + for (Map.Entry entry : _targetStateMap.entrySet()) { + root.set(entry.getKey(), entry.getValue().serialize()); } return root.toString(); } - private boolean updateShardStateMapWithDiff(Map> diffMap, - Map> currentMap) { - if (diffMap == null || diffMap.isEmpty()) { - return false; + + public class ShardStateMap { + Map> _stateMap; + public ShardStateMap(Map> stateMap) { + _stateMap = stateMap; + } + public String getState(String instance, String shard) { + return _stateMap.get(instance).get(shard); + } + + private boolean updateShardStateMapWithDiff(Map> diffMap) { + if (diffMap == null || diffMap.isEmpty()) { + return false; + } + for (Map.Entry> diffEntry : diffMap.entrySet()) { + String resource = diffEntry.getKey(); + Map diffCurrentState = diffEntry.getValue(); + if (_stateMap.get(resource) != null) { + _stateMap.get(resource).entrySet().forEach(currentMapEntry -> { + String shard = currentMapEntry.getKey(); + if (diffCurrentState.get(shard) != null) { + currentMapEntry.setValue(diffCurrentState.get(shard)); + } + }); + } else { + _stateMap.put(resource, diffCurrentState); + } + } + return true; } - for (Map.Entry> entry : diffMap.entrySet()) { - String instance = entry.getKey(); - Map currentState = entry.getValue(); - if (currentMap.get(instance) == null) { - currentMap.put(instance, currentState); - } else { - currentMap.get(instance).entrySet().stream().forEach(e -> { - if (currentState.get(e.getKey()) != null) { - e.setValue(currentState.get(e.getKey())); + + private Map> updateMapAndGetDiff( + Map> newCurrentStateMap) { + Map> diff = new HashMap<>(); + for (Map.Entry> entry : newCurrentStateMap.entrySet()) { + String instance = entry.getKey(); + Map newCurrentState = entry.getValue(); + Map oldCurrentState = _stateMap.get(instance); + if (oldCurrentState == null) { + diff.put(instance, newCurrentState); + continue; + } + if (!oldCurrentState.equals(newCurrentState)) { + for (String shard : newCurrentState.keySet()) { + if (oldCurrentState.get(shard) == null || !oldCurrentState.get(shard).equals(newCurrentState.get(shard))) { + diff.computeIfAbsent(instance, k -> new HashMap<>()).put(shard, newCurrentState.get(shard)); + } } - }); + } + } + _stateMap = newCurrentStateMap; + return diff; + } + + /** + * Serialize the shard state map to a JSON object. + * @return a JSON object representing the shard state map. Example: {"shard1":"ONLINE","shard2":"OFFLINE"} + */ + public ObjectNode serialize() { + ObjectNode root = mapper.createObjectNode(); + for (Map.Entry> entry : _stateMap.entrySet()) { + String resource = entry.getKey(); + ObjectNode resourceNode = mapper.createObjectNode(); + for (Map.Entry shardEntry : entry.getValue().entrySet()) { + resourceNode.put(shardEntry.getKey(), shardEntry.getValue()); + } + root.set(resource, resourceNode); } + return root; } - return true; + } } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java index bb54eed56a..8030ac7da3 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java @@ -29,59 +29,5 @@ public class TestGatewayCurrentStateCache { private GatewayCurrentStateCache cache = new GatewayCurrentStateCache("TestCluster");; - @Test - public void testUpdateCacheWithNewCurrentStateAndGetDiff() { - Map> newState = new HashMap<>(); - Map instanceState = new HashMap<>(); - instanceState.put("shard1", "ONLINE"); - instanceState.put("shard2", "OFFLINE"); - newState.put("instance1", instanceState); - Map> diff = cache.updateCacheWithNewCurrentStateAndGetDiff(newState); - - Assert.assertNotNull(diff); - Assert.assertEquals(diff.size(), 1); - Assert.assertEquals(diff.get("instance1").size(), 2); - Assert.assertEquals(diff.get("instance1").get("shard1"), "ONLINE"); - Assert.assertEquals(diff.get("instance1").get("shard2"), "OFFLINE"); - Assert.assertEquals(cache.getCurrentState("instance1", "shard1"), "ONLINE"); - Assert.assertEquals(cache.getCurrentState("instance1", "shard2"), "OFFLINE"); - } - - @Test(dependsOnMethods = "testUpdateCacheWithNewCurrentStateAndGetDiff") - public void testUpdateCacheWithCurrentStateDiff() { - Map> diff = new HashMap<>(); - Map instanceState = new HashMap<>(); - instanceState.put("shard1", "OFFLINE"); - diff.put("instance1", instanceState); - - cache.updateCacheWithCurrentStateDiff(diff); - Assert.assertEquals(cache.getCurrentState("instance1", "shard1"), "OFFLINE"); - Assert.assertEquals(cache.getCurrentState("instance1", "shard2"), "OFFLINE"); - } - - @Test(dependsOnMethods = "testUpdateCacheWithCurrentStateDiff") - public void testUpdateTargetStateWithDiff() { - Map> targetStateChange = new HashMap<>(); - Map instanceState = new HashMap<>(); - instanceState.put("shard1", "ONLINE"); - targetStateChange.put("instance1", instanceState); - - cache.updateTargetStateWithDiff(targetStateChange); - - Assert.assertEquals(cache.getTargetStateMapIfChanged().get("instance1").get("shard1"), "ONLINE"); - } - - @Test(dependsOnMethods = "testUpdateTargetStateWithDiff") - public void testSerializeTargetAssignments() { - Map> targetState = new HashMap<>(); - Map instanceState = new HashMap<>(); - instanceState.put("shard1", "OFFLINE"); - targetState.put("instance1", instanceState); - - cache.updateTargetStateWithDiff(targetState); - - String serialized = cache.serializeTargetAssignments(); - Assert.assertTrue(serialized.contains("\"instance1\":{\"shard1\":\"OFFLINE\"}")); - } } From 39325a28c9738873ba93d69089441b3ba0fa9a57 Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Mon, 2 Sep 2024 15:41:03 -0700 Subject: [PATCH 4/5] add resource --- .../service/GatewayServiceManager.java | 5 -- .../util/GatewayCurrentStateCache.java | 56 +++++++------------ .../utils/TestGatewayCurrentStateCache.java | 51 ++++++++++++++++- 3 files changed, 71 insertions(+), 41 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index d64d1c6617..3edd4ee9d4 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -137,7 +137,6 @@ public void run() { } } - public void startService() throws IOException { _gatewayServiceChannel.start(); } @@ -149,10 +148,6 @@ public void stopService() { _helixGatewayParticipantMap.clear(); } - public GatewayCurrentStateCache getCurrentStateCache(String clusterName) { - return _currentStateCacheMap.computeIfAbsent(clusterName, k -> new GatewayCurrentStateCache(clusterName)); - } - private void createHelixGatewayParticipant(String clusterName, String instanceName, Map> initialShardStateMap) { // Create and add the participant to the participant map diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java index 3cd32a404a..ea4a977af9 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java @@ -40,20 +40,20 @@ public class GatewayCurrentStateCache { // instance -> resource state (resource -> shard -> target state) Map _targetStateMap; - boolean _targetStateChanged = false; - final Object _targetStateLock; - public GatewayCurrentStateCache(String clusterName) { _clusterName = clusterName; _currentStateMap = new HashMap<>(); _targetStateMap = new HashMap<>(); - _targetStateLock = new Object(); } public String getCurrentState(String instance, String resource, String shard) { return _currentStateMap.get(instance).getState(resource, shard); } + public String getTargetState(String instance, String resource, String shard) { + return _targetStateMap.get(instance).getState(resource, shard); + } + /** * Update the cached current state of instances in a cluster, and return the diff of the change. * @param newCurrentStateMap The new current state map of instances in the cluster @@ -62,7 +62,7 @@ public String getCurrentState(String instance, String resource, String shard) { public Map>> updateCacheWithNewCurrentStateAndGetDiff( Map>> newCurrentStateMap) { Map>> diff = new HashMap<>(); - for(String instance : newCurrentStateMap.keySet()) { + for (String instance : newCurrentStateMap.keySet()) { Map> newCurrentState = newCurrentStateMap.get(instance); diff.put(instance, _currentStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())) .updateMapAndGetDiff(newCurrentState)); @@ -89,50 +89,40 @@ public void updateCacheWithCurrentStateDiff(Map> targetStateChangeMap) { - synchronized (_targetStateLock) { - _targetStateChanged = _targetStateMap.get(instance).updateShardStateMapWithDiff(targetStateChangeMap); - } + _targetStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())) + .updateShardStateMapWithDiff(targetStateChangeMap); } /** - * Get the target state map if it has changed since last time and reset the _targetStateChanged flag in. - * @return The target state map if it has changed, otherwise return null. - */ - public Map getTargetStateMapIfChanged() { - synchronized (_targetStateLock) { - if (_targetStateChanged) { - _targetStateChanged = false; - return _targetStateMap; - } - return null; - } - } - - /** - * Serialize the target state assignments to a JSON string. + * Serialize the target state assignments to a JSON Node. * example : {"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}} */ - public String serializeTargetAssignments() { + public ObjectNode serializeTargetAssignmentsToJSON() { ObjectNode root = mapper.createObjectNode(); for (Map.Entry entry : _targetStateMap.entrySet()) { - root.set(entry.getKey(), entry.getValue().serialize()); + root.set(entry.getKey(), entry.getValue().toJSONNode()); } - return root.toString(); + return root; } - public class ShardStateMap { Map> _stateMap; + public ShardStateMap(Map> stateMap) { _stateMap = stateMap; } + public String getState(String instance, String shard) { return _stateMap.get(instance).get(shard); } - private boolean updateShardStateMapWithDiff(Map> diffMap) { + private Map> getShardStateMap() { + return _stateMap; + } + + private void updateShardStateMapWithDiff(Map> diffMap) { if (diffMap == null || diffMap.isEmpty()) { - return false; + return; } for (Map.Entry> diffEntry : diffMap.entrySet()) { String resource = diffEntry.getKey(); @@ -148,11 +138,9 @@ private boolean updateShardStateMapWithDiff(Map> dif _stateMap.put(resource, diffCurrentState); } } - return true; } - private Map> updateMapAndGetDiff( - Map> newCurrentStateMap) { + private Map> updateMapAndGetDiff(Map> newCurrentStateMap) { Map> diff = new HashMap<>(); for (Map.Entry> entry : newCurrentStateMap.entrySet()) { String instance = entry.getKey(); @@ -178,7 +166,7 @@ private Map> updateMapAndGetDiff( * Serialize the shard state map to a JSON object. * @return a JSON object representing the shard state map. Example: {"shard1":"ONLINE","shard2":"OFFLINE"} */ - public ObjectNode serialize() { + public ObjectNode toJSONNode() { ObjectNode root = mapper.createObjectNode(); for (Map.Entry> entry : _stateMap.entrySet()) { String resource = entry.getKey(); @@ -190,7 +178,5 @@ public ObjectNode serialize() { } return root; } - } } - diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java index 8030ac7da3..448e0b3e9b 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java @@ -23,11 +23,60 @@ import java.util.Map; import org.apache.helix.gateway.util.GatewayCurrentStateCache; import org.testng.Assert; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class TestGatewayCurrentStateCache { - private GatewayCurrentStateCache cache = new GatewayCurrentStateCache("TestCluster");; + private GatewayCurrentStateCache cache; + @BeforeMethod + public void setUp() { + cache = new GatewayCurrentStateCache("TestCluster"); + } + @Test + public void testUpdateCacheWithNewCurrentStateAndGetDiff() { + Map>> newState = new HashMap<>(); + Map> instanceState = new HashMap<>(); + Map shardState = new HashMap<>(); + shardState.put("shard1", "ONLINE"); + instanceState.put("resource1", shardState); + newState.put("instance1", instanceState); + + Map>> diff = cache.updateCacheWithNewCurrentStateAndGetDiff(newState); + + Assert.assertNotNull(diff); + Assert.assertEquals(diff.size(), 1); + Assert.assertEquals(diff.get("instance1").get("resource1").get("shard1"), "ONLINE"); + } + + @Test + public void testUpdateCacheWithCurrentStateDiff() { + Map>> diff = new HashMap<>(); + Map> instanceState = new HashMap<>(); + Map shardState = new HashMap<>(); + shardState.put("shard2", "ONLINE"); + shardState.put("shard1", "ONLINE"); + instanceState.put("resource1", shardState); + diff.put("instance1", instanceState); + + cache.updateCacheWithCurrentStateDiff(diff); + + Assert.assertEquals(cache.getCurrentState("instance1", "resource1", "shard1"), "ONLINE"); + Assert.assertEquals(cache.getCurrentState("instance1", "resource1", "shard2"), "ONLINE"); + } + + @Test + public void testUpdateTargetStateWithDiff() { + Map> targetStateChange = new HashMap<>(); + Map shardState = new HashMap<>(); + shardState.put("shard1", "OFFLINE"); + targetStateChange.put("resource1", shardState); + + cache.updateTargetStateWithDiff("instance1", targetStateChange); + + Assert.assertEquals(cache.getTargetState("instance1", "resource1", "shard1"), "OFFLINE"); + Assert.assertEquals(cache.serializeTargetAssignmentsToJSON().toString(), "{\"instance1\":{\"resource1\":{\"shard1\":\"OFFLINE\"}}}"); + } } From 6ea7720b47b457359cbb4b2b29999ed0f4c0efbe Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Tue, 3 Sep 2024 18:12:24 -0700 Subject: [PATCH 5/5] comment --- .../util/GatewayCurrentStateCache.java | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java index ea4a977af9..503909d4a2 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java @@ -65,7 +65,7 @@ public Map>> updateCacheWithNewCurrentSt for (String instance : newCurrentStateMap.keySet()) { Map> newCurrentState = newCurrentStateMap.get(instance); diff.put(instance, _currentStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())) - .updateMapAndGetDiff(newCurrentState)); + .updateAndGetDiff(newCurrentState)); } return diff; } @@ -78,8 +78,7 @@ public Map>> updateCacheWithNewCurrentSt public void updateCacheWithCurrentStateDiff(Map>> currentStateDiff) { for (String instance : currentStateDiff.keySet()) { Map> currentStateDiffMap = currentStateDiff.get(instance); - _currentStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())) - .updateShardStateMapWithDiff(currentStateDiffMap); + updateShardStateMapWithDiff(_currentStateMap, instance, currentStateDiffMap); } } @@ -89,8 +88,7 @@ public void updateCacheWithCurrentStateDiff(Map> targetStateChangeMap) { - _targetStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())) - .updateShardStateMapWithDiff(targetStateChangeMap); + updateShardStateMapWithDiff(_targetStateMap, instance, targetStateChangeMap); } /** @@ -105,7 +103,15 @@ public ObjectNode serializeTargetAssignmentsToJSON() { return root; } - public class ShardStateMap { + private void updateShardStateMapWithDiff(Map stateMap, String instance, + Map> diffMap) { + if (diffMap == null || diffMap.isEmpty()) { + return; + } + stateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())).updateWithDiff(diffMap); + } + + public static class ShardStateMap { Map> _stateMap; public ShardStateMap(Map> stateMap) { @@ -120,10 +126,7 @@ private Map> getShardStateMap() { return _stateMap; } - private void updateShardStateMapWithDiff(Map> diffMap) { - if (diffMap == null || diffMap.isEmpty()) { - return; - } + private void updateWithDiff(Map> diffMap) { for (Map.Entry> diffEntry : diffMap.entrySet()) { String resource = diffEntry.getKey(); Map diffCurrentState = diffEntry.getValue(); @@ -140,20 +143,20 @@ private void updateShardStateMapWithDiff(Map> diffMa } } - private Map> updateMapAndGetDiff(Map> newCurrentStateMap) { + private Map> updateAndGetDiff(Map> newCurrentStateMap) { Map> diff = new HashMap<>(); for (Map.Entry> entry : newCurrentStateMap.entrySet()) { - String instance = entry.getKey(); + String resource = entry.getKey(); Map newCurrentState = entry.getValue(); - Map oldCurrentState = _stateMap.get(instance); + Map oldCurrentState = _stateMap.get(resource); if (oldCurrentState == null) { - diff.put(instance, newCurrentState); + diff.put(resource, newCurrentState); continue; } if (!oldCurrentState.equals(newCurrentState)) { for (String shard : newCurrentState.keySet()) { if (oldCurrentState.get(shard) == null || !oldCurrentState.get(shard).equals(newCurrentState.get(shard))) { - diff.computeIfAbsent(instance, k -> new HashMap<>()).put(shard, newCurrentState.get(shard)); + diff.computeIfAbsent(resource, k -> new HashMap<>()).put(shard, newCurrentState.get(shard)); } } }