diff --git a/cli/src/lib/sub.ts b/cli/src/lib/sub.ts index 049170977..7415fd007 100644 --- a/cli/src/lib/sub.ts +++ b/cli/src/lib/sub.ts @@ -214,8 +214,6 @@ const benchSub = async (options: BenchSubscribeOptions) => { const connOpts = parseConnectOptions(options, 'sub') - let connectedCount = 0 - const subOptsArray = parseSubscribeOptions(options) const isNewConnArray = Array(count).fill(true) @@ -230,9 +228,11 @@ const benchSub = async (options: BenchSubscribeOptions) => { let total = 0 let oldTotal = 0 - let isLogged = false + let connectedCount = 0 let subscribedCount = 0 + const allSuccessfulSubs: mqtt.ISubscriptionGrant[] = [] + const failedSubs: { clientId: string; subItem: mqtt.ISubscriptionGrant }[] = [] for (let i = 1; i <= count; i++) { ;((i: number, connOpts: mqtt.IClientOptions) => { @@ -244,64 +244,78 @@ const benchSub = async (options: BenchSubscribeOptions) => { interactiveSub.await('[%d/%d] - Connecting...', connectedCount, count) - client.on('connect', () => { + client.on('connect', async () => { connectedCount += 1 retryTimesArray[i - 1] = 0 if (isNewConnArray[i - 1]) { interactiveSub.success('[%d/%d] - Connected', connectedCount, count) - topic.forEach((t: string, index: number) => { - const { username, clientId } = opts + if (count === connectedCount) { + const connEnd = Date.now() + signale.success(`Created ${count} connections in ${(connEnd - connStart) / 1000}s`) + } + + const subscribePromises = topic.map((t: string, index: number) => { + return new Promise((resolve, reject) => { + const { username, clientId } = opts + let topicName = t.replaceAll('%i', i.toString()).replaceAll('%c', clientId!) + username && (topicName = topicName.replaceAll('%u', username)) + const subOpts = subOptsArray[index] + client.subscribe(topicName, subOpts, (err, result) => { + if (err) { + logWrapper.fail(`[${i}/${count}] - Client ID: ${opts.clientId}, ${err}`) + return reject(err) + } + result.forEach((sub) => { + if (sub.qos > 2) { + failedSubs.push({ + clientId: opts.clientId!, + subItem: sub, + }) + } else { + allSuccessfulSubs.push(sub) + } + subscribedCount += 1 + }) + resolve() + }) + }) + }) - let topicName = t.replaceAll('%i', i.toString()).replaceAll('%c', clientId!) - username && (topicName = topicName.replaceAll('%u', username)) + try { + await Promise.all(subscribePromises) - const subOpts = subOptsArray[index] + if (connectedCount === count && subscribedCount === count * topic.length && !isLogged) { + if (allSuccessfulSubs.length > 0) { + const uniqueSuccessfulSubs = Array.from(new Set(allSuccessfulSubs.map((t) => t.topic))) + logWrapper.success(`All connections subscribed to: ${uniqueSuccessfulSubs.join(', ')}`) + } - interactiveSub.await('[%d/%d] - Subscribing to %s...', connectedCount, count, topicName) + if (failedSubs.length > 0) { + failedSubs.forEach((sub) => { + basicLog.subscriptionNegated(sub.subItem, sub.clientId) + }) + } - client.subscribe(topicName, subOpts, (err, result) => { - if (err) { - logWrapper.fail(`[${i}/${count}] - Client ID: ${opts.clientId}, ${err}`) + if (allSuccessfulSubs.length === 0) { process.exit(1) } - result.forEach((sub) => { - if (sub.qos > 2) { - logWrapper.fail( - `[${i}/${count}] - Client ID: ${opts.clientId}, subscription negated to ${sub.topic} with code ${sub.qos}`, - ) - process.exit(1) - } - }) - - interactiveSub.success('[%d/%d] - Subscribed to %s', connectedCount, count, topicName) - subscribedCount += 1 - - if (connectedCount === count && subscribedCount === count * topic.length && !isLogged) { - const connEnd = Date.now() - signale.success(`Created ${count} connections in ${(connEnd - connStart) / 1000}s`) - total = 0 - isLogged = true - - const intervalFunc = () => { - const rate = total - oldTotal - interactiveSub.log(`Received total: ${total}, rate: ${rate}/s`) - oldTotal = total - } - - const verboseIntervalFunc = () => { - if (total > oldTotal) { - const rate = total - oldTotal - logWrapper.log(`Received total: ${total}, rate: ${rate}/s`) - } - oldTotal = total - } + total = 0 + isLogged = true - setInterval(verbose ? verboseIntervalFunc : intervalFunc, 1000) + const logRate = () => { + const rate = total - oldTotal + const logMethod = verbose ? logWrapper.log : interactiveSub.log + logMethod(`Received total: ${total}, rate: ${rate}/s`) + oldTotal = total } - }) - }) + + setInterval(logRate, 1000) + } + } catch (error) { + process.exit(1) + } } else { benchLog.reconnected(connectedCount, count, opts.clientId!) } diff --git a/cli/src/utils/logWrapper.ts b/cli/src/utils/logWrapper.ts index cd9c4c474..c4874ed5b 100644 --- a/cli/src/utils/logWrapper.ts +++ b/cli/src/utils/logWrapper.ts @@ -68,8 +68,13 @@ const basicLog = { connected: () => logWrapper.success('Connected'), subscribing: (t: string) => logWrapper.await(`Subscribing to ${t}...`), subscribed: (t: string) => logWrapper.success(`Subscribed to ${t}`), - subscriptionNegated: (sub: { topic: string; qos: number }) => - logWrapper.fail(`Subscription negated to "${sub.topic}" with code ${sub.qos}`), + subscriptionNegated: (sub: { topic: string; qos: number }, clientId?: string) => { + let errorLog = `Subscription negated to "${sub.topic}" with code ${sub.qos}` + if (clientId) { + errorLog = `Client ID: ${clientId}, ${errorLog}` + } + logWrapper.fail(errorLog) + }, publishing: () => logWrapper.await('Message publishing...'), published: () => logWrapper.success('Message published'), enterToPublish: () => logWrapper.success('Connected, press Enter to publish, press Ctrl+C to exit'),