Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): add avro support #1735

Merged
merged 3 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
},
"dependencies": {
"@inquirer/prompts": "^5.0.3",
"avsc": "^5.7.7",
"cbor": "^9.0.1",
"chalk": "~4.1.2",
"cli-table3": "^0.6.3",
Expand Down
10 changes: 10 additions & 0 deletions cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ export class Commander {
'-Pmn, --protobuf-message-name <NAME>',
'the name of the protobuf message type (must exist in the .proto file)',
)
.option(
'-Ap, --avsc-path <PATH>',
'the path to the .avsc file that defines the avro schema for AVRO decoding',
ysfscream marked this conversation as resolved.
Show resolved Hide resolved
parseFileRead,
)
.option('--debug', 'enable debug mode for MQTT.js', false)
.allowUnknownOption(false)
.action(pub)
Expand Down Expand Up @@ -367,6 +372,11 @@ export class Commander {
'-Pmn, --protobuf-message-name <NAME>',
'the name of the protobuf message type (must exist in the .proto file)',
)
.option(
'-Ap, --avsc-path <PATH>',
'the path to the .avsc file that defines the avro schema for AVRO decoding',
parseFileRead,
)
.option('--debug', 'enable debug mode for MQTT.js', false)
.allowUnknownOption(false)
.action(sub)
Expand Down
41 changes: 22 additions & 19 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import logWrapper, { basicLog, benchLog, Signale, signale, simulateLog, singaleC
import { handleLoadOptions, handleSaveOptions } from '../utils/options'
import { checkScenarioExists, checkTopicExists, parseConnectOptions, parsePublishOptions } from '../utils/parse'
import { serializeProtobufToBuffer } from '../utils/protobuf'
import { serializeAvroToBuffer } from '../utils/avro'
import { loadSimulator } from '../utils/simulate'

/**
Expand All @@ -25,35 +26,39 @@ import { loadSimulator } from '../utils/simulate'
* Flow:
* Input Message -> [Format Conversion] -> [Protobuf Serialization] -> Output Message
ysfscream marked this conversation as resolved.
Show resolved Hide resolved
* @param {string | Buffer} message - The message to be processed.
* @param {string} [protobufPath] - The path to the protobuf definition.
* @param {string} [protobufMessageName] - The name of the protobuf message.
* @param {SchemaOptions} [schemaOptions] - Options for schema-based encoding
* @param {FormatType} [format] - The format to convert the message to.
* @returns {Buffer | string} - The processed message.
*/
const processPublishMessage = (
message: string | Buffer,
protobufPath?: string,
protobufMessageName?: string,
schemaOptions: SchemaOptions,
format?: FormatType,
): Buffer | string => {
Copy link
Member

@Red-Asuka Red-Asuka Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that schemaOptions should be an optional parameter, and that schemaOptions and format should be combined into a single options object.

The code could be refactored like this:

const processReceivedMessage = (
  payload: Buffer,
  options?: {
    format?: "base64" | "json" | "hex" | "cbor" | "binary"
    schema?: 'protobuf' | 'avro'
    protobufPath?: string
    protobufMessageName?: string
    avscPath?: string
  },
): string | Buffer => {

The rationale is that these parameters are all related to processing the payload and are optional. By merging them into a single options object, we eliminate concerns about argument order and make the code more intuitive and easier to maintain.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your effort on reviewing my code. But I have a different opinion on the design of this parameter.

  1. format and schemaOptions serve different purposes within the processing pipeline.(They are used in different functions)
  2. merging them into one parameter would bypass the restriction set in type SchemaOptions, which I believe is kind of unsafe.

But I do understand your suggestion on simplifying the signature of this function, maybe we can come to a middle ground which retains safety while simplifies the signature.

const convertMessageFormat = (msg: string | Buffer): Buffer | string => {
const convertMessageFormat = (msg: string | Buffer): string | Buffer => {
if (!format) {
return msg
}
const bufferMsg = Buffer.isBuffer(msg) ? msg : Buffer.from(msg.toString())
return convertPayload(bufferMsg, format, 'encode')
}

const serializeProtobufMessage = (msg: string | Buffer): Buffer | string => {
if (protobufPath && protobufMessageName) {
return serializeProtobufToBuffer(msg.toString(), protobufPath, protobufMessageName)
const serializeWithSchema = (msg: string | Buffer): string | Buffer => {
switch (schemaOptions.type) {
case 'none':
return msg

case 'protobuf':
return serializeProtobufToBuffer(msg, schemaOptions.protobufPath, schemaOptions.protobufMessageName)

case 'avro':
return serializeAvroToBuffer(msg, schemaOptions.avscPath)
}
ysfscream marked this conversation as resolved.
Show resolved Hide resolved
return msg
}

const pipeline = [convertMessageFormat, serializeProtobufMessage]
const pipeline = [convertMessageFormat, serializeWithSchema]

return pipeline.reduce((msg, transformer) => transformer(msg), message) as Buffer
return pipeline.reduce((msg: string | Buffer, transformer) => transformer(msg), message) as Buffer
}

const send = (
Expand All @@ -62,8 +67,7 @@ const send = (
pubOpts: {
topic: string
message: string | Buffer
protobufPath: string | undefined
protobufMessageName: string | undefined
schemaOptions: SchemaOptions
format: FormatType | undefined
opts: IClientPublishOptions
},
Expand All @@ -77,9 +81,9 @@ const send = (
client.on('connect', () => {
retryTimes = 0
basicLog.connected()
const { topic, message, protobufPath, protobufMessageName, format } = pubOpts
const { topic, message, schemaOptions, format } = pubOpts
basicLog.publishing()
const publishMessage = processPublishMessage(message, protobufPath, protobufMessageName, format)
const publishMessage = processPublishMessage(message, schemaOptions, format)
client.publish(topic, publishMessage, pubOpts.opts, (err) => {
if (err) {
basicLog.error(err)
Expand Down Expand Up @@ -127,8 +131,7 @@ const multiSend = (
pubOpts: {
topic: string
message: string | Buffer
protobufPath: string | undefined
protobufMessageName: string | undefined
schemaOptions: SchemaOptions
format: FormatType | undefined
opts: IClientPublishOptions
},
Expand All @@ -143,10 +146,10 @@ const multiSend = (
})
let count = 0
sender._write = (line, _enc, cb) => {
const { topic, opts, protobufPath, protobufMessageName, format } = pubOpts
const { topic, opts, schemaOptions, format } = pubOpts
count++
let omitTopic = opts.properties?.topicAlias && count >= 2
const publishMessage = processPublishMessage(line.trim(), protobufPath, protobufMessageName, format)
const publishMessage = processPublishMessage(line.trim(), schemaOptions, format)
client.publish(omitTopic ? '' : topic, publishMessage, opts, cb)
}

Expand Down
65 changes: 51 additions & 14 deletions cli/src/lib/sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { writeFile, appendFile, getPathExtname, createNextNumberedFileName } fro
import { deserializeBufferToProtobuf } from '../utils/protobuf'
import isSupportedBinaryFormatForMQTT from '../utils/binaryFormats'
import * as Debug from 'debug'
import { deserializeBufferToAvro } from '../utils/avro'

/**
*
Expand All @@ -17,27 +18,45 @@ import * as Debug from 'debug'
* Flow:
* payload -> [Protobuf Deserialization] -> [Format Conversion] -> Processed Message
ysfscream marked this conversation as resolved.
Show resolved Hide resolved
* @param payload - The message payload to be processed.
* @param protobufPath - The path to the Protobuf definition file.
* @param protobufMessageName - The name of the Protobuf message.
* @param {SchemaOptions} [schemaOptions] - Options for schema-based encoding
* @param format - The format of the payload.
* @returns The processed message as a string or Buffer.
*/
const processReceivedMessage = (
payload: Buffer,
protobufPath?: string,
protobufMessageName?: string,
schemaOptions: SchemaOptions,
format?: FormatType,
): string | Buffer => {
let message: string | Buffer = payload
const pipeline = [
(msg: Buffer) =>
protobufPath && protobufMessageName
? deserializeBufferToProtobuf(msg, protobufPath, protobufMessageName, format)
: msg,
(msg: Buffer) => (format ? convertPayload(msg, format, 'decode') : msg),
]

message = pipeline.reduce((msg, transformer) => transformer(msg), message)
const convertMessageFormat = (msg: string | Buffer): string | Buffer => {
if (!format) {
return msg
}
return convertPayload(msg, format, 'decode')
}

const deserializeWithSchema = (msg: string | Buffer): string | Buffer => {
switch (schemaOptions.type) {
case 'none':
return msg

case 'protobuf':
return deserializeBufferToProtobuf(
payload,
schemaOptions.protobufPath,
schemaOptions.protobufMessageName,
format,
)

case 'avro':
return deserializeBufferToAvro(payload, schemaOptions.avscPath, format)
}
}
ysfscream marked this conversation as resolved.
Show resolved Hide resolved

const pipeline = [deserializeWithSchema, convertMessageFormat]

message = pipeline.reduce((msg: string | Buffer, transformer) => transformer(msg), message)

if (Buffer.isBuffer(message) && format !== 'binary') {
message = message.toString('utf-8')
Expand Down Expand Up @@ -139,11 +158,29 @@ const sub = (options: SubscribeOptions) => {
client.on('connect', subscribeToTopics)

client.on('message', (topic, payload, packet) => {
const { format, protobufPath, protobufMessageName, fileSave, fileWrite, delimiter } = options
const { format, protobufPath, protobufMessageName, avscPath, fileSave, fileWrite, delimiter } = options

let schemaOptions: SchemaOptions
if (protobufPath && protobufMessageName) {
schemaOptions = {
type: 'protobuf',
protobufPath,
protobufMessageName,
}
} else if (avscPath) {
schemaOptions = {
type: 'avro',
avscPath,
}
} else {
schemaOptions = {
type: 'none',
}
}

const msgData: MsgItem[] = []

const receivedMessage = processReceivedMessage(payload, protobufPath, protobufMessageName, format)
const receivedMessage = processReceivedMessage(payload, schemaOptions, format)

const savePath = fileSave ? createNextNumberedFileName(fileSave) : fileWrite
if (savePath) {
Expand Down
21 changes: 20 additions & 1 deletion cli/src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@ declare global {
debug?: boolean
}

interface ProtobufSchemaOptions {
type: 'protobuf'
protobufPath: string
protobufMessageName: string
}

interface AvroSchemaOptions {
type: 'avro'
avscPath: string
}

interface NoSchema {
type: 'none'
}

type SchemaOptions = ProtobufSchemaOptions | AvroSchemaOptions | NoSchema
ysfscream marked this conversation as resolved.
Show resolved Hide resolved

interface PublishOptions extends ConnectOptions {
topic: string
message: string | Buffer
Expand All @@ -76,6 +93,7 @@ declare global {
connUserProperties?: Record<string, string | string[]>
protobufPath?: string
protobufMessageName?: string
avscPath?: string
format?: FormatType
}

Expand All @@ -92,11 +110,12 @@ declare global {
fileWrite?: string
fileSave?: string
delimiter?: string
format?: FormatType
outputMode?: OutputMode
verbose: boolean
protobufPath?: string
protobufMessageName?: string
avscPath?: string
format?: FormatType
}

type OmitConnectOptions = Omit<ConnectOptions, 'debug'>
Expand Down
59 changes: 59 additions & 0 deletions cli/src/utils/avro.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import * as avro from 'avsc'
import * as fs from 'fs'
import logWrapper from './logWrapper'

const schemaCache: { [key: string]: avro.Type } = {}

const getAvroType = (schemaPath: string): avro.Type => {
// first search from cache
if (schemaCache[schemaPath]) {
return schemaCache[schemaPath]
}
ysfscream marked this conversation as resolved.
Show resolved Hide resolved

try {
const schemaStr = fs.readFileSync(schemaPath, 'utf-8')
const type = avro.Type.forSchema(JSON.parse(schemaStr))
LAST7 marked this conversation as resolved.
Show resolved Hide resolved

// cache the parsed schema
schemaCache[schemaPath] = type

return type
} catch (err: unknown) {
logWrapper.fail(`Unable to load avro schema from ${schemaPath}: ${(err as Error).message}`)
process.exit(1)
}
}

export const serializeAvroToBuffer = (raw: string | Buffer, avscSchemaPath: string): Buffer => {
const type: avro.Type = getAvroType(avscSchemaPath)

let rawMessage = raw.toString('utf-8')

try {
const serializedMessage = type.toBuffer(JSON.parse(rawMessage))
LAST7 marked this conversation as resolved.
Show resolved Hide resolved
return Buffer.from(serializedMessage)
} catch (err: unknown) {
logWrapper.fail(`Unable to serialize message to avro buffer: ${err}`)
process.exit(1)
}
}

export const deserializeBufferToAvro = (
payload: Buffer,
avscSchemaPath: string,
needFormat?: FormatType,
): string | Buffer => {
const type: avro.Type = getAvroType(avscSchemaPath)

try {
const message = type.fromBuffer(payload)

if (needFormat) {
return Buffer.from(JSON.stringify(message))
}
return JSON.stringify(message)
} catch (err: unknown) {
logWrapper.fail(`Unable to deserialize avro encoded buffer: ${(err as Error).message}`)
process.exit(1)
}
}
21 changes: 20 additions & 1 deletion cli/src/utils/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ const parsePublishOptions = (options: PublishOptions) => {
contentType,
protobufPath,
protobufMessageName,
avscPath,
format,
} = options

Expand All @@ -358,6 +359,24 @@ const parsePublishOptions = (options: PublishOptions) => {
dup,
}

let schemaOptions: SchemaOptions
if (protobufPath && protobufMessageName) {
schemaOptions = {
type: 'protobuf',
protobufPath,
protobufMessageName,
}
} else if (avscPath) {
schemaOptions = {
type: 'avro',
avscPath,
}
} else {
schemaOptions = {
type: 'none',
}
}

if (options.mqttVersion === 5) {
const properties = {
payloadFormatIndicator,
Expand All @@ -375,7 +394,7 @@ const parsePublishOptions = (options: PublishOptions) => {
)
}

return { topic, message, protobufPath, protobufMessageName, format, opts: publishOptions }
return { topic, message, schemaOptions, format, opts: publishOptions }
}

const parseSubscribeOptions = (options: SubscribeOptions) => {
Expand Down
4 changes: 2 additions & 2 deletions cli/src/utils/protobuf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const deserializeBufferToProtobuf = (
protobufPath: string,
protobufMessageName: string,
needFormat: FormatType | undefined,
): any => {
): string | Buffer => {
try {
const root = protobuf.loadSync(protobufPath)
const Message = root.lookupType(protobufMessageName)
Expand All @@ -45,7 +45,7 @@ export const deserializeBufferToProtobuf = (
if (needFormat) {
return Buffer.from(JSON.stringify(MessageData.toJSON()))
}
return MessageData
return JSON.stringify(MessageData.toJSON())
} catch (error: unknown) {
let err = transformPBJSError(error as Error)
logWrapper.fail(err.message.split('\n')[0])
Expand Down
Loading
Loading