From e0bf45af3ec85986d15ce3d05cb62637bc761a3e Mon Sep 17 00:00:00 2001 From: Chad Nehemiah Date: Tue, 21 Mar 2023 08:46:14 -0500 Subject: [PATCH] deps(dev): Upgrade aegir to 38.1.7 (#257) Updates aegir and removes superfluous typescript dep --------- Co-authored-by: achingbrain --- package.json | 6 ++-- src/alloc-unsafe-browser.ts | 2 +- src/alloc-unsafe.ts | 2 +- src/decode.ts | 9 +++-- src/encode.ts | 2 +- src/mplex.ts | 24 +++++++------- src/stream.ts | 4 +-- test/fixtures/utils.ts | 10 ++++-- test/restrict-size.spec.ts | 6 ++-- test/stream.spec.ts | 65 ++++++++++++++++++++----------------- 10 files changed, 73 insertions(+), 57 deletions(-) diff --git a/package.json b/package.json index c239e5c286..ae05b95d99 100644 --- a/package.json +++ b/package.json @@ -165,9 +165,10 @@ "devDependencies": { "@libp2p/interface-stream-muxer-compliance-tests": "^6.0.0", "@types/varint": "^6.0.0", - "aegir": "^37.2.0", + "aegir": "^38.1.7", "cborg": "^1.8.1", "delay": "^5.0.0", + "eslint-plugin-etc": "^2.0.2", "iso-random-stream": "^2.0.2", "it-all": "^2.0.0", "it-drain": "^2.0.0", @@ -176,8 +177,7 @@ "it-pipe": "^2.0.3", "it-to-buffer": "^3.0.0", "p-defer": "^4.0.0", - "random-int": "^3.0.0", - "typescript": "^5.0.2" + "random-int": "^3.0.0" }, "browser": { "./dist/src/alloc-unsafe.js": "./dist/src/alloc-unsafe-browser.js" diff --git a/src/alloc-unsafe-browser.ts b/src/alloc-unsafe-browser.ts index d53332ea9f..111defb5c6 100644 --- a/src/alloc-unsafe-browser.ts +++ b/src/alloc-unsafe-browser.ts @@ -1,3 +1,3 @@ -export function allocUnsafe (size: number) { +export function allocUnsafe (size: number): Uint8Array { return new Uint8Array(size) } diff --git a/src/alloc-unsafe.ts b/src/alloc-unsafe.ts index a8caa3c0c6..5387f4153c 100644 --- a/src/alloc-unsafe.ts +++ b/src/alloc-unsafe.ts @@ -1,3 +1,3 @@ -export function allocUnsafe (size: number) { +export function allocUnsafe (size: number): Buffer { return Buffer.allocUnsafe(size) } diff --git a/src/decode.ts b/src/decode.ts index 94d58123dc..b6dcb8cb43 100644 --- a/src/decode.ts +++ b/src/decode.ts @@ -25,7 +25,7 @@ export class Decoder { this._maxUnprocessedMessageQueueSize = maxUnprocessedMessageQueueSize } - write (chunk: Uint8Array) { + write (chunk: Uint8Array): Message[] { if (chunk == null || chunk.length === 0) { return [] } @@ -109,7 +109,12 @@ export class Decoder { const MSB = 0x80 const REST = 0x7F -function readVarInt (buf: Uint8ArrayList, offset: number = 0) { +export interface ReadVarIntResult { + value: number + offset: number +} + +function readVarInt (buf: Uint8ArrayList, offset: number = 0): ReadVarIntResult { let res = 0 let shift = 0 let counter = offset diff --git a/src/encode.ts b/src/encode.ts index 7b9f55a332..bd79b377bd 100644 --- a/src/encode.ts +++ b/src/encode.ts @@ -56,7 +56,7 @@ const encoder = new Encoder() /** * Encode and yield one or more messages */ -export async function * encode (source: Source, minSendBytes: number = 0) { +export async function * encode (source: Source, minSendBytes: number = 0): AsyncGenerator { if (minSendBytes == null || minSendBytes === 0) { // just send the messages for await (const messages of source) { diff --git a/src/mplex.ts b/src/mplex.ts index 9b5caa8b53..c2e8c444c8 100644 --- a/src/mplex.ts +++ b/src/mplex.ts @@ -22,7 +22,7 @@ const MAX_STREAMS_OUTBOUND_STREAMS_PER_CONNECTION = 1024 const MAX_STREAM_BUFFER_SIZE = 1024 * 1024 * 4 // 4MB const DISCONNECT_THRESHOLD = 5 -function printMessage (msg: Message) { +function printMessage (msg: Message): any { const output: any = { ...msg, type: `${MessageTypeNames[msg.type]} (${msg.type})` @@ -101,7 +101,7 @@ export class MplexStreamMuxer implements StreamMuxer { /** * Returns a Map of streams and their ids */ - get streams () { + get streams (): Stream[] { // Inbound and Outbound streams may have the same ids, so we need to make those unique const streams: Stream[] = [] for (const stream of this._streams.initiators.values()) { @@ -135,9 +135,9 @@ export class MplexStreamMuxer implements StreamMuxer { if (this.closeController.signal.aborted) return if (err != null) { - this.streams.forEach(s => s.abort(err)) + this.streams.forEach(s => { s.abort(err) }) } else { - this.streams.forEach(s => s.close()) + this.streams.forEach(s => { s.close() }) } this.closeController.abort() } @@ -145,13 +145,13 @@ export class MplexStreamMuxer implements StreamMuxer { /** * Called whenever an inbound stream is created */ - _newReceiverStream (options: { id: number, name: string }) { + _newReceiverStream (options: { id: number, name: string }): MplexStream { const { id, name } = options const registry = this._streams.receivers return this._newStream({ id, name, type: 'receiver', registry }) } - _newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map }) { + _newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map }): MplexStream { const { id, name, type, registry } = options log('new %s stream %s', type, id) @@ -164,7 +164,7 @@ export class MplexStreamMuxer implements StreamMuxer { throw new Error(`${type} stream ${id} already exists!`) } - const send = (msg: Message) => { + const send = (msg: Message): void => { if (log.enabled) { log.trace('%s stream %s send', type, id, printMessage(msg)) } @@ -172,7 +172,7 @@ export class MplexStreamMuxer implements StreamMuxer { this._source.push(msg) } - const onEnd = () => { + const onEnd = (): void => { log('%s stream with id %s and protocol %s ended', type, id, stream.stat.protocol) registry.delete(id) @@ -190,7 +190,7 @@ export class MplexStreamMuxer implements StreamMuxer { * Creates a sink with an abortable source. Incoming messages will * also have their size restricted. All messages will be varint decoded. */ - _createSink () { + _createSink (): Sink { const sink: Sink = async source => { // see: https://github.com/jacobheun/any-signal/pull/18 const abortSignals = [this.closeController.signal] @@ -222,8 +222,8 @@ export class MplexStreamMuxer implements StreamMuxer { * Creates a source that restricts outgoing message sizes * and varint encodes them */ - _createSource () { - const onEnd = (err?: Error) => { + _createSource (): any { + const onEnd = (err?: Error): void => { this.close(err) } const source = pushableV({ @@ -238,7 +238,7 @@ export class MplexStreamMuxer implements StreamMuxer { }) } - async _handleIncoming (message: Message) { + async _handleIncoming (message: Message): Promise { const { id, type } = message if (log.enabled) { diff --git a/src/stream.ts b/src/stream.ts index 7ee1510c9d..949e57434c 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -47,7 +47,7 @@ export function createStream (options: Options): MplexStream { open: Date.now() } - const onSourceEnd = (err?: Error) => { + const onSourceEnd = (err?: Error): void => { if (sourceEnded) { return } @@ -68,7 +68,7 @@ export function createStream (options: Options): MplexStream { } } - const onSinkEnd = (err?: Error) => { + const onSinkEnd = (err?: Error): void => { if (sinkEnded) { return } diff --git a/test/fixtures/utils.ts b/test/fixtures/utils.ts index a3f10dfed1..bfab13a04b 100644 --- a/test/fixtures/utils.ts +++ b/test/fixtures/utils.ts @@ -1,10 +1,16 @@ import { Message, MessageTypes } from '../../src/message-types.js' -export function messageWithBytes (msg: Message) { +export type MessageWithBytes = { + [k in keyof Message]: Message[k] +} & { + data: Uint8Array +} + +export function messageWithBytes (msg: Message): Message | MessageWithBytes { if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { return { ...msg, - data: msg.data.slice() // convert Uint8ArrayList to Buffer + data: msg.data.slice() // convert Uint8ArrayList to Uint8Array } } diff --git a/test/restrict-size.spec.ts b/test/restrict-size.spec.ts index 7f5ebfa1c1..fc6637f5e3 100644 --- a/test/restrict-size.spec.ts +++ b/test/restrict-size.spec.ts @@ -33,7 +33,7 @@ describe('restrict size', () => { (source) => each(source, chunk => { output.push(chunk) }), - async (source) => await drain(source) + async (source) => { await drain(source) } ) } catch (err: any) { expect(err).to.have.property('code', 'ERR_MSG_TOO_BIG') @@ -90,7 +90,7 @@ describe('restrict size', () => { (source) => each(source, chunk => { output.push(chunk) }), - async (source) => await drain(source) + async (source) => { await drain(source) } ) } catch (err: any) { expect(err).to.have.property('code', 'ERR_MSG_QUEUE_TOO_BIG') @@ -113,7 +113,7 @@ describe('restrict size', () => { (source) => each(source, chunk => { output.push(chunk) }), - async (source) => await drain(source) + async (source) => { await drain(source) } ) } catch (err: any) { expect(err).to.have.property('code', 'ERR_MSG_QUEUE_TOO_BIG') diff --git a/test/stream.spec.ts b/test/stream.spec.ts index f57e8c07b8..f7d19e90e6 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -17,11 +17,11 @@ import type { Message } from '../src/message-types.js' import type { MplexStream } from '../src/mplex.js' import { Uint8ArrayList } from 'uint8arraylist' -function randomInput (min = 1, max = 100) { +function randomInput (min = 1, max = 100): Uint8ArrayList[] { return Array.from(Array(randomInt(min, max)), () => new Uint8ArrayList(randomBytes(randomInt(1, 128)))) } -function expectMsgType (actual: keyof typeof MessageTypeNames, expected: keyof typeof MessageTypeNames) { +function expectMsgType (actual: keyof typeof MessageTypeNames, expected: keyof typeof MessageTypeNames): void { expect(MessageTypeNames[actual]).to.equal(MessageTypeNames[expected]) } @@ -33,7 +33,7 @@ function echoedMessage (message: Message): Message { return bufferToMessage(message.data.slice()) } -function expectMessages (messages: Message[], codes: Array) { +function expectMessages (messages: Message[], codes: Array): void { messages.slice(0, codes.length).forEach((msg, index) => { expect(msg).to.have.property('type', codes[index]) @@ -43,11 +43,11 @@ function expectMessages (messages: Message[], codes: Array) { - return expectMessages(messages.slice(0, codes.length).map(echoedMessage), codes) +function expectEchoedMessages (messages: Message[], codes: Array): void { + expectMessages(messages.slice(0, codes.length).map(echoedMessage), codes) } -const msgToBuffer = (msg: Message) => { +const msgToBuffer = (msg: Message): Uint8ArrayList => { const m: any = { ...msg } @@ -65,12 +65,17 @@ interface onMessage { (msg: Message, initator: MplexStream, receiver: MplexStream): void } -async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiverMessage?: onMessage) { +export interface StreamPair { + initiatorMessages: Message[] + receiverMessages: Message[] +} + +async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiverMessage?: onMessage): Promise { const receiverMessages: Message[] = [] const initiatorMessages: Message[] = [] const id = 5 - const mockInitiatorSend = (msg: Message) => { + const mockInitiatorSend = (msg: Message): void => { initiatorMessages.push(msg) if (onInitiatorMessage != null) { @@ -79,7 +84,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver receiver.sourcePush(msgToBuffer(msg)) } - const mockReceiverSend = (msg: Message) => { + const mockReceiverSend = (msg: Message): void => { receiverMessages.push(msg) if (onReceiverMessage != null) { @@ -99,7 +104,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver // when the initiator sends a CLOSE message, we call close if (msg.type === MessageTypes.CLOSE_INITIATOR) { - void receiver.closeRead() + receiver.closeRead() } // when the initiator sends a RESET message, we call close @@ -119,7 +124,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver // when the receiver sends a CLOSE message, we call close if (msg.type === MessageTypes.CLOSE_RECEIVER) { - void initiator.close() + initiator.close() } // when the receiver sends a RESET message, we call close @@ -142,7 +147,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver describe('stream', () => { it('should initiate stream with NEW_STREAM message', async () => { const msgs: Message[] = [] - const mockSend = (msg: Message) => msgs.push(msg) + const mockSend = (msg: Message): void => { msgs.push(msg) } const id = randomInt(1000) const stream = createStream({ id, send: mockSend }) const input = randomInput() @@ -156,7 +161,7 @@ describe('stream', () => { it('should initiate named stream with NEW_STREAM message', async () => { const msgs: Message[] = [] - const mockSend = (msg: Message) => msgs.push(msg) + const mockSend = (msg: Message): void => { msgs.push(msg) } const id = randomInt(1000) const name = `STREAM${Date.now()}` const stream = createStream({ id, name, send: mockSend }) @@ -171,7 +176,7 @@ describe('stream', () => { it('should end a stream when it is aborted', async () => { const msgs: Message[] = [] - const mockSend = (msg: Message) => msgs.push(msg) + const mockSend = (msg: Message): void => { msgs.push(msg) } const id = randomInt(1000) const name = `STREAM${Date.now()}` const deferred = defer() @@ -186,7 +191,7 @@ describe('stream', () => { it('should end a stream when it is reset', async () => { const msgs: Message[] = [] - const mockSend = (msg: Message) => msgs.push(msg) + const mockSend = (msg: Message): void => { msgs.push(msg) } const id = randomInt(1000) const name = `STREAM${Date.now()}` const deferred = defer() @@ -201,7 +206,7 @@ describe('stream', () => { it('should send data with MESSAGE_INITIATOR messages if stream initiator', async () => { const msgs: Message[] = [] - const mockSend = (msg: Message) => msgs.push(msg) + const mockSend = (msg: Message): void => { msgs.push(msg) } const id = randomInt(1000) const name = id.toString() const stream = createStream({ id, name, send: mockSend, type: 'initiator' }) @@ -222,7 +227,7 @@ describe('stream', () => { it('should send data with MESSAGE_RECEIVER messages if stream receiver', async () => { const msgs: Message[] = [] - const mockSend = (msg: Message) => msgs.push(msg) + const mockSend = (msg: Message): void => { msgs.push(msg) } const id = randomInt(1000) const name = id.toString() const stream = createStream({ id, name, send: mockSend, type: 'receiver' }) @@ -243,7 +248,7 @@ describe('stream', () => { it('should close stream with CLOSE_INITIATOR message if stream initiator', async () => { const msgs: Message[] = [] - const mockSend = (msg: Message) => msgs.push(msg) + const mockSend = (msg: Message): void => { msgs.push(msg) } const id = randomInt(1000) const name = id.toString() const stream = createStream({ id, name, send: mockSend, type: 'initiator' }) @@ -260,7 +265,7 @@ describe('stream', () => { it('should close stream with CLOSE_RECEIVER message if stream receiver', async () => { const msgs: Message[] = [] - const mockSend = (msg: Message) => msgs.push(msg) + const mockSend = (msg: Message): void => { msgs.push(msg) } const id = randomInt(1000) const name = id.toString() const stream = createStream({ id, name, send: mockSend, type: 'receiver' }) @@ -277,7 +282,7 @@ describe('stream', () => { it('should reset stream on error with RESET_INITIATOR message if stream initiator', async () => { const msgs: Message[] = [] - const mockSend = (msg: Message) => msgs.push(msg) + const mockSend = (msg: Message): void => { msgs.push(msg) } const id = randomInt(1000) const name = id.toString() const stream = createStream({ id, name, send: mockSend, type: 'initiator' }) @@ -302,7 +307,7 @@ describe('stream', () => { it('should reset stream on error with RESET_RECEIVER message if stream receiver', async () => { const msgs: Message[] = [] - const mockSend = (msg: Message) => msgs.push(msg) + const mockSend = (msg: Message): void => { msgs.push(msg) } const id = randomInt(1000) const name = id.toString() const stream = createStream({ id, name, send: mockSend, type: 'receiver' }) @@ -371,7 +376,7 @@ describe('stream', () => { messages++ if (messages === maxMsgs) { - return initiator.abort(error) + initiator.abort(error) } }) @@ -401,7 +406,7 @@ describe('stream', () => { messages++ if (messages === maxMsgs) { - return recipient.abort(error) + recipient.abort(error) } }) @@ -490,7 +495,7 @@ describe('stream', () => { }) it('should call onEnd only when both sides have closed', async () => { - const send = (msg: Message) => { + const send = (msg: Message): void => { if (msg.type === MessageTypes.CLOSE_INITIATOR) { // simulate remote closing connection stream.closeRead() @@ -501,7 +506,7 @@ describe('stream', () => { const id = randomInt(1000) const name = id.toString() const deferred = defer() - const onEnd = (err?: any) => err != null ? deferred.reject(err) : deferred.resolve() + const onEnd = (err?: any): void => { err != null ? deferred.reject(err) : deferred.resolve() } const stream = createStream({ id, name, send, onEnd }) const input = randomInput() @@ -515,12 +520,12 @@ describe('stream', () => { }) it('should call onEnd with error for local error', async () => { - const send = () => { + const send = (): void => { throw new Error(`Local boom ${Date.now()}`) } const id = randomInt(1000) const deferred = defer() - const onEnd = (err?: any) => err != null ? deferred.reject(err) : deferred.resolve() + const onEnd = (err?: any): void => { err != null ? deferred.reject(err) : deferred.resolve() } const stream = createStream({ id, send, onEnd }) const input = randomInput() @@ -536,7 +541,7 @@ describe('stream', () => { it('should split writes larger than max message size', async () => { const messages: Message[] = [] - const send = (msg: Message) => { + const send = (msg: Message): void => { if (msg.type === MessageTypes.CLOSE_INITIATOR) { stream.closeRead() } else if (msg.type === MessageTypes.MESSAGE_INITIATOR) { @@ -561,7 +566,7 @@ describe('stream', () => { }) it('should error on double-sink', async () => { - const send = () => {} + const send = (): void => {} const id = randomInt(1000) const stream = createStream({ id, send }) @@ -575,7 +580,7 @@ describe('stream', () => { it('should chunk really big messages', async () => { const msgs: Message[] = [] - const mockSend = (msg: Message) => msgs.push(msg) + const mockSend = (msg: Message): void => { msgs.push(msg) } const id = randomInt(1000) const name = `STREAM${Date.now()}` const maxMsgSize = 10