Skip to content

Commit

Permalink
Gateway - Add GatewayCurrentStateCache for gateway service (#2895)
Browse files Browse the repository at this point in the history
This pull request introduces a caching mechanism for the Helix Gateway service. The main changes include:

Addition of a new GatewayCurrentStateCache class in GatewayCurrentStateCache.java, which manages caching of current and target states for instances in a cluster.
  • Loading branch information
xyuanlu committed Sep 18, 2024
1 parent c3451b6 commit 6fab029
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.helix.common.caches.CurrentStateCache;
import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.gateway.util.GatewayCurrentStateCache;
import org.apache.helix.gateway.util.PerKeyBlockingExecutor;


Expand Down Expand Up @@ -60,6 +63,8 @@ public class GatewayServiceManager {

private final GatewayServiceChannelConfig _gatewayServiceChannelConfig;

private final Map<String, GatewayCurrentStateCache> _currentStateCacheMap;

public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatewayServiceChannelConfig) {
_helixGatewayParticipantMap = new ConcurrentHashMap<>();
_zkAddress = zkAddress;
Expand All @@ -68,6 +73,7 @@ public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatew
_connectionEventProcessor =
new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable
_gatewayServiceChannelConfig = gatewayServiceChannelConfig;
_currentStateCacheMap = new HashMap<>();
}

/**
Expand Down Expand Up @@ -131,7 +137,6 @@ public void run() {
}
}


public void startService() throws IOException {
_gatewayServiceChannel.start();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package org.apache.helix.gateway.util;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.HashMap;
import java.util.Map;


/**
* A cache to store the current target assignment, and the reported current state of the instances in a cluster.
*/
public class GatewayCurrentStateCache {
static ObjectMapper mapper = new ObjectMapper();
String _clusterName;

// A cache of current state. It should be updated by the HelixGatewayServiceChannel
// instance -> resource state (resource -> shard -> target state)
Map<String, ShardStateMap> _currentStateMap;

// A cache of target state.
// instance -> resource state (resource -> shard -> target state)
Map<String, ShardStateMap> _targetStateMap;

public GatewayCurrentStateCache(String clusterName) {
_clusterName = clusterName;
_currentStateMap = new HashMap<>();
_targetStateMap = new HashMap<>();
}

public String getCurrentState(String instance, String resource, String shard) {
return _currentStateMap.get(instance).getState(resource, shard);
}

public String getTargetState(String instance, String resource, String shard) {
return _targetStateMap.get(instance).getState(resource, shard);
}

/**
* Update the cached current state of instances in a cluster, and return the diff of the change.
* @param newCurrentStateMap The new current state map of instances in the cluster
* @return
*/
public Map<String, Map<String, Map<String, String>>> updateCacheWithNewCurrentStateAndGetDiff(
Map<String, Map<String, Map<String, String>>> newCurrentStateMap) {
Map<String, Map<String, Map<String, String>>> diff = new HashMap<>();
for (String instance : newCurrentStateMap.keySet()) {
Map<String, Map<String, String>> newCurrentState = newCurrentStateMap.get(instance);
diff.put(instance, _currentStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>()))
.updateAndGetDiff(newCurrentState));
}
return diff;
}

/**
* Update the cache with the current state diff.
* All existing target states remains the same
* @param currentStateDiff
*/
public void updateCacheWithCurrentStateDiff(Map<String, Map<String, Map<String, String>>> currentStateDiff) {
for (String instance : currentStateDiff.keySet()) {
Map<String, Map<String, String>> currentStateDiffMap = currentStateDiff.get(instance);
updateShardStateMapWithDiff(_currentStateMap, instance, currentStateDiffMap);
}
}

/**
* Update the target state with the changed target state maps.
* All existing target states remains the same
* @param targetStateChangeMap
*/
public void updateTargetStateWithDiff(String instance, Map<String, Map<String, String>> targetStateChangeMap) {
updateShardStateMapWithDiff(_targetStateMap, instance, targetStateChangeMap);
}

/**
* Serialize the target state assignments to a JSON Node.
* example : {"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}}
*/
public ObjectNode serializeTargetAssignmentsToJSON() {
ObjectNode root = mapper.createObjectNode();
for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) {
root.set(entry.getKey(), entry.getValue().toJSONNode());
}
return root;
}

private void updateShardStateMapWithDiff(Map<String, ShardStateMap> stateMap, String instance,
Map<String, Map<String, String>> diffMap) {
if (diffMap == null || diffMap.isEmpty()) {
return;
}
stateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())).updateWithDiff(diffMap);
}

public static class ShardStateMap {
Map<String, Map<String, String>> _stateMap;

public ShardStateMap(Map<String, Map<String, String>> stateMap) {
_stateMap = stateMap;
}

public String getState(String instance, String shard) {
return _stateMap.get(instance).get(shard);
}

private Map<String, Map<String, String>> getShardStateMap() {
return _stateMap;
}

private void updateWithDiff(Map<String, Map<String, String>> diffMap) {
for (Map.Entry<String, Map<String, String>> diffEntry : diffMap.entrySet()) {
String resource = diffEntry.getKey();
Map<String, String> diffCurrentState = diffEntry.getValue();
if (_stateMap.get(resource) != null) {
_stateMap.get(resource).entrySet().forEach(currentMapEntry -> {
String shard = currentMapEntry.getKey();
if (diffCurrentState.get(shard) != null) {
currentMapEntry.setValue(diffCurrentState.get(shard));
}
});
} else {
_stateMap.put(resource, diffCurrentState);
}
}
}

private Map<String, Map<String, String>> updateAndGetDiff(Map<String, Map<String, String>> newCurrentStateMap) {
Map<String, Map<String, String>> diff = new HashMap<>();
for (Map.Entry<String, Map<String, String>> entry : newCurrentStateMap.entrySet()) {
String resource = entry.getKey();
Map<String, String> newCurrentState = entry.getValue();
Map<String, String> oldCurrentState = _stateMap.get(resource);
if (oldCurrentState == null) {
diff.put(resource, newCurrentState);
continue;
}
if (!oldCurrentState.equals(newCurrentState)) {
for (String shard : newCurrentState.keySet()) {
if (oldCurrentState.get(shard) == null || !oldCurrentState.get(shard).equals(newCurrentState.get(shard))) {
diff.computeIfAbsent(resource, k -> new HashMap<>()).put(shard, newCurrentState.get(shard));
}
}
}
}
_stateMap = newCurrentStateMap;
return diff;
}

/**
* Serialize the shard state map to a JSON object.
* @return a JSON object representing the shard state map. Example: {"shard1":"ONLINE","shard2":"OFFLINE"}
*/
public ObjectNode toJSONNode() {
ObjectNode root = mapper.createObjectNode();
for (Map.Entry<String, Map<String, String>> entry : _stateMap.entrySet()) {
String resource = entry.getKey();
ObjectNode resourceNode = mapper.createObjectNode();
for (Map.Entry<String, String> shardEntry : entry.getValue().entrySet()) {
resourceNode.put(shardEntry.getKey(), shardEntry.getValue());
}
root.set(resource, resourceNode);
}
return root;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.apache.helix.gateway.utils;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.HashMap;
import java.util.Map;
import org.apache.helix.gateway.util.GatewayCurrentStateCache;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class TestGatewayCurrentStateCache {
private GatewayCurrentStateCache cache;

@BeforeMethod
public void setUp() {
cache = new GatewayCurrentStateCache("TestCluster");
}

@Test
public void testUpdateCacheWithNewCurrentStateAndGetDiff() {
Map<String, Map<String, Map<String, String>>> newState = new HashMap<>();
Map<String, Map<String, String>> instanceState = new HashMap<>();
Map<String, String> shardState = new HashMap<>();
shardState.put("shard1", "ONLINE");
instanceState.put("resource1", shardState);
newState.put("instance1", instanceState);

Map<String, Map<String, Map<String, String>>> diff = cache.updateCacheWithNewCurrentStateAndGetDiff(newState);

Assert.assertNotNull(diff);
Assert.assertEquals(diff.size(), 1);
Assert.assertEquals(diff.get("instance1").get("resource1").get("shard1"), "ONLINE");
}

@Test
public void testUpdateCacheWithCurrentStateDiff() {
Map<String, Map<String, Map<String, String>>> diff = new HashMap<>();
Map<String, Map<String, String>> instanceState = new HashMap<>();
Map<String, String> shardState = new HashMap<>();
shardState.put("shard2", "ONLINE");
shardState.put("shard1", "ONLINE");
instanceState.put("resource1", shardState);
diff.put("instance1", instanceState);

cache.updateCacheWithCurrentStateDiff(diff);

Assert.assertEquals(cache.getCurrentState("instance1", "resource1", "shard1"), "ONLINE");
Assert.assertEquals(cache.getCurrentState("instance1", "resource1", "shard2"), "ONLINE");
}

@Test
public void testUpdateTargetStateWithDiff() {
Map<String, Map<String, String>> targetStateChange = new HashMap<>();
Map<String, String> shardState = new HashMap<>();
shardState.put("shard1", "OFFLINE");
targetStateChange.put("resource1", shardState);

cache.updateTargetStateWithDiff("instance1", targetStateChange);

Assert.assertEquals(cache.getTargetState("instance1", "resource1", "shard1"), "OFFLINE");
Assert.assertEquals(cache.serializeTargetAssignmentsToJSON().toString(), "{\"instance1\":{\"resource1\":{\"shard1\":\"OFFLINE\"}}}");
}
}

0 comments on commit 6fab029

Please sign in to comment.