From 2f26707f0caf852bf414a3ef8def4ef3c7241e91 Mon Sep 17 00:00:00 2001 From: "zhouyou.ltx" Date: Wed, 3 Jan 2024 12:17:33 +0800 Subject: [PATCH 1/3] feat(groupStatus): Add "groupStatus" command to the CLI tools #764 Signed-off-by: zhouyou.ltx --- proto/src/main/proto/proxy/proxy.proto | 35 +++++++++++- .../rocketmq/proxy/grpc/ProxyServiceImpl.java | 57 +++++++++++++++++++ .../proxy/grpc/ProxyServiceImplTest.java | 13 +++++ 3 files changed, 104 insertions(+), 1 deletion(-) diff --git a/proto/src/main/proto/proxy/proxy.proto b/proto/src/main/proto/proxy/proxy.proto index 743b43272..00a7979ea 100644 --- a/proto/src/main/proto/proxy/proxy.proto +++ b/proto/src/main/proto/proxy/proxy.proto @@ -136,7 +136,7 @@ message ConsumerClientConnection { message ConsumerClientConnectionReply { Status status = 1; - // Producer client connection + // consumer client connection repeated ConsumerClientConnection connection = 2; } @@ -165,6 +165,38 @@ message RelayReply { Status status = 1; } + + +message ConsumerConnectionRequest { + ProxyRequestContext context = 1; + // Consumer group name + string group = 2; +} + + +message ConsumerSubInfo { + // topic + string topic = 1; + // subExpression + string sub_expression = 2; +} + +message ConsumerGroupCliInfo { + string consume_type = 1; + string message_model = 2; + string consume_from_where = 3; + // consumer client connection + repeated ConsumerClientConnection connection = 4; + // consumer subscription info + repeated ConsumerSubInfo consumer_sub_info = 5; +} +message ConsumerConnectionReply { + Status status = 1; + // consumer group cli info + ConsumerGroupCliInfo consumer_group_cli_info = 2; +} + + service ProxyService { rpc resetConsumeOffset(ResetConsumeOffsetRequest) returns (ResetConsumeOffsetReply) {} rpc resetConsumeOffsetByTimestamp(ResetConsumeOffsetByTimestampRequest) returns (ResetConsumeOffsetReply) {} @@ -172,4 +204,5 @@ service ProxyService { rpc producerClientConnection(ProducerClientConnectionRequest) returns (ProducerClientConnectionReply) {} rpc consumerClientConnection(ConsumerClientConnectionRequest) returns (ConsumerClientConnectionReply) {} rpc relay(RelayRequest) returns (RelayReply) {} + rpc consumerConnection(ConsumerConnectionRequest) returns (ConsumerConnectionReply) {} } \ No newline at end of file diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java index 97365c9a8..5a52e2d24 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java @@ -21,6 +21,10 @@ import apache.rocketmq.proxy.v1.ConsumerClientConnection; import apache.rocketmq.proxy.v1.ConsumerClientConnectionReply; import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest; +import apache.rocketmq.proxy.v1.ConsumerConnectionReply; +import apache.rocketmq.proxy.v1.ConsumerConnectionRequest; +import apache.rocketmq.proxy.v1.ConsumerGroupCliInfo; +import apache.rocketmq.proxy.v1.ConsumerSubInfo; import apache.rocketmq.proxy.v1.ProducerClientConnection; import apache.rocketmq.proxy.v1.ProducerClientConnectionReply; import apache.rocketmq.proxy.v1.ProducerClientConnectionRequest; @@ -52,6 +56,7 @@ import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.slf4j.Logger; public class ProxyServiceImpl extends ProxyServiceGrpc.ProxyServiceImplBase { @@ -241,4 +246,56 @@ public void relay(RelayRequest request, StreamObserver responseObser } } } + + @Override + public void consumerConnection(ConsumerConnectionRequest request, + StreamObserver responseObserver) { + ConsumerGroupInfo groupInfo = consumerManager.getConsumerGroupInfo(request.getGroup(), true); + if (groupInfo == null) { + responseObserver.onNext(ConsumerConnectionReply.newBuilder() + .setStatus(Status + .newBuilder() + .setCode(Code.BAD_REQUEST) + .setMessage("Consumer group not found: " + request.getGroup()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + + ConsumerGroupCliInfo.Builder consumerBuilder = ConsumerGroupCliInfo.newBuilder(); + + consumerBuilder + .setConsumeType(groupInfo.getConsumeType().getTypeCN()) + .setMessageModel(groupInfo.getMessageModel().getModeCN()) + .setConsumeFromWhere(groupInfo.getConsumeFromWhere().name()); + + for (ClientChannelInfo info : groupInfo.getChannelInfoTable().values()) { + String protocolType = ChannelProtocolType.REMOTING.name(); + if (info.getChannel() instanceof GrpcClientChannel) { + protocolType = ChannelProtocolType.GRPC_V2.name(); + } + consumerBuilder.addConnection(ConsumerClientConnection.newBuilder() + .setClientId(info.getClientId()) + .setProtocol(protocolType) + .setAddress(NetworkUtil.socketAddress2String(info.getChannel().remoteAddress())) + .setLanguage(info.getLanguage().name()) + .setVersion(MQVersion.getVersionDesc(info.getVersion())) + .setLastUpdateTime(info.getLastUpdateTimestamp()) + .build()); + } + + for (SubscriptionData data : groupInfo.getSubscriptionTable().values()) { + consumerBuilder.addConsumerSubInfo(ConsumerSubInfo.newBuilder() + .setTopic(data.getTopic()) + .setSubExpression(data.getSubString()) + .build()); + } + ConsumerConnectionReply.Builder builder = ConsumerConnectionReply.newBuilder() + .setConsumerGroupCliInfo(consumerBuilder.build()); + + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); + } + } diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java b/proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java index 942202a2a..5b0f1322d 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java @@ -18,6 +18,8 @@ package com.automq.rocketmq.proxy.grpc; import apache.rocketmq.common.v1.Code; +import apache.rocketmq.proxy.v1.ConsumerClientConnection; +import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest; import apache.rocketmq.proxy.v1.ProxyServiceGrpc; import apache.rocketmq.proxy.v1.Status; import com.automq.rocketmq.common.config.BrokerConfig; @@ -28,6 +30,7 @@ import com.automq.rocketmq.store.api.MessageStore; import com.automq.rocketmq.store.model.message.PutResult; import java.lang.reflect.Field; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.client.ConsumerManager; @@ -75,4 +78,14 @@ void relay() { Status status = proxyClient.relayMessage(TARGET, messageExt.message()).join(); assertEquals(Code.OK, status.getCode()); } + + + @Test + void ConsumerClientConnection() { + final String groupName = ""; + + ConsumerClientConnectionRequest request = ConsumerClientConnectionRequest.newBuilder().setGroup(groupName).build(); + List list = proxyClient.consumerClientConnection(TARGET, request).join(); + + } } \ No newline at end of file From 757e1e4adbe047964ddfc0480bff6ded9f6520f3 Mon Sep 17 00:00:00 2001 From: "zhouyou.ltx" Date: Wed, 3 Jan 2024 20:09:23 +0800 Subject: [PATCH 2/3] test gpg --- .../automq/rocketmq/proxy/service/ProxyRelayServiceImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/ProxyRelayServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/ProxyRelayServiceImpl.java index b5d3d7d4f..b0cc075cd 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/ProxyRelayServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/ProxyRelayServiceImpl.java @@ -18,6 +18,7 @@ package com.automq.rocketmq.proxy.service; import java.util.concurrent.CompletableFuture; + import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult; @@ -35,6 +36,7 @@ public class ProxyRelayServiceImpl implements ProxyRelayService { @Override public CompletableFuture> processGetConsumerRunningInfo(ProxyContext context, RemotingCommand command, GetConsumerRunningInfoRequestHeader header) { + // throw new UnsupportedOperationException(); } From 3a8d7a505a0a6e06cc83d7655ed3d4de22a6b1ec Mon Sep 17 00:00:00 2001 From: "zhouyou.ltx" Date: Mon, 15 Jan 2024 09:10:25 +0800 Subject: [PATCH 3/3] feat(consumer status monitor) --- .../rocketmq/broker/BrokerController.java | 5 +- .../java/com/automq/rocketmq/cli/MQAdmin.java | 4 +- .../rocketmq/cli/consumer/ConsumerStatus.java | 204 ++++++++++++++++++ proto/src/main/proto/proxy/proxy.proto | 76 ++++--- .../rocketmq/proxy/grpc/ProxyClient.java | 5 + .../rocketmq/proxy/grpc/ProxyServiceImpl.java | 149 ++++++++----- .../proxy/grpc/client/GrpcProxyClient.java | 26 +++ .../proxy/service/DefaultServiceManager.java | 12 +- .../proxy/service/ProxyRelayServiceImpl.java | 49 ++++- .../proxy/grpc/ProxyServiceImplTest.java | 6 +- 10 files changed, 438 insertions(+), 98 deletions(-) create mode 100644 cli/src/main/java/com/automq/rocketmq/cli/consumer/ConsumerStatus.java diff --git a/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java b/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java index d4b233f3a..107df1d69 100644 --- a/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java @@ -199,9 +199,12 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception { // TODO: Split controller to a separate port ControllerServiceImpl controllerService = MetadataStoreBuilder.build(metadataStore); - ProxyServiceImpl proxyService = new ProxyServiceImpl(messageStore, extendMessageService, producerManager, consumerManager); + ProxyServiceImpl proxyService = new ProxyServiceImpl(messageStore, extendMessageService + , producerManager, consumerManager,serviceManager); grpcServer = new GrpcProtocolServer(brokerConfig.proxy(), messagingProcessor, controllerService, proxyService); remotingServer = new RemotingProtocolServer(messagingProcessor); + + ((DefaultServiceManager)serviceManager).setRemotingProtocolServer(this.remotingServer); } @Override diff --git a/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java b/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java index 6aa408b7c..20326ce32 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java @@ -21,6 +21,7 @@ import com.automq.rocketmq.cli.broker.TerminateNode; import com.automq.rocketmq.cli.consumer.ConsumeMessage; import com.automq.rocketmq.cli.consumer.ConsumerClientConnection; +import com.automq.rocketmq.cli.consumer.ConsumerStatus; import com.automq.rocketmq.cli.consumer.CreateGroup; import com.automq.rocketmq.cli.consumer.DeleteGroup; import com.automq.rocketmq.cli.consumer.DescribeGroup; @@ -62,7 +63,8 @@ ResetConsumeOffset.class, PrintTopicStats.class, ProducerClientConnection.class, - ConsumerClientConnection.class + ConsumerClientConnection.class, + ConsumerStatus.class } ) public class MQAdmin implements Runnable { diff --git a/cli/src/main/java/com/automq/rocketmq/cli/consumer/ConsumerStatus.java b/cli/src/main/java/com/automq/rocketmq/cli/consumer/ConsumerStatus.java new file mode 100644 index 000000000..d5567cbcf --- /dev/null +++ b/cli/src/main/java/com/automq/rocketmq/cli/consumer/ConsumerStatus.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.cli.consumer; + +import apache.rocketmq.proxy.v1.ConsumerStatusReply; +import apache.rocketmq.proxy.v1.ConsumerStatusRequest; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; +import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient; +import de.vandermeer.asciitable.AT_Row; +import de.vandermeer.asciitable.AsciiTable; +import de.vandermeer.asciitable.CWC_LongestLine; +import de.vandermeer.skb.interfaces.transformers.textformat.TextAlignment; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.protocol.body.ConsumeStatus; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.PopProcessQueueInfo; +import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import picocli.CommandLine; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.stream.IntStream; + +@CommandLine.Command(name = "consumerStatus", mixinStandardHelpOptions = true, showDefaultValues = true) +public class ConsumerStatus implements Callable { + @CommandLine.ParentCommand + MQAdmin mqAdmin; + + @CommandLine.Option(names = {"-g", "--groupName"}, description = "Group name", required = true) + String groupName; + @CommandLine.Option(names = {"-i", "--clientId"}, description = "clientId", required = false) + String clientId; + @CommandLine.Option(names = {"-s", "--jstack"}, description = "jstack", required = false) + boolean jstackEnable; + + @Override + public Void call() throws Exception { + GrpcProxyClient proxyClient = new GrpcProxyClient(new CliClientConfig()); + + ConsumerStatusReply consumerStatusReply + = proxyClient.consumerStatus(mqAdmin.getEndpoint(), + ConsumerStatusRequest.newBuilder().setGroup(groupName) + .setClientId(clientId).setJstackEnable(jstackEnable).build()).get(); + + AsciiTable groupTable = new AsciiTable(); + groupTable.addStrongRule(); + groupTable.addRow("Consumer Group"); + groupTable.addRow("consumeType", "messageModel", "consumeFromWhere"); + groupTable.addRow(consumerStatusReply.getConsumeType(), consumerStatusReply.getMessageModel(), consumerStatusReply.getConsumeFromWhere()); + + + groupTable.addStrongRule(); + groupTable.addRow("Consumer Client"); + groupTable.addRow("CLIENT ID", "PROTOCOL", "VERSION", "ADDRESS", "LANGUAGE"); + groupTable.addRule(); + for (apache.rocketmq.proxy.v1.ConsumerClientConnection connection : consumerStatusReply.getConnectionList()) { + groupTable.addRow(connection.getClientId(), connection.getProtocol(), connection.getVersion(), connection.getAddress(), connection.getLanguage()); + groupTable.addRule(); + } + + //Consumer Subscription + final ConsumerRunningInfo consumerRunningInfo = ConsumerRunningInfo + .decode(consumerStatusReply.getConsumerRunningInfo().toByteArray(), ConsumerRunningInfo.class); + addGroupInfo(consumerRunningInfo,groupTable); + +// CWC_LongestLine cwc = new CWC_LongestLine(); +// IntStream.range(0, row.getCells().size()).forEach((i) -> { +// cwc.add(10, 0); +// }); +// groupTable.getRenderer().setCWC(cwc); + + String render = groupTable.render(); + System.out.println(render); + return null; + } + + private void addGroupInfo(ConsumerRunningInfo consumerRunningInfo,AsciiTable groupTable) { + groupTable.addStrongRule(); + groupTable.addRow("Consumer Subscription"); + AT_Row row = groupTable.addRow("Topic", "PROTOCOL", "ClassFilter", "SubExpression"); + if (consumerRunningInfo.getSubscriptionSet() != null + && !consumerRunningInfo.getSubscriptionSet().isEmpty()) { + for (SubscriptionData subscriptionData : consumerRunningInfo.getSubscriptionSet()) { + groupTable.addRow(subscriptionData.getTopic(), subscriptionData.isClassFilterMode(), + subscriptionData.getSubString()); + groupTable.addRule(); + } + } + + groupTable.addStrongRule(); + groupTable.addRow("Consumer Offset"); + groupTable.addRow("Topic", "Broker Name", "QID", "Consumer Offset"); + if (consumerRunningInfo.getMqTable() != null + && !consumerRunningInfo.getMqTable().entrySet().isEmpty()) { + for (Map.Entry entry : consumerRunningInfo.getMqTable().entrySet()) { + groupTable.addRow(entry.getKey().getTopic(), + entry.getKey().getBrokerName(), + entry.getKey().getQueueId(), + entry.getValue().getCommitOffset()); + groupTable.addRule(); + } + } + + groupTable.addStrongRule(); + groupTable.addRow("Consumer MQ Detail"); + groupTable.addRow("Topic", "Broker Name", "QID", "Consumer Offset"); + if (consumerRunningInfo.getMqTable() != null + && !consumerRunningInfo.getMqTable().entrySet().isEmpty()) { + for (Map.Entry entry : consumerRunningInfo.getMqTable().entrySet()) { + groupTable.addRow(entry.getKey().getTopic(), + entry.getKey().getBrokerName(), + entry.getKey().getQueueId(), + entry.getValue().toString()); + groupTable.addRule(); + } + } + + groupTable.addStrongRule(); + groupTable.addRow("Consumer MQ Detail"); + groupTable.addRow("Topic", "Broker Name", "QID", "ProcessQueueInfo"); + if (consumerRunningInfo.getMqTable() != null + && !consumerRunningInfo.getMqTable().entrySet().isEmpty()) { + for (Map.Entry entry : consumerRunningInfo.getMqTable().entrySet()) { + groupTable.addRow(entry.getKey().getTopic(), + entry.getKey().getBrokerName(), + entry.getKey().getQueueId(), + entry.getValue().toString()); + groupTable.addRule(); + } + } + + groupTable.addStrongRule(); + groupTable.addRow("Consumer Pop Detail"); + groupTable.addRow("Topic", "Broker Name", "QID", "ProcessQueueInfo"); + if (consumerRunningInfo.getMqTable() != null + && !consumerRunningInfo.getMqTable().entrySet().isEmpty()) { + for (Map.Entry entry : consumerRunningInfo.getMqPopTable().entrySet()) { + groupTable.addRow(entry.getKey().getTopic(), + entry.getKey().getBrokerName(), + entry.getKey().getQueueId(), + entry.getValue().toString()); + groupTable.addRule(); + } + } + + groupTable.addStrongRule(); + groupTable.addRow("Consumer RT&TPS"); + groupTable.addRow("Topic", "Pull RT", "Pull TPS", "Consume RT", "ConsumeOK TPS", "ConsumeFailed TPS", "ConsumeFailedMsgsInHour"); + if (consumerRunningInfo.getStatusTable() != null + && !consumerRunningInfo.getMqTable().entrySet().isEmpty()) { + for (Map.Entry entry : consumerRunningInfo.getStatusTable().entrySet()) { + groupTable.addRow(entry.getKey(), + entry.getValue().getPullRT(), + entry.getValue().getPullTPS(), + entry.getValue().getConsumeRT(), + entry.getValue().getConsumeOKTPS(), + entry.getValue().getConsumeFailedTPS(), + entry.getValue().getConsumeFailedMsgs()); + groupTable.addRule(); + } + } + + groupTable.addStrongRule(); + groupTable.addRow("User Consume Info"); + groupTable.addRow("Topic", "Pull RT", "Pull TPS", "Consume RT", "ConsumeOK TPS", "ConsumeFailed TPS", "ConsumeFailedMsgsInHour"); + if (consumerRunningInfo.getUserConsumerInfo() != null + && !consumerRunningInfo.getUserConsumerInfo().entrySet().isEmpty()) { + for (Map.Entry entry : consumerRunningInfo.getUserConsumerInfo().entrySet()) { + groupTable.addRow(entry.getKey(), entry.getValue()); + groupTable.addRule(); + } + } + + groupTable.addStrongRule(); + groupTable.addRow("Consumer jstack"); + if (consumerRunningInfo.getJstack() != null && + !consumerRunningInfo.getJstack().isEmpty()) { + groupTable.addRow(consumerRunningInfo.getJstack()); + } + + } + + private void centralize(AT_Row row) { + row.setTextAlignment(TextAlignment.CENTER); +// row.getCells().forEach((cell -> cell.getContext().setTextAlignment(TextAlignment.CENTER))); + } +} diff --git a/proto/src/main/proto/proxy/proxy.proto b/proto/src/main/proto/proxy/proxy.proto index 00a7979ea..4b271f1b5 100644 --- a/proto/src/main/proto/proxy/proxy.proto +++ b/proto/src/main/proto/proxy/proxy.proto @@ -134,12 +134,52 @@ message ConsumerClientConnection { int64 last_update_time = 7; } + + message ConsumerClientConnectionReply { Status status = 1; - // consumer client connection + // Producer client connection + repeated ConsumerClientConnection connection = 2; +} + + + +message ConsumerSubInfo { + // topic + string topic = 1; + // subExpression + string sub_expression = 2; +} + +message ConsumerStatusReply { + Status status = 1; + // consumer connection repeated ConsumerClientConnection connection = 2; + + repeated ConsumerSubInfo consumer_sub_info = 3; + + string consume_type = 4; + + string message_model = 5; + + string consume_from_where = 6; + + bytes consumer_running_info = 7; } +message ConsumerStatusRequest { + ProxyRequestContext context = 1; + // Consumer group name + string group = 2; + + string client_id = 3; + + bool jstack_enable = 4; +} + + + + message TraceContext { // Trace id string trace_id = 1; @@ -165,38 +205,6 @@ message RelayReply { Status status = 1; } - - -message ConsumerConnectionRequest { - ProxyRequestContext context = 1; - // Consumer group name - string group = 2; -} - - -message ConsumerSubInfo { - // topic - string topic = 1; - // subExpression - string sub_expression = 2; -} - -message ConsumerGroupCliInfo { - string consume_type = 1; - string message_model = 2; - string consume_from_where = 3; - // consumer client connection - repeated ConsumerClientConnection connection = 4; - // consumer subscription info - repeated ConsumerSubInfo consumer_sub_info = 5; -} -message ConsumerConnectionReply { - Status status = 1; - // consumer group cli info - ConsumerGroupCliInfo consumer_group_cli_info = 2; -} - - service ProxyService { rpc resetConsumeOffset(ResetConsumeOffsetRequest) returns (ResetConsumeOffsetReply) {} rpc resetConsumeOffsetByTimestamp(ResetConsumeOffsetByTimestampRequest) returns (ResetConsumeOffsetReply) {} @@ -204,5 +212,5 @@ service ProxyService { rpc producerClientConnection(ProducerClientConnectionRequest) returns (ProducerClientConnectionReply) {} rpc consumerClientConnection(ConsumerClientConnectionRequest) returns (ConsumerClientConnectionReply) {} rpc relay(RelayRequest) returns (RelayReply) {} - rpc consumerConnection(ConsumerConnectionRequest) returns (ConsumerConnectionReply) {} + rpc consumerStatus(ConsumerStatusRequest) returns (ConsumerStatusReply) {} } \ No newline at end of file diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyClient.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyClient.java index 745b0a365..dbe037acc 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyClient.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyClient.java @@ -19,6 +19,8 @@ import apache.rocketmq.proxy.v1.ConsumerClientConnection; import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest; +import apache.rocketmq.proxy.v1.ConsumerStatusReply; +import apache.rocketmq.proxy.v1.ConsumerStatusRequest; import apache.rocketmq.proxy.v1.ProducerClientConnection; import apache.rocketmq.proxy.v1.ProducerClientConnectionRequest; import apache.rocketmq.proxy.v1.QueueStats; @@ -44,5 +46,8 @@ CompletableFuture> producerClientConnection(Strin CompletableFuture> consumerClientConnection(String target, ConsumerClientConnectionRequest request); + CompletableFuture consumerStatus(String target, ConsumerStatusRequest request); + + CompletableFuture relayMessage(String target, FlatMessage message); } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java index 5a52e2d24..e5870c609 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java @@ -21,9 +21,8 @@ import apache.rocketmq.proxy.v1.ConsumerClientConnection; import apache.rocketmq.proxy.v1.ConsumerClientConnectionReply; import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest; -import apache.rocketmq.proxy.v1.ConsumerConnectionReply; -import apache.rocketmq.proxy.v1.ConsumerConnectionRequest; -import apache.rocketmq.proxy.v1.ConsumerGroupCliInfo; +import apache.rocketmq.proxy.v1.ConsumerStatusReply; +import apache.rocketmq.proxy.v1.ConsumerStatusRequest; import apache.rocketmq.proxy.v1.ConsumerSubInfo; import apache.rocketmq.proxy.v1.ProducerClientConnection; import apache.rocketmq.proxy.v1.ProducerClientConnectionReply; @@ -42,11 +41,13 @@ import com.automq.rocketmq.proxy.service.ExtendMessageService; import com.automq.rocketmq.store.api.MessageStore; import com.automq.rocketmq.store.model.StoreContext; +import com.google.protobuf.ByteString; import com.google.protobuf.TextFormat; import io.grpc.stub.StreamObserver; import io.netty.channel.Channel; import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -54,8 +55,16 @@ import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; +import org.apache.rocketmq.proxy.service.ServiceManager; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.slf4j.Logger; @@ -69,12 +78,16 @@ public class ProxyServiceImpl extends ProxyServiceGrpc.ProxyServiceImplBase { private final ProducerManager producerManager; private final ConsumerManager consumerManager; + private final ServiceManager serviceManager; + public ProxyServiceImpl(MessageStore messageStore, ExtendMessageService messageService, - ProducerManager producerManager, ConsumerManager consumerManager) { + ProducerManager producerManager, ConsumerManager consumerManager, + ServiceManager serviceManager) { this.messageStore = messageStore; this.messageService = messageService; this.producerManager = producerManager; this.consumerManager = consumerManager; + this.serviceManager = serviceManager; } @Override @@ -213,6 +226,82 @@ public void consumerClientConnection(ConsumerClientConnectionRequest request, responseObserver.onCompleted(); } + + @Override + public void consumerStatus(ConsumerStatusRequest request, StreamObserver responseObserver) { + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + header.setConsumerGroup(request.getGroup()); + try { + + ConsumerStatusReply.Builder consumerStatusReplyBuilder = ConsumerStatusReply.newBuilder(); + + //1.request to client to get info + GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader(); + requestHeader.setConsumerGroup(request.getGroup()); + requestHeader.setClientId(request.getClientId()); + requestHeader.setJstackEnable(request.getJstackEnable()); + + ProxyRelayService proxyRelayService = serviceManager.getProxyRelayService(); + CompletableFuture> completableFuture + = proxyRelayService.processGetConsumerRunningInfo(ProxyContext.create(), + RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader), header); + ProxyRelayResult result = completableFuture.get(); + ConsumerRunningInfo consumerRunningInfo = result.getResult(); + consumerStatusReplyBuilder.setConsumerRunningInfo(ByteString.copyFrom(consumerRunningInfo.encode())); + + //2.get consumer group info + ConsumerGroupInfo groupInfo = consumerManager.getConsumerGroupInfo(request.getGroup(), true); + if (groupInfo == null) { + responseObserver.onNext(ConsumerStatusReply.newBuilder() + .setStatus(Status + .newBuilder() + .setCode(Code.BAD_REQUEST) + .setMessage("Consumer group not found: " + request.getGroup()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + + //3.assemble info from response + consumerStatusReplyBuilder + .setConsumeType(groupInfo.getConsumeType().getTypeCN()) + .setMessageModel(groupInfo.getMessageModel().getModeCN()) + .setConsumeFromWhere(groupInfo.getConsumeFromWhere().name()); + + if (groupInfo.getChannelInfoTable() != null && !groupInfo.getChannelInfoTable().isEmpty()) { + for (ClientChannelInfo info : groupInfo.getChannelInfoTable().values()) { + String protocolType = ChannelProtocolType.REMOTING.name(); + if (info.getChannel() instanceof GrpcClientChannel) { + protocolType = ChannelProtocolType.GRPC_V2.name(); + } + consumerStatusReplyBuilder.addConnection(ConsumerClientConnection.newBuilder() + .setClientId(info.getClientId()) + .setProtocol(protocolType) + .setAddress(NetworkUtil.socketAddress2String(info.getChannel().remoteAddress())) + .setLanguage(info.getLanguage().name()) + .setVersion(MQVersion.getVersionDesc(info.getVersion())) + .setLastUpdateTime(info.getLastUpdateTimestamp()) + .build()); + } + } + + if (groupInfo.getSubscriptionTable() != null && !groupInfo.getSubscriptionTable().isEmpty()) { + for (SubscriptionData data : groupInfo.getSubscriptionTable().values()) { + consumerStatusReplyBuilder.addConsumerSubInfo(ConsumerSubInfo.newBuilder() + .setTopic(data.getTopic()) + .setSubExpression(data.getSubString()) + .build()); + } + } + + responseObserver.onNext(consumerStatusReplyBuilder.build()); + responseObserver.onCompleted(); + } catch (Exception e) { + LOGGER.error("Failed to get topic message type for {}", "", e); + } + } + @Override public void relay(RelayRequest request, StreamObserver responseObserver) { switch (request.getCommandCase()) { @@ -246,56 +335,4 @@ public void relay(RelayRequest request, StreamObserver responseObser } } } - - @Override - public void consumerConnection(ConsumerConnectionRequest request, - StreamObserver responseObserver) { - ConsumerGroupInfo groupInfo = consumerManager.getConsumerGroupInfo(request.getGroup(), true); - if (groupInfo == null) { - responseObserver.onNext(ConsumerConnectionReply.newBuilder() - .setStatus(Status - .newBuilder() - .setCode(Code.BAD_REQUEST) - .setMessage("Consumer group not found: " + request.getGroup()) - .build()) - .build()); - responseObserver.onCompleted(); - return; - } - - ConsumerGroupCliInfo.Builder consumerBuilder = ConsumerGroupCliInfo.newBuilder(); - - consumerBuilder - .setConsumeType(groupInfo.getConsumeType().getTypeCN()) - .setMessageModel(groupInfo.getMessageModel().getModeCN()) - .setConsumeFromWhere(groupInfo.getConsumeFromWhere().name()); - - for (ClientChannelInfo info : groupInfo.getChannelInfoTable().values()) { - String protocolType = ChannelProtocolType.REMOTING.name(); - if (info.getChannel() instanceof GrpcClientChannel) { - protocolType = ChannelProtocolType.GRPC_V2.name(); - } - consumerBuilder.addConnection(ConsumerClientConnection.newBuilder() - .setClientId(info.getClientId()) - .setProtocol(protocolType) - .setAddress(NetworkUtil.socketAddress2String(info.getChannel().remoteAddress())) - .setLanguage(info.getLanguage().name()) - .setVersion(MQVersion.getVersionDesc(info.getVersion())) - .setLastUpdateTime(info.getLastUpdateTimestamp()) - .build()); - } - - for (SubscriptionData data : groupInfo.getSubscriptionTable().values()) { - consumerBuilder.addConsumerSubInfo(ConsumerSubInfo.newBuilder() - .setTopic(data.getTopic()) - .setSubExpression(data.getSubString()) - .build()); - } - ConsumerConnectionReply.Builder builder = ConsumerConnectionReply.newBuilder() - .setConsumerGroupCliInfo(consumerBuilder.build()); - - responseObserver.onNext(builder.build()); - responseObserver.onCompleted(); - } - } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/client/GrpcProxyClient.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/client/GrpcProxyClient.java index c08341bb3..7e4c82695 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/client/GrpcProxyClient.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/client/GrpcProxyClient.java @@ -21,6 +21,8 @@ import apache.rocketmq.proxy.v1.ConsumerClientConnection; import apache.rocketmq.proxy.v1.ConsumerClientConnectionReply; import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest; +import apache.rocketmq.proxy.v1.ConsumerStatusReply; +import apache.rocketmq.proxy.v1.ConsumerStatusRequest; import apache.rocketmq.proxy.v1.ProducerClientConnection; import apache.rocketmq.proxy.v1.ProducerClientConnectionReply; import apache.rocketmq.proxy.v1.ProducerClientConnectionRequest; @@ -207,6 +209,30 @@ public void onFailure(Throwable t) { return future; } + @Override + public CompletableFuture consumerStatus(String target, ConsumerStatusRequest request) { + ProxyServiceGrpc.ProxyServiceFutureStub stub = getOrCreateStubForTarget(target); + + CompletableFuture future = new CompletableFuture<>(); + Futures.addCallback(stub.consumerStatus(request), + new FutureCallback<>() { + @Override + public void onSuccess(ConsumerStatusReply result) { + if (result.getStatus().getCode() == Code.OK) { + future.complete(result); + } else { + future.completeExceptionally(new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, result.getStatus().getMessage())); + } + } + + @Override + public void onFailure(Throwable t) { + future.completeExceptionally(t); + } + }, MoreExecutors.directExecutor()); + return future; + } + @Override public CompletableFuture relayMessage(String target, FlatMessage message) { ProxyServiceGrpc.ProxyServiceFutureStub stub = getOrCreateStubForTarget(target); diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/DefaultServiceManager.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/DefaultServiceManager.java index 8d2570e6f..d70ca40b9 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/DefaultServiceManager.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/DefaultServiceManager.java @@ -19,6 +19,7 @@ import com.automq.rocketmq.common.config.BrokerConfig; import com.automq.rocketmq.metadata.api.ProxyMetadataService; +import com.automq.rocketmq.proxy.remoting.RemotingProtocolServer; import com.automq.rocketmq.store.api.MessageStore; import org.apache.rocketmq.broker.client.ConsumerGroupEvent; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; @@ -43,6 +44,7 @@ public class DefaultServiceManager implements ServiceManager { private final TransactionService transactionService; private final AdminService adminService; private final DeadLetterService deadLetterService; + private RemotingProtocolServer remotingProtocolServer; public DefaultServiceManager(BrokerConfig config, ProxyMetadataService proxyMetadataService, DeadLetterService deadLetterService, MessageService messageService, @@ -54,7 +56,7 @@ public DefaultServiceManager(BrokerConfig config, ProxyMetadataService proxyMeta this.topicRouteService = new TopicRouteServiceImpl(config, proxyMetadataService); this.producerManager = producerManager; this.consumerManager = consumerManager; - this.proxyRelayService = new ProxyRelayServiceImpl(); + this.proxyRelayService = new ProxyRelayServiceImpl(this.consumerManager,this.remotingProtocolServer); this.transactionService = new TransactionServiceImpl(); this.adminService = new AdminServiceImpl(); } @@ -100,6 +102,14 @@ public AdminService getAdminService() { return adminService; } + public RemotingProtocolServer getRemotingProtocolServer() { + return remotingProtocolServer; + } + + public void setRemotingProtocolServer(RemotingProtocolServer remotingProtocolServer) { + this.remotingProtocolServer = remotingProtocolServer; + } + @Override public void shutdown() throws Exception { topicRouteService.shutdown(); diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/ProxyRelayServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/ProxyRelayServiceImpl.java index b0cc075cd..81bd48571 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/ProxyRelayServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/ProxyRelayServiceImpl.java @@ -17,15 +17,21 @@ package com.automq.rocketmq.proxy.service; +import java.time.Duration; import java.util.concurrent.CompletableFuture; +import com.automq.rocketmq.proxy.remoting.RemotingProtocolServer; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; import org.apache.rocketmq.proxy.service.relay.RelayData; import org.apache.rocketmq.proxy.service.transaction.TransactionData; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader; @@ -33,11 +39,48 @@ import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; public class ProxyRelayServiceImpl implements ProxyRelayService { + + private static final long DEFAULT_MQ_CLIENT_TIMEOUT = Duration.ofSeconds(3).toMillis(); + private final ConsumerManager consumerManager; + + private final RemotingProtocolServer remotingProtocolServer; + + public ProxyRelayServiceImpl(ConsumerManager consumerManager, RemotingProtocolServer remotingProtocolServer) { + this.consumerManager = consumerManager; + this.remotingProtocolServer = remotingProtocolServer; + } + @Override public CompletableFuture> processGetConsumerRunningInfo(ProxyContext context, - RemotingCommand command, GetConsumerRunningInfoRequestHeader header) { - // - throw new UnsupportedOperationException(); + RemotingCommand command, GetConsumerRunningInfoRequestHeader header) { + + CompletableFuture> responseFuture = new CompletableFuture<>(); + + //1.find channel of client + ClientChannelInfo clientChannelInfo = consumerManager.findChannel(header.getConsumerGroup(), header.getClientId()); + + //2.invoke client by channel + try { + remotingProtocolServer.invokeToClient(clientChannelInfo.getChannel(), command, DEFAULT_MQ_CLIENT_TIMEOUT) + .thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + ConsumerRunningInfo consumerRunningInfo = ConsumerRunningInfo.decode(response.getBody(), ConsumerRunningInfo.class); + responseFuture.complete(new ProxyRelayResult<>(response.getCode(), + response.getRemark(), consumerRunningInfo)); + } else { + String errMsg = String.format("get consumer running info failed, code:%s remark:%s", response.getCode(), response.getRemark()); + RuntimeException e = new RuntimeException(errMsg); + responseFuture.completeExceptionally(e); + } + }).exceptionally(t -> { + responseFuture.completeExceptionally(ExceptionUtils.getRealException(t)); + return null; + }); + } catch (Throwable t) { + responseFuture.completeExceptionally(ExceptionUtils.getRealException(t)); + return null; + } + return responseFuture; } @Override diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java b/proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java index 5b0f1322d..b6f35026e 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java @@ -26,6 +26,7 @@ import com.automq.rocketmq.common.model.FlatMessageExt; import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient; import com.automq.rocketmq.proxy.mock.MockMessageUtil; +import com.automq.rocketmq.proxy.service.DefaultServiceManager; import com.automq.rocketmq.proxy.service.ExtendMessageService; import com.automq.rocketmq.store.api.MessageStore; import com.automq.rocketmq.store.model.message.PutResult; @@ -60,7 +61,8 @@ public void setup() throws NoSuchFieldException, IllegalAccessException { ExtendMessageService messageService = mock(ExtendMessageService.class); ProducerManager producerManager = mock(ProducerManager.class); ConsumerManager consumerManager = mock(ConsumerManager.class); - ProxyServiceImpl server = new ProxyServiceImpl(messageStore, messageService, producerManager, consumerManager); + ProxyServiceImpl server = new ProxyServiceImpl(messageStore, messageService, + producerManager, consumerManager, new DefaultServiceManager(any(),any(),any(),any(),any(),any(),any())); grpcServerRule.getServiceRegistry().addService(server); ProxyServiceGrpc.ProxyServiceFutureStub stub = ProxyServiceGrpc.newFutureStub(grpcServerRule.getChannel()); @@ -81,7 +83,7 @@ void relay() { @Test - void ConsumerClientConnection() { + void consumerConnection() { final String groupName = ""; ConsumerClientConnectionRequest request = ConsumerClientConnectionRequest.newBuilder().setGroup(groupName).build();