From 6f2c0b4f66e3e691bc41cbc1b9d6e870155ebe63 Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Tue, 8 Oct 2024 22:42:03 -0700 Subject: [PATCH] add getter for all target state --- .../constant/gatewayServiceManagerConstant.java | 6 ++++++ .../gateway/service/GatewayServiceManager.java | 15 ++++++++++++--- .../gateway/util/GatewayCurrentStateCache.java | 11 ++++++++++- .../helix/gateway/util/PollChannelUtil.java | 16 ++++++++++++++++ .../integration/TestFilePullChannelE2E.java | 2 +- 5 files changed, 45 insertions(+), 5 deletions(-) create mode 100644 helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java new file mode 100644 index 0000000000..c4fc420e18 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java @@ -0,0 +1,6 @@ +package org.apache.helix.gateway.api.constant; + +public class gatewayServiceManagerConstant { + public static final String TARGET_STATE_ASSIGNMENT_KEY_NAME = "Assignment"; + public static final String TIMESTAMP_KEY = "Timestamp"; +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index 9d4430ba1a..285045b15f 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -19,6 +19,7 @@ * under the License. */ +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableSet; @@ -36,6 +37,11 @@ import org.apache.helix.gateway.participant.HelixGatewayParticipant; import org.apache.helix.gateway.util.GatewayCurrentStateCache; import org.apache.helix.gateway.util.PerKeyBlockingExecutor; +import org.apache.helix.gateway.util.PollChannelUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.helix.gateway.api.constant.gatewayServiceManagerConstant.*; /** @@ -46,6 +52,7 @@ * 4. For ST reply message, update the tracker */ public class GatewayServiceManager { + private static final Logger logger = LoggerFactory.getLogger(GatewayServiceManager.class); private static final ObjectMapper objectMapper = new ObjectMapper(); public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10; public static final ImmutableSet SUPPORTED_MULTI_STATE_MODEL_TYPES = @@ -128,12 +135,14 @@ public void updateCurrentState(String clusterName, String instanceName, String r public synchronized String serializeTargetState() { ObjectNode targetStateNode = new ObjectMapper().createObjectNode(); + ObjectNode res = new ObjectMapper().createObjectNode(); for (String clusterName : _currentStateCacheMap.keySet()) { // add the json node to the target state node targetStateNode.set(clusterName, getOrCreateCache(clusterName).serializeTargetAssignmentsToJSONNode()); } - targetStateNode.set("timestamp", objectMapper.valueToTree(System.currentTimeMillis())); - return targetStateNode.toString(); + res.set(TARGET_STATE_ASSIGNMENT_KEY_NAME, targetStateNode); + res.set(TIMESTAMP_KEY, objectMapper.valueToTree(System.currentTimeMillis())); + return res.toString(); } public void updateTargetState(String clusterName, String instanceName, String resourceId, String shardId, @@ -162,7 +171,7 @@ private ShardStateUpdator(GatewayServiceEvent event) { @Override public void run() { - System.out.println("Processing state transition result " + _event.getInstanceName()); + logger.info("Processing state transition result " + _event.getInstanceName()); HelixGatewayParticipant participant = getHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName()); if (participant == null) { diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java index bbec7f3e4d..8d785aac0a 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -59,6 +60,14 @@ public String getTargetState(String instance, String resource, String shard) { return shardStateMap == null ? null : shardStateMap.getState(resource, shard); } + public synchronized Map>> getAllTargetStates() { + Map>> result = new HashMap<>(); + for (Map.Entry entry : _targetStateMap.entrySet()) { + result.put(entry.getKey(), new HashMap<>(entry.getValue()._stateMap)); + } + return result; + } + /** * Update the cached current state of instances in a cluster, and return the diff of the change. * @param userCurrentStateMap The new current state map of instances in the cluster @@ -141,8 +150,8 @@ public synchronized void resetTargetStateCache(String instance) { } public static class ShardStateMap { + // resource -> shard -> state Map> _stateMap; - ObjectNode root = mapper.createObjectNode(); public ShardStateMap(Map> stateMap) { _stateMap = new HashMap<>(stateMap); diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java index 593bec2587..68e8fb2aa6 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java @@ -128,4 +128,20 @@ public long getLastUpdatedTime(){ return _lastUpdatedTime; } } + + /** + * Target assignments representation as JSON + */ + public static class TargetAssignment { + // cluster -> instance -> resource -> shard -> state + @JsonProperty ("Assignment") + String _targetAssignment; + @JsonProperty ("Timestamp") + long _timestamp; + + public TargetAssignment(String targetAssignment, long timestamp){ + _targetAssignment = targetAssignment; + _timestamp = timestamp; + } + } } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java b/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java index 3066dcc91e..ae2f0cde78 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java @@ -228,7 +228,7 @@ private void verifyTargetState() throws Exception { int finalI = i; Assert.assertTrue(TestHelper.verify(() -> { String content = Files.readString(targetPaths.get(finalI)); - return content.contains("{\"TestDB\":{\"TestDB_0\":\"ONLINE\"}}}"); + return content.contains("{\"TestDB\":{\"TestDB_0\":\"ONLINE\"}}"); }, TestHelper.WAIT_DURATION)); } }