From 462f5a3ae9762db569abf616703c73777c3a502d Mon Sep 17 00:00:00 2001
From: Red-Asuka <liyd0623@gmail.com>
Date: Wed, 24 Jan 2024 14:41:51 +0800
Subject: [PATCH] feat(cli): add message limit option to bench pub command

---
 cli/src/index.ts          |  6 ++++++
 cli/src/lib/pub.ts        | 37 ++++++++++++++++++++++++++++++-------
 cli/src/types/global.d.ts |  1 +
 3 files changed, 37 insertions(+), 7 deletions(-)

diff --git a/cli/src/index.ts b/cli/src/index.ts
index f63f5a1e1..87c78c435 100755
--- a/cli/src/index.ts
+++ b/cli/src/index.ts
@@ -389,6 +389,12 @@ export class Commander {
       .option('-c, --count <NUMBER>', 'the number of connections', parseNumber, 1000)
       .option('-i, --interval <MILLISECONDS>', 'interval of connecting to the broker', parseNumber, 10)
       .option('-im, --message-interval <MILLISECONDS>', 'interval of publishing messages', parseNumber, 1000)
+      .option(
+        '-L, --limit <NUMBER>',
+        'The maximum number of messages to publish. A value of 0 means no limit on the number of messages',
+        parseNumber,
+        0,
+      )
       .option(
         '-t, --topic <TOPIC>',
         'the message topic, support %u (username), %c (client id), %i (index) variables',
diff --git a/cli/src/lib/pub.ts b/cli/src/lib/pub.ts
index a6be8ad32..f0143fa18 100644
--- a/cli/src/lib/pub.ts
+++ b/cli/src/lib/pub.ts
@@ -194,8 +194,19 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
     save && saveConfig('benchPub', options)
   }
 
-  const { count, interval, messageInterval, hostname, port, topic, clientId, message, verbose, maximumReconnectTimes } =
-    options
+  const {
+    count,
+    interval,
+    messageInterval,
+    limit,
+    hostname,
+    port,
+    topic,
+    clientId,
+    message,
+    verbose,
+    maximumReconnectTimes,
+  } = options
 
   checkTopicExists(topic, commandType)
 
@@ -205,8 +216,12 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
 
   const { username } = connOpts
 
+  let initialized = false
+
   let connectedCount = 0
 
+  let inFlightMessageCount = 0
+
   const isNewConnArray = Array(count).fill(true)
 
   const retryTimesArray = Array(count).fill(0)
@@ -257,10 +272,18 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
         if (isNewConnArray[i - 1]) {
           interactive.success('[%d/%d] - Connected', connectedCount, count)
 
-          setInterval(() => {
-            if (!client.connected) {
+          setInterval(async () => {
+            // If the number of messages sent exceeds the limit, exit the process.
+            if (limit > 0 && total >= limit) {
+              // Wait for the total number of sent messages to be printed, then exit the process.
+              await delay(1000)
+              process.exit(0)
+            }
+            // If not initialized or client is not connected or message count exceeds the limit, do not send messages.
+            if (!initialized || !client.connected || (limit > 0 && total + inFlightMessageCount >= limit)) {
               return
             }
+            inFlightMessageCount += 1
             let publishTopic = topicName
             let publishMessage = message
             if (commandType === 'simulate') {
@@ -272,6 +295,7 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
               publishMessage = simulationResult.message
             }
             client.publish(publishTopic, publishMessage, pubOpts.opts, (err) => {
+              inFlightMessageCount -= 1
               if (err) {
                 signale.warn(err)
               } else {
@@ -282,13 +306,12 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
           }, messageInterval)
 
           if (connectedCount === count) {
+            initialized = true
+
             const connEnd = Date.now()
 
             signale.info(`Created ${count} connections in ${(connEnd - connStart) / 1000}s`)
 
-            total = 0
-            rate = 0
-
             if (!verbose) {
               setInterval(() => {
                 simpleInteractive.info(`Published total: ${total}, message rate: ${rate}/s`)
diff --git a/cli/src/types/global.d.ts b/cli/src/types/global.d.ts
index 5650e703c..06ab2b7d9 100644
--- a/cli/src/types/global.d.ts
+++ b/cli/src/types/global.d.ts
@@ -111,6 +111,7 @@ declare global {
     count: number
     interval: number
     messageInterval: number
+    limit: number
     verbose: boolean
   }