Skip to content

Commit

Permalink
Incorporate comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
zpinto committed Jul 31, 2024
1 parent 6f358ca commit 28e9c64
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 69 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
import org.apache.helix.manager.zk.ZKHelixManager;
Expand All @@ -43,7 +42,8 @@
* for the participant and updates the state of the participant's shards upon successful state
* transitions signaled by remote participant.
*/
public class HelixGatewayParticipant implements HelixStateTransitionProcessor {
public class HelixGatewayParticipant {
public static final String UNASSIGNED_STATE = "UNASSIGNED";
private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
private final HelixManager _participantManager;
private final Map<String, Map<String, String>> _shardStateMap;
Expand All @@ -57,7 +57,6 @@ private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProce
_stateTransitionResultMap = new ConcurrentHashMap<>();
}

@Override
public void processStateTransitionMessage(Message message) throws Exception {
String transitionId = message.getMsgId();
String resourceId = message.getResourceName();
Expand All @@ -84,7 +83,6 @@ public void processStateTransitionMessage(Message message) throws Exception {
}
}

@Override
public void handleStateTransitionError(Message message, StateTransitionError error) {
// Remove the stateTransitionResultMap future for the message
String transitionId = message.getMsgId();
Expand Down Expand Up @@ -144,7 +142,7 @@ public Map<String, Map<String, String>> getShardStateMap() {
*/
public String getCurrentState(String resourceId, String shardId) {
return getShardStateMap().getOrDefault(resourceId, Collections.emptyMap())
.getOrDefault(shardId, HelixDefinedState.DROPPED.name());
.getOrDefault(shardId, UNASSIGNED_STATE);
}

private void updateState(String resourceId, String shardId, String state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*/

import org.apache.helix.NotificationContext;
import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
Expand All @@ -34,17 +34,17 @@ public class HelixGatewayMultiTopStateStateModel extends StateModel {
private static final Logger _logger =
LoggerFactory.getLogger(HelixGatewayMultiTopStateStateModel.class);

private final HelixStateTransitionProcessor _stateTransitionProcessor;
private final HelixGatewayParticipant _helixGatewayParticipant;

public HelixGatewayMultiTopStateStateModel(
HelixStateTransitionProcessor stateTransitionProcessor) {
_stateTransitionProcessor = stateTransitionProcessor;
HelixGatewayParticipant helixGatewayParticipant) {
_helixGatewayParticipant = helixGatewayParticipant;
}

@Transition(to = "*", from = "*")
public void genericStateTransitionHandler(Message message, NotificationContext context)
throws Exception {
_stateTransitionProcessor.processStateTransitionMessage(message);
_helixGatewayParticipant.processStateTransitionMessage(message);
}

@Override
Expand All @@ -55,6 +55,6 @@ public void reset() {
@Override
public void rollbackOnError(Message message, NotificationContext context,
StateTransitionError error) {
_stateTransitionProcessor.handleStateTransitionError(message, error);
_helixGatewayParticipant.handleStateTransitionError(message, error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@
* under the License.
*/

import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.participant.statemachine.StateModelFactory;

public class HelixGatewayMultiTopStateStateModelFactory extends StateModelFactory<HelixGatewayMultiTopStateStateModel> {
private final HelixStateTransitionProcessor _stateTransitionProcessor;
private final HelixGatewayParticipant _helixGatewayParticipant;

public HelixGatewayMultiTopStateStateModelFactory(
HelixStateTransitionProcessor stateTransitionProcessor) {
_stateTransitionProcessor = stateTransitionProcessor;
HelixGatewayParticipant helixGatewayParticipant) {
_helixGatewayParticipant = helixGatewayParticipant;
}

@Override
public HelixGatewayMultiTopStateStateModel createNewStateModel(String resourceName,
String partitionKey) {
return new HelixGatewayMultiTopStateStateModel(_stateTransitionProcessor);
return new HelixGatewayMultiTopStateStateModel(_helixGatewayParticipant);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.helix.HelixDefinedState;
import org.apache.helix.gateway.constant.GatewayServiceEventType;
import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.gateway.service.GatewayServiceEvent;
import org.apache.helix.model.Message;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
Expand All @@ -45,13 +46,13 @@ public final class StateTransitionMessageTranslateUtil {
*/
public static HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType translateStatesToTransitionType(
String currentState, String toState) {
boolean isCurrentDropped = HelixDefinedState.DROPPED.name().equals(currentState);
boolean isUnassigned = HelixGatewayParticipant.UNASSIGNED_STATE.equals(currentState);
boolean isToStateDropped = HelixDefinedState.DROPPED.name().equals(toState);

if (isToStateDropped && !isCurrentDropped) {
if (isToStateDropped && !isUnassigned) {
return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD;
}
if (!isToStateDropped && isCurrentDropped) {
if (!isToStateDropped && isUnassigned) {
return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD;
}
return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.stream.Collectors;

import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
Expand Down Expand Up @@ -180,7 +179,7 @@ private String getHelixCurrentState(String instanceName, String resourceName,
String shardId) {
return _gSetupTool.getClusterManagementTool()
.getResourceExternalView(CLUSTER_NAME, resourceName).getStateMap(shardId)
.getOrDefault(instanceName, HelixDefinedState.DROPPED.name());
.getOrDefault(instanceName, HelixGatewayParticipant.UNASSIGNED_STATE);
}

/**
Expand Down Expand Up @@ -284,8 +283,9 @@ public void testProcessStateTransitionAfterReconnect() throws Exception {
HelixGatewayParticipant participant = _participants.get(0);
deleteParticipant(participant);

// Verify the Helix state transitions to "DROPPED" for the participant
verifyHelixPartitionStates(participant.getInstanceName(), "DROPPED");
// Verify the Helix state transitions to "UNASSIGNED_STATE" for the participant
verifyHelixPartitionStates(participant.getInstanceName(),
HelixGatewayParticipant.UNASSIGNED_STATE);

// Re-add the participant with its initial state
addParticipant(participant.getInstanceName(), participant.getShardStateMap());
Expand All @@ -300,7 +300,8 @@ public void testProcessStateTransitionAfterReconnectAfterDroppingPartition() thr
// Remove the first participant and verify state
HelixGatewayParticipant participant = _participants.get(0);
deleteParticipant(participant);
verifyHelixPartitionStates(participant.getInstanceName(), "DROPPED");
verifyHelixPartitionStates(participant.getInstanceName(),
HelixGatewayParticipant.UNASSIGNED_STATE);

// Remove shard preference and re-add the participant
removeFromPreferenceList(participant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.helix.HelixDefinedState;

import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand All @@ -41,7 +42,7 @@ public void testTranslateStatesToTransitionType_DeleteShard() {

@Test
public void testTranslateStatesToTransitionType_AddShard() {
String currentState = HelixDefinedState.DROPPED.name();
String currentState = HelixGatewayParticipant.UNASSIGNED_STATE;
String toState = "ONLINE";

HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result =
Expand Down

0 comments on commit 28e9c64

Please sign in to comment.