Skip to content

Commit

Permalink
Implement GatewayServiceManager (#2844)
Browse files Browse the repository at this point in the history
Implement GatewayServiceManager
  • Loading branch information
xyuanlu authored Jul 25, 2024
1 parent fc4a2e1 commit 3795062
Show file tree
Hide file tree
Showing 14 changed files with 468 additions and 145 deletions.
5 changes: 5 additions & 0 deletions helix-gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<extensions>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.helix.gateway.constant;

public enum GatewayServiceEventType {
CONNECT, // init connection to gateway service
UPDATE, // update state transition result
DISCONNECT // shutdown connection to gateway service.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package org.apache.helix.gateway.grpcservice;

import io.grpc.stub.StreamObserver;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.gateway.service.GatewayServiceEvent;
import org.apache.helix.gateway.service.GatewayServiceManager;
import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
import org.apache.helix.gateway.util.PerKeyLockRegistry;
import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;


/**
* Helix Gateway Service GRPC UI implementation.
*/
public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
implements HelixGatewayServiceProcessor {

// Map to store the observer for each instance
private final Map<String, StreamObserver<TransitionMessage>> _observerMap = new HashMap<>();
// A reverse map to store the instance name for each observer. It is used to find the instance when connection is closed.
private final Map<StreamObserver<TransitionMessage>, Pair<String, String>> _reversedObserverMap = new HashMap<>();

private final GatewayServiceManager _manager;

// A fine grain lock register on instance level
private final PerKeyLockRegistry _lockRegistry;

public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
_manager = manager;
_lockRegistry = new PerKeyLockRegistry();
}

/**
* Grpc service end pint.
* Application instances Report the state of the shard or result of transition request to the gateway service.
* @param responseObserver
* @return
*/
@Override
public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage> report(
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage> responseObserver) {

return new StreamObserver<ShardStateMessage>() {

@Override
public void onNext(ShardStateMessage request) {
if (request.hasShardState()) {
ShardState shardState = request.getShardState();
updateObserver(shardState.getInstanceName(), shardState.getClusterName(), responseObserver);
}
_manager.newGatewayServiceEvent(StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request));
}

@Override
public void onError(Throwable t) {
onClientClose(responseObserver);
}

@Override
public void onCompleted() {
onClientClose(responseObserver);
}
};
}

/**
* Send state transition message to the instance.
* The instance must already have established a connection to the gateway service.
* @param instanceName
* @return
*/
@Override
public boolean sendStateTransitionMessage(String instanceName) {
StreamObserver<TransitionMessage> observer;
observer = _observerMap.get(instanceName);
if (observer != null) {
observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage());
}
return true;
}

private void updateObserver(String instanceName, String clusterName,
StreamObserver<TransitionMessage> streamObserver) {
_lockRegistry.withLock(instanceName, () -> {
_observerMap.put(instanceName, streamObserver);
_reversedObserverMap.put(streamObserver, new ImmutablePair<>(instanceName, clusterName));
});
}

private void onClientClose(
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage> responseObserver) {
String instanceName;
String clusterName;
Pair<String, String> instanceInfo = _reversedObserverMap.get(responseObserver);
clusterName = instanceInfo.getRight();
instanceName = instanceInfo.getLeft();

if (instanceName == null || clusterName == null) {
// TODO: log error;
return;
}
GatewayServiceEvent event =
StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, instanceName);
_manager.newGatewayServiceEvent(event);
_lockRegistry.withLock(instanceName, () -> {
_reversedObserverMap.remove(responseObserver);
_observerMap.remove(instanceName);
_lockRegistry.removeLock(instanceName);
});
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.apache.helix.gateway.service;

import java.util.List;
import java.util.Map;
import org.apache.helix.gateway.constant.GatewayServiceEventType;


/**
* Event representing message reported by clients to Helix Gateway Service.
*/
public class GatewayServiceEvent {
// event type
private GatewayServiceEventType _eventType;
// event data
private String _clusterName;
private String _instanceName;
// A map where client reports the state of each shard upon connection
private Map<String, Map<String, String>> _shardStateMap;
// result for state transition request
private List<StateTransitionResult> _stateTransitionResult;

public static class StateTransitionResult {
private String stateTransitionId;
private String stateTransitionStatus;
private String shardState;

public StateTransitionResult(String stateTransitionId, String stateTransitionStatus, String shardState) {
this.stateTransitionId = stateTransitionId;
this.stateTransitionStatus = stateTransitionStatus;
this.shardState = shardState;
}

public String getStateTransitionId() {
return stateTransitionId;
}
public String getStateTransitionStatus() {
return stateTransitionStatus;
}
public String getShardState() {
return shardState;
}
}

private GatewayServiceEvent(GatewayServiceEventType eventType, String clusterName, String instanceName,
Map<String, Map<String, String>> shardStateMap, List<StateTransitionResult> stateTransitionStatusMap) {
_eventType = eventType;
_clusterName = clusterName;
_instanceName = instanceName;
_shardStateMap = shardStateMap;
_stateTransitionResult = stateTransitionStatusMap;
}

public GatewayServiceEventType getEventType() {
return _eventType;
}
public String getClusterName() {
return _clusterName;
}
public String getInstanceName() {
return _instanceName;
}
public Map<String, Map<String, String>> getShardStateMap() {
return _shardStateMap;
}
public List<StateTransitionResult> getStateTransitionResult() {
return _stateTransitionResult;
}


public static class GateWayServiceEventBuilder {
private GatewayServiceEventType _eventType;
private String _clusterName;
private String _instanceName;
private Map<String, Map<String, String>> _shardStateMap;
private List<StateTransitionResult> _stateTransitionResult;

public GateWayServiceEventBuilder(GatewayServiceEventType eventType) {
this._eventType = eventType;
}

public GateWayServiceEventBuilder setClusterName(String clusterName) {
this._clusterName = clusterName;
return this;
}

public GateWayServiceEventBuilder setParticipantName(String instanceName) {
this._instanceName = instanceName;
return this;
}

public GateWayServiceEventBuilder setShardStateMap(Map<String, Map<String, String>> shardStateMap) {
this._shardStateMap = shardStateMap;
return this;
}

public GateWayServiceEventBuilder setStateTransitionStatusMap(
List<StateTransitionResult> stateTransitionStatusMap) {
this._stateTransitionResult = stateTransitionStatusMap;
return this;
}

public GatewayServiceEvent build() {
return new GatewayServiceEvent(_eventType, _clusterName, _instanceName, _shardStateMap, _stateTransitionResult);
}
}
}
Loading

0 comments on commit 3795062

Please sign in to comment.