Skip to content

Commit

Permalink
fix(cli): fix sub error logic with multi topics
Browse files Browse the repository at this point in the history
  • Loading branch information
ysfscream authored and Red-Asuka committed May 29, 2024
1 parent 19d4e69 commit 64c35fe
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 24 deletions.
4 changes: 0 additions & 4 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ const send = (
}
})

client.on('close', () => {
basicLog.close()
})

client.on('disconnect', (packet: IDisconnectPacket) => {
basicLog.disconnect(packet)
})
Expand Down
63 changes: 44 additions & 19 deletions cli/src/lib/sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,35 +75,60 @@ const sub = (options: SubscribeOptions) => {

!outputModeClean && basicLog.connecting(loadOptions, connOpts.hostname!, connOpts.port, options.topic.join(', '))

client.on('connect', () => {
!outputModeClean && basicLog.connected()
const subscribeToTopics = async () => {
if (!outputModeClean) basicLog.connected()

retryTimes = 0

const subOptsArray = parseSubscribeOptions(options)

const { topic } = options

topic.forEach((t: string, index: number) => {
if (!outputModeClean) {
topic.forEach((t: string) => basicLog.subscribing(t))
}

const subscribePromises = topic.map((t: string, index: number) => {
const subOpts = subOptsArray[index]
return new Promise<{ successfulSubs: mqtt.ISubscriptionGrant[]; failedSubs: mqtt.ISubscriptionGrant[] }>(
(resolve, reject) => {
client.subscribe(t, subOpts, (err, result) => {
if (err) {
if (!outputModeClean) basicLog.error(err)
return reject(err)
}

!outputModeClean && basicLog.subscribing(t)
const successfulSubs: mqtt.ISubscriptionGrant[] = []
const failedSubs: mqtt.ISubscriptionGrant[] = []

client.subscribe(t, subOpts, (err, result) => {
if (err) {
!outputModeClean && basicLog.error(err)
process.exit(1)
}
result.forEach((sub) => {
if (sub.qos > 2) {
!outputModeClean && basicLog.subscriptionNegated(sub)
process.exit(1)
}
})
!outputModeClean && basicLog.subscribed(t)
})
result.forEach((sub) => {
if (sub.qos > 2) {
failedSubs.push(sub)
if (!outputModeClean) basicLog.subscriptionNegated(sub)
} else {
successfulSubs.push(sub)
if (!outputModeClean) basicLog.subscribed(sub.topic)
}
})

resolve({ successfulSubs, failedSubs })
})
},
)
})
})

try {
const results = await Promise.all(subscribePromises)
const allSuccessfulSubs = results.flatMap((r) => r.successfulSubs)

if (allSuccessfulSubs.length === 0) {
process.exit(1)
}
} catch (error) {
process.exit(1)
}
}

client.on('connect', subscribeToTopics)

client.on('message', (topic, payload, packet) => {
const { format, protobufPath, protobufMessageName, fileSave, fileWrite, delimiter } = options
Expand Down
2 changes: 1 addition & 1 deletion cli/src/utils/logWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ const basicLog = {
subscribing: (t: string) => logWrapper.await(`Subscribing to ${t}...`),
subscribed: (t: string) => logWrapper.success(`Subscribed to ${t}`),
subscriptionNegated: (sub: { topic: string; qos: number }) =>
logWrapper.fail(`Subscription negated to ${sub.topic} with code ${sub.qos}`),
logWrapper.fail(`Subscription negated to "${sub.topic}" with code ${sub.qos}`),
publishing: () => logWrapper.await('Message publishing...'),
published: () => logWrapper.success('Message published'),
enterToPublish: () => logWrapper.success('Connected, press Enter to publish, press Ctrl+C to exit'),
Expand Down

0 comments on commit 64c35fe

Please sign in to comment.