Skip to content

Commit

Permalink
feat(cli): add message limit option to bench pub command
Browse files Browse the repository at this point in the history
  • Loading branch information
Red-Asuka authored and ysfscream committed Jan 24, 2024
1 parent cb1d412 commit 462f5a3
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
6 changes: 6 additions & 0 deletions cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,12 @@ export class Commander {
.option('-c, --count <NUMBER>', 'the number of connections', parseNumber, 1000)
.option('-i, --interval <MILLISECONDS>', 'interval of connecting to the broker', parseNumber, 10)
.option('-im, --message-interval <MILLISECONDS>', 'interval of publishing messages', parseNumber, 1000)
.option(
'-L, --limit <NUMBER>',
'The maximum number of messages to publish. A value of 0 means no limit on the number of messages',
parseNumber,
0,
)
.option(
'-t, --topic <TOPIC>',
'the message topic, support %u (username), %c (client id), %i (index) variables',
Expand Down
37 changes: 30 additions & 7 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,19 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
save && saveConfig('benchPub', options)
}

const { count, interval, messageInterval, hostname, port, topic, clientId, message, verbose, maximumReconnectTimes } =
options
const {
count,
interval,
messageInterval,
limit,
hostname,
port,
topic,
clientId,
message,
verbose,
maximumReconnectTimes,
} = options

checkTopicExists(topic, commandType)

Expand All @@ -205,8 +216,12 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |

const { username } = connOpts

let initialized = false

let connectedCount = 0

let inFlightMessageCount = 0

const isNewConnArray = Array(count).fill(true)

const retryTimesArray = Array(count).fill(0)
Expand Down Expand Up @@ -257,10 +272,18 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
if (isNewConnArray[i - 1]) {
interactive.success('[%d/%d] - Connected', connectedCount, count)

setInterval(() => {
if (!client.connected) {
setInterval(async () => {
// If the number of messages sent exceeds the limit, exit the process.
if (limit > 0 && total >= limit) {
// Wait for the total number of sent messages to be printed, then exit the process.
await delay(1000)
process.exit(0)
}
// If not initialized or client is not connected or message count exceeds the limit, do not send messages.
if (!initialized || !client.connected || (limit > 0 && total + inFlightMessageCount >= limit)) {
return
}
inFlightMessageCount += 1
let publishTopic = topicName
let publishMessage = message
if (commandType === 'simulate') {
Expand All @@ -272,6 +295,7 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
publishMessage = simulationResult.message
}
client.publish(publishTopic, publishMessage, pubOpts.opts, (err) => {
inFlightMessageCount -= 1
if (err) {
signale.warn(err)
} else {
Expand All @@ -282,13 +306,12 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
}, messageInterval)

if (connectedCount === count) {
initialized = true

const connEnd = Date.now()

signale.info(`Created ${count} connections in ${(connEnd - connStart) / 1000}s`)

total = 0
rate = 0

if (!verbose) {
setInterval(() => {
simpleInteractive.info(`Published total: ${total}, message rate: ${rate}/s`)
Expand Down
1 change: 1 addition & 0 deletions cli/src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ declare global {
count: number
interval: number
messageInterval: number
limit: number
verbose: boolean
}

Expand Down

0 comments on commit 462f5a3

Please sign in to comment.