Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): support text output #1664

Merged
merged 3 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"json-bigint": "^1.0.0",
"lodash": "^4.17.21",
"mqtt": "^4.3.7",
"ora": "^5.4.1",
"protobufjs": "^7.2.3",
"pump": "^3.0.0",
"readable-stream": "^3.6.0",
Expand Down
2 changes: 1 addition & 1 deletion cli/src/configs/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ const DEFAULT_CONFIG: ConfigModel = {
},
}

const VALID_OUTPUT_MODES: Array<ConfigModel['output']> = ['text', 'json', 'log']
const VALID_OUTPUT_MODES: Array<ConfigModel['output']> = ['text', 'log']

export { USER_HOME_DIR, CONFIG_FILE_PATH, DEFAULT_CONFIG, VALID_OUTPUT_MODES }
1 change: 0 additions & 1 deletion cli/src/configs/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export * from './load'
export * from './init'
2 changes: 1 addition & 1 deletion cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import { conn, benchConn } from './lib/conn'
import { pub, benchPub, simulatePub } from './lib/pub'
import { sub, benchSub } from './lib/sub'
import ls from './lib/ls'
import { initConfig } from './lib/init'
import { version } from '../package.json'
import { initConfig } from './configs'
import state from './state'

export class Commander {
Expand Down
10 changes: 5 additions & 5 deletions cli/src/lib/conn.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as mqtt from 'mqtt'
import { Signale, signale, basicLog, benchLog } from '../utils/signale'
import logWrapper, { Signale, basicLog, benchLog, signale, singaleConfig } from '../utils/logWrapper'
import { parseConnectOptions } from '../utils/parse'
import delay from '../utils/delay'
import { saveConfig, loadConfig } from '../utils/config'
Expand Down Expand Up @@ -71,7 +71,7 @@ const benchConn = async (options: BenchConnectOptions) => {

const retryTimesArray = Array(count).fill(0)

const interactive = new Signale({ interactive: true })
const interactiveConn = new Signale({ interactive: true, config: singaleConfig })

benchLog.start.conn(config, count, interval, hostname, port)

Expand All @@ -85,17 +85,17 @@ const benchConn = async (options: BenchConnectOptions) => {

const client = mqtt.connect(opts)

interactive.await('[%d/%d] - Connecting...', connectedCount, count)
interactiveConn.await('[%d/%d] - Connecting...', connectedCount, count)

client.on('connect', () => {
connectedCount += 1
retryTimesArray[i - 1] = 0
if (isNewConnArray[i - 1]) {
interactive.success('[%d/%d] - Connected', connectedCount, count)
interactiveConn.success('[%d/%d] - Connected', connectedCount, count)

if (connectedCount === count) {
const end = Date.now()
signale.info(`Done, total time: ${(end - start) / 1000}s`)
signale.success(`Created ${count} connections in ${(end - start) / 1000}s`)
}
} else {
benchLog.reconnected(connectedCount, count, opts.clientId!)
Expand Down
5 changes: 2 additions & 3 deletions cli/src/configs/init.ts → cli/src/lib/init.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { writeFileSync, mkdirSync } from 'fs'
import { join } from 'path'
import { select, input, password } from '@inquirer/prompts'
import { CONFIG_FILE_PATH, DEFAULT_CONFIG, USER_HOME_DIR } from './common'
import { CONFIG_FILE_PATH, DEFAULT_CONFIG, USER_HOME_DIR } from '../configs/common'

/**
* Generates the content of a configuration INI file based on the provided config object.
Expand Down Expand Up @@ -36,8 +36,7 @@ async function initConfig(): Promise<void> {
message: 'Select MQTTX CLI output mode',
choices: [
{ name: 'Text', value: 'text', description: 'Plain text output' },
{ name: 'JSON', value: 'json', description: 'JSON formatted output' },
{ name: 'Log', value: 'log', description: 'Log file output' },
{ name: 'Log', value: 'log', description: 'Log style output' },
],
default: DEFAULT_CONFIG.output,
})) as ConfigModel['output']
Expand Down
29 changes: 13 additions & 16 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import concat from 'concat-stream'
import { Writable } from 'readable-stream'
import split2 from 'split2'
import { IClientOptions, IClientPublishOptions } from 'mqtt'
import { Signale, signale, basicLog, benchLog, simulateLog } from '../utils/signale'
import logWrapper, { Signale, basicLog, benchLog, simulateLog, singaleConfig, signale } from '../utils/logWrapper'
import { parseConnectOptions, parsePublishOptions, checkTopicExists, checkScenarioExists } from '../utils/parse'
import delay from '../utils/delay'
import { saveConfig, loadConfig } from '../utils/config'
Expand Down Expand Up @@ -58,7 +58,7 @@ const send = (
const publishMessage = processPublishMessage(message, protobufPath, protobufMessageName, format)
client.publish(topic, publishMessage, pubOpts.opts, (err) => {
if (err) {
signale.warn(err)
logWrapper.warn(err.toString())
} else {
basicLog.published()
}
Expand Down Expand Up @@ -156,7 +156,8 @@ const handleFileRead = (filePath: string) => {
basicLog.fileReadSuccess()
return bufferData
} catch (err) {
signale.error('Failed to read file:', err)
const error = err as Error
logWrapper.fail(`Failed to read file: ${error.toString()}`)
process.exit(1)
}
}
Expand Down Expand Up @@ -262,11 +263,7 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |

const retryTimesArray = Array(count).fill(0)

const interactive = new Signale({ interactive: true })
const simpleInteractive = new Signale({
interactive: true,
config: { displayLabel: false, displayDate: true, displayTimestamp: true },
})
const interactivePub = new Signale({ interactive: true, config: singaleConfig })

if (commandType === 'simulate') {
simulateLog.start.pub(
Expand Down Expand Up @@ -303,27 +300,27 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |

const client = mqtt.connect(opts)

interactive.await('[%d/%d] - Connecting...', connectedCount, count)
interactivePub.await('[%d/%d] - Connecting...', connectedCount, count)

client.on('connect', () => {
connectedCount += 1
retryTimesArray[i - 1] = 0
if (isNewConnArray[i - 1]) {
interactive.success('[%d/%d] - Connected', connectedCount, count)
interactivePub.success('[%d/%d] - Connected', connectedCount, count)

setInterval(async () => {
// If the number of messages sent exceeds the limit, exit the process.
if (limit > 0 && total >= limit) {
// Wait for the total number of sent messages to be printed, then exit the process.
await delay(1000)
signale.success(`All ${total} messages have been sent, reaching the limit of ${limit}.`)
logWrapper.success(`All ${total} messages have been sent, reaching the limit of ${limit}.`)
process.exit(0)
}
// If the segmented message has been completely sent, exit the process.
if (splitLimit > 0 && total >= splitLimit) {
// Wait for the total number of sent messages to be printed, then exit the process.
await delay(1000)
signale.success(`All ${total} messages from the ${fileRead} have been successfully sent.`)
logWrapper.success(`All ${total} messages from the ${fileRead} have been successfully sent.`)
process.exit(0)
}
// If not initialized or client is not connected or message count exceeds the limit, do not send messages.
Expand Down Expand Up @@ -359,7 +356,7 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
client.publish(publishTopic, publishMessage, pubOpts.opts, (err) => {
inFlightMessageCount -= 1
if (err) {
signale.warn(err)
logWrapper.warn(err.toString())
} else {
total += 1
rate += 1
Expand All @@ -372,16 +369,16 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |

const connEnd = Date.now()

signale.info(`Created ${count} connections in ${(connEnd - connStart) / 1000}s`)
signale.success(`Created ${count} connections in ${(connEnd - connStart) / 1000}s`)

if (!verbose) {
setInterval(() => {
simpleInteractive.info(`Published total: ${total}, message rate: ${rate}/s`)
interactivePub.log(`Published total: ${total}, message rate: ${rate}/s`)
rate = 0
}, 1000)
} else {
setInterval(() => {
signale.info(`Published total: ${total}, message rate: ${rate}/s`)
logWrapper.log(`Published total: ${total}, message rate: ${rate}/s`)
rate = 0
}, 1000)
}
Expand Down
28 changes: 12 additions & 16 deletions cli/src/lib/sub.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as mqtt from 'mqtt'
import { Signale, signale, msgLog, basicLog, benchLog } from '../utils/signale'
import logWrapper, { Signale, msgLog, basicLog, benchLog, singaleConfig, signale } from '../utils/logWrapper'
import { parseConnectOptions, parseSubscribeOptions, checkTopicExists } from '../utils/parse'
import delay from '../utils/delay'
import convertPayload from '../utils/convertPayload'
Expand Down Expand Up @@ -41,7 +41,7 @@ const processReceivedMessage = (
const handleDefaultBinaryFile = (format: FormatType | undefined, filePath?: string) => {
if (filePath) {
if ((!format || format !== 'binary') && isSupportedBinaryFormatForMQTT(getPathExtname(filePath))) {
signale.warn('Please use the --format binary option for handling binary files')
logWrapper.warn('Please use the --format binary option for handling binary files')
if (!format) {
return 'binary'
}
Expand Down Expand Up @@ -199,11 +199,7 @@ const benchSub = async (options: BenchSubscribeOptions) => {

const retryTimesArray = Array(count).fill(0)

const interactive = new Signale({ interactive: true })
const simpleInteractive = new Signale({
interactive: true,
config: { displayLabel: false, displayDate: true, displayTimestamp: true },
})
const interactiveSub = new Signale({ interactive: true, config: singaleConfig })

benchLog.start.sub(config, count, interval, hostname, port, topic.join(', '))

Expand All @@ -220,13 +216,13 @@ const benchSub = async (options: BenchSubscribeOptions) => {

const client = mqtt.connect(opts)

interactive.await('[%d/%d] - Connecting...', connectedCount, count)
interactiveSub.await('[%d/%d] - Connecting...', connectedCount, count)

client.on('connect', () => {
connectedCount += 1
retryTimesArray[i - 1] = 0
if (isNewConnArray[i - 1]) {
interactive.success('[%d/%d] - Connected', connectedCount, count)
interactiveSub.success('[%d/%d] - Connected', connectedCount, count)

topic.forEach((t: string, index: number) => {
const { username, clientId } = opts
Expand All @@ -236,19 +232,19 @@ const benchSub = async (options: BenchSubscribeOptions) => {

const subOpts = subOptsArray[index]

interactive.await('[%d/%d] - Subscribing to %s...', connectedCount, count, topicName)
interactiveSub.await('[%d/%d] - Subscribing to %s...', connectedCount, count, topicName)

client.subscribe(topicName, subOpts, (err, result) => {
if (err) {
signale.error(`[${i}/${count}] - Client ID: ${opts.clientId}, ${err}`)
logWrapper.fail(`[${i}/${count}] - Client ID: ${opts.clientId}, ${err}`)
process.exit(1)
} else {
interactive.success('[%d/%d] - Subscribed to %s', connectedCount, count, topicName)
interactiveSub.success('[%d/%d] - Subscribed to %s', connectedCount, count, topicName)
}

result.forEach((sub) => {
if (sub.qos > 2) {
signale.error(
logWrapper.fail(
`[${i}/${count}] - Client ID: ${opts.clientId}, subscription negated to ${sub.topic} with code ${sub.qos}`,
)
process.exit(1)
Expand All @@ -258,21 +254,21 @@ const benchSub = async (options: BenchSubscribeOptions) => {
if (connectedCount === count && topic[topic.length - 1] === t) {
const connEnd = Date.now()

signale.info(`Created ${count} connections in ${(connEnd - connStart) / 1000}s`)
signale.success(`Created ${count} connections in ${(connEnd - connStart) / 1000}s`)

total = 0

if (!verbose) {
setInterval(() => {
const rate = total - oldTotal
simpleInteractive.info(`Received total: ${total}, rate: ${rate}/s`)
interactiveSub.log(`Received total: ${total}, rate: ${rate}/s`)
oldTotal = total
}, 1000)
} else {
setInterval(() => {
if (total > oldTotal) {
const rate = total - oldTotal
signale.info(`Received total: ${total}, rate: ${rate}/s`)
logWrapper.log(`Received total: ${total}, rate: ${rate}/s`)
}
oldTotal = total
}, 1000)
Expand Down
2 changes: 1 addition & 1 deletion cli/src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ declare global {
}

interface ConfigModel {
output: 'text' | 'json' | 'log'
output: 'text' | 'log'
mqtt: {
host: string
port: number
Expand Down
14 changes: 8 additions & 6 deletions cli/src/utils/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import signale from 'signale'
import {
fileExists,
writeFile,
Expand All @@ -8,6 +7,7 @@ import {
parseYamlOrJson,
isYaml,
} from '../utils/fileUtils'
import logWrapper from './logWrapper'

const defaultPath = `${process.cwd()}/mqttx-cli-config.json`

Expand All @@ -29,7 +29,7 @@ const removeUselessOptions = (
const validateConfig = (commandType: CommandType, filePath: string, config: Config) => {
const data = config[commandType]
if (!data || typeof data !== 'object' || Object.keys(data).length === 0) {
signale.error(`No configuration for ${commandType} found in ${filePath}`)
logWrapper.fail(`No configuration for ${commandType} found in ${filePath}`)
process.exit(1)
}
}
Expand All @@ -54,9 +54,10 @@ const saveConfig = (
}
const content = stringifyToYamlOrJson(data, isYaml(filePath))
writeFile(filePath, content)
signale.success(`Configurations saved to ${filePath}`)
logWrapper.success(`Configurations saved to ${filePath}`)
} catch (error) {
signale.error(error)
const err = error as Error
logWrapper.fail(err.toString())
process.exit(1)
}
}
Expand All @@ -77,11 +78,12 @@ function loadConfig(commandType: CommandType, savePath: boolean | string) {
validateConfig(commandType, filePath, config)
return config[commandType]
} else {
signale.error(`Configuration file ${filePath} not found`)
logWrapper.fail(`Configuration file ${filePath} not found`)
process.exit(1)
}
} catch (error) {
signale.error(error)
const err = error as Error
logWrapper.fail(err.toString())
process.exit(1)
}
}
Expand Down
2 changes: 1 addition & 1 deletion cli/src/utils/convertPayload.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import chalk from 'chalk'
import { jsonParse, jsonStringify } from './jsonUtils'
import cbor from 'cbor'
import { basicLog } from './signale'
import { basicLog } from './logWrapper'

type Action = 'encode' | 'decode'

Expand Down
13 changes: 8 additions & 5 deletions cli/src/utils/fileUtils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import fs from 'fs'
import path from 'path'
import YAML from 'js-yaml'
import signale from 'signale'
import logWrapper from './logWrapper'

const processPath = (savePath: boolean | string, defaultPath?: string) => {
let filePath = ''
Expand Down Expand Up @@ -37,7 +37,8 @@ const readFile = (filePath: string): Buffer => {
try {
return fs.readFileSync(filePath)
} catch (error) {
signale.error(error)
const err = error as Error
logWrapper.fail(err.toString())
process.exit(1)
}
}
Expand All @@ -46,7 +47,8 @@ const writeFile = (filePath: string, data: string | Buffer): void => {
try {
fs.writeFileSync(filePath, data)
} catch (error) {
signale.error(error)
const err = error as Error
logWrapper.fail(err.toString())
process.exit(1)
}
}
Expand All @@ -55,7 +57,8 @@ const appendFile = (filePath: string, data: string | Buffer, delimiter = '\n'):
try {
fs.appendFileSync(filePath, `${data}${delimiter}`)
} catch (error) {
signale.error(error)
const err = error as Error
logWrapper.fail(err.toString())
process.exit(1)
}
}
Expand Down Expand Up @@ -86,7 +89,7 @@ const createNextNumberedFileName = (filePath: string): string => {
const newFileName = `${baseNameWithoutExt}(${newNumber})${ext}`
return path.join(dir, newFileName)
} catch (err) {
signale.error(`Error: Unable to create a new numbered file name for path '${filePath}'.`)
logWrapper.fail(`Error: Unable to create a new numbered file name for path '${filePath}'.`)
process.exit(1)
}
}
Expand Down
Loading
Loading