Skip to content

Commit

Permalink
feat(cli): add ConsumerClientConnection command (#807)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Dec 7, 2023
1 parent 3afab4f commit 808bb55
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
Expand Down Expand Up @@ -123,7 +124,8 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
MessageServiceImpl messageServiceImpl = new MessageServiceImpl(brokerConfig.proxy(), messageStore, proxyMetadataService, lockService, dlqService, producerManager);
this.messageService = messageServiceImpl;
this.extendMessageService = messageServiceImpl;
serviceManager = new DefaultServiceManager(brokerConfig, proxyMetadataService, dlqService, messageService, messageStore, producerManager);
ConsumerManager consumerManager = new ConsumerManager(new DefaultServiceManager.ConsumerIdsChangeListenerImpl(), brokerConfig.proxy().channelExpiredTimeout());
serviceManager = new DefaultServiceManager(brokerConfig, proxyMetadataService, dlqService, messageService, messageStore, producerManager, consumerManager);

messagingProcessor = ExtendMessagingProcessor.createForS3RocketMQ(serviceManager, brokerConfig.proxy());

Expand Down Expand Up @@ -194,7 +196,7 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {

// TODO: Split controller to a separate port
ControllerServiceImpl controllerService = MetadataStoreBuilder.build(metadataStore);
ProxyServiceImpl proxyService = new ProxyServiceImpl(extendMessageService, producerManager);
ProxyServiceImpl proxyService = new ProxyServiceImpl(extendMessageService, producerManager, consumerManager);
grpcServer = new GrpcProtocolServer(brokerConfig.proxy(), messagingProcessor, controllerService, proxyService);
remotingServer = new RemotingProtocolServer(messagingProcessor);
}
Expand Down
4 changes: 3 additions & 1 deletion cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.rocketmq.cli.broker.DescribeCluster;
import com.automq.rocketmq.cli.broker.TerminateNode;
import com.automq.rocketmq.cli.consumer.ConsumeMessage;
import com.automq.rocketmq.cli.consumer.ConsumerClientConnection;
import com.automq.rocketmq.cli.consumer.CreateGroup;
import com.automq.rocketmq.cli.consumer.DeleteGroup;
import com.automq.rocketmq.cli.consumer.DescribeGroup;
Expand Down Expand Up @@ -60,7 +61,8 @@
TerminateNode.class,
ResetConsumeOffset.class,
PrintTopicStats.class,
ProducerClientConnection.class
ProducerClientConnection.class,
ConsumerClientConnection.class
}
)
public class MQAdmin implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient;
import de.vandermeer.asciitable.AT_Row;
import de.vandermeer.asciitable.AsciiTable;
import de.vandermeer.asciitable.CWC_LongestLine;
import de.vandermeer.skb.interfaces.transformers.textformat.TextAlignment;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.stream.IntStream;
import picocli.CommandLine;

@CommandLine.Command(name = "consumerClientConnection", mixinStandardHelpOptions = true, showDefaultValues = true)
public class ConsumerClientConnection implements Callable<Void> {
@CommandLine.ParentCommand
MQAdmin mqAdmin;

@CommandLine.Option(names = {"-g", "--groupName"}, description = "Group name", required = true)
String groupName;

@Override
public Void call() throws Exception {
GrpcProxyClient proxyClient = new GrpcProxyClient(new CliClientConfig());

List<apache.rocketmq.proxy.v1.ConsumerClientConnection> connections = proxyClient.consumerClientConnection(mqAdmin.getEndpoint(), ConsumerClientConnectionRequest.newBuilder().setGroup(groupName).build()).get();

AsciiTable groupTable = new AsciiTable();
groupTable.addRule();
AT_Row row = groupTable.addRow("CLIENT ID", "PROTOCOL", "VERSION", "ADDRESS", "LANGUAGE");
centralize(row);
groupTable.addRule();

for (apache.rocketmq.proxy.v1.ConsumerClientConnection connection : connections) {
row = groupTable.addRow(connection.getClientId(), connection.getProtocol(), connection.getVersion(), connection.getAddress(), connection.getLanguage());
centralize(row);
groupTable.addRule();
}

CWC_LongestLine cwc = new CWC_LongestLine();
IntStream.range(0, row.getCells().size()).forEach((i) -> {
cwc.add(10, 0);
});
groupTable.getRenderer().setCWC(cwc);

String render = groupTable.render();
System.out.println(render);
return null;
}

private void centralize(AT_Row row) {
row.getCells().forEach((cell -> cell.getContext().setTextAlignment(TextAlignment.CENTER)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ProducerClientConnection implements Callable<Void> {
public Void call() throws Exception {
GrpcProxyClient proxyClient = new GrpcProxyClient(new CliClientConfig());

List<apache.rocketmq.proxy.v1.ProducerClientConnection> connections = proxyClient.producerClientConnection(mqAdmin.getEndpoint(), ProducerClientConnectionRequest.newBuilder().setProductionGroup(groupName).build()).get();
List<apache.rocketmq.proxy.v1.ProducerClientConnection> connections = proxyClient.producerClientConnection(mqAdmin.getEndpoint(), ProducerClientConnectionRequest.newBuilder().setGroup(groupName).build()).get();

AsciiTable groupTable = new AsciiTable();
groupTable.addRule();
Expand Down
34 changes: 30 additions & 4 deletions proto/src/main/proto/proxy/proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ message TopicStatsReply {

message ProducerClientConnectionRequest {
ProxyRequestContext context = 1;
// Producer client id
string production_group = 2;
// Producer group name
string group = 2;
}

message ProducerClientConnection {
Expand All @@ -103,8 +103,6 @@ message ProducerClientConnection {
string version = 4;
// Client address
string address = 5;
// Connected time
int64 connected_time = 6;
// Last update time
int64 last_update_time = 7;
}
Expand All @@ -115,9 +113,37 @@ message ProducerClientConnectionReply {
repeated ProducerClientConnection connection = 2;
}

message ConsumerClientConnectionRequest {
ProxyRequestContext context = 1;
// Consumer group name
string group = 2;
}

message ConsumerClientConnection {
// Producer client id
string client_id = 1;
// Protocol
string protocol = 2;
// Language
string language = 3;
// Version
string version = 4;
// Client address
string address = 5;
// Last update time
int64 last_update_time = 7;
}

message ConsumerClientConnectionReply {
Status status = 1;
// Producer client connection
repeated ConsumerClientConnection connection = 2;
}

service ProxyService {
rpc resetConsumeOffset(ResetConsumeOffsetRequest) returns (ResetConsumeOffsetReply) {}
rpc resetConsumeOffsetByTimestamp(ResetConsumeOffsetByTimestampRequest) returns (ResetConsumeOffsetReply) {}
rpc topicStats(TopicStatsRequest) returns (TopicStatsReply) {}
rpc producerClientConnection(ProducerClientConnectionRequest) returns (ProducerClientConnectionReply) {}
rpc consumerClientConnection(ConsumerClientConnectionRequest) returns (ConsumerClientConnectionReply) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.automq.rocketmq.proxy.grpc;

import apache.rocketmq.proxy.v1.ConsumerClientConnection;
import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest;
import apache.rocketmq.proxy.v1.ProducerClientConnection;
import apache.rocketmq.proxy.v1.ProducerClientConnectionRequest;
import apache.rocketmq.proxy.v1.QueueStats;
Expand All @@ -36,4 +38,7 @@ public interface ProxyClient extends Closeable {

CompletableFuture<List<ProducerClientConnection>> producerClientConnection(String target,
ProducerClientConnectionRequest request);

CompletableFuture<List<ConsumerClientConnection>> consumerClientConnection(String target,
ConsumerClientConnectionRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package com.automq.rocketmq.proxy.grpc;

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.proxy.v1.ConsumerClientConnection;
import apache.rocketmq.proxy.v1.ConsumerClientConnectionReply;
import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest;
import apache.rocketmq.proxy.v1.ProducerClientConnection;
import apache.rocketmq.proxy.v1.ProducerClientConnectionReply;
import apache.rocketmq.proxy.v1.ProducerClientConnectionRequest;
Expand All @@ -36,6 +39,8 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.utils.NetworkUtil;
Expand All @@ -49,10 +54,13 @@ public class ProxyServiceImpl extends ProxyServiceGrpc.ProxyServiceImplBase {
private final ExtendMessageService messageService;

private final ProducerManager producerManager;
private final ConsumerManager consumerManager;

public ProxyServiceImpl(ExtendMessageService messageService, ProducerManager producerManager) {
public ProxyServiceImpl(ExtendMessageService messageService, ProducerManager producerManager,
ConsumerManager consumerManager) {
this.messageService = messageService;
this.producerManager = producerManager;
this.consumerManager = consumerManager;
}

@Override
Expand Down Expand Up @@ -126,13 +134,13 @@ public void topicStats(TopicStatsRequest request, StreamObserver<TopicStatsReply
@Override
public void producerClientConnection(ProducerClientConnectionRequest request,
StreamObserver<ProducerClientConnectionReply> responseObserver) {
ConcurrentHashMap<Channel, ClientChannelInfo> map = producerManager.getGroupChannelTable().get(request.getProductionGroup());
ConcurrentHashMap<Channel, ClientChannelInfo> map = producerManager.getGroupChannelTable().get(request.getGroup());
if (map == null) {
responseObserver.onNext(ProducerClientConnectionReply.newBuilder()
.setStatus(Status
.newBuilder()
.setCode(Code.BAD_REQUEST)
.setMessage("Producer group not found: " + request.getProductionGroup())
.setMessage("Producer group not found: " + request.getGroup())
.build())
.build());
responseObserver.onCompleted();
Expand All @@ -156,4 +164,38 @@ public void producerClientConnection(ProducerClientConnectionRequest request,
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}

@Override
public void consumerClientConnection(ConsumerClientConnectionRequest request,
StreamObserver<ConsumerClientConnectionReply> responseObserver) {
ConsumerGroupInfo groupInfo = consumerManager.getConsumerGroupInfo(request.getGroup(), true);
if (groupInfo == null) {
responseObserver.onNext(ConsumerClientConnectionReply.newBuilder()
.setStatus(Status
.newBuilder()
.setCode(Code.BAD_REQUEST)
.setMessage("Consumer group not found: " + request.getGroup())
.build())
.build());
responseObserver.onCompleted();
return;
}
ConsumerClientConnectionReply.Builder builder = ConsumerClientConnectionReply.newBuilder();
for (ClientChannelInfo info : groupInfo.getChannelInfoTable().values()) {
String protocolType = ChannelProtocolType.REMOTING.name();
if (info.getChannel() instanceof GrpcClientChannel) {
protocolType = ChannelProtocolType.GRPC_V2.name();
}
builder.addConnection(ConsumerClientConnection.newBuilder()
.setClientId(info.getClientId())
.setProtocol(protocolType)
.setAddress(NetworkUtil.socketAddress2String(info.getChannel().remoteAddress()))
.setLanguage(info.getLanguage().name())
.setVersion(MQVersion.getVersionDesc(info.getVersion()))
.setLastUpdateTime(info.getLastUpdateTimestamp())
.build());
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package com.automq.rocketmq.proxy.grpc.client;

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.proxy.v1.ConsumerClientConnection;
import apache.rocketmq.proxy.v1.ConsumerClientConnectionReply;
import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest;
import apache.rocketmq.proxy.v1.ProducerClientConnection;
import apache.rocketmq.proxy.v1.ProducerClientConnectionReply;
import apache.rocketmq.proxy.v1.ProducerClientConnectionRequest;
Expand Down Expand Up @@ -174,6 +177,31 @@ public void onFailure(Throwable t) {
return future;
}

@Override
public CompletableFuture<List<ConsumerClientConnection>> consumerClientConnection(String target,
ConsumerClientConnectionRequest request) {
ProxyServiceGrpc.ProxyServiceFutureStub stub = getOrCreateStubForTarget(target);

CompletableFuture<List<ConsumerClientConnection>> future = new CompletableFuture<>();
Futures.addCallback(stub.consumerClientConnection(request),
new FutureCallback<>() {
@Override
public void onSuccess(ConsumerClientConnectionReply result) {
if (result.getStatus().getCode() == Code.OK) {
future.complete(result.getConnectionList());
} else {
future.completeExceptionally(new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, result.getStatus().getMessage()));
}
}

@Override
public void onFailure(Throwable t) {
future.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return future;
}

@Override
public void close() throws IOException {
for (Map.Entry<String, ProxyServiceGrpc.ProxyServiceFutureStub> entry : stubs.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ public class DefaultServiceManager implements ServiceManager {

public DefaultServiceManager(BrokerConfig config, ProxyMetadataService proxyMetadataService,
DeadLetterService deadLetterService, MessageService messageService,
MessageStore messageStore, ProducerManager producerManager) {
MessageStore messageStore, ProducerManager producerManager, ConsumerManager consumerManager) {
this.metadataService = proxyMetadataService;
this.deadLetterService = deadLetterService;
this.resourceMetadataService = new ResourceMetadataService(proxyMetadataService);
this.messageService = messageService;
this.topicRouteService = new TopicRouteServiceImpl(config, proxyMetadataService);
this.producerManager = producerManager;
this.consumerManager = new ConsumerManager(new ConsumerIdsChangeListenerImpl(), config.proxy().channelExpiredTimeout());
this.consumerManager = consumerManager;
this.proxyRelayService = new ProxyRelayServiceImpl();
this.transactionService = new TransactionServiceImpl();
this.adminService = new AdminServiceImpl();
Expand Down Expand Up @@ -109,7 +109,7 @@ public void shutdown() throws Exception {
public void start() throws Exception {
}

protected static class ConsumerIdsChangeListenerImpl implements ConsumerIdsChangeListener {
public static class ConsumerIdsChangeListenerImpl implements ConsumerIdsChangeListener {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
// TODO: implement this to support consumer group change notification
Expand Down

0 comments on commit 808bb55

Please sign in to comment.