From 4a37ae089953e2a8cf39c10915b6b80eac1b0884 Mon Sep 17 00:00:00 2001 From: Orin Eman Date: Thu, 17 Nov 2022 13:52:37 -0800 Subject: [PATCH] fix: issue #729 and use Buffer rather than string for CIRAChannel's sendBuffer. Added test for CIRAChannel writeData() binary path --- src/amt/APFProcessor.test.ts | 26 ++++++------ src/amt/APFProcessor.ts | 57 ++++++++++++++------------- src/amt/CIRAChannel.test.ts | 20 +++++++++- src/amt/CIRAChannel.ts | 36 ++++++----------- src/amt/HttpHandler.ts | 15 ++++++- src/amt/httpHandler.test.ts | 22 ++++++++++- src/utils/redirectInterceptor.test.ts | 2 +- 7 files changed, 110 insertions(+), 68 deletions(-) diff --git a/src/amt/APFProcessor.test.ts b/src/amt/APFProcessor.test.ts index 211f660c3..59c69acd4 100644 --- a/src/amt/APFProcessor.test.ts +++ b/src/amt/APFProcessor.test.ts @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 **********************************************************************/ +import { Buffer } from 'node:buffer' import Common from '../utils/common' import { logger } from '../logging' import APFProcessor, { APFProtocol } from './APFProcessor' @@ -259,9 +260,7 @@ describe('APFProcessor Tests', () => { it('should return 9 if sending entire pending buffer', () => { const fakeCiraChannel: CIRAChannel = { - sendBuffer: { - length: 1000 - }, + sendBuffer: Buffer.alloc(1000), sendcredits: 1000, socket: { write: jest.fn() @@ -287,7 +286,7 @@ describe('APFProcessor Tests', () => { it('should return 9 if sending partial pending buffer', () => { const fakeCiraChannel: CIRAChannel = { - sendBuffer: 'my fake buffer', + sendBuffer: Buffer.from('my fake buffer'), sendcredits: 5, socket: { write: jest.fn() @@ -461,7 +460,7 @@ describe('APFProcessor Tests', () => { const length = 17 const data = '' fakeCiraChannel.closing = 0 - fakeCiraChannel.sendBuffer = 'fake buffer' + fakeCiraChannel.sendBuffer = Buffer.from('fake buffer') fakeCiraChannel.onStateChange = new EventEmitter() const result = APFProcessor.channelOpenConfirmation(fakeCiraSocket, length, data) expect(result).toEqual(17) @@ -475,7 +474,7 @@ describe('APFProcessor Tests', () => { const data = '' const readIntSpy = jest.spyOn(Common, 'ReadInt').mockReturnValue(1) fakeCiraChannel.closing = 0 - fakeCiraChannel.sendBuffer = 'fake buffer' + fakeCiraChannel.sendBuffer = Buffer.from('fake buffer') fakeCiraChannel.onStateChange = new EventEmitter() const result = APFProcessor.channelOpenConfirmation(fakeCiraSocket, length, data) expect(result).toEqual(17) @@ -1139,13 +1138,16 @@ describe('APFProcessor Tests', () => { }) it('should SendChannelData', () => { - APFProcessor.SendChannelData(fakeCiraSocket, channelid, data) - const dataExpected = + writeSpy = jest.spyOn(fakeCiraSocket, 'write') + APFProcessor.SendChannelData(fakeCiraSocket, channelid, Buffer.from(data)) + const dataExpected = Buffer.from( String.fromCharCode(APFProtocol.CHANNEL_DATA) + - Common.IntToStr(channelid) + - Common.IntToStr(data.length) + - data - expect(writeSpy).toHaveBeenCalledWith(fakeCiraSocket, dataExpected) + Common.IntToStr(channelid) + + Common.IntToStr(data.length) + + data, + 'binary' + ) + expect(writeSpy).toHaveBeenCalledWith(dataExpected) }) it('should SendChannelWindowAdjust', () => { diff --git a/src/amt/APFProcessor.ts b/src/amt/APFProcessor.ts index acca507c5..f635f075e 100644 --- a/src/amt/APFProcessor.ts +++ b/src/amt/APFProcessor.ts @@ -3,9 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 **********************************************************************/ +import { Buffer } from 'node:buffer' import { logger, messages } from '../logging' import Common from '../utils/common' import { CIRASocket } from '../models/models' +import { CIRAChannel } from './CIRAChannel' import { EventEmitter } from 'stream' const KEEPALIVE_INTERVAL = 30 // 30 seconds is typical keepalive interval for AMT CIRA connection @@ -155,19 +157,8 @@ const APFProcessor = { } cirachannel.sendcredits += ByteToAdd logger.silly(`${messages.MPS_WINDOW_ADJUST}, ${RecipientChannel.toString()}, ${ByteToAdd.toString()}, ${cirachannel.sendcredits}`) - if (cirachannel.state === 2 && cirachannel.sendBuffer != null) { - // Compute how much data we can send - if (cirachannel.sendBuffer.length <= cirachannel.sendcredits) { - // Send the entire pending buffer - APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer) - cirachannel.sendcredits -= cirachannel.sendBuffer.length - delete cirachannel.sendBuffer - } else { - // Send a part of the pending buffer - APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer.substring(0, cirachannel.sendcredits)) - cirachannel.sendBuffer = cirachannel.sendBuffer.substring(cirachannel.sendcredits) - cirachannel.sendcredits = 0 - } + if (cirachannel.state === 2) { + APFProcessor.SendPendingData(cirachannel) } return 9 }, @@ -232,19 +223,7 @@ const APFProcessor = { } else { cirachannel.state = 2 // Send any pending data - if (cirachannel.sendBuffer != null) { - if (cirachannel.sendBuffer.length <= cirachannel.sendcredits) { - // Send the entire pending buffer - APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer) - cirachannel.sendcredits -= cirachannel.sendBuffer.length - delete cirachannel.sendBuffer - } else { - // Send a part of the pending buffer - APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer.substring(0, cirachannel.sendcredits)) - cirachannel.sendBuffer = cirachannel.sendBuffer.substring(cirachannel.sendcredits) - cirachannel.sendcredits = 0 - } - } + APFProcessor.SendPendingData(cirachannel) // Indicate the channel is open if (cirachannel.onStateChange) { cirachannel.onStateChange.emit('stateChange', cirachannel.state) @@ -535,8 +514,31 @@ const APFProcessor = { APFProcessor.Write(socket, String.fromCharCode(APFProtocol.CHANNEL_CLOSE) + Common.IntToStr(channelid)) }, - SendChannelData: (socket: CIRASocket, channelid, data): void => { + SendPendingData: (cirachannel: CIRAChannel): void => { + if (cirachannel.sendBuffer != null) { + if (cirachannel.sendBuffer.length <= cirachannel.sendcredits) { + // Send the entire pending buffer + APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer) + cirachannel.sendcredits -= cirachannel.sendBuffer.length + delete cirachannel.sendBuffer + } else { + // Send a part of the pending buffer + APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer.subarray(0, cirachannel.sendcredits)) + cirachannel.sendBuffer = cirachannel.sendBuffer.subarray(cirachannel.sendcredits) + cirachannel.sendcredits = 0 + } + } + }, + + SendChannelData: (socket: CIRASocket, channelid, data: Buffer): void => { logger.silly(`${messages.MPS_SEND_CHANNEL_DATA}, ${channelid}`) + const b = Buffer.allocUnsafe(9 + data.length) + b[0] = APFProtocol.CHANNEL_DATA + b.writeUInt32BE(channelid, 1) + b.writeUInt32BE(data.length, 5) + data.copy(b, 9) + socket.write(b) + /* APFProcessor.Write( socket, String.fromCharCode(APFProtocol.CHANNEL_DATA) + @@ -544,6 +546,7 @@ const APFProcessor = { Common.IntToStr(data.length) + data ) + */ }, SendChannelWindowAdjust: (socket: CIRASocket, channelid, bytestoadd): void => { diff --git a/src/amt/CIRAChannel.test.ts b/src/amt/CIRAChannel.test.ts index 945a3fe53..2afa23b2f 100644 --- a/src/amt/CIRAChannel.test.ts +++ b/src/amt/CIRAChannel.test.ts @@ -83,9 +83,27 @@ describe('CIRA Channel', () => { expect(sendChannelSpy).toHaveBeenCalledTimes(1) expect(ciraChannel.sendcredits).toBe(0) }) + it('should write binary data to channel', async () => { + // Tests both the binary data path and appending to the sendBuffer. + // There are enough send credits for the initial sendBuffer only, + // so the string written should appear in sendBuffer. + ciraChannel.state = 2 + ciraChannel.sendcredits = 10 + ciraChannel.sendBuffer = Buffer.alloc(10) + const data = String.fromCharCode(1, 2, 3, 4, 0xC0, 5) + const sendChannelSpy = jest.spyOn(APFProcessor, 'SendChannelData').mockImplementation(() => {}) + + const writePromise = ciraChannel.writeData(data, null) + const responseData = await writePromise + + expect(responseData).toEqual(null) + expect(sendChannelSpy).toHaveBeenCalledTimes(1) + expect(ciraChannel.sendcredits).toBe(0) + expect(ciraChannel.sendBuffer).toEqual(Buffer.from(data, 'binary')) + }) it('should resolve if data does not contain messageId', async () => { ciraChannel.state = 2 - ciraChannel.sendcredits = 116 + ciraChannel.sendcredits = 97 const data = 'KVMR' const params: connectionParams = { guid: '4c4c4544-004b-4210-8033-b6c04f504633', diff --git a/src/amt/CIRAChannel.ts b/src/amt/CIRAChannel.ts index aeabe4452..f9cae7ef0 100644 --- a/src/amt/CIRAChannel.ts +++ b/src/amt/CIRAChannel.ts @@ -4,6 +4,7 @@ **********************************************************************/ // import httpZ, { HttpZResponseModel } from 'http-z' +import { Buffer } from 'node:buffer' import { CIRASocket } from '../models/models' import APFProcessor from './APFProcessor' import { connectionParams, HttpHandler } from './HttpHandler' @@ -21,7 +22,7 @@ export class CIRAChannel { amtCiraWindow: number ciraWindow: number write?: (data: string) => Promise - sendBuffer?: string = null + sendBuffer?: Buffer = null amtchannelid?: number closing?: number onStateChange?: EventEmitter // (state: number) => void @@ -83,34 +84,21 @@ export class CIRAChannel { } else { this.resolve = resolve } - let wsmanRequest: any = data + if (this.state === 0) return reject(new Error('Closed'))// return false + let wsmanRequest: Buffer if (params != null) { // this is an API Call wsmanRequest = this.httpHandler.wrapIt(params, data) + } else { + wsmanRequest = Buffer.from(data, 'binary') } - if (this.state === 0) return reject(new Error('Closed'))// return false - if (this.state === 1 || this.sendcredits === 0 || this.sendBuffer != null) { - if (this.sendBuffer == null) { - this.sendBuffer = wsmanRequest - } else { - this.sendBuffer += wsmanRequest - } - if (messageId == null) { - return resolve(null) - } else { return } + if (this.sendBuffer == null) { + this.sendBuffer = wsmanRequest + } else { + this.sendBuffer = Buffer.concat([this.sendBuffer, wsmanRequest]) } - // Compute how much data we can send - if (wsmanRequest?.length <= this.sendcredits) { - // Send the entire message - APFProcessor.SendChannelData(this.socket, this.amtchannelid, wsmanRequest) - this.sendcredits -= wsmanRequest.length - if (messageId == null) { - return resolve(null) - } else { return } + if (this.state !== 1 && this.sendcredits !== 0) { + APFProcessor.SendPendingData(this) } - // Send a part of the message - this.sendBuffer = wsmanRequest.substring(this.sendcredits) - APFProcessor.SendChannelData(this.socket, this.amtchannelid, wsmanRequest.substring(0, this.sendcredits)) - this.sendcredits = 0 if (messageId == null) { resolve(null) } diff --git a/src/amt/HttpHandler.ts b/src/amt/HttpHandler.ts index 60ce3958e..d5ca12739 100644 --- a/src/amt/HttpHandler.ts +++ b/src/amt/HttpHandler.ts @@ -33,7 +33,7 @@ export class HttpHandler { this.parser = new xml2js.Parser({ ignoreAttrs: true, mergeAttrs: false, explicitArray: false, tagNameProcessors: [this.stripPrefix], valueProcessors: [xml2js.processors.parseNumbers, xml2js.processors.parseBooleans] }) } - wrapIt (connectionParams: connectionParams, data: string): string { + wrapIt (connectionParams: connectionParams, data: string): Buffer { try { const url = '/wsman' const action = 'POST' @@ -62,6 +62,7 @@ export class HttpHandler { }) message += `Authorization: ${authorizationRequestHeader}\r\n` } + /* // Use Chunked-Encoding message += Buffer.from([ `Host: ${connectionParams.guid}:${connectionParams.port}`, @@ -73,6 +74,15 @@ export class HttpHandler { '\r\n' ].join('\r\n'), 'utf8') return message + */ + const dataBuffer = Buffer.from(data, 'utf8') + message += `Host: ${connectionParams.guid}:${connectionParams.port}\r\nContent-Length: ${dataBuffer.length}\r\n\r\n` + const buffer = Buffer.concat([Buffer.from(message, 'utf8'), dataBuffer]) + if (dataBuffer.length !== data.length) { + logger.silly(`wrapIt data length mismatch: Buffer.length = ${dataBuffer.length}, string.length = ${data.length}`) + logger.silly(buffer.toString('utf8')) + } + return buffer } catch (err) { logger.error(`${messages.CREATE_HASH_STRING_FAILED}:`, err.message) return null @@ -108,7 +118,8 @@ export class HttpHandler { parseXML (xmlBody: string): any { let wsmanResponse: string - this.parser.parseString(xmlBody, (err, result) => { + const xmlDecoded: string = Buffer.from(xmlBody, 'binary').toString('utf8') + this.parser.parseString(xmlDecoded, (err, result) => { if (err) { logger.error(`${messages.XML_PARSE_FAILED}:`, err) wsmanResponse = null diff --git a/src/amt/httpHandler.test.ts b/src/amt/httpHandler.test.ts index b3d863aff..58cb9bc3b 100644 --- a/src/amt/httpHandler.test.ts +++ b/src/amt/httpHandler.test.ts @@ -95,7 +95,27 @@ it('should return a WSMan request', async () => { password: 'P@ssw0rd' } const result = httpHandler.wrapIt(params, xmlRequestBody) - expect(result).toContain('Authorization') + expect(result.toString()).toContain('Authorization') +}) +it('should properly encode UTF8 characters', async () => { + // À is codepoint 0x00C0, [0xC3, 0x80] when UTF8 encoded... + const xmlRequestBody = 'FooÀbar' + const digestChallenge = { + realm: 'Digest:56ABC7BE224EF620C69EB88F01071DC8', + nonce: 'fVNueyEAAAAAAAAAcO8WqJ8s+WdyFUIY', + stale: 'false', + qop: 'auth' + } + const params: connectionParams = { + guid: '4c4c4544-004b-4210-8033-b6c04f504633', + port: 16992, + digestChallenge: digestChallenge, + username: 'admin', + password: 'P@ssw0rd' + } + const result: Buffer = httpHandler.wrapIt(params, xmlRequestBody) + expect(result.toString('binary')).toContain('Foo' + String.fromCharCode(0xC3, 0x80) + 'bar') + expect(result.toString('binary')).toContain('\r\nContent-Length: 19\r\n') }) it('should return a null when no xml is passed to wrap a WSMan request', async () => { const digestChallenge = { diff --git a/src/utils/redirectInterceptor.test.ts b/src/utils/redirectInterceptor.test.ts index e68006503..53974bc10 100644 --- a/src/utils/redirectInterceptor.test.ts +++ b/src/utils/redirectInterceptor.test.ts @@ -796,7 +796,7 @@ test('handleAuthenticateSession DIGEST with user, pass and digestRealm', () => { r += String.fromCharCode(ws.authCNonce.length) // CNonce Length r += ws.authCNonce // CNonce r += String.fromCharCode(ncs.length) // NonceCount Length - r += ncs // NonceCount // NonceCount + r += ncs // NonceCount r += String.fromCharCode(digest.length) // Response Length r += digest // Response r += String.fromCharCode(amt.digestQOP.length) // QOP Length