From d4f1779b68e658211e7a50ba446ec479bb413d2b Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 17 Jun 2022 08:50:42 +0100 Subject: [PATCH] fix: limit stream concurrency (#77) Pass `maxInboundStreams` and `maxOutboundStreams` options to registrar --- package.json | 8 ++++---- src/index.ts | 13 +++++++++++-- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/package.json b/package.json index c2e18a7e5d..2a58770e4f 100644 --- a/package.json +++ b/package.json @@ -174,6 +174,10 @@ "dependencies": { "@libp2p/components": "^2.0.0", "@libp2p/crypto": "^1.0.0", + "@libp2p/interface-connection": "^2.0.0", + "@libp2p/interface-peer-id": "^1.0.2", + "@libp2p/interface-pubsub": "^1.0.3", + "@libp2p/interface-registrar": "^2.0.0", "@libp2p/interfaces": "^3.0.2", "@libp2p/logger": "^2.0.0", "@libp2p/peer-collections": "^1.0.0", @@ -191,10 +195,6 @@ "uint8arrays": "^3.0.0" }, "devDependencies": { - "@libp2p/interface-connection": "^2.0.0", - "@libp2p/interface-peer-id": "^1.0.2", - "@libp2p/interface-pubsub": "^1.0.1", - "@libp2p/interface-registrar": "^2.0.0", "@libp2p/peer-id-factory": "^1.0.0", "aegir": "^37.2.0", "delay": "^5.0.0", diff --git a/src/index.ts b/src/index.ts index a8d0f521ed..5f481d175d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -65,6 +65,8 @@ export abstract class PubSubBaseProtocol extends EventEmi private _registrarTopologyIds: string[] | undefined protected enabled: boolean + private readonly maxInboundStreams: number + private readonly maxOutboundStreams: number constructor (props: PubSubInit) { super() @@ -74,7 +76,9 @@ export abstract class PubSubBaseProtocol extends EventEmi globalSignaturePolicy = 'StrictSign', canRelayMessage = false, emitSelf = false, - messageProcessingConcurrency = 10 + messageProcessingConcurrency = 10, + maxInboundStreams = 1, + maxOutboundStreams = 1 } = props this.multicodecs = ensureArray(multicodecs) @@ -88,6 +92,8 @@ export abstract class PubSubBaseProtocol extends EventEmi this.emitSelf = emitSelf this.topicValidators = new Map() this.queue = new Queue({ concurrency: messageProcessingConcurrency }) + this.maxInboundStreams = maxInboundStreams + this.maxOutboundStreams = maxOutboundStreams this._onIncomingStream = this._onIncomingStream.bind(this) this._onPeerConnected = this._onPeerConnected.bind(this) @@ -115,7 +121,10 @@ export abstract class PubSubBaseProtocol extends EventEmi const registrar = this.components.getRegistrar() // Incoming streams // Called after a peer dials us - await Promise.all(this.multicodecs.map(async multicodec => await registrar.handle(multicodec, this._onIncomingStream))) + await Promise.all(this.multicodecs.map(async multicodec => await registrar.handle(multicodec, this._onIncomingStream, { + maxInboundStreams: this.maxInboundStreams, + maxOutboundStreams: this.maxOutboundStreams + }))) // register protocol with topology // Topology callbacks called on connection manager changes