diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index cceabc8874..3edd4ee9d4 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -22,15 +22,18 @@ import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.helix.common.caches.CurrentStateCache; import org.apache.helix.gateway.api.constant.GatewayServiceEventType; import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory; import org.apache.helix.gateway.participant.HelixGatewayParticipant; +import org.apache.helix.gateway.util.GatewayCurrentStateCache; import org.apache.helix.gateway.util.PerKeyBlockingExecutor; @@ -60,6 +63,8 @@ public class GatewayServiceManager { private final GatewayServiceChannelConfig _gatewayServiceChannelConfig; + private final Map _currentStateCacheMap; + public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatewayServiceChannelConfig) { _helixGatewayParticipantMap = new ConcurrentHashMap<>(); _zkAddress = zkAddress; @@ -68,6 +73,7 @@ public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatew _connectionEventProcessor = new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable _gatewayServiceChannelConfig = gatewayServiceChannelConfig; + _currentStateCacheMap = new HashMap<>(); } /** @@ -131,7 +137,6 @@ public void run() { } } - public void startService() throws IOException { _gatewayServiceChannel.start(); } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java new file mode 100644 index 0000000000..503909d4a2 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java @@ -0,0 +1,185 @@ +package org.apache.helix.gateway.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.HashMap; +import java.util.Map; + + +/** + * A cache to store the current target assignment, and the reported current state of the instances in a cluster. + */ +public class GatewayCurrentStateCache { + static ObjectMapper mapper = new ObjectMapper(); + String _clusterName; + + // A cache of current state. It should be updated by the HelixGatewayServiceChannel + // instance -> resource state (resource -> shard -> target state) + Map _currentStateMap; + + // A cache of target state. + // instance -> resource state (resource -> shard -> target state) + Map _targetStateMap; + + public GatewayCurrentStateCache(String clusterName) { + _clusterName = clusterName; + _currentStateMap = new HashMap<>(); + _targetStateMap = new HashMap<>(); + } + + public String getCurrentState(String instance, String resource, String shard) { + return _currentStateMap.get(instance).getState(resource, shard); + } + + public String getTargetState(String instance, String resource, String shard) { + return _targetStateMap.get(instance).getState(resource, shard); + } + + /** + * Update the cached current state of instances in a cluster, and return the diff of the change. + * @param newCurrentStateMap The new current state map of instances in the cluster + * @return + */ + public Map>> updateCacheWithNewCurrentStateAndGetDiff( + Map>> newCurrentStateMap) { + Map>> diff = new HashMap<>(); + for (String instance : newCurrentStateMap.keySet()) { + Map> newCurrentState = newCurrentStateMap.get(instance); + diff.put(instance, _currentStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())) + .updateAndGetDiff(newCurrentState)); + } + return diff; + } + + /** + * Update the cache with the current state diff. + * All existing target states remains the same + * @param currentStateDiff + */ + public void updateCacheWithCurrentStateDiff(Map>> currentStateDiff) { + for (String instance : currentStateDiff.keySet()) { + Map> currentStateDiffMap = currentStateDiff.get(instance); + updateShardStateMapWithDiff(_currentStateMap, instance, currentStateDiffMap); + } + } + + /** + * Update the target state with the changed target state maps. + * All existing target states remains the same + * @param targetStateChangeMap + */ + public void updateTargetStateWithDiff(String instance, Map> targetStateChangeMap) { + updateShardStateMapWithDiff(_targetStateMap, instance, targetStateChangeMap); + } + + /** + * Serialize the target state assignments to a JSON Node. + * example : {"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}} + */ + public ObjectNode serializeTargetAssignmentsToJSON() { + ObjectNode root = mapper.createObjectNode(); + for (Map.Entry entry : _targetStateMap.entrySet()) { + root.set(entry.getKey(), entry.getValue().toJSONNode()); + } + return root; + } + + private void updateShardStateMapWithDiff(Map stateMap, String instance, + Map> diffMap) { + if (diffMap == null || diffMap.isEmpty()) { + return; + } + stateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())).updateWithDiff(diffMap); + } + + public static class ShardStateMap { + Map> _stateMap; + + public ShardStateMap(Map> stateMap) { + _stateMap = stateMap; + } + + public String getState(String instance, String shard) { + return _stateMap.get(instance).get(shard); + } + + private Map> getShardStateMap() { + return _stateMap; + } + + private void updateWithDiff(Map> diffMap) { + for (Map.Entry> diffEntry : diffMap.entrySet()) { + String resource = diffEntry.getKey(); + Map diffCurrentState = diffEntry.getValue(); + if (_stateMap.get(resource) != null) { + _stateMap.get(resource).entrySet().forEach(currentMapEntry -> { + String shard = currentMapEntry.getKey(); + if (diffCurrentState.get(shard) != null) { + currentMapEntry.setValue(diffCurrentState.get(shard)); + } + }); + } else { + _stateMap.put(resource, diffCurrentState); + } + } + } + + private Map> updateAndGetDiff(Map> newCurrentStateMap) { + Map> diff = new HashMap<>(); + for (Map.Entry> entry : newCurrentStateMap.entrySet()) { + String resource = entry.getKey(); + Map newCurrentState = entry.getValue(); + Map oldCurrentState = _stateMap.get(resource); + if (oldCurrentState == null) { + diff.put(resource, newCurrentState); + continue; + } + if (!oldCurrentState.equals(newCurrentState)) { + for (String shard : newCurrentState.keySet()) { + if (oldCurrentState.get(shard) == null || !oldCurrentState.get(shard).equals(newCurrentState.get(shard))) { + diff.computeIfAbsent(resource, k -> new HashMap<>()).put(shard, newCurrentState.get(shard)); + } + } + } + } + _stateMap = newCurrentStateMap; + return diff; + } + + /** + * Serialize the shard state map to a JSON object. + * @return a JSON object representing the shard state map. Example: {"shard1":"ONLINE","shard2":"OFFLINE"} + */ + public ObjectNode toJSONNode() { + ObjectNode root = mapper.createObjectNode(); + for (Map.Entry> entry : _stateMap.entrySet()) { + String resource = entry.getKey(); + ObjectNode resourceNode = mapper.createObjectNode(); + for (Map.Entry shardEntry : entry.getValue().entrySet()) { + resourceNode.put(shardEntry.getKey(), shardEntry.getValue()); + } + root.set(resource, resourceNode); + } + return root; + } + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java new file mode 100644 index 0000000000..448e0b3e9b --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java @@ -0,0 +1,82 @@ +package org.apache.helix.gateway.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.Map; +import org.apache.helix.gateway.util.GatewayCurrentStateCache; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class TestGatewayCurrentStateCache { + private GatewayCurrentStateCache cache; + + @BeforeMethod + public void setUp() { + cache = new GatewayCurrentStateCache("TestCluster"); + } + + @Test + public void testUpdateCacheWithNewCurrentStateAndGetDiff() { + Map>> newState = new HashMap<>(); + Map> instanceState = new HashMap<>(); + Map shardState = new HashMap<>(); + shardState.put("shard1", "ONLINE"); + instanceState.put("resource1", shardState); + newState.put("instance1", instanceState); + + Map>> diff = cache.updateCacheWithNewCurrentStateAndGetDiff(newState); + + Assert.assertNotNull(diff); + Assert.assertEquals(diff.size(), 1); + Assert.assertEquals(diff.get("instance1").get("resource1").get("shard1"), "ONLINE"); + } + + @Test + public void testUpdateCacheWithCurrentStateDiff() { + Map>> diff = new HashMap<>(); + Map> instanceState = new HashMap<>(); + Map shardState = new HashMap<>(); + shardState.put("shard2", "ONLINE"); + shardState.put("shard1", "ONLINE"); + instanceState.put("resource1", shardState); + diff.put("instance1", instanceState); + + cache.updateCacheWithCurrentStateDiff(diff); + + Assert.assertEquals(cache.getCurrentState("instance1", "resource1", "shard1"), "ONLINE"); + Assert.assertEquals(cache.getCurrentState("instance1", "resource1", "shard2"), "ONLINE"); + } + + @Test + public void testUpdateTargetStateWithDiff() { + Map> targetStateChange = new HashMap<>(); + Map shardState = new HashMap<>(); + shardState.put("shard1", "OFFLINE"); + targetStateChange.put("resource1", shardState); + + cache.updateTargetStateWithDiff("instance1", targetStateChange); + + Assert.assertEquals(cache.getTargetState("instance1", "resource1", "shard1"), "OFFLINE"); + Assert.assertEquals(cache.serializeTargetAssignmentsToJSON().toString(), "{\"instance1\":{\"resource1\":{\"shard1\":\"OFFLINE\"}}}"); + } +}