Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Aug 30, 2024
1 parent 418e57f commit 45312d1
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -72,7 +73,7 @@ public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatew
_connectionEventProcessor =
new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable
_gatewayServiceChannelConfig = gatewayServiceChannelConfig;
_currentStateCacheMap = new ConcurrentHashMap<>();
_currentStateCacheMap = new HashMap<>();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class GatewayCurrentStateCache {
Expand All @@ -38,11 +37,13 @@ public class GatewayCurrentStateCache {
Map<String, Map<String, String>> _targetStateMap;

boolean _targetStateChanged = false;
final Object _targetStateLock;

public GatewayCurrentStateCache(String clusterName) {
_clusterName = clusterName;
_currentStateMap = new ConcurrentHashMap<>();
_targetStateMap = new ConcurrentHashMap<>();
_currentStateMap = new HashMap<>();
_targetStateMap = new HashMap<>();
_targetStateLock = new Object();
}

public String getCurrentState(String instance, String shard) {
Expand All @@ -56,19 +57,16 @@ public String getCurrentState(String instance, String shard) {
*/
public Map<String, Map<String, String>> updateCacheWithNewCurrentStateAndGetDiff(
Map<String, Map<String, String>> newCurrentStateMap) {
Map<String, Map<String, String>> diff = null;
Map<String, Map<String, String>> diff = new HashMap<>();
for (Map.Entry<String, Map<String, String>> entry : newCurrentStateMap.entrySet()) {
String instance = entry.getKey();
Map<String, String> newCurrentState = entry.getValue();
Map<String, String> oldCurrentState = _currentStateMap.get(instance);
if (oldCurrentState == null || !oldCurrentState.equals(newCurrentState)) {
if (diff == null) {
diff = new HashMap<>();
}
if (oldCurrentState == null) {
diff.put(instance, newCurrentState);
continue;
}
if (oldCurrentState == null) {
diff.put(instance, newCurrentState);
continue;
}
if (!oldCurrentState.equals(newCurrentState)) {
for (String shard : newCurrentState.keySet()) {
if (oldCurrentState.get(shard) == null || !oldCurrentState.get(shard).equals(newCurrentState.get(shard))) {
diff.computeIfAbsent(instance, k -> new HashMap<>()).put(shard, newCurrentState.get(shard));
Expand All @@ -90,32 +88,34 @@ public void updateCacheWithCurrentStateDiff(Map<String, Map<String, String>> cur
* @param targetStateChangeMap
*/
public void updateTargetStateWithDiff(Map<String, Map<String, String>> targetStateChangeMap) {
_targetStateChanged = updateShardStateMapWithDiff(targetStateChangeMap, _targetStateMap);
}

public boolean isTargetStateChanged() {
return _targetStateChanged;
}

public void resetTargetStateChanged() {
_targetStateChanged = false;
synchronized (_targetStateLock) {
_targetStateChanged = updateShardStateMapWithDiff(targetStateChangeMap, _targetStateMap);
}
}

public Map<String, Map<String, String>> getTargetStateMap() {
return _targetStateMap;
public Map<String, Map<String, String>> getTargetStateMapIfChanged() {
synchronized (_targetStateLock) {
if (_targetStateChanged) {
_targetStateChanged = false;
return _targetStateMap;
}
return null;
}
}

public String serializeTargetAssignments() {
ObjectMapper mapper = new ObjectMapper();
ObjectNode root = mapper.createObjectNode();
for (Map.Entry<String, Map<String, String>> entry : _targetStateMap.entrySet()) {
String instance = entry.getKey();
Map<String, String> assignments = entry.getValue();
ObjectNode instanceNode = mapper.createObjectNode();
for (Map.Entry<String, String> assignment : assignments.entrySet()) {
instanceNode.put(assignment.getKey(), assignment.getValue());
synchronized (_targetStateLock) {
for (Map.Entry<String, Map<String, String>> entry : _targetStateMap.entrySet()) {
String instance = entry.getKey();
Map<String, String> assignments = entry.getValue();
ObjectNode instanceNode = mapper.createObjectNode();
for (Map.Entry<String, String> assignment : assignments.entrySet()) {
instanceNode.put(assignment.getKey(), assignment.getValue());
}
root.set(instance, instanceNode);
}
root.set(instance, instanceNode);
}
return root.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Map;
import org.apache.helix.gateway.util.GatewayCurrentStateCache;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -70,8 +69,7 @@ public void testUpdateTargetStateWithDiff() {

cache.updateTargetStateWithDiff(targetStateChange);

Assert.assertTrue(cache.isTargetStateChanged());
Assert.assertEquals(cache.getTargetStateMap().get("instance1").get("shard1"), "ONLINE");
Assert.assertEquals(cache.getTargetStateMapIfChanged().get("instance1").get("shard1"), "ONLINE");
}

@Test(dependsOnMethods = "testUpdateTargetStateWithDiff")
Expand Down

0 comments on commit 45312d1

Please sign in to comment.