Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Aug 30, 2024
1 parent 418e57f commit 7acfcbb
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class GatewayCurrentStateCache {
Expand All @@ -38,11 +37,13 @@ public class GatewayCurrentStateCache {
Map<String, Map<String, String>> _targetStateMap;

boolean _targetStateChanged = false;
final Object _targetStateLock;

public GatewayCurrentStateCache(String clusterName) {
_clusterName = clusterName;
_currentStateMap = new ConcurrentHashMap<>();
_targetStateMap = new ConcurrentHashMap<>();
_currentStateMap = new HashMap<>();
_targetStateMap = new HashMap<>();
_targetStateLock = new Object();
}

public String getCurrentState(String instance, String shard) {
Expand All @@ -56,19 +57,16 @@ public String getCurrentState(String instance, String shard) {
*/
public Map<String, Map<String, String>> updateCacheWithNewCurrentStateAndGetDiff(
Map<String, Map<String, String>> newCurrentStateMap) {
Map<String, Map<String, String>> diff = null;
Map<String, Map<String, String>> diff = new HashMap<>();
for (Map.Entry<String, Map<String, String>> entry : newCurrentStateMap.entrySet()) {
String instance = entry.getKey();
Map<String, String> newCurrentState = entry.getValue();
Map<String, String> oldCurrentState = _currentStateMap.get(instance);
if (oldCurrentState == null || !oldCurrentState.equals(newCurrentState)) {
if (diff == null) {
diff = new HashMap<>();
}
if (oldCurrentState == null) {
diff.put(instance, newCurrentState);
continue;
}
if (oldCurrentState == null) {
diff.put(instance, newCurrentState);
continue;
}
if (!oldCurrentState.equals(newCurrentState)) {
for (String shard : newCurrentState.keySet()) {
if (oldCurrentState.get(shard) == null || !oldCurrentState.get(shard).equals(newCurrentState.get(shard))) {
diff.computeIfAbsent(instance, k -> new HashMap<>()).put(shard, newCurrentState.get(shard));
Expand All @@ -90,32 +88,34 @@ public void updateCacheWithCurrentStateDiff(Map<String, Map<String, String>> cur
* @param targetStateChangeMap
*/
public void updateTargetStateWithDiff(Map<String, Map<String, String>> targetStateChangeMap) {
_targetStateChanged = updateShardStateMapWithDiff(targetStateChangeMap, _targetStateMap);
}

public boolean isTargetStateChanged() {
return _targetStateChanged;
}

public void resetTargetStateChanged() {
_targetStateChanged = false;
synchronized (_targetStateLock) {
_targetStateChanged = updateShardStateMapWithDiff(targetStateChangeMap, _targetStateMap);
}
}

public Map<String, Map<String, String>> getTargetStateMap() {
return _targetStateMap;
public Map<String, Map<String, String>> getTargetStateMapIfChanged() {
synchronized (_targetStateLock) {
if (_targetStateChanged) {
_targetStateChanged = false;
return _targetStateMap;
}
return null;
}
}

public String serializeTargetAssignments() {
ObjectMapper mapper = new ObjectMapper();
ObjectNode root = mapper.createObjectNode();
for (Map.Entry<String, Map<String, String>> entry : _targetStateMap.entrySet()) {
String instance = entry.getKey();
Map<String, String> assignments = entry.getValue();
ObjectNode instanceNode = mapper.createObjectNode();
for (Map.Entry<String, String> assignment : assignments.entrySet()) {
instanceNode.put(assignment.getKey(), assignment.getValue());
synchronized (_targetStateLock) {
for (Map.Entry<String, Map<String, String>> entry : _targetStateMap.entrySet()) {
String instance = entry.getKey();
Map<String, String> assignments = entry.getValue();
ObjectNode instanceNode = mapper.createObjectNode();
for (Map.Entry<String, String> assignment : assignments.entrySet()) {
instanceNode.put(assignment.getKey(), assignment.getValue());
}
root.set(instance, instanceNode);
}
root.set(instance, instanceNode);
}
return root.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Map;
import org.apache.helix.gateway.util.GatewayCurrentStateCache;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -70,8 +69,7 @@ public void testUpdateTargetStateWithDiff() {

cache.updateTargetStateWithDiff(targetStateChange);

Assert.assertTrue(cache.isTargetStateChanged());
Assert.assertEquals(cache.getTargetStateMap().get("instance1").get("shard1"), "ONLINE");
Assert.assertEquals(cache.getTargetStateMapIfChanged().get("instance1").get("shard1"), "ONLINE");
}

@Test(dependsOnMethods = "testUpdateTargetStateWithDiff")
Expand Down

0 comments on commit 7acfcbb

Please sign in to comment.