Skip to content

Commit

Permalink
feat(cli): add PrintTopicStats command (#802)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Dec 6, 2023
1 parent 8231145 commit 5f0d956
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 4 deletions.
4 changes: 3 additions & 1 deletion cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.automq.rocketmq.cli.topic.DeleteTopic;
import com.automq.rocketmq.cli.topic.DescribeTopic;
import com.automq.rocketmq.cli.topic.ListTopic;
import com.automq.rocketmq.cli.topic.PrintTopicStats;
import com.automq.rocketmq.cli.topic.UpdateTopic;
import picocli.CommandLine;

Expand All @@ -56,7 +57,8 @@
ProduceMessage.class,
ConsumeMessage.class,
TerminateNode.class,
ResetConsumeOffset.class
ResetConsumeOffset.class,
PrintTopicStats.class
}
)
public class MQAdmin implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ public Void call() throws Exception {
}

private void centralize(AT_Row row) {
row.getCells().forEach((cell -> {
cell.getContext().setTextAlignment(TextAlignment.CENTER);
}));
row.getCells().forEach((cell -> cell.getContext().setTextAlignment(TextAlignment.CENTER)));
}
}
146 changes: 146 additions & 0 deletions cli/src/main/java/com/automq/rocketmq/cli/topic/PrintTopicStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.cli.topic;

import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.DescribeClusterRequest;
import apache.rocketmq.controller.v1.MessageQueueAssignment;
import apache.rocketmq.controller.v1.Node;
import apache.rocketmq.controller.v1.StreamRole;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.proxy.v1.QueueStats;
import apache.rocketmq.proxy.v1.StreamStats;
import apache.rocketmq.proxy.v1.TopicStatsRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient;
import de.vandermeer.asciitable.AT_Row;
import de.vandermeer.asciitable.AsciiTable;
import de.vandermeer.asciitable.CWC_LongestLine;
import de.vandermeer.skb.interfaces.transformers.textformat.TextAlignment;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import picocli.CommandLine;

@CommandLine.Command(name = "printTopicStats", mixinStandardHelpOptions = true, showDefaultValues = true)
public class PrintTopicStats implements Callable<Void> {
@CommandLine.Option(names = {"-t", "--topic"}, description = "Topic name", required = true)
String topicName;

@CommandLine.Option(names = {"-g", "--group"}, description = "Consumer group name", defaultValue = "")
String consumerGroupName;

@CommandLine.Option(names = {"-q", "--queueId"}, description = "Queue id, -1 means all queues of given topic", required = true, defaultValue = "-1")
int queueId;

@CommandLine.ParentCommand
MQAdmin mqAdmin;

@Override
public Void call() throws Exception {
GrpcControllerClient controllerClient = new GrpcControllerClient(new CliClientConfig());
GrpcProxyClient proxyClient = new GrpcProxyClient(new CliClientConfig());

CompletableFuture<Cluster> clusterFuture = controllerClient.describeCluster(mqAdmin.getEndpoint(), DescribeClusterRequest.newBuilder().build());
CompletableFuture<Topic> topicFuture = controllerClient.describeTopic(mqAdmin.getEndpoint(), null, topicName);

clusterFuture.thenCombine(topicFuture, (cluster, topic) -> {
if (topic == null) {
System.out.println("Topic " + topicName + " does not exist");
return null;
}

Map<Integer/*node id*/, Node> nodeMap = cluster.getNodesList().stream().collect(Collectors.toMap(node -> node.getId(), node -> node));

long topicId;
List<QueueStats> queueStatsList = new ArrayList<>();
if (queueId == -1) {
List<Integer> nodeList = topic.getAssignmentsList().stream().map(MessageQueueAssignment::getNodeId).distinct().toList();
for (int nodeId : nodeList) {
String nodeAddress = nodeMap.get(nodeId).getAddress();
List<QueueStats> result = proxyClient.getTopicStats(nodeAddress, TopicStatsRequest.newBuilder().setTopic(topicName).setGroup(consumerGroupName).build()).join();
queueStatsList.addAll(result);
}
} else {
Optional<MessageQueueAssignment> optional = topic.getAssignmentsList().stream().filter(assignment -> assignment.getQueue().getQueueId() == queueId).findFirst();
if (optional.isEmpty()) {
throw new IllegalArgumentException("Queue " + queueId + " does not exist");
}
String nodeAddress = nodeMap.get(optional.get().getNodeId()).getAddress();
List<QueueStats> result = proxyClient.getTopicStats(nodeAddress, TopicStatsRequest.newBuilder().setTopic(topicName).setGroup(consumerGroupName).build()).join();
queueStatsList.addAll(result);
}
printTopicStats(topic, queueStatsList);
return null;
}).get();
return null;
}

private void printTopicStats(Topic topic, List<QueueStats> queueStatsList) {
System.out.println("Topic Id: " + topic.getTopicId());
System.out.println("Topic Name: " + topic.getName());

int streamCount = queueStatsList.stream().map(QueueStats::getStreamStatsCount).mapToInt(Integer::intValue).sum();
if (streamCount == 0) {
System.out.println("No opened queue found");
return;
}

AsciiTable statsTable = new AsciiTable();
statsTable.addRule();
AT_Row row = statsTable.addRow("TOPIC", "QUEUE ID", "STREAM ID", "STREAM ROLE", "MIN OFFSET", "MAX OFFSET", "CONSUME OFFSET", "DELIVERING COUNT");
centralize(row);
statsTable.addRule();

for (QueueStats queueStats : queueStatsList) {
for (StreamStats streamStats : queueStats.getStreamStatsList()) {
row = statsTable.addRow(topic.getName(), queueStats.getQueueId(), streamStats.getStreamId(), formatStreamRole(streamStats.getRole(), consumerGroupName), streamStats.getMinOffset(), streamStats.getMaxOffset(), streamStats.getConsumeOffset(), "-");
centralize(row);
statsTable.addRule();
}
}

CWC_LongestLine cwc = new CWC_LongestLine();
IntStream.range(0, row.getCells().size()).forEach((i) -> cwc.add(10, 0));
statsTable.getRenderer().setCWC(cwc);

String render = statsTable.render();
System.out.println(render);
}

private void centralize(AT_Row row) {
row.getCells().forEach((cell -> cell.getContext().setTextAlignment(TextAlignment.CENTER)));
}

private String formatStreamRole(StreamRole role, String consumerGroup) {
return switch (role) {
case STREAM_ROLE_DATA -> "DATA";
case STREAM_ROLE_OPS -> "OPERATION";
case STREAM_ROLE_SNAPSHOT -> "SNAPSHOT";
case STREAM_ROLE_RETRY -> "RETRY for " + consumerGroup;
default -> "UNKNOWN";
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@

package com.automq.rocketmq.proxy.grpc;

import apache.rocketmq.proxy.v1.QueueStats;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetByTimestampRequest;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest;
import apache.rocketmq.proxy.v1.TopicStatsRequest;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public interface ProxyClient extends Closeable {
CompletableFuture<Void> resetConsumeOffset(String target, ResetConsumeOffsetRequest request);

CompletableFuture<Void> resetConsumeOffsetByTimestamp(String target, ResetConsumeOffsetByTimestampRequest request);

CompletableFuture<List<QueueStats>> getTopicStats(String target, TopicStatsRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.proxy.v1.ProxyServiceGrpc;
import apache.rocketmq.proxy.v1.QueueStats;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetByTimestampRequest;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetReply;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest;
import apache.rocketmq.proxy.v1.TopicStatsReply;
import apache.rocketmq.proxy.v1.TopicStatsRequest;
import com.automq.rocketmq.common.config.GrpcClientConfig;
import com.automq.rocketmq.proxy.grpc.ProxyClient;
import com.google.common.base.Strings;
Expand All @@ -34,6 +37,7 @@
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -118,6 +122,30 @@ public void onFailure(Throwable t) {
return cf;
}

@Override
public CompletableFuture<List<QueueStats>> getTopicStats(String target, TopicStatsRequest request) {
ProxyServiceGrpc.ProxyServiceFutureStub stub = getOrCreateStubForTarget(target);

CompletableFuture<List<QueueStats>> future = new CompletableFuture<>();
Futures.addCallback(stub.topicStats(request),
new FutureCallback<>() {
@Override
public void onSuccess(TopicStatsReply result) {
if (result.getStatus().getCode() == Code.OK) {
future.complete(result.getQueueStatsList());
} else {
future.completeExceptionally(new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, result.getStatus().getMessage()));
}
}

@Override
public void onFailure(Throwable t) {
future.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return future;
}

@Override
public void close() throws IOException {
for (Map.Entry<String, ProxyServiceGrpc.ProxyServiceFutureStub> entry : stubs.entrySet()) {
Expand Down

0 comments on commit 5f0d956

Please sign in to comment.