diff --git a/helix-gateway/pom.xml b/helix-gateway/pom.xml
index 8a7c50ed71..62e927d11f 100644
--- a/helix-gateway/pom.xml
+++ b/helix-gateway/pom.xml
@@ -86,6 +86,12 @@
org.apache.helix
helix-core
+
+ org.apache.helix
+ helix-core
+ test-jar
+ test
+
io.grpc
grpc-services
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
similarity index 60%
rename from helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
rename to helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
index 814cfb0d0f..fe57e69c92 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
@@ -1,4 +1,4 @@
-package org.apache.helix.gateway.service;
+package org.apache.helix.gateway.api.service;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,11 +19,22 @@
* under the License.
*/
+import org.apache.helix.model.Message;
+
/**
- * Translate from/to GRPC function call to Helix Gateway Service event.
+ * Helix Gateway Service Processor interface allows sending state transition messages to
+ * participants through service implementing this interface.
*/
public interface HelixGatewayServiceProcessor {
- public boolean sendStateTransitionMessage( String instanceName);
+ /**
+ * Send a state transition message to a remote participant.
+ *
+ * @param instanceName the name of the participant
+ * @param currentState the current state of the shard
+ * @param message the message to send
+ */
+ void sendStateTransitionMessage(String instanceName, String currentState,
+ Message message);
}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java
deleted file mode 100644
index 528b28e2fb..0000000000
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.helix.gateway.constant;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public enum MessageStatus {
- SUCCESS, FAILURE
-}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java
deleted file mode 100644
index 49619dec81..0000000000
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.helix.gateway.constant;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public enum MessageType {
- ADD, REMOVE, CHANGE_ROLE
-}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
index 09fe3d07b6..018d6591e4 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
@@ -26,9 +26,10 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.gateway.service.GatewayServiceEvent;
import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
import org.apache.helix.gateway.util.PerKeyLockRegistry;
import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import org.apache.helix.model.Message;
import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
@@ -59,8 +60,9 @@ public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
/**
* Grpc service end pint.
* Application instances Report the state of the shard or result of transition request to the gateway service.
- * @param responseObserver
- * @return
+ *
+ * @param responseObserver the observer to send the response to the client
+ * @return the observer to receive the state of the shard or result of transition request
*/
@Override
public StreamObserver report(
@@ -92,17 +94,20 @@ public void onCompleted() {
/**
* Send state transition message to the instance.
* The instance must already have established a connection to the gateway service.
- * @param instanceName
- * @return
+ *
+ * @param instanceName the instance name to send the message to
+ * @param currentState the current state of shard
+ * @param message the message to convert to the transition message
*/
@Override
- public boolean sendStateTransitionMessage(String instanceName) {
+ public void sendStateTransitionMessage(String instanceName, String currentState,
+ Message message) {
StreamObserver observer;
observer = _observerMap.get(instanceName);
if (observer != null) {
- observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage());
+ observer.onNext(
+ StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(message));
}
- return true;
}
private void updateObserver(String instanceName, String clusterName,
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
new file mode 100644
index 0000000000..6405529603
--- /dev/null
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
@@ -0,0 +1,245 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+/**
+ * HelixGatewayParticipant encapsulates the Helix Participant Manager and handles tracking the state
+ * of a remote participant connected to the Helix Gateway Service. It processes state transitions
+ * for the participant and updates the state of the participant's shards upon successful state
+ * transitions signaled by remote participant.
+ */
+public class HelixGatewayParticipant {
+ public static final String UNASSIGNED_STATE = "UNASSIGNED";
+ private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+ private final HelixManager _participantManager;
+ private final Map> _shardStateMap;
+ private final Map> _stateTransitionResultMap;
+
+ private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor,
+ HelixManager participantManager, Map> initialShardStateMap) {
+ _gatewayServiceProcessor = gatewayServiceProcessor;
+ _participantManager = participantManager;
+ _shardStateMap = initialShardStateMap;
+ _stateTransitionResultMap = new ConcurrentHashMap<>();
+ }
+
+ public void processStateTransitionMessage(Message message) throws Exception {
+ String transitionId = message.getMsgId();
+ String resourceId = message.getResourceName();
+ String shardId = message.getPartitionName();
+ String toState = message.getToState();
+
+ try {
+ if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+ return;
+ }
+
+ CompletableFuture future = new CompletableFuture<>();
+ _stateTransitionResultMap.put(transitionId, future);
+ _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+ getCurrentState(resourceId, shardId), message);
+
+ if (!future.get()) {
+ throw new Exception("Failed to transition to state " + toState);
+ }
+
+ updateState(resourceId, shardId, toState);
+ } finally {
+ _stateTransitionResultMap.remove(transitionId);
+ }
+ }
+
+ public void handleStateTransitionError(Message message, StateTransitionError error) {
+ // Remove the stateTransitionResultMap future for the message
+ String transitionId = message.getMsgId();
+ String resourceId = message.getResourceName();
+ String shardId = message.getPartitionName();
+
+ // Remove the future from the stateTransitionResultMap since we are no longer able
+ // to process the state transition due to participant manager either timing out
+ // or failing to process the state transition
+ _stateTransitionResultMap.remove(transitionId);
+
+ // Set the replica state to ERROR
+ updateState(resourceId, shardId, HelixDefinedState.ERROR.name());
+
+ // Notify the HelixGatewayParticipantClient that it is in ERROR state
+ // TODO: We need a better way than sending the state transition with a toState of ERROR
+ }
+
+ /**
+ * Get the instance name of the participant.
+ *
+ * @return participant instance name
+ */
+ public String getInstanceName() {
+ return _participantManager.getInstanceName();
+ }
+
+ /**
+ * Completes the state transition with the given transitionId.
+ *
+ * @param transitionId the transitionId to complete
+ * @param isSuccess whether the state transition was successful
+ */
+ public void completeStateTransition(String transitionId, boolean isSuccess) {
+ CompletableFuture future = _stateTransitionResultMap.get(transitionId);
+ if (future != null) {
+ future.complete(isSuccess);
+ }
+ }
+
+ private boolean isCurrentStateAlreadyTarget(String resourceId, String shardId,
+ String targetState) {
+ return getCurrentState(resourceId, shardId).equals(targetState);
+ }
+
+ @VisibleForTesting
+ public Map> getShardStateMap() {
+ return _shardStateMap;
+ }
+
+ /**
+ * Get the current state of the shard.
+ *
+ * @param resourceId the resource id
+ * @param shardId the shard id
+ * @return the current state of the shard or DROPPED if it does not exist
+ */
+ public String getCurrentState(String resourceId, String shardId) {
+ return getShardStateMap().getOrDefault(resourceId, Collections.emptyMap())
+ .getOrDefault(shardId, UNASSIGNED_STATE);
+ }
+
+ private void updateState(String resourceId, String shardId, String state) {
+ if (state.equals(HelixDefinedState.DROPPED.name())) {
+ getShardStateMap().computeIfPresent(resourceId, (k, v) -> {
+ v.remove(shardId);
+ if (v.isEmpty()) {
+ return null;
+ }
+ return v;
+ });
+ } else {
+ getShardStateMap().computeIfAbsent(resourceId, k -> new ConcurrentHashMap<>())
+ .put(shardId, state);
+ }
+ }
+
+ public void disconnect() {
+ _participantManager.disconnect();
+ }
+
+ public static class Builder {
+ private final HelixGatewayServiceProcessor _helixGatewayServiceProcessor;
+ private final String _instanceName;
+ private final String _clusterName;
+ private final String _zkAddress;
+ private final List _multiTopStateModelDefinitions;
+ private final Map> _initialShardStateMap;
+
+ public Builder(HelixGatewayServiceProcessor helixGatewayServiceProcessor, String instanceName,
+ String clusterName, String zkAddress) {
+ _helixGatewayServiceProcessor = helixGatewayServiceProcessor;
+ _instanceName = instanceName;
+ _clusterName = clusterName;
+ _zkAddress = zkAddress;
+ _multiTopStateModelDefinitions = new ArrayList<>();
+ _initialShardStateMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Add a multi-top state model definition to the participant to be registered in the
+ * participant's state machine engine.
+ *
+ * @param stateModelDefinitionName the state model definition name to add (should be multi-top
+ * state model)
+ * @return the builder
+ */
+ public Builder addMultiTopStateStateModelDefinition(String stateModelDefinitionName) {
+ // TODO: Add validation that the state model definition is a multi-top state model
+ _multiTopStateModelDefinitions.add(stateModelDefinitionName);
+ return this;
+ }
+
+ /**
+ * Add initial shard state to the participant. This is used to initialize the participant with
+ * the initial state of the shards in order to reduce unnecessary state transitions from being
+ * forwarded to the participant.
+ *
+ * @param initialShardStateMap the initial shard state map to add
+ * @return the Builder
+ */
+ public Builder setInitialShardState(Map> initialShardStateMap) {
+ // TODO: Add handling for shard states that where never assigned to the participant since
+ // the participant was last online.
+ // deep copy into the initialShardStateMap into concurrent hash map
+ initialShardStateMap.forEach((resourceId, shardStateMap) -> {
+ _initialShardStateMap.put(resourceId, new ConcurrentHashMap<>(shardStateMap));
+ });
+
+ return this;
+ }
+
+ /**
+ * Build the HelixGatewayParticipant. This will create a HelixManager for the participant and
+ * connect to the Helix cluster. The participant will be registered with the multi-top state
+ * model definitions and initialized with the initial shard state map.
+ *
+ * @return the HelixGatewayParticipant
+ */
+ public HelixGatewayParticipant build() {
+ HelixManager participantManager =
+ new ZKHelixManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, _zkAddress);
+ HelixGatewayParticipant participant =
+ new HelixGatewayParticipant(_helixGatewayServiceProcessor, participantManager,
+ _initialShardStateMap);
+ _multiTopStateModelDefinitions.forEach(
+ stateModelDefinition -> participantManager.getStateMachineEngine()
+ .registerStateModelFactory(stateModelDefinition,
+ new HelixGatewayMultiTopStateStateModelFactory(participant)));
+ try {
+ participantManager.connect();
+ } catch (Exception e) {
+ // TODO: When API for gracefully triggering disconnect from remote participant
+ // is available, we should call it here instead of throwing exception.
+ throw new RuntimeException(e);
+ }
+ return participant;
+ }
+ }
+}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
index 5745dbf032..b919429b91 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
@@ -40,20 +40,20 @@ public class GatewayServiceEvent {
public static class StateTransitionResult {
private String stateTransitionId;
- private String stateTransitionStatus;
+ private boolean isSuccess;
private String shardState;
- public StateTransitionResult(String stateTransitionId, String stateTransitionStatus, String shardState) {
+ public StateTransitionResult(String stateTransitionId, boolean isSuccess, String shardState) {
this.stateTransitionId = stateTransitionId;
- this.stateTransitionStatus = stateTransitionStatus;
+ this.isSuccess = isSuccess;
this.shardState = shardState;
}
public String getStateTransitionId() {
return stateTransitionId;
}
- public String getStateTransitionStatus() {
- return stateTransitionStatus;
+ public boolean getIsSuccess() {
+ return isSuccess;
}
public String getShardState() {
return shardState;
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index 830bb97c6b..85a274156b 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -19,14 +19,17 @@
* under the License.
*/
-import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
import org.apache.helix.gateway.constant.GatewayServiceEventType;
import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
+import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
@@ -37,40 +40,35 @@
* 3. On init connect, create the participant manager
* 4. For ST reply message, update the tracker
*/
-
public class GatewayServiceManager {
public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10;
- private final Map _helixGatewayServiceMap;
+ public static final ImmutableSet SUPPORTED_MULTI_STATE_MODEL_TYPES =
+ ImmutableSet.of("OnlineOffline");
+ private final Map> _helixGatewayParticipantMap;
+ private final String _zkAddress;
// a single thread tp for event processing
private final ExecutorService _participantStateTransitionResultUpdator;
// link to grpc service
- private final HelixGatewayServiceGrpcService _grpcService;
+ private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
// a per key executor for connection event. All event for the same instance will be executed in sequence.
// It is used to ensure for each instance, the connect/disconnect event won't start until the previous one is done.
private final PerKeyBlockingExecutor _connectionEventProcessor;
public GatewayServiceManager() {
- _helixGatewayServiceMap = new ConcurrentHashMap<>();
+ _helixGatewayParticipantMap = new ConcurrentHashMap<>();
+ _zkAddress = "foo";
_participantStateTransitionResultUpdator = Executors.newSingleThreadExecutor();
- _grpcService = new HelixGatewayServiceGrpcService(this);
+ _gatewayServiceProcessor = new HelixGatewayServiceGrpcService(this);
_connectionEventProcessor =
new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable
}
/**
- * send state transition message to application instance
- * @return
- */
- public AtomicBoolean sendTransitionRequestToApplicationInstance() {
- // TODO: add param
- return null;
- }
-
- /**
- * Process the event from Grpc service
+ * Process the event from Grpc service and dispatch to async executor for processing.
+ *
* @param event
*/
public void newGatewayServiceEvent(GatewayServiceEvent event) {
@@ -86,20 +84,24 @@ public void newGatewayServiceEvent(GatewayServiceEvent event) {
*/
class shardStateUpdator implements Runnable {
- GatewayServiceEvent _event;
+ private final GatewayServiceEvent _event;
- public shardStateUpdator(GatewayServiceEvent event) {
+ private shardStateUpdator(GatewayServiceEvent event) {
_event = event;
}
@Override
public void run() {
- HelixGatewayService helixGatewayService = _helixGatewayServiceMap.get(_event.getClusterName());
- if (helixGatewayService == null) {
+ HelixGatewayParticipant participant =
+ getHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName());
+ if (participant == null) {
// TODO: return error code and throw exception.
return;
}
- helixGatewayService.receiveSTResponse();
+ _event.getStateTransitionResult().forEach(stateTransitionResult -> {
+ participant.completeStateTransition(stateTransitionResult.getStateTransitionId(),
+ stateTransitionResult.getIsSuccess());
+ });
}
}
@@ -108,33 +110,47 @@ public void run() {
* It includes waiting for ZK connection, and also wait for previous LiveInstance to expire.
*/
class participantConnectionProcessor implements Runnable {
- GatewayServiceEvent _event;
+ private final GatewayServiceEvent _event;
- public participantConnectionProcessor(GatewayServiceEvent event) {
+ private participantConnectionProcessor(GatewayServiceEvent event) {
_event = event;
}
@Override
public void run() {
- HelixGatewayService helixGatewayService;
- _helixGatewayServiceMap.computeIfAbsent(_event.getClusterName(),
- k -> new HelixGatewayService(GatewayServiceManager.this, _event.getClusterName()));
- helixGatewayService = _helixGatewayServiceMap.get(_event.getClusterName());
if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {
- helixGatewayService.registerParticipant();
+ createHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName(),
+ _event.getShardStateMap());
} else {
- helixGatewayService.deregisterParticipant(_event.getClusterName(), _event.getInstanceName());
+ removeHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName());
}
}
}
- @VisibleForTesting
- HelixGatewayServiceGrpcService getGrpcService() {
- return _grpcService;
+ private void createHelixGatewayParticipant(String clusterName, String instanceName,
+ Map> initialShardStateMap) {
+ // Create and add the participant to the participant map
+ HelixGatewayParticipant.Builder participantBuilder =
+ new HelixGatewayParticipant.Builder(_gatewayServiceProcessor, instanceName, clusterName,
+ _zkAddress).setInitialShardState(initialShardStateMap);
+ SUPPORTED_MULTI_STATE_MODEL_TYPES.forEach(
+ participantBuilder::addMultiTopStateStateModelDefinition);
+ _helixGatewayParticipantMap.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>())
+ .put(instanceName, participantBuilder.build());
+ }
+
+ private void removeHelixGatewayParticipant(String clusterName, String instanceName) {
+ // Disconnect and remove the participant from the participant map
+ HelixGatewayParticipant participant = getHelixGatewayParticipant(clusterName, instanceName);
+ if (participant != null) {
+ participant.disconnect();
+ _helixGatewayParticipantMap.get(clusterName).remove(instanceName);
+ }
}
- @VisibleForTesting
- HelixGatewayService getHelixGatewayService(String clusterName) {
- return _helixGatewayServiceMap.get(clusterName);
+ private HelixGatewayParticipant getHelixGatewayParticipant(String clusterName,
+ String instanceName) {
+ return _helixGatewayParticipantMap.getOrDefault(clusterName, Collections.emptyMap())
+ .get(instanceName);
}
}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
index dce5a44a09..aed7518b93 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
@@ -23,7 +23,6 @@
* Factory class to create GatewayServiceManager
*/
public class GatewayServiceManagerFactory {
-
public GatewayServiceManager createGatewayServiceManager() {
return new GatewayServiceManager();
}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
deleted file mode 100644
index 2ef35820cb..0000000000
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package org.apache.helix.gateway.service;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModelFactory;
-import org.apache.helix.manager.zk.ZKHelixManager;
-
-
-/**
- * A service object for each Helix cluster.
- * This service object manages the Helix participants in the cluster.
- */
-public class HelixGatewayService {
- final private Map> _participantsMap;
-
- final private String _zkAddress;
- private final GatewayServiceManager _gatewayServiceManager;
- private Map> _flagMap;
- public HelixGatewayService(GatewayServiceManager gatewayServiceManager, String zkAddress) {
- _participantsMap = new ConcurrentHashMap<>();
- _zkAddress = zkAddress;
- _gatewayServiceManager = gatewayServiceManager;
- _flagMap = new ConcurrentHashMap<>();
- }
-
- public GatewayServiceManager getClusterManager() {
- return _gatewayServiceManager;
- }
-
- /**
- * Register a participant to the Helix cluster.
- * It creates a HelixParticipantManager and connects to the Helix controller.
- */
- public void registerParticipant() {
- // TODO: create participant manager and add to _participantsMap
- HelixManager manager = new ZKHelixManager("clusterName", "instanceName", InstanceType.PARTICIPANT, _zkAddress);
- manager.getStateMachineEngine()
- .registerStateModelFactory("OnlineOffline", new HelixGatewayOnlineOfflineStateModelFactory(_gatewayServiceManager));
- try {
- manager.connect();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Deregister a participant from the Helix cluster when app instance is gracefully stopped or connection lost.
- * @param clusterName
- * @param participantName
- */
- public void deregisterParticipant(String clusterName, String participantName) {
- HelixManager manager = _participantsMap.get(clusterName).remove(participantName);
- if (manager != null) {
- manager.disconnect();
- removeChannel(participantName);
- }
- }
-
- public void addChannel() {
- // _flagMap.computeIfAbsent(mockApplication.getInstanceName(), k -> new ConcurrentHashMap<>());
- }
-
- public void removeChannel(String instanceName) {
- _flagMap.remove(instanceName);
- }
-
- public AtomicBoolean sendMessage() {
- AtomicBoolean flag = new AtomicBoolean(false);
- return flag;
- }
-
- /**
- * Entry point for receive the state transition response from the participant.
- * It will update in memory state accordingly.
- */
- public void receiveSTResponse() {
- // AtomicBoolean flag = _flagMap.get(instanceName).remove(response.getMessageId());
- }
-
- /**
- * Stop the HelixGatewayService.
- * It stops all participants in the cluster.
- */
- public void stop() {
- // TODO: stop all participants
- System.out.println("Stopping Helix Gateway Service");
- }
-}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java
new file mode 100644
index 0000000000..37de51b420
--- /dev/null
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java
@@ -0,0 +1,60 @@
+package org.apache.helix.gateway.statemodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.gateway.participant.HelixGatewayParticipant;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@StateModelInfo(initialState = "OFFLINE", states = {})
+public class HelixGatewayMultiTopStateStateModel extends StateModel {
+ private static final Logger _logger =
+ LoggerFactory.getLogger(HelixGatewayMultiTopStateStateModel.class);
+
+ private final HelixGatewayParticipant _helixGatewayParticipant;
+
+ public HelixGatewayMultiTopStateStateModel(
+ HelixGatewayParticipant helixGatewayParticipant) {
+ _helixGatewayParticipant = helixGatewayParticipant;
+ }
+
+ @Transition(to = "*", from = "*")
+ public void genericStateTransitionHandler(Message message, NotificationContext context)
+ throws Exception {
+ _helixGatewayParticipant.processStateTransitionMessage(message);
+ }
+
+ @Override
+ public void reset() {
+ // no-op we don't want to start from init state again.
+ }
+
+ @Override
+ public void rollbackOnError(Message message, NotificationContext context,
+ StateTransitionError error) {
+ _helixGatewayParticipant.handleStateTransitionError(message, error);
+ }
+}
\ No newline at end of file
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java
similarity index 63%
rename from helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
rename to helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java
index 7550fef510..64662998e3 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java
@@ -19,19 +19,20 @@
* under the License.
*/
-import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.participant.statemachine.StateModelFactory;
-public class HelixGatewayOnlineOfflineStateModelFactory extends StateModelFactory {
- private GatewayServiceManager _clusterManager;
+public class HelixGatewayMultiTopStateStateModelFactory extends StateModelFactory {
+ private final HelixGatewayParticipant _helixGatewayParticipant;
- public HelixGatewayOnlineOfflineStateModelFactory(GatewayServiceManager clusterManager) {
- _clusterManager = clusterManager;
+ public HelixGatewayMultiTopStateStateModelFactory(
+ HelixGatewayParticipant helixGatewayParticipant) {
+ _helixGatewayParticipant = helixGatewayParticipant;
}
@Override
- public HelixGatewayOnlineOfflineStateModel createNewStateModel(String resourceName,
+ public HelixGatewayMultiTopStateStateModel createNewStateModel(String resourceName,
String partitionKey) {
- return new HelixGatewayOnlineOfflineStateModel(resourceName, partitionKey, _clusterManager);
+ return new HelixGatewayMultiTopStateStateModel(_helixGatewayParticipant);
}
}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
deleted file mode 100644
index 4585ea3780..0000000000
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.gateway.statemodel;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-
-public class HelixGatewayOnlineOfflineStateModel extends StateModel {
- private boolean _firstTime = true;
- private GatewayServiceManager _gatewayServiceManager;
-
- private String _resourceName;
- private String _partitionKey;
-
- private AtomicBoolean _completed;
-
- public HelixGatewayOnlineOfflineStateModel(String resourceName, String partitionKey,
- GatewayServiceManager gatewayServiceManager) {
- _resourceName = resourceName;
- _partitionKey = partitionKey;
- _gatewayServiceManager = gatewayServiceManager;
- }
-
- public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
- if (_firstTime) {
- wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
- System.out.println(
- "Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with ADD for "
- + message.getResourceName() + " processed");
- _firstTime = false;
- }
- wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
- System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName()
- + " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName() + " processed");
- }
-
- public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
- wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
- System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName()
- + " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName() + " processed");
- }
-
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
- wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
- System.out.println(
- "Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with REMOVE for "
- + message.getResourceName() + " processed");
- }
-
- private void wait(AtomicBoolean completed) {
- _completed = completed;
- while (true) {
- try {
- if (_completed.get()) {
- break;
- }
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
index 383f38b58d..ecc6c95683 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
@@ -23,8 +23,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
+import org.apache.helix.HelixDefinedState;
import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.model.Message;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
@@ -33,27 +37,63 @@
public final class StateTransitionMessageTranslateUtil {
+ /**
+ * Determine the transition type based on the current state and the target state.
+ *
+ * @param currentState current state
+ * @param toState target state
+ * @return TransitionType
+ */
+ public static HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType translateStatesToTransitionType(
+ String currentState, String toState) {
+ boolean isUnassigned = HelixGatewayParticipant.UNASSIGNED_STATE.equals(currentState);
+ boolean isToStateDropped = HelixDefinedState.DROPPED.name().equals(toState);
- public static TransitionMessage translateSTMsgToTransitionMessage() {
- return null;
+ if (isToStateDropped && !isUnassigned) {
+ return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD;
+ }
+ if (!isToStateDropped && isUnassigned) {
+ return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD;
+ }
+ return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE;
+ }
+
+ /**
+ * Translate from Helix ST Message to Helix Gateway Service TransitionMessage.
+ *
+ * @param message Message
+ * @return TransitionMessage
+ */
+ public static TransitionMessage translateSTMsgToTransitionMessage(Message message) {
+ return TransitionMessage.newBuilder().addRequest(
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.newBuilder()
+ .setTransitionID(message.getMsgId()).setTransitionType(
+ translateStatesToTransitionType(message.getFromState(), message.getToState()))
+ .setResourceID(message.getResourceName()).setShardID(message.getPartitionName())
+ .setTargetState(message.getToState()).build()).build();
}
/**
* Translate from user sent ShardStateMessage message to Helix Gateway Service event.
+ *
+ * @param request ShardStateMessage message
+ * contains the state of each shard upon connection or result of state transition request.
+ * @return GatewayServiceEvent
*/
public static GatewayServiceEvent translateShardStateMessageToEvent(ShardStateMessage request) {
-
GatewayServiceEvent.GateWayServiceEventBuilder builder;
if (request.hasShardState()) { // init connection to gateway service
ShardState shardState = request.getShardState();
- Map shardStateMap = new HashMap<>();
+ Map> shardStateMap = new HashMap<>();
for (HelixGatewayServiceOuterClass.SingleResourceState resourceState : shardState.getResourceStateList()) {
for (HelixGatewayServiceOuterClass.SingleShardState state : resourceState.getShardStatesList()) {
- shardStateMap.put(resourceState.getResource() + "_" + state.getShardName(), state.getCurrentState());
+ shardStateMap.computeIfAbsent(resourceState.getResource(), k -> new HashMap<>())
+ .put(state.getShardName(), state.getCurrentState());
}
}
builder = new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(
- shardState.getClusterName()).setParticipantName(shardState.getInstanceName());
+ shardState.getClusterName()).setParticipantName(shardState.getInstanceName())
+ .setShardStateMap(shardStateMap);
} else {
ShardTransitionStatus shardTransitionStatus = request.getShardTransitionStatus();
// this is status update for established connection
@@ -63,7 +103,7 @@ public static GatewayServiceEvent translateShardStateMessageToEvent(ShardStateMe
for (HelixGatewayServiceOuterClass.SingleShardTransitionStatus shardTransition : status) {
GatewayServiceEvent.StateTransitionResult result =
new GatewayServiceEvent.StateTransitionResult(shardTransition.getTransitionID(),
- shardTransition.getCurrentState(), shardTransition.getCurrentState());
+ shardTransition.getIsSuccess(), shardTransition.getCurrentState());
stResult.add(result);
}
builder = new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.UPDATE).setClusterName(
@@ -75,9 +115,12 @@ public static GatewayServiceEvent translateShardStateMessageToEvent(ShardStateMe
}
/**
- * Translate termination event to GatewayServiceEvent.
+ * Translate from client close to Helix Gateway Service event.
+ *
+ * @param instanceName the instance name to send the message to
+ * @param clusterName the cluster name
+ * @return GatewayServiceEvent
*/
-
public static GatewayServiceEvent translateClientCloseToEvent(String instanceName, String clusterName) {
GatewayServiceEvent.GateWayServiceEventBuilder builder =
new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.DISCONNECT).setClusterName(
diff --git a/helix-gateway/src/main/proto/HelixGatewayService.proto b/helix-gateway/src/main/proto/HelixGatewayService.proto
index d32423a34d..e7db5473e9 100644
--- a/helix-gateway/src/main/proto/HelixGatewayService.proto
+++ b/helix-gateway/src/main/proto/HelixGatewayService.proto
@@ -30,11 +30,10 @@ message SingleTransitionMessage {
TransitionType transitionType = 2; // Transition type for shard operations
string resourceID = 3; // Resource ID
string shardID = 4; // Shard to perform operation
- optional string startState = 5; // Shard start state
string targetState = 6; // Shard target state.
}
-message TransitionMessage{
+message TransitionMessage {
repeated SingleTransitionMessage request = 1;
}
diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
new file mode 100644
index 0000000000..fda7fbb1be
--- /dev/null
+++ b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
@@ -0,0 +1,333 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+public class TestHelixGatewayParticipant extends ZkTestBase {
+ private static final String CLUSTER_NAME = TestHelixGatewayParticipant.class.getSimpleName();
+ private static final int START_NUM_NODE = 2;
+ private static final String TEST_DB = "TestDB";
+ private static final String TEST_STATE_MODEL = "OnlineOffline";
+ private static final String CONTROLLER_PREFIX = "controller";
+ private static final String PARTICIPANT_PREFIX = "participant";
+
+ private ZkHelixClusterVerifier _clusterVerifier;
+ private ClusterControllerManager _controller;
+ private int _nextStartPort = 12000;
+ private final List _participants = Lists.newArrayList();
+ private final Map _pendingMessageMap = new ConcurrentHashMap<>();
+
+ @BeforeClass
+ public void beforeClass() {
+ // Set up the Helix cluster
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.getRecord().setSimpleField(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "true");
+ configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+ // Start initial participants
+ for (int i = 0; i < START_NUM_NODE; i++) {
+ addParticipant();
+ }
+
+ // Start the controller
+ String controllerName = CONTROLLER_PREFIX + '_' + CLUSTER_NAME;
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ // Enable best possible assignment persistence
+ enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+ }
+
+ @AfterClass
+ public void afterClass() {
+ // Clean up by disconnecting the controller and participants
+ _controller.disconnect();
+ for (HelixGatewayParticipant participant : _participants) {
+ participant.disconnect();
+ }
+ }
+
+ /**
+ * Add a participant with a specific initial state map.
+ */
+ private HelixGatewayParticipant addParticipant(String participantName,
+ Map> initialShardMap) {
+ HelixGatewayParticipant participant = new HelixGatewayParticipant.Builder(
+ new MockHelixGatewayServiceProcessor(_pendingMessageMap), participantName, CLUSTER_NAME,
+ ZK_ADDR).addMultiTopStateStateModelDefinition(TEST_STATE_MODEL)
+ .setInitialShardState(initialShardMap).build();
+ _participants.add(participant);
+ return participant;
+ }
+
+ /**
+ * Add a participant with an empty initial state map.
+ */
+ private HelixGatewayParticipant addParticipant() {
+ String participantName = PARTICIPANT_PREFIX + "_" + _nextStartPort++;
+ return addParticipant(participantName, Collections.emptyMap());
+ }
+
+ /**
+ * Remove a participant from the cluster.
+ */
+ private void deleteParticipant(HelixGatewayParticipant participant) {
+ participant.disconnect();
+ _participants.remove(participant);
+ }
+
+ /**
+ * Add a participant to the IdealState's preference list.
+ */
+ private void addToPreferenceList(HelixGatewayParticipant participant) {
+ IdealState idealState =
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+ idealState.getPreferenceLists().values()
+ .forEach(preferenceList -> preferenceList.add(participant.getInstanceName()));
+ idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas()) + 1));
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, TEST_DB, idealState);
+ }
+
+ /**
+ * Remove a participant from the IdealState's preference list.
+ */
+ private void removeFromPreferenceList(HelixGatewayParticipant participant) {
+ IdealState idealState =
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+ idealState.getPreferenceLists().values()
+ .forEach(preferenceList -> preferenceList.remove(participant.getInstanceName()));
+ idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas()) - 1));
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, TEST_DB, idealState);
+ }
+
+ /**
+ * Create a test database in the cluster with a semi-automatic state model.
+ */
+ private void createDB() {
+ createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB,
+ _participants.stream().map(HelixGatewayParticipant::getInstanceName)
+ .collect(Collectors.toList()), TEST_STATE_MODEL, 1, _participants.size());
+
+ _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setResources(new HashSet<>(
+ _gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)))
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+ }
+
+ /**
+ * Retrieve a pending message for a specific participant.
+ */
+ private Message getPendingMessage(String instanceName) {
+ return _pendingMessageMap.get(instanceName);
+ }
+
+ /**
+ * Process the pending message for a participant.
+ */
+ private void processPendingMessage(HelixGatewayParticipant participant, boolean isSuccess) {
+ Message message = _pendingMessageMap.remove(participant.getInstanceName());
+ participant.completeStateTransition(message.getMsgId(), isSuccess);
+ }
+
+ /**
+ * Get the current state of a Helix shard.
+ */
+ private String getHelixCurrentState(String instanceName, String resourceName,
+ String shardId) {
+ return _gSetupTool.getClusterManagementTool()
+ .getResourceExternalView(CLUSTER_NAME, resourceName).getStateMap(shardId)
+ .getOrDefault(instanceName, HelixGatewayParticipant.UNASSIGNED_STATE);
+ }
+
+ /**
+ * Verify that all specified participants have pending messages.
+ */
+ private void verifyPendingMessages(List participants) throws Exception {
+ Assert.assertTrue(TestHelper.verify(() -> participants.stream()
+ .allMatch(participant -> getPendingMessage(participant.getInstanceName()) != null),
+ TestHelper.WAIT_DURATION));
+ }
+
+ /**
+ * Verify that the gateway state matches the Helix state for all participants.
+ */
+ private void verifyGatewayStateMatchesHelixState() throws Exception {
+ Assert.assertTrue(TestHelper.verify(() -> _participants.stream().allMatch(participant -> {
+ String instanceName = participant.getInstanceName();
+ for (String resourceName : _gSetupTool.getClusterManagementTool()
+ .getResourcesInCluster(CLUSTER_NAME)) {
+ for (String shardId : _gSetupTool.getClusterManagementTool()
+ .getResourceIdealState(CLUSTER_NAME, resourceName).getPartitionSet()) {
+ String helixCurrentState =
+ getHelixCurrentState(instanceName, resourceName, shardId);
+ if (!participant.getCurrentState(resourceName, shardId).equals(helixCurrentState)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }), TestHelper.WAIT_DURATION));
+ }
+
+ /**
+ * Verify that all shards for a given instance are in a specific state.
+ */
+ private void verifyHelixPartitionStates(String instanceName, String state) throws Exception {
+ Assert.assertTrue(TestHelper.verify(() -> {
+ for (String resourceName : _gSetupTool.getClusterManagementTool()
+ .getResourcesInCluster(CLUSTER_NAME)) {
+ for (String shardId : _gSetupTool.getClusterManagementTool()
+ .getResourceIdealState(CLUSTER_NAME, resourceName).getPartitionSet()) {
+ if (!getHelixCurrentState(instanceName, resourceName, shardId).equals(state)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }, TestHelper.WAIT_DURATION));
+ }
+
+ @Test
+ public void testProcessStateTransitionMessageSuccess() throws Exception {
+ createDB();
+ verifyPendingMessages(_participants);
+
+ // Verify that all pending messages have the toState "ONLINE"
+ for (HelixGatewayParticipant participant : _participants) {
+ Message message = getPendingMessage(participant.getInstanceName());
+ Assert.assertNotNull(message);
+ Assert.assertEquals(message.getToState(), "ONLINE");
+ }
+
+ // Process all pending messages successfully
+ for (HelixGatewayParticipant participant : _participants) {
+ processPendingMessage(participant, true);
+ }
+
+ // Verify that the cluster converges and all states are "ONLINE"
+ Assert.assertTrue(_clusterVerifier.verify());
+ verifyGatewayStateMatchesHelixState();
+ }
+
+ @Test(dependsOnMethods = "testProcessStateTransitionMessageSuccess")
+ public void testProcessStateTransitionMessageFailure() throws Exception {
+ // Add a new participant and include it in the preference list
+ HelixGatewayParticipant participant = addParticipant();
+ addToPreferenceList(participant);
+ verifyPendingMessages(List.of(participant));
+
+ // Verify the pending message has the toState "ONLINE"
+ Message message = getPendingMessage(participant.getInstanceName());
+ Assert.assertNotNull(message);
+ Assert.assertEquals(message.getToState(), "ONLINE");
+
+ // Process the message with failure
+ processPendingMessage(participant, false);
+
+ // Verify that the cluster converges and states reflect the failure (e.g., "OFFLINE")
+ Assert.assertTrue(_clusterVerifier.verify());
+ verifyGatewayStateMatchesHelixState();
+
+ // Remove the participant from the preference list and delete it
+ removeFromPreferenceList(participant);
+ deleteParticipant(participant);
+ Assert.assertTrue(_clusterVerifier.verify());
+ }
+
+ @Test(dependsOnMethods = "testProcessStateTransitionMessageFailure")
+ public void testProcessStateTransitionAfterReconnect() throws Exception {
+ // Remove the first participant
+ HelixGatewayParticipant participant = _participants.get(0);
+ deleteParticipant(participant);
+
+ // Verify the Helix state transitions to "UNASSIGNED_STATE" for the participant
+ verifyHelixPartitionStates(participant.getInstanceName(),
+ HelixGatewayParticipant.UNASSIGNED_STATE);
+
+ // Re-add the participant with its initial state
+ addParticipant(participant.getInstanceName(), participant.getShardStateMap());
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ // Verify the Helix state is "ONLINE"
+ verifyHelixPartitionStates(participant.getInstanceName(), "ONLINE");
+ }
+
+ @Test(dependsOnMethods = "testProcessStateTransitionAfterReconnect")
+ public void testProcessStateTransitionAfterReconnectAfterDroppingPartition() throws Exception {
+ // Remove the first participant and verify state
+ HelixGatewayParticipant participant = _participants.get(0);
+ deleteParticipant(participant);
+ verifyHelixPartitionStates(participant.getInstanceName(),
+ HelixGatewayParticipant.UNASSIGNED_STATE);
+
+ // Remove shard preference and re-add the participant
+ removeFromPreferenceList(participant);
+ HelixGatewayParticipant participantReplacement =
+ addParticipant(participant.getInstanceName(), participant.getShardStateMap());
+ verifyPendingMessages(List.of(participantReplacement));
+
+ // Process the pending message successfully
+ processPendingMessage(participantReplacement, true);
+
+ // Verify that the cluster converges and states are correctly updated to "ONLINE"
+ Assert.assertTrue(_clusterVerifier.verify());
+ verifyGatewayStateMatchesHelixState();
+ }
+
+ public static class MockHelixGatewayServiceProcessor implements HelixGatewayServiceProcessor {
+ private final Map _pendingMessageMap;
+
+ public MockHelixGatewayServiceProcessor(Map pendingMessageMap) {
+ _pendingMessageMap = pendingMessageMap;
+ }
+
+ @Override
+ public void sendStateTransitionMessage(String instanceName, String currentState,
+ Message message) {
+ _pendingMessageMap.put(instanceName, message);
+ }
+ }
+}
diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
index 01b78593ca..a345f008e2 100644
--- a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
+++ b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
@@ -40,7 +40,6 @@ public void testConnectionAndDisconnectionEvents() {
// Process disconnection event
grpcService.report(null).onNext(disconnectionEvent);
- HelixGatewayService gatewayService = manager.getHelixGatewayService("cluster1");
// Verify the events were processed in sequence
verify(manager, times(2)).newGatewayServiceEvent(any());
}
diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java
new file mode 100644
index 0000000000..34e23d0b0c
--- /dev/null
+++ b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java
@@ -0,0 +1,68 @@
+package org.apache.helix.gateway.utils;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDefinedState;
+
+import org.apache.helix.gateway.participant.HelixGatewayParticipant;
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+
+public class TestStateTransitionMessageTranslateUtil {
+
+ @Test
+ public void testTranslateStatesToTransitionType_DeleteShard() {
+ String currentState = "ONLINE";
+ String toState = HelixDefinedState.DROPPED.name();
+
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result =
+ StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState, toState);
+
+ Assert.assertEquals(result,
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD,
+ "Expected DELETE_SHARD when transitioning to DROPPED state from a non-DROPPED state.");
+ }
+
+ @Test
+ public void testTranslateStatesToTransitionType_AddShard() {
+ String currentState = HelixGatewayParticipant.UNASSIGNED_STATE;
+ String toState = "ONLINE";
+
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result =
+ StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState, toState);
+
+ Assert.assertEquals(result,
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD,
+ "Expected ADD_SHARD when transitioning from DROPPED state to a non-DROPPED state.");
+ }
+
+ @Test
+ public void testTranslateStatesToTransitionType_ChangeRole() {
+ String currentState = "ONLINE";
+ String toState = "OFFLINE";
+
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result =
+ StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState, toState);
+
+ Assert.assertEquals(result,
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE,
+ "Expected CHANGE_ROLE when transitioning between non-DROPPED states.");
+ }
+}