Skip to content

Commit

Permalink
refactor: refactor cmd line in cli (#744)
Browse files Browse the repository at this point in the history
Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye authored Nov 28, 2023
1 parent f73366f commit 2bac396
Show file tree
Hide file tree
Showing 18 changed files with 108 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private static Date toDate(Timestamp timestamp) {
return calendar.getTime();
}

static void alignCentral(AT_Row row) {
public static void alignCentral(AT_Row row) {
for (AT_Cell cell : row.getCells()) {
cell.getContext().setTextAlignment(TextAlignment.CENTER);
}
Expand Down
32 changes: 30 additions & 2 deletions cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,27 @@

package com.automq.rocketmq.cli;

import com.automq.rocketmq.cli.broker.DescribeCluster;
import com.automq.rocketmq.cli.broker.TerminateNode;
import com.automq.rocketmq.cli.consumer.ConsumeMessage;
import com.automq.rocketmq.cli.consumer.CreateGroup;
import com.automq.rocketmq.cli.consumer.DeleteGroup;
import com.automq.rocketmq.cli.consumer.DescribeGroup;
import com.automq.rocketmq.cli.consumer.ListGroup;
import com.automq.rocketmq.cli.consumer.UpdateGroup;
import com.automq.rocketmq.cli.producer.ProduceMessage;
import com.automq.rocketmq.cli.stream.DescribeStream;
import com.automq.rocketmq.cli.topic.CreateTopic;
import com.automq.rocketmq.cli.topic.DeleteTopic;
import com.automq.rocketmq.cli.topic.DescribeTopic;
import com.automq.rocketmq.cli.topic.ListTopic;
import com.automq.rocketmq.cli.topic.UpdateTopic;
import picocli.CommandLine;

@CommandLine.Command(name = "mqadmin",
mixinStandardHelpOptions = true,
version = "S3RocketMQ 1.0",
description = "Command line tools for S3RocketMQ",
version = "AutoMQ for RocketMQ 1.0",
description = "Command line tools for AutoMQ for RocketMQ",
showDefaultValues = true,
subcommands = {
DescribeCluster.class,
Expand Down Expand Up @@ -55,6 +70,19 @@ public class MQAdmin implements Runnable {
@CommandLine.Option(names = {"-s", "--secret-key"}, description = "The authentication secret key")
String secretKey = "";

public String getEndpoint() {
return endpoint;
}


public String getAccessKey() {
return accessKey;
}

public String getSecretKey() {
return secretKey;
}

public void run() {
throw new CommandLine.ParameterException(spec.commandLine(), "Missing required subcommand");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.broker;

import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.DescribeClusterRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.ConsoleHelper;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.concurrent.Callable;
Expand All @@ -35,7 +38,7 @@ public Void call() throws Exception {
try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) {
DescribeClusterRequest request = DescribeClusterRequest.newBuilder()
.build();
Cluster cluster = client.describeCluster(mqAdmin.endpoint, request).join();
Cluster cluster = client.describeCluster(mqAdmin.getEndpoint(), request).join();
ConsoleHelper.printCluster(cluster);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.broker;

import apache.rocketmq.controller.v1.TerminateNodeReply;
import apache.rocketmq.controller.v1.TerminateNodeRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import io.grpc.stub.StreamObserver;
Expand All @@ -41,7 +43,7 @@ public Void call() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) {
TerminateNodeRequest request = TerminateNodeRequest.newBuilder().setNodeId(nodeId).build();
client.terminateNode(mqAdmin.endpoint, request, new StreamObserver<>() {
client.terminateNode(mqAdmin.getEndpoint(), request, new StreamObserver<>() {
@Override
public void onNext(TerminateNodeReply value) {
switch (value.getStatus().getCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.SubscriptionMode;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.cli.tools.CliUtils;
import com.automq.rocketmq.common.PrefixThreadFactory;
import com.automq.rocketmq.common.exception.ControllerException;
Expand Down Expand Up @@ -179,7 +181,7 @@ private void prepareConsumerGroup(String consumerGroup) throws IOException {
.setSubMode(SubscriptionMode.SUB_MODE_POP)
.build();

CompletableFuture<CreateGroupReply> groupCf = client.createGroup(mqAdmin.endpoint, request);
CompletableFuture<CreateGroupReply> groupCf = client.createGroup(mqAdmin.getEndpoint(), request);
groupCf = groupCf.exceptionally(throwable -> {
Throwable t = CliUtils.getRealException(throwable);
if (t instanceof ControllerException controllerException) {
Expand All @@ -200,10 +202,10 @@ private void prepareConsumerGroup(String consumerGroup) throws IOException {
}
private SimpleConsumer prepareConsumer(ClientServiceProvider provider, String consumerGroup) {
StaticSessionCredentialsProvider staticSessionCredentialsProvider =
new StaticSessionCredentialsProvider(mqAdmin.accessKey, mqAdmin.secretKey);
new StaticSessionCredentialsProvider(mqAdmin.getAccessKey(), mqAdmin.getSecretKey());

ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(mqAdmin.endpoint)
.setEndpoints(mqAdmin.getEndpoint())
.setCredentialProvider(staticSessionCredentialsProvider)
.setRequestTimeout(Duration.ofSeconds(10))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.SubscriptionMode;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -53,7 +55,7 @@ public Void call() throws Exception {
.setSubMode(subMode)
.build();

CreateGroupReply groupReply = client.createGroup(mqAdmin.endpoint, request).join();
CreateGroupReply groupReply = client.createGroup(mqAdmin.getEndpoint(), request).join();
System.out.println("Group created: " + groupReply.getGroupId());
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.concurrent.Callable;
Expand All @@ -34,7 +36,7 @@ public class DeleteGroup implements Callable<Void> {
@Override
public Void call() throws Exception {
try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) {
client.deleteGroup(mqAdmin.endpoint, id)
client.deleteGroup(mqAdmin.getEndpoint(), id)
.thenRun(() -> {
System.out.println("Deleted group whose group-id=" + id);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.controller.v1.ConsumerGroup;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import de.vandermeer.asciitable.AT_Row;
Expand All @@ -40,7 +42,7 @@ public class DescribeGroup implements Callable<Void> {
@Override
public Void call() throws Exception {
try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) {
ConsumerGroup group = client.describeGroup(mqAdmin.endpoint, groupName)
ConsumerGroup group = client.describeGroup(mqAdmin.getEndpoint(), groupName)
.join();
if (null == group) {
System.err.printf("Group '%s' is not found%n%n", groupName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.ListGroupReply;
import apache.rocketmq.controller.v1.ListGroupRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.ConsoleHelper;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import de.vandermeer.asciitable.AT_Row;
Expand Down Expand Up @@ -49,7 +52,7 @@ public Void call() throws Exception {
ConsoleHelper.alignCentral(row);
listGroups.addRule();

client.listGroups(mqAdmin.endpoint, request, new StreamObserver<>() {
client.listGroups(mqAdmin.getEndpoint(), request, new StreamObserver<>() {
@Override
public void onNext(ListGroupReply value) {
ConsumerGroup group = value.getGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.DescribeClusterRequest;
Expand All @@ -24,6 +24,8 @@
import apache.rocketmq.controller.v1.Node;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient;
import com.google.protobuf.TextFormat;
Expand Down Expand Up @@ -57,9 +59,9 @@ public Void call() throws Exception {
// TODO: support retrying when failed because of cluster's state change at the same time
GrpcControllerClient controllerClient = new GrpcControllerClient(new CliClientConfig());
GrpcProxyClient proxyClient = new GrpcProxyClient(new CliClientConfig());
CompletableFuture<Cluster> clusterCf = controllerClient.describeCluster(mqAdmin.endpoint, DescribeClusterRequest.newBuilder().build());
CompletableFuture<Cluster> clusterCf = controllerClient.describeCluster(mqAdmin.getEndpoint(), DescribeClusterRequest.newBuilder().build());

CompletableFuture<Topic> topicCf = controllerClient.describeTopic(mqAdmin.endpoint, null, topicName);
CompletableFuture<Topic> topicCf = controllerClient.describeTopic(mqAdmin.getEndpoint(), null, topicName);
clusterCf.thenCombine(topicCf, Pair::of)
.thenComposeAsync(pair -> {
Cluster cluster = pair.getLeft();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -66,7 +68,7 @@ public Void call() throws Exception {
builder.setGroupType(groupType);
}

client.updateGroup(mqAdmin.endpoint, builder.build()).join();
client.updateGroup(mqAdmin.getEndpoint(), builder.build()).join();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.producer;

import apache.rocketmq.controller.v1.AcceptTypes;
import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.MessageType;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.cli.tools.CliUtils;
import com.automq.rocketmq.common.PrefixThreadFactory;
import com.automq.rocketmq.common.exception.ControllerException;
Expand Down Expand Up @@ -141,7 +143,7 @@ private void prepareTopics() throws IOException, ControllerException {
.setAcceptTypes(AcceptTypes.newBuilder().addTypes(messageType).build())
.build();

CompletableFuture<Long> topicCf = client.createTopic(mqAdmin.endpoint, request);
CompletableFuture<Long> topicCf = client.createTopic(mqAdmin.getEndpoint(), request);

topicCf = topicCf.exceptionally(throwable -> {
Throwable t = CliUtils.getRealException(throwable);
Expand All @@ -162,10 +164,10 @@ private void prepareTopics() throws IOException, ControllerException {

private Producer prepareProducer(ClientServiceProvider provider) {
StaticSessionCredentialsProvider staticSessionCredentialsProvider =
new StaticSessionCredentialsProvider(mqAdmin.accessKey, mqAdmin.secretKey);
new StaticSessionCredentialsProvider(mqAdmin.getAccessKey(), mqAdmin.getSecretKey());

ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(mqAdmin.endpoint)
.setEndpoints(mqAdmin.getEndpoint())
.setCredentialProvider(staticSessionCredentialsProvider)
.setRequestTimeout(Duration.ofSeconds(10))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.stream;

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.DescribeStreamReply;
import apache.rocketmq.controller.v1.DescribeStreamRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.ConsoleHelper;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.concurrent.Callable;
Expand All @@ -40,7 +43,7 @@ public Void call() throws Exception {
DescribeStreamRequest request = DescribeStreamRequest.newBuilder()
.setStreamId(streamId)
.build();
DescribeStreamReply reply = client.describeStream(mqAdmin.endpoint, request).join();
DescribeStreamReply reply = client.describeStream(mqAdmin.getEndpoint(), request).join();
if (reply.getStatus().getCode() == Code.OK) {
ConsoleHelper.printStream(reply.getStream(), reply.getRangesList());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.topic;

import apache.rocketmq.controller.v1.AcceptTypes;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.MessageType;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.common.util.DurationUtil;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -68,7 +70,7 @@ public Void call() throws Exception {
.setRetentionHours((int) retentionHours)
.build();

Long topicId = client.createTopic(mqAdmin.endpoint, request).join();
Long topicId = client.createTopic(mqAdmin.getEndpoint(), request).join();
System.out.println("Topic created: " + topicId);
client.close();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.topic;

import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.concurrent.Callable;
Expand All @@ -34,7 +36,7 @@ public class DeleteTopic implements Callable<Void> {
@Override
public Void call() throws Exception {
try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) {
client.deleteTopic(mqAdmin.endpoint, id)
client.deleteTopic(mqAdmin.getEndpoint(), id)
.thenRun(() -> {
System.out.println("Deleted topic whose topic-id=" + id);
})
Expand Down
Loading

0 comments on commit 2bac396

Please sign in to comment.