Skip to content

Commit

Permalink
add resource
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Sep 3, 2024
1 parent 6ddbf7d commit 39325a2
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ public void run() {
}
}


public void startService() throws IOException {
_gatewayServiceChannel.start();
}
Expand All @@ -149,10 +148,6 @@ public void stopService() {
_helixGatewayParticipantMap.clear();
}

public GatewayCurrentStateCache getCurrentStateCache(String clusterName) {
return _currentStateCacheMap.computeIfAbsent(clusterName, k -> new GatewayCurrentStateCache(clusterName));
}

private void createHelixGatewayParticipant(String clusterName, String instanceName,
Map<String, Map<String, String>> initialShardStateMap) {
// Create and add the participant to the participant map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@ public class GatewayCurrentStateCache {
// instance -> resource state (resource -> shard -> target state)
Map<String, ShardStateMap> _targetStateMap;

boolean _targetStateChanged = false;
final Object _targetStateLock;

public GatewayCurrentStateCache(String clusterName) {
_clusterName = clusterName;
_currentStateMap = new HashMap<>();
_targetStateMap = new HashMap<>();
_targetStateLock = new Object();
}

public String getCurrentState(String instance, String resource, String shard) {
return _currentStateMap.get(instance).getState(resource, shard);
}

public String getTargetState(String instance, String resource, String shard) {
return _targetStateMap.get(instance).getState(resource, shard);
}

/**
* Update the cached current state of instances in a cluster, and return the diff of the change.
* @param newCurrentStateMap The new current state map of instances in the cluster
Expand All @@ -62,7 +62,7 @@ public String getCurrentState(String instance, String resource, String shard) {
public Map<String, Map<String, Map<String, String>>> updateCacheWithNewCurrentStateAndGetDiff(
Map<String, Map<String, Map<String, String>>> newCurrentStateMap) {
Map<String, Map<String, Map<String, String>>> diff = new HashMap<>();
for(String instance : newCurrentStateMap.keySet()) {
for (String instance : newCurrentStateMap.keySet()) {
Map<String, Map<String, String>> newCurrentState = newCurrentStateMap.get(instance);
diff.put(instance, _currentStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>()))
.updateMapAndGetDiff(newCurrentState));
Expand All @@ -89,50 +89,40 @@ public void updateCacheWithCurrentStateDiff(Map<String, Map<String, Map<String,
* @param targetStateChangeMap
*/
public void updateTargetStateWithDiff(String instance, Map<String, Map<String, String>> targetStateChangeMap) {
synchronized (_targetStateLock) {
_targetStateChanged = _targetStateMap.get(instance).updateShardStateMapWithDiff(targetStateChangeMap);
}
_targetStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>()))
.updateShardStateMapWithDiff(targetStateChangeMap);
}

/**
* Get the target state map if it has changed since last time and reset the _targetStateChanged flag in.
* @return The target state map if it has changed, otherwise return null.
*/
public Map<String, ShardStateMap> getTargetStateMapIfChanged() {
synchronized (_targetStateLock) {
if (_targetStateChanged) {
_targetStateChanged = false;
return _targetStateMap;
}
return null;
}
}

/**
* Serialize the target state assignments to a JSON string.
* Serialize the target state assignments to a JSON Node.
* example : {"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}}
*/
public String serializeTargetAssignments() {
public ObjectNode serializeTargetAssignmentsToJSON() {
ObjectNode root = mapper.createObjectNode();
for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) {
root.set(entry.getKey(), entry.getValue().serialize());
root.set(entry.getKey(), entry.getValue().toJSONNode());
}
return root.toString();
return root;
}


public class ShardStateMap {
Map<String, Map<String, String>> _stateMap;

public ShardStateMap(Map<String, Map<String, String>> stateMap) {
_stateMap = stateMap;
}

public String getState(String instance, String shard) {
return _stateMap.get(instance).get(shard);
}

private boolean updateShardStateMapWithDiff(Map<String, Map<String, String>> diffMap) {
private Map<String, Map<String, String>> getShardStateMap() {
return _stateMap;
}

private void updateShardStateMapWithDiff(Map<String, Map<String, String>> diffMap) {
if (diffMap == null || diffMap.isEmpty()) {
return false;
return;
}
for (Map.Entry<String, Map<String, String>> diffEntry : diffMap.entrySet()) {
String resource = diffEntry.getKey();
Expand All @@ -148,11 +138,9 @@ private boolean updateShardStateMapWithDiff(Map<String, Map<String, String>> dif
_stateMap.put(resource, diffCurrentState);
}
}
return true;
}

private Map<String, Map<String, String>> updateMapAndGetDiff(
Map<String, Map<String, String>> newCurrentStateMap) {
private Map<String, Map<String, String>> updateMapAndGetDiff(Map<String, Map<String, String>> newCurrentStateMap) {
Map<String, Map<String, String>> diff = new HashMap<>();
for (Map.Entry<String, Map<String, String>> entry : newCurrentStateMap.entrySet()) {
String instance = entry.getKey();
Expand All @@ -178,7 +166,7 @@ private Map<String, Map<String, String>> updateMapAndGetDiff(
* Serialize the shard state map to a JSON object.
* @return a JSON object representing the shard state map. Example: {"shard1":"ONLINE","shard2":"OFFLINE"}
*/
public ObjectNode serialize() {
public ObjectNode toJSONNode() {
ObjectNode root = mapper.createObjectNode();
for (Map.Entry<String, Map<String, String>> entry : _stateMap.entrySet()) {
String resource = entry.getKey();
Expand All @@ -190,7 +178,5 @@ public ObjectNode serialize() {
}
return root;
}

}
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,60 @@
import java.util.Map;
import org.apache.helix.gateway.util.GatewayCurrentStateCache;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class TestGatewayCurrentStateCache {
private GatewayCurrentStateCache cache = new GatewayCurrentStateCache("TestCluster");;
private GatewayCurrentStateCache cache;

@BeforeMethod
public void setUp() {
cache = new GatewayCurrentStateCache("TestCluster");
}

@Test
public void testUpdateCacheWithNewCurrentStateAndGetDiff() {
Map<String, Map<String, Map<String, String>>> newState = new HashMap<>();
Map<String, Map<String, String>> instanceState = new HashMap<>();
Map<String, String> shardState = new HashMap<>();
shardState.put("shard1", "ONLINE");
instanceState.put("resource1", shardState);
newState.put("instance1", instanceState);

Map<String, Map<String, Map<String, String>>> diff = cache.updateCacheWithNewCurrentStateAndGetDiff(newState);

Assert.assertNotNull(diff);
Assert.assertEquals(diff.size(), 1);
Assert.assertEquals(diff.get("instance1").get("resource1").get("shard1"), "ONLINE");
}

@Test
public void testUpdateCacheWithCurrentStateDiff() {
Map<String, Map<String, Map<String, String>>> diff = new HashMap<>();
Map<String, Map<String, String>> instanceState = new HashMap<>();
Map<String, String> shardState = new HashMap<>();
shardState.put("shard2", "ONLINE");
shardState.put("shard1", "ONLINE");
instanceState.put("resource1", shardState);
diff.put("instance1", instanceState);

cache.updateCacheWithCurrentStateDiff(diff);

Assert.assertEquals(cache.getCurrentState("instance1", "resource1", "shard1"), "ONLINE");
Assert.assertEquals(cache.getCurrentState("instance1", "resource1", "shard2"), "ONLINE");
}

@Test
public void testUpdateTargetStateWithDiff() {
Map<String, Map<String, String>> targetStateChange = new HashMap<>();
Map<String, String> shardState = new HashMap<>();
shardState.put("shard1", "OFFLINE");
targetStateChange.put("resource1", shardState);

cache.updateTargetStateWithDiff("instance1", targetStateChange);

Assert.assertEquals(cache.getTargetState("instance1", "resource1", "shard1"), "OFFLINE");
Assert.assertEquals(cache.serializeTargetAssignmentsToJSON().toString(), "{\"instance1\":{\"resource1\":{\"shard1\":\"OFFLINE\"}}}");
}
}

0 comments on commit 39325a2

Please sign in to comment.