Skip to content

Commit

Permalink
add getter for all target state
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Oct 9, 2024
1 parent 2439837 commit 6f2c0b4
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.apache.helix.gateway.api.constant;

public class gatewayServiceManagerConstant {
public static final String TARGET_STATE_ASSIGNMENT_KEY_NAME = "Assignment";
public static final String TIMESTAMP_KEY = "Timestamp";
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableSet;
Expand All @@ -36,6 +37,11 @@
import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.gateway.util.GatewayCurrentStateCache;
import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
import org.apache.helix.gateway.util.PollChannelUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.helix.gateway.api.constant.gatewayServiceManagerConstant.*;


/**
Expand All @@ -46,6 +52,7 @@
* 4. For ST reply message, update the tracker
*/
public class GatewayServiceManager {
private static final Logger logger = LoggerFactory.getLogger(GatewayServiceManager.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10;
public static final ImmutableSet<String> SUPPORTED_MULTI_STATE_MODEL_TYPES =
Expand Down Expand Up @@ -128,12 +135,14 @@ public void updateCurrentState(String clusterName, String instanceName, String r

public synchronized String serializeTargetState() {
ObjectNode targetStateNode = new ObjectMapper().createObjectNode();
ObjectNode res = new ObjectMapper().createObjectNode();
for (String clusterName : _currentStateCacheMap.keySet()) {
// add the json node to the target state node
targetStateNode.set(clusterName, getOrCreateCache(clusterName).serializeTargetAssignmentsToJSONNode());
}
targetStateNode.set("timestamp", objectMapper.valueToTree(System.currentTimeMillis()));
return targetStateNode.toString();
res.set(TARGET_STATE_ASSIGNMENT_KEY_NAME, targetStateNode);
res.set(TIMESTAMP_KEY, objectMapper.valueToTree(System.currentTimeMillis()));
return res.toString();
}

public void updateTargetState(String clusterName, String instanceName, String resourceId, String shardId,
Expand Down Expand Up @@ -162,7 +171,7 @@ private ShardStateUpdator(GatewayServiceEvent event) {

@Override
public void run() {
System.out.println("Processing state transition result " + _event.getInstanceName());
logger.info("Processing state transition result " + _event.getInstanceName());
HelixGatewayParticipant participant =
getHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName());
if (participant == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -59,6 +60,14 @@ public String getTargetState(String instance, String resource, String shard) {
return shardStateMap == null ? null : shardStateMap.getState(resource, shard);
}

public synchronized Map<String, Map<String, Map<String, String>>> getAllTargetStates() {
Map<String, Map<String, Map<String, String>>> result = new HashMap<>();
for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) {
result.put(entry.getKey(), new HashMap<>(entry.getValue()._stateMap));
}
return result;
}

/**
* Update the cached current state of instances in a cluster, and return the diff of the change.
* @param userCurrentStateMap The new current state map of instances in the cluster
Expand Down Expand Up @@ -141,8 +150,8 @@ public synchronized void resetTargetStateCache(String instance) {
}

public static class ShardStateMap {
// resource -> shard -> state
Map<String, Map<String, String>> _stateMap;
ObjectNode root = mapper.createObjectNode();

public ShardStateMap(Map<String, Map<String, String>> stateMap) {
_stateMap = new HashMap<>(stateMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,20 @@ public long getLastUpdatedTime(){
return _lastUpdatedTime;
}
}

/**
* Target assignments representation as JSON
*/
public static class TargetAssignment {
// cluster -> instance -> resource -> shard -> state
@JsonProperty ("Assignment")
String _targetAssignment;
@JsonProperty ("Timestamp")
long _timestamp;

public TargetAssignment(String targetAssignment, long timestamp){
_targetAssignment = targetAssignment;
_timestamp = timestamp;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private void verifyTargetState() throws Exception {
int finalI = i;
Assert.assertTrue(TestHelper.verify(() -> {
String content = Files.readString(targetPaths.get(finalI));
return content.contains("{\"TestDB\":{\"TestDB_0\":\"ONLINE\"}}}");
return content.contains("{\"TestDB\":{\"TestDB_0\":\"ONLINE\"}}");
}, TestHelper.WAIT_DURATION));
}
}
Expand Down

0 comments on commit 6f2c0b4

Please sign in to comment.