diff --git a/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java b/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java index 5fdfd3cce..4382f96f4 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java @@ -32,6 +32,7 @@ import com.automq.rocketmq.cli.topic.DeleteTopic; import com.automq.rocketmq.cli.topic.DescribeTopic; import com.automq.rocketmq.cli.topic.ListTopic; +import com.automq.rocketmq.cli.topic.PrintTopicStats; import com.automq.rocketmq.cli.topic.UpdateTopic; import picocli.CommandLine; @@ -56,7 +57,8 @@ ProduceMessage.class, ConsumeMessage.class, TerminateNode.class, - ResetConsumeOffset.class + ResetConsumeOffset.class, + PrintTopicStats.class } ) public class MQAdmin implements Runnable { diff --git a/cli/src/main/java/com/automq/rocketmq/cli/consumer/DescribeGroup.java b/cli/src/main/java/com/automq/rocketmq/cli/consumer/DescribeGroup.java index 0255d8c9e..dd8c9148f 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/consumer/DescribeGroup.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/consumer/DescribeGroup.java @@ -86,8 +86,6 @@ public Void call() throws Exception { } private void centralize(AT_Row row) { - row.getCells().forEach((cell -> { - cell.getContext().setTextAlignment(TextAlignment.CENTER); - })); + row.getCells().forEach((cell -> cell.getContext().setTextAlignment(TextAlignment.CENTER))); } } diff --git a/cli/src/main/java/com/automq/rocketmq/cli/topic/PrintTopicStats.java b/cli/src/main/java/com/automq/rocketmq/cli/topic/PrintTopicStats.java new file mode 100644 index 000000000..2714b375a --- /dev/null +++ b/cli/src/main/java/com/automq/rocketmq/cli/topic/PrintTopicStats.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.cli.topic; + +import apache.rocketmq.controller.v1.Cluster; +import apache.rocketmq.controller.v1.DescribeClusterRequest; +import apache.rocketmq.controller.v1.MessageQueueAssignment; +import apache.rocketmq.controller.v1.Node; +import apache.rocketmq.controller.v1.StreamRole; +import apache.rocketmq.controller.v1.Topic; +import apache.rocketmq.proxy.v1.QueueStats; +import apache.rocketmq.proxy.v1.StreamStats; +import apache.rocketmq.proxy.v1.TopicStatsRequest; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; +import com.automq.rocketmq.controller.client.GrpcControllerClient; +import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient; +import de.vandermeer.asciitable.AT_Row; +import de.vandermeer.asciitable.AsciiTable; +import de.vandermeer.asciitable.CWC_LongestLine; +import de.vandermeer.skb.interfaces.transformers.textformat.TextAlignment; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import picocli.CommandLine; + +@CommandLine.Command(name = "printTopicStats", mixinStandardHelpOptions = true, showDefaultValues = true) +public class PrintTopicStats implements Callable { + @CommandLine.Option(names = {"-t", "--topic"}, description = "Topic name", required = true) + String topicName; + + @CommandLine.Option(names = {"-g", "--group"}, description = "Consumer group name", defaultValue = "") + String consumerGroupName; + + @CommandLine.Option(names = {"-q", "--queueId"}, description = "Queue id, -1 means all queues of given topic", required = true, defaultValue = "-1") + int queueId; + + @CommandLine.ParentCommand + MQAdmin mqAdmin; + + @Override + public Void call() throws Exception { + GrpcControllerClient controllerClient = new GrpcControllerClient(new CliClientConfig()); + GrpcProxyClient proxyClient = new GrpcProxyClient(new CliClientConfig()); + + CompletableFuture clusterFuture = controllerClient.describeCluster(mqAdmin.getEndpoint(), DescribeClusterRequest.newBuilder().build()); + CompletableFuture topicFuture = controllerClient.describeTopic(mqAdmin.getEndpoint(), null, topicName); + + clusterFuture.thenCombine(topicFuture, (cluster, topic) -> { + if (topic == null) { + System.out.println("Topic " + topicName + " does not exist"); + return null; + } + + Map nodeMap = cluster.getNodesList().stream().collect(Collectors.toMap(node -> node.getId(), node -> node)); + + long topicId; + List queueStatsList = new ArrayList<>(); + if (queueId == -1) { + List nodeList = topic.getAssignmentsList().stream().map(MessageQueueAssignment::getNodeId).distinct().toList(); + for (int nodeId : nodeList) { + String nodeAddress = nodeMap.get(nodeId).getAddress(); + List result = proxyClient.getTopicStats(nodeAddress, TopicStatsRequest.newBuilder().setTopic(topicName).setGroup(consumerGroupName).build()).join(); + queueStatsList.addAll(result); + } + } else { + Optional optional = topic.getAssignmentsList().stream().filter(assignment -> assignment.getQueue().getQueueId() == queueId).findFirst(); + if (optional.isEmpty()) { + throw new IllegalArgumentException("Queue " + queueId + " does not exist"); + } + String nodeAddress = nodeMap.get(optional.get().getNodeId()).getAddress(); + List result = proxyClient.getTopicStats(nodeAddress, TopicStatsRequest.newBuilder().setTopic(topicName).setGroup(consumerGroupName).build()).join(); + queueStatsList.addAll(result); + } + printTopicStats(topic, queueStatsList); + return null; + }).get(); + return null; + } + + private void printTopicStats(Topic topic, List queueStatsList) { + System.out.println("Topic Id: " + topic.getTopicId()); + System.out.println("Topic Name: " + topic.getName()); + + int streamCount = queueStatsList.stream().map(QueueStats::getStreamStatsCount).mapToInt(Integer::intValue).sum(); + if (streamCount == 0) { + System.out.println("No opened queue found"); + return; + } + + AsciiTable statsTable = new AsciiTable(); + statsTable.addRule(); + AT_Row row = statsTable.addRow("TOPIC", "QUEUE ID", "STREAM ID", "STREAM ROLE", "MIN OFFSET", "MAX OFFSET", "CONSUME OFFSET", "DELIVERING COUNT"); + centralize(row); + statsTable.addRule(); + + for (QueueStats queueStats : queueStatsList) { + for (StreamStats streamStats : queueStats.getStreamStatsList()) { + row = statsTable.addRow(topic.getName(), queueStats.getQueueId(), streamStats.getStreamId(), formatStreamRole(streamStats.getRole(), consumerGroupName), streamStats.getMinOffset(), streamStats.getMaxOffset(), streamStats.getConsumeOffset(), "-"); + centralize(row); + statsTable.addRule(); + } + } + + CWC_LongestLine cwc = new CWC_LongestLine(); + IntStream.range(0, row.getCells().size()).forEach((i) -> cwc.add(10, 0)); + statsTable.getRenderer().setCWC(cwc); + + String render = statsTable.render(); + System.out.println(render); + } + + private void centralize(AT_Row row) { + row.getCells().forEach((cell -> cell.getContext().setTextAlignment(TextAlignment.CENTER))); + } + + private String formatStreamRole(StreamRole role, String consumerGroup) { + return switch (role) { + case STREAM_ROLE_DATA -> "DATA"; + case STREAM_ROLE_OPS -> "OPERATION"; + case STREAM_ROLE_SNAPSHOT -> "SNAPSHOT"; + case STREAM_ROLE_RETRY -> "RETRY for " + consumerGroup; + default -> "UNKNOWN"; + }; + } +} diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyClient.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyClient.java index 7bd3becab..42be24afe 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyClient.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyClient.java @@ -17,12 +17,18 @@ package com.automq.rocketmq.proxy.grpc; +import apache.rocketmq.proxy.v1.QueueStats; import apache.rocketmq.proxy.v1.ResetConsumeOffsetByTimestampRequest; import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest; +import apache.rocketmq.proxy.v1.TopicStatsRequest; import java.io.Closeable; +import java.util.List; import java.util.concurrent.CompletableFuture; public interface ProxyClient extends Closeable { CompletableFuture resetConsumeOffset(String target, ResetConsumeOffsetRequest request); + CompletableFuture resetConsumeOffsetByTimestamp(String target, ResetConsumeOffsetByTimestampRequest request); + + CompletableFuture> getTopicStats(String target, TopicStatsRequest request); } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/client/GrpcProxyClient.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/client/GrpcProxyClient.java index b4e4856a0..cebcaaaca 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/client/GrpcProxyClient.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/client/GrpcProxyClient.java @@ -19,9 +19,12 @@ import apache.rocketmq.common.v1.Code; import apache.rocketmq.proxy.v1.ProxyServiceGrpc; +import apache.rocketmq.proxy.v1.QueueStats; import apache.rocketmq.proxy.v1.ResetConsumeOffsetByTimestampRequest; import apache.rocketmq.proxy.v1.ResetConsumeOffsetReply; import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest; +import apache.rocketmq.proxy.v1.TopicStatsReply; +import apache.rocketmq.proxy.v1.TopicStatsRequest; import com.automq.rocketmq.common.config.GrpcClientConfig; import com.automq.rocketmq.proxy.grpc.ProxyClient; import com.google.common.base.Strings; @@ -34,6 +37,7 @@ import io.grpc.InsecureChannelCredentials; import io.grpc.ManagedChannel; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -118,6 +122,30 @@ public void onFailure(Throwable t) { return cf; } + @Override + public CompletableFuture> getTopicStats(String target, TopicStatsRequest request) { + ProxyServiceGrpc.ProxyServiceFutureStub stub = getOrCreateStubForTarget(target); + + CompletableFuture> future = new CompletableFuture<>(); + Futures.addCallback(stub.topicStats(request), + new FutureCallback<>() { + @Override + public void onSuccess(TopicStatsReply result) { + if (result.getStatus().getCode() == Code.OK) { + future.complete(result.getQueueStatsList()); + } else { + future.completeExceptionally(new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, result.getStatus().getMessage())); + } + } + + @Override + public void onFailure(Throwable t) { + future.completeExceptionally(t); + } + }, MoreExecutors.directExecutor()); + return future; + } + @Override public void close() throws IOException { for (Map.Entry entry : stubs.entrySet()) {