diff --git a/cli/src/index.ts b/cli/src/index.ts index f63f5a1e1..87c78c435 100755 --- a/cli/src/index.ts +++ b/cli/src/index.ts @@ -389,6 +389,12 @@ export class Commander { .option('-c, --count ', 'the number of connections', parseNumber, 1000) .option('-i, --interval ', 'interval of connecting to the broker', parseNumber, 10) .option('-im, --message-interval ', 'interval of publishing messages', parseNumber, 1000) + .option( + '-L, --limit ', + 'The maximum number of messages to publish. A value of 0 means no limit on the number of messages', + parseNumber, + 0, + ) .option( '-t, --topic ', 'the message topic, support %u (username), %c (client id), %i (index) variables', diff --git a/cli/src/lib/pub.ts b/cli/src/lib/pub.ts index a6be8ad32..f0143fa18 100644 --- a/cli/src/lib/pub.ts +++ b/cli/src/lib/pub.ts @@ -194,8 +194,19 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions | save && saveConfig('benchPub', options) } - const { count, interval, messageInterval, hostname, port, topic, clientId, message, verbose, maximumReconnectTimes } = - options + const { + count, + interval, + messageInterval, + limit, + hostname, + port, + topic, + clientId, + message, + verbose, + maximumReconnectTimes, + } = options checkTopicExists(topic, commandType) @@ -205,8 +216,12 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions | const { username } = connOpts + let initialized = false + let connectedCount = 0 + let inFlightMessageCount = 0 + const isNewConnArray = Array(count).fill(true) const retryTimesArray = Array(count).fill(0) @@ -257,10 +272,18 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions | if (isNewConnArray[i - 1]) { interactive.success('[%d/%d] - Connected', connectedCount, count) - setInterval(() => { - if (!client.connected) { + setInterval(async () => { + // If the number of messages sent exceeds the limit, exit the process. + if (limit > 0 && total >= limit) { + // Wait for the total number of sent messages to be printed, then exit the process. + await delay(1000) + process.exit(0) + } + // If not initialized or client is not connected or message count exceeds the limit, do not send messages. + if (!initialized || !client.connected || (limit > 0 && total + inFlightMessageCount >= limit)) { return } + inFlightMessageCount += 1 let publishTopic = topicName let publishMessage = message if (commandType === 'simulate') { @@ -272,6 +295,7 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions | publishMessage = simulationResult.message } client.publish(publishTopic, publishMessage, pubOpts.opts, (err) => { + inFlightMessageCount -= 1 if (err) { signale.warn(err) } else { @@ -282,13 +306,12 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions | }, messageInterval) if (connectedCount === count) { + initialized = true + const connEnd = Date.now() signale.info(`Created ${count} connections in ${(connEnd - connStart) / 1000}s`) - total = 0 - rate = 0 - if (!verbose) { setInterval(() => { simpleInteractive.info(`Published total: ${total}, message rate: ${rate}/s`) diff --git a/cli/src/types/global.d.ts b/cli/src/types/global.d.ts index 5650e703c..06ab2b7d9 100644 --- a/cli/src/types/global.d.ts +++ b/cli/src/types/global.d.ts @@ -111,6 +111,7 @@ declare global { count: number interval: number messageInterval: number + limit: number verbose: boolean }