diff --git a/helix-gateway/pom.xml b/helix-gateway/pom.xml
index 8a7c50ed71..62e927d11f 100644
--- a/helix-gateway/pom.xml
+++ b/helix-gateway/pom.xml
@@ -86,6 +86,12 @@
org.apache.helix
helix-core
+
+ org.apache.helix
+ helix-core
+ test-jar
+ test
+
io.grpc
grpc-services
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/participant/HelixGatewayMultiTopStateStateTransitionProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/participant/HelixGatewayMultiTopStateStateTransitionProcessor.java
deleted file mode 100644
index 2064123315..0000000000
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/participant/HelixGatewayMultiTopStateStateTransitionProcessor.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.helix.gateway.api.participant;
-
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateTransitionError;
-
-/**
- * Process multi-top state model state transition message.
- */
-public interface HelixGatewayMultiTopStateStateTransitionProcessor {
- /**
- * Process multi-top state model state transition message.
- * @param message state transition message
- * @throws Exception if failed to process the message
- */
- void processMultiTopStateModelStateTransitionMessage(Message message) throws Exception;
-
- /**
- * Handle state transition error. This results from state transition handler throwing an exception or
- * timing out.
- *
- * @param message state transition message
- * @param error state transition error
- */
- void handleStateTransitionError(Message message, StateTransitionError error);
-}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/participant/HelixStateTransitionProcessor.java
similarity index 51%
rename from helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
rename to helix-gateway/src/main/java/org/apache/helix/gateway/api/participant/HelixStateTransitionProcessor.java
index 814cfb0d0f..bcd2a29e1b 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/participant/HelixStateTransitionProcessor.java
@@ -1,4 +1,4 @@
-package org.apache.helix.gateway.service;
+package org.apache.helix.gateway.api.participant;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,11 +19,26 @@
* under the License.
*/
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
/**
- * Translate from/to GRPC function call to Helix Gateway Service event.
+ * Process state transition message.
*/
-public interface HelixGatewayServiceProcessor {
-
- public boolean sendStateTransitionMessage( String instanceName);
+public interface HelixStateTransitionProcessor {
+ /**
+ * Process state transition message.
+ * @param message state transition message
+ * @throws Exception if failed to process the message
+ */
+ void processStateTransitionMessage(Message message) throws Exception;
+ /**
+ * Handle state transition error. This results from state transition handler throwing an exception or
+ * timing out.
+ *
+ * @param message state transition message
+ * @param error state transition error
+ */
+ void handleStateTransitionError(Message message, StateTransitionError error);
}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
index 325e413237..f8183f1904 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
@@ -1,15 +1,40 @@
package org.apache.helix.gateway.api.service;
-import org.apache.helix.gateway.constant.MessageType;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import org.apache.helix.model.Message;
/**
* Helix Gateway Service Processor interface allows sending state transition messages to
- * participants through services implementing this interface.
+ * participants through service implementing this interface.
*/
public interface HelixGatewayServiceProcessor {
- public void sendStateTransitionMessage(String instanceName, MessageType messageType,
+ /**
+ * Send a state transition message to a remote participant.
+ *
+ * @param instanceName the name of the participant
+ * @param currentState the current state of the participant
+ * @param message the message to send
+ */
+ void sendStateTransitionMessage(String instanceName, String currentState,
Message message);
}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java
deleted file mode 100644
index 528b28e2fb..0000000000
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.helix.gateway.constant;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public enum MessageStatus {
- SUCCESS, FAILURE
-}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java
deleted file mode 100644
index 0567978cfc..0000000000
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.helix.gateway.constant;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public enum MessageType {
- ADD, DELETE, CHANGE_ROLE
-}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
index 6d096ffd5e..784c197c5b 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
@@ -24,7 +24,6 @@
import java.util.Map;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.helix.gateway.constant.MessageType;
import org.apache.helix.gateway.service.GatewayServiceEvent;
import org.apache.helix.gateway.service.GatewayServiceManager;
import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
@@ -61,8 +60,9 @@ public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
/**
* Grpc service end pint.
* Application instances Report the state of the shard or result of transition request to the gateway service.
- * @param responseObserver
- * @return
+ *
+ * @param responseObserver the observer to send the response to the client
+ * @return the observer to receive the state of the shard or result of transition request
*/
@Override
public StreamObserver report(
@@ -94,18 +94,19 @@ public void onCompleted() {
/**
* Send state transition message to the instance.
* The instance must already have established a connection to the gateway service.
+ *
* @param instanceName the instance name to send the message to
- * @param messageType the type of the message
+ * @param currentState the current state of shard
* @param message the message to convert to the transition message
*/
@Override
- public void sendStateTransitionMessage(String instanceName, MessageType messageType,
+ public void sendStateTransitionMessage(String instanceName, String currentState,
Message message) {
StreamObserver observer;
observer = _observerMap.get(instanceName);
if (observer != null) {
observer.onNext(
- StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(messageType,
+ StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(currentState,
message));
}
}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
index d7b9be805a..feaca1000a 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
@@ -1,23 +1,49 @@
package org.apache.helix.gateway.participant;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
-import org.apache.helix.gateway.api.participant.HelixGatewayMultiTopStateStateTransitionProcessor;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
-import org.apache.helix.gateway.constant.MessageType;
import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateTransitionError;
-public class HelixGatewayParticipant implements HelixGatewayMultiTopStateStateTransitionProcessor {
+/**
+ * HelixGatewayParticipant encapsulates the Helix Participant Manager and handles tracking the state
+ * of a remote participant connected to the Helix Gateway Service. It processes state transitions
+ * for the participant and updates the state of the participant's shards upon successful state
+ * transitions signaled by remote participant.
+ */
+public class HelixGatewayParticipant implements HelixStateTransitionProcessor {
private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
private final HelixManager _participantManager;
private final Map> _shardStateMap;
@@ -32,7 +58,7 @@ private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProce
}
@Override
- public void processMultiTopStateModelStateTransitionMessage(Message message) throws Exception {
+ public void processStateTransitionMessage(Message message) throws Exception {
String transitionId = message.getMsgId();
String resourceId = message.getResourceName();
String shardId = message.getPartitionName();
@@ -45,9 +71,8 @@ public void processMultiTopStateModelStateTransitionMessage(Message message) thr
CompletableFuture future = new CompletableFuture<>();
_stateTransitionResultMap.put(transitionId, future);
- _gatewayServiceProcessor.sendStateTransitionMessage(
- _participantManager.getInstanceName(),
- determineTransitionType(resourceId, shardId, toState), message);
+ _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+ getCurrentState(resourceId, shardId), message);
boolean success = future.get();
if (!success) {
@@ -79,6 +104,15 @@ public void handleStateTransitionError(Message message, StateTransitionError err
// TODO: We need a better way than sending the state transition with a toState of ERROR
}
+ /**
+ * Get the instance name of the participant.
+ *
+ * @return participant instance name
+ */
+ public String getInstanceName() {
+ return _participantManager.getInstanceName();
+ }
+
/**
* Completes the state transition with the given transitionId.
*
@@ -94,25 +128,29 @@ public void completeStateTransition(String transitionId, boolean isSuccess) {
private boolean isCurrentStateAlreadyTarget(String resourceId, String shardId,
String targetState) {
- return (_shardStateMap.containsKey(resourceId) && _shardStateMap.get(resourceId)
- .containsKey(shardId) && _shardStateMap.get(resourceId).get(shardId).equals(targetState))
- || targetState.equals(HelixDefinedState.DROPPED.name());
+ return getCurrentState(resourceId, shardId).equals(targetState);
}
- private MessageType determineTransitionType(String resourceId, String shardId,
- String toState) {
- boolean containsShard = _shardStateMap.containsKey(resourceId) && _shardStateMap.get(resourceId)
- .containsKey(shardId);
- if (toState.equals(HelixDefinedState.DROPPED.name())) {
- return containsShard ? MessageType.DELETE : MessageType.ADD;
- } else {
- return containsShard ? MessageType.CHANGE_ROLE : MessageType.ADD;
- }
+ @VisibleForTesting
+ public Map> getShardStateMap() {
+ return _shardStateMap;
+ }
+
+ /**
+ * Get the current state of the shard.
+ *
+ * @param resourceId the resource id
+ * @param shardId the shard id
+ * @return the current state of the shard or DROPPED if it does not exist
+ */
+ public String getCurrentState(String resourceId, String shardId) {
+ return getShardStateMap().getOrDefault(resourceId, Collections.emptyMap())
+ .getOrDefault(shardId, HelixDefinedState.DROPPED.name());
}
private void updateState(String resourceId, String shardId, String state) {
if (state.equals(HelixDefinedState.DROPPED.name())) {
- _shardStateMap.computeIfPresent(resourceId, (k, v) -> {
+ getShardStateMap().computeIfPresent(resourceId, (k, v) -> {
v.remove(shardId);
if (v.isEmpty()) {
return null;
@@ -120,7 +158,7 @@ private void updateState(String resourceId, String shardId, String state) {
return v;
});
} else {
- _shardStateMap.computeIfAbsent(resourceId, k -> new ConcurrentHashMap<>())
+ getShardStateMap().computeIfAbsent(resourceId, k -> new ConcurrentHashMap<>())
.put(shardId, state);
}
}
@@ -156,7 +194,7 @@ public Builder(HelixGatewayServiceProcessor helixGatewayServiceProcessor, String
* @return the builder
*/
public Builder addMultiTopStateStateModelDefinition(String stateModelDefinitionName) {
- // TODO: Add validation that thi state model definition is a multi-top state model
+ // TODO: Add validation that the state model definition is a multi-top state model
_multiTopStateModelDefinitions.add(stateModelDefinitionName);
return this;
}
@@ -169,7 +207,9 @@ public Builder addMultiTopStateStateModelDefinition(String stateModelDefinitionN
* @param initialShardStateMap the initial shard state map to add
* @return the Builder
*/
- public Builder addInitialShardState(Map> initialShardStateMap) {
+ public Builder setInitialShardState(Map> initialShardStateMap) {
+ // TODO: Add handling for shard states that where never assigned to the participant since
+ // the participant was last online.
// deep copy into the initialShardStateMap into concurrent hash map
initialShardStateMap.forEach((resourceId, shardStateMap) -> {
_initialShardStateMap.put(resourceId, new ConcurrentHashMap<>(shardStateMap));
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index f475232097..85a274156b 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -132,7 +132,7 @@ private void createHelixGatewayParticipant(String clusterName, String instanceNa
// Create and add the participant to the participant map
HelixGatewayParticipant.Builder participantBuilder =
new HelixGatewayParticipant.Builder(_gatewayServiceProcessor, instanceName, clusterName,
- _zkAddress).addInitialShardState(initialShardStateMap);
+ _zkAddress).setInitialShardState(initialShardStateMap);
SUPPORTED_MULTI_STATE_MODEL_TYPES.forEach(
participantBuilder::addMultiTopStateStateModelDefinition);
_helixGatewayParticipantMap.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>())
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
deleted file mode 100644
index 2ef35820cb..0000000000
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package org.apache.helix.gateway.service;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModelFactory;
-import org.apache.helix.manager.zk.ZKHelixManager;
-
-
-/**
- * A service object for each Helix cluster.
- * This service object manages the Helix participants in the cluster.
- */
-public class HelixGatewayService {
- final private Map> _participantsMap;
-
- final private String _zkAddress;
- private final GatewayServiceManager _gatewayServiceManager;
- private Map> _flagMap;
- public HelixGatewayService(GatewayServiceManager gatewayServiceManager, String zkAddress) {
- _participantsMap = new ConcurrentHashMap<>();
- _zkAddress = zkAddress;
- _gatewayServiceManager = gatewayServiceManager;
- _flagMap = new ConcurrentHashMap<>();
- }
-
- public GatewayServiceManager getClusterManager() {
- return _gatewayServiceManager;
- }
-
- /**
- * Register a participant to the Helix cluster.
- * It creates a HelixParticipantManager and connects to the Helix controller.
- */
- public void registerParticipant() {
- // TODO: create participant manager and add to _participantsMap
- HelixManager manager = new ZKHelixManager("clusterName", "instanceName", InstanceType.PARTICIPANT, _zkAddress);
- manager.getStateMachineEngine()
- .registerStateModelFactory("OnlineOffline", new HelixGatewayOnlineOfflineStateModelFactory(_gatewayServiceManager));
- try {
- manager.connect();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Deregister a participant from the Helix cluster when app instance is gracefully stopped or connection lost.
- * @param clusterName
- * @param participantName
- */
- public void deregisterParticipant(String clusterName, String participantName) {
- HelixManager manager = _participantsMap.get(clusterName).remove(participantName);
- if (manager != null) {
- manager.disconnect();
- removeChannel(participantName);
- }
- }
-
- public void addChannel() {
- // _flagMap.computeIfAbsent(mockApplication.getInstanceName(), k -> new ConcurrentHashMap<>());
- }
-
- public void removeChannel(String instanceName) {
- _flagMap.remove(instanceName);
- }
-
- public AtomicBoolean sendMessage() {
- AtomicBoolean flag = new AtomicBoolean(false);
- return flag;
- }
-
- /**
- * Entry point for receive the state transition response from the participant.
- * It will update in memory state accordingly.
- */
- public void receiveSTResponse() {
- // AtomicBoolean flag = _flagMap.get(instanceName).remove(response.getMessageId());
- }
-
- /**
- * Stop the HelixGatewayService.
- * It stops all participants in the cluster.
- */
- public void stop() {
- // TODO: stop all participants
- System.out.println("Stopping Helix Gateway Service");
- }
-}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java
index 5528962b68..855b9c7048 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java
@@ -1,7 +1,26 @@
package org.apache.helix.gateway.statemodel;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import org.apache.helix.NotificationContext;
-import org.apache.helix.gateway.api.participant.HelixGatewayMultiTopStateStateTransitionProcessor;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
@@ -15,17 +34,17 @@ public class HelixGatewayMultiTopStateStateModel extends StateModel {
private static final Logger _logger =
LoggerFactory.getLogger(HelixGatewayMultiTopStateStateModel.class);
- private final HelixGatewayMultiTopStateStateTransitionProcessor _stateTransitionProcessor;
+ private final HelixStateTransitionProcessor _stateTransitionProcessor;
public HelixGatewayMultiTopStateStateModel(
- HelixGatewayMultiTopStateStateTransitionProcessor stateTransitionProcessor) {
+ HelixStateTransitionProcessor stateTransitionProcessor) {
_stateTransitionProcessor = stateTransitionProcessor;
}
@Transition(to = "*", from = "*")
public void genericStateTransitionHandler(Message message, NotificationContext context)
throws Exception {
- _stateTransitionProcessor.processMultiTopStateModelStateTransitionMessage(message);
+ _stateTransitionProcessor.processStateTransitionMessage(message);
}
@Override
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java
index 2bec4c5a19..4a4b2f922c 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java
@@ -1,13 +1,32 @@
package org.apache.helix.gateway.statemodel;
-import org.apache.helix.gateway.api.participant.HelixGatewayMultiTopStateStateTransitionProcessor;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
import org.apache.helix.participant.statemachine.StateModelFactory;
public class HelixGatewayMultiTopStateStateModelFactory extends StateModelFactory {
- private final HelixGatewayMultiTopStateStateTransitionProcessor _stateTransitionProcessor;
+ private final HelixStateTransitionProcessor _stateTransitionProcessor;
public HelixGatewayMultiTopStateStateModelFactory(
- HelixGatewayMultiTopStateStateTransitionProcessor stateTransitionProcessor) {
+ HelixStateTransitionProcessor stateTransitionProcessor) {
_stateTransitionProcessor = stateTransitionProcessor;
}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
deleted file mode 100644
index 4585ea3780..0000000000
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.gateway.statemodel;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-
-public class HelixGatewayOnlineOfflineStateModel extends StateModel {
- private boolean _firstTime = true;
- private GatewayServiceManager _gatewayServiceManager;
-
- private String _resourceName;
- private String _partitionKey;
-
- private AtomicBoolean _completed;
-
- public HelixGatewayOnlineOfflineStateModel(String resourceName, String partitionKey,
- GatewayServiceManager gatewayServiceManager) {
- _resourceName = resourceName;
- _partitionKey = partitionKey;
- _gatewayServiceManager = gatewayServiceManager;
- }
-
- public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
- if (_firstTime) {
- wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
- System.out.println(
- "Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with ADD for "
- + message.getResourceName() + " processed");
- _firstTime = false;
- }
- wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
- System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName()
- + " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName() + " processed");
- }
-
- public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
- wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
- System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName()
- + " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName() + " processed");
- }
-
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
- wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
- System.out.println(
- "Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with REMOVE for "
- + message.getResourceName() + " processed");
- }
-
- private void wait(AtomicBoolean completed) {
- _completed = completed;
- while (true) {
- try {
- if (_completed.get()) {
- break;
- }
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
deleted file mode 100644
index 7550fef510..0000000000
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.helix.gateway.statemodel;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-
-public class HelixGatewayOnlineOfflineStateModelFactory extends StateModelFactory {
- private GatewayServiceManager _clusterManager;
-
- public HelixGatewayOnlineOfflineStateModelFactory(GatewayServiceManager clusterManager) {
- _clusterManager = clusterManager;
- }
-
- @Override
- public HelixGatewayOnlineOfflineStateModel createNewStateModel(String resourceName,
- String partitionKey) {
- return new HelixGatewayOnlineOfflineStateModel(resourceName, partitionKey, _clusterManager);
- }
-}
diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
index 65128bacb3..b46f9953df 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
@@ -23,8 +23,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
+import org.apache.helix.HelixDefinedState;
import org.apache.helix.gateway.constant.GatewayServiceEventType;
-import org.apache.helix.gateway.constant.MessageType;
import org.apache.helix.gateway.service.GatewayServiceEvent;
import org.apache.helix.model.Message;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
@@ -35,40 +36,40 @@
public final class StateTransitionMessageTranslateUtil {
-
/**
- * Translate from user sent MessageType to Helix Gateway Service TransitionType.
+ * Determine the transition type based on the current state and the target state.
*
- * @param messageType MessageType
+ * @param currentState current state
+ * @param toState target state
* @return TransitionType
*/
- public static HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType translateMessageTypeToTransitionType(
- MessageType messageType) {
- switch (messageType) {
- case ADD:
- return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD;
- case DELETE:
- return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD;
- case CHANGE_ROLE:
- return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE;
- default:
- throw new IllegalArgumentException("Unknown message type: " + messageType);
+ public static HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType translateStatesToTransitionType(
+ String currentState, String toState) {
+ boolean isCurrentDropped = HelixDefinedState.DROPPED.name().equals(currentState);
+ boolean isToStateDropped = HelixDefinedState.DROPPED.name().equals(toState);
+
+ if (isToStateDropped && !isCurrentDropped) {
+ return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD;
+ }
+ if (!isToStateDropped && isCurrentDropped) {
+ return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD;
}
+ return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE;
}
/**
* Translate from user sent Message to Helix Gateway Service event.
*
- * @param messageType MessageType
+ * @param currentState current state of the shard
* @param message Message
* @return TransitionMessage
*/
- public static TransitionMessage translateSTMsgToTransitionMessage(MessageType messageType,
+ public static TransitionMessage translateSTMsgToTransitionMessage(String currentState,
Message message) {
return TransitionMessage.newBuilder().addRequest(
HelixGatewayServiceOuterClass.SingleTransitionMessage.newBuilder()
.setTransitionID(message.getMsgId())
- .setTransitionType(translateMessageTypeToTransitionType(messageType))
+ .setTransitionType(translateStatesToTransitionType(currentState, message.getToState()))
.setResourceID(message.getResourceName()).setShardID(message.getPartitionName())
.setTargetState(message.getToState()).build()).build();
}
diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
new file mode 100644
index 0000000000..df765c85cc
--- /dev/null
+++ b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
@@ -0,0 +1,332 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+public class TestHelixGatewayParticipant extends ZkTestBase {
+ private static final String CLUSTER_NAME = TestHelixGatewayParticipant.class.getSimpleName();
+ private static final int START_NUM_NODE = 2;
+ private static final String TEST_DB = "TestDB";
+ private static final String TEST_STATE_MODEL = "OnlineOffline";
+ private static final String CONTROLLER_PREFIX = "controller";
+ private static final String PARTICIPANT_PREFIX = "participant";
+
+ private ZkHelixClusterVerifier _clusterVerifier;
+ private ClusterControllerManager _controller;
+ private int _nextStartPort = 12000;
+ private final List _participants = Lists.newArrayList();
+ private final Map _pendingMessageMap = new ConcurrentHashMap<>();
+
+ @BeforeClass
+ public void beforeClass() {
+ // Set up the Helix cluster
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.getRecord().setSimpleField(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "true");
+ configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+ // Start initial participants
+ for (int i = 0; i < START_NUM_NODE; i++) {
+ addParticipant();
+ }
+
+ // Start the controller
+ String controllerName = CONTROLLER_PREFIX + '_' + CLUSTER_NAME;
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ // Enable best possible assignment persistence
+ enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+ }
+
+ @AfterClass
+ public void afterClass() {
+ // Clean up by disconnecting the controller and participants
+ _controller.disconnect();
+ for (HelixGatewayParticipant participant : _participants) {
+ participant.disconnect();
+ }
+ }
+
+ /**
+ * Add a participant with a specific initial state map.
+ */
+ private HelixGatewayParticipant addParticipant(String participantName,
+ Map> initialShardMap) {
+ HelixGatewayParticipant participant = new HelixGatewayParticipant.Builder(
+ new MockHelixGatewayServiceProcessor(_pendingMessageMap), participantName, CLUSTER_NAME,
+ ZK_ADDR).addMultiTopStateStateModelDefinition(TEST_STATE_MODEL)
+ .setInitialShardState(initialShardMap).build();
+ _participants.add(participant);
+ return participant;
+ }
+
+ /**
+ * Add a participant with an empty initial state map.
+ */
+ private HelixGatewayParticipant addParticipant() {
+ String participantName = PARTICIPANT_PREFIX + "_" + _nextStartPort++;
+ return addParticipant(participantName, Collections.emptyMap());
+ }
+
+ /**
+ * Remove a participant from the cluster.
+ */
+ private void deleteParticipant(HelixGatewayParticipant participant) {
+ participant.disconnect();
+ _participants.remove(participant);
+ }
+
+ /**
+ * Add a participant to the IdealState's preference list.
+ */
+ private void addToPreferenceList(HelixGatewayParticipant participant) {
+ IdealState idealState =
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+ idealState.getPreferenceLists().values()
+ .forEach(preferenceList -> preferenceList.add(participant.getInstanceName()));
+ idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas()) + 1));
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, TEST_DB, idealState);
+ }
+
+ /**
+ * Remove a participant from the IdealState's preference list.
+ */
+ private void removeFromPreferenceList(HelixGatewayParticipant participant) {
+ IdealState idealState =
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+ idealState.getPreferenceLists().values()
+ .forEach(preferenceList -> preferenceList.remove(participant.getInstanceName()));
+ idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas()) - 1));
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, TEST_DB, idealState);
+ }
+
+ /**
+ * Create a test database in the cluster with a semi-automatic state model.
+ */
+ private void createDB() {
+ createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB,
+ _participants.stream().map(HelixGatewayParticipant::getInstanceName)
+ .collect(Collectors.toList()), TEST_STATE_MODEL, 1, _participants.size());
+
+ _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setResources(new HashSet<>(
+ _gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)))
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+ }
+
+ /**
+ * Retrieve a pending message for a specific participant.
+ */
+ private Message getPendingMessage(String instanceName) {
+ return _pendingMessageMap.get(instanceName);
+ }
+
+ /**
+ * Process the pending message for a participant.
+ */
+ private void processPendingMessage(HelixGatewayParticipant participant, boolean isSuccess) {
+ Message message = _pendingMessageMap.remove(participant.getInstanceName());
+ participant.completeStateTransition(message.getMsgId(), isSuccess);
+ }
+
+ /**
+ * Get the current state of a Helix partition.
+ */
+ private String getHelixCurrentState(String instanceName, String resourceName,
+ String partitionName) {
+ return _gSetupTool.getClusterManagementTool()
+ .getResourceExternalView(CLUSTER_NAME, resourceName).getStateMap(partitionName)
+ .getOrDefault(instanceName, HelixDefinedState.DROPPED.name());
+ }
+
+ /**
+ * Verify that all specified participants have pending messages.
+ */
+ private void verifyPendingMessages(List participants) throws Exception {
+ Assert.assertTrue(TestHelper.verify(() -> participants.stream()
+ .allMatch(participant -> getPendingMessage(participant.getInstanceName()) != null),
+ TestHelper.WAIT_DURATION));
+ }
+
+ /**
+ * Verify that the gateway state matches the Helix state for all participants.
+ */
+ private void verifyGatewayStateMatchesHelixState() throws Exception {
+ Assert.assertTrue(TestHelper.verify(() -> _participants.stream().allMatch(participant -> {
+ String instanceName = participant.getInstanceName();
+ for (String resourceName : _gSetupTool.getClusterManagementTool()
+ .getResourcesInCluster(CLUSTER_NAME)) {
+ for (String partitionName : _gSetupTool.getClusterManagementTool()
+ .getResourceIdealState(CLUSTER_NAME, resourceName).getPartitionSet()) {
+ String helixCurrentState =
+ getHelixCurrentState(instanceName, resourceName, partitionName);
+ if (!participant.getCurrentState(resourceName, partitionName).equals(helixCurrentState)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }), TestHelper.WAIT_DURATION));
+ }
+
+ /**
+ * Verify that all partitions for a given instance are in a specific state.
+ */
+ private void verifyHelixPartitionStates(String instanceName, String state) throws Exception {
+ Assert.assertTrue(TestHelper.verify(() -> {
+ for (String resourceName : _gSetupTool.getClusterManagementTool()
+ .getResourcesInCluster(CLUSTER_NAME)) {
+ for (String partitionName : _gSetupTool.getClusterManagementTool()
+ .getResourceIdealState(CLUSTER_NAME, resourceName).getPartitionSet()) {
+ if (!getHelixCurrentState(instanceName, resourceName, partitionName).equals(state)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }, TestHelper.WAIT_DURATION));
+ }
+
+ @Test
+ public void testProcessStateTransitionMessageSuccess() throws Exception {
+ createDB();
+ verifyPendingMessages(_participants);
+
+ // Verify that all pending messages have the toState "ONLINE"
+ for (HelixGatewayParticipant participant : _participants) {
+ Message message = getPendingMessage(participant.getInstanceName());
+ Assert.assertNotNull(message);
+ Assert.assertEquals(message.getToState(), "ONLINE");
+ }
+
+ // Process all pending messages successfully
+ for (HelixGatewayParticipant participant : _participants) {
+ processPendingMessage(participant, true);
+ }
+
+ // Verify that the cluster converges and all states are "ONLINE"
+ Assert.assertTrue(_clusterVerifier.verify());
+ verifyGatewayStateMatchesHelixState();
+ }
+
+ @Test(dependsOnMethods = "testProcessStateTransitionMessageSuccess")
+ public void testProcessStateTransitionMessageFailure() throws Exception {
+ // Add a new participant and include it in the preference list
+ HelixGatewayParticipant participant = addParticipant();
+ addToPreferenceList(participant);
+ verifyPendingMessages(List.of(participant));
+
+ // Verify the pending message has the toState "ONLINE"
+ Message message = getPendingMessage(participant.getInstanceName());
+ Assert.assertNotNull(message);
+ Assert.assertEquals(message.getToState(), "ONLINE");
+
+ // Process the message with failure
+ processPendingMessage(participant, false);
+
+ // Verify that the cluster converges and states reflect the failure (e.g., "OFFLINE")
+ Assert.assertTrue(_clusterVerifier.verify());
+ verifyGatewayStateMatchesHelixState();
+
+ // Remove the participant from the preference list and delete it
+ removeFromPreferenceList(participant);
+ deleteParticipant(participant);
+ Assert.assertTrue(_clusterVerifier.verify());
+ }
+
+ @Test(dependsOnMethods = "testProcessStateTransitionMessageFailure")
+ public void testProcessStateTransitionAfterReconnect() throws Exception {
+ // Remove the first participant
+ HelixGatewayParticipant participant = _participants.get(0);
+ deleteParticipant(participant);
+
+ // Verify the Helix state transitions to "DROPPED" for the participant
+ verifyHelixPartitionStates(participant.getInstanceName(), "DROPPED");
+
+ // Re-add the participant with its initial state
+ addParticipant(participant.getInstanceName(), participant.getShardStateMap());
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ // Verify the Helix state is "ONLINE"
+ verifyHelixPartitionStates(participant.getInstanceName(), "ONLINE");
+ }
+
+ @Test(dependsOnMethods = "testProcessStateTransitionAfterReconnect")
+ public void testProcessStateTransitionAfterReconnectAfterDroppingPartition() throws Exception {
+ // Remove the first participant and verify state
+ HelixGatewayParticipant participant = _participants.get(0);
+ deleteParticipant(participant);
+ verifyHelixPartitionStates(participant.getInstanceName(), "DROPPED");
+
+ // Remove partition preference and re-add the participant
+ removeFromPreferenceList(participant);
+ HelixGatewayParticipant participantReplacement =
+ addParticipant(participant.getInstanceName(), participant.getShardStateMap());
+ verifyPendingMessages(List.of(participantReplacement));
+
+ // Process the pending message successfully
+ processPendingMessage(participantReplacement, true);
+
+ // Verify that the cluster converges and states are correctly updated to "ONLINE"
+ Assert.assertTrue(_clusterVerifier.verify());
+ verifyGatewayStateMatchesHelixState();
+ }
+
+ public static class MockHelixGatewayServiceProcessor implements HelixGatewayServiceProcessor {
+ private final Map _pendingMessageMap;
+
+ public MockHelixGatewayServiceProcessor(Map pendingMessageMap) {
+ _pendingMessageMap = pendingMessageMap;
+ }
+
+ @Override
+ public void sendStateTransitionMessage(String instanceName, String currentState,
+ Message message) {
+ _pendingMessageMap.put(instanceName, message);
+ }
+ }
+}
diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/statemodel/TestHelixGatewayMultiTopStateStateModel.java b/helix-gateway/src/test/java/org/apache/helix/gateway/statemodel/TestHelixGatewayMultiTopStateStateModel.java
new file mode 100644
index 0000000000..5c52efece3
--- /dev/null
+++ b/helix-gateway/src/test/java/org/apache/helix/gateway/statemodel/TestHelixGatewayMultiTopStateStateModel.java
@@ -0,0 +1,4 @@
+package org.apache.helix.gateway.statemodel;
+
+public class TestHelixGatewayMultiTopStateStateModel {
+}
diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestHelixGatewayParticipant.java b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestHelixGatewayParticipant.java
new file mode 100644
index 0000000000..97b34c568e
--- /dev/null
+++ b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestHelixGatewayParticipant.java
@@ -0,0 +1,67 @@
+package org.apache.helix.gateway.utils;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDefinedState;
+
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+
+public class TestHelixGatewayParticipant {
+
+ @Test
+ public void testTranslateStatesToTransitionType_DeleteShard() {
+ String currentState = "ONLINE";
+ String toState = HelixDefinedState.DROPPED.name();
+
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result =
+ StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState, toState);
+
+ Assert.assertEquals(result,
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD,
+ "Expected DELETE_SHARD when transitioning to DROPPED state from a non-DROPPED state.");
+ }
+
+ @Test
+ public void testTranslateStatesToTransitionType_AddShard() {
+ String currentState = HelixDefinedState.DROPPED.name();
+ String toState = "ONLINE";
+
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result =
+ StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState, toState);
+
+ Assert.assertEquals(result,
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD,
+ "Expected ADD_SHARD when transitioning from DROPPED state to a non-DROPPED state.");
+ }
+
+ @Test
+ public void testTranslateStatesToTransitionType_ChangeRole() {
+ String currentState = "ONLINE";
+ String toState = "OFFLINE";
+
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result =
+ StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState, toState);
+
+ Assert.assertEquals(result,
+ HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE,
+ "Expected CHANGE_ROLE when transitioning between non-DROPPED states.");
+ }
+}