From 47bc4a71a3036e8bf66c4bd5a5663f4347423d5d Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 18 Jun 2020 17:22:35 -0500 Subject: [PATCH 1/2] feat: add outbound mesh quota --- ts/heartbeat.ts | 163 ++++++++++++++++++++++++++++++++++-------- ts/index.ts | 70 ++++++++++++++++-- ts/score/peerScore.ts | 8 ++- 3 files changed, 203 insertions(+), 38 deletions(-) diff --git a/ts/heartbeat.ts b/ts/heartbeat.ts index 38acbaf8..1389ed2f 100644 --- a/ts/heartbeat.ts +++ b/ts/heartbeat.ts @@ -75,52 +75,157 @@ export class Heartbeat { // that hasn't been piggybacked since the last heartbeat this.gossipsub._flush() - /** - * @type {Map>} - */ + // cache scores throught the heartbeat + const scores = new Map() + const getScore = (id: string): number => { + let s = scores.get(id) + if (s === undefined) { + s = this.gossipsub.score.score(id) + scores.set(id, s) + } + return s + } + const tograft = new Map() const toprune = new Map() // maintain the mesh for topics we have joined this.gossipsub.mesh.forEach((peers, topic) => { + // prune/graft helper functions (defined per topic) + const prunePeer = (p: Peer): void => { + const id = p.id.toB58String() + this.gossipsub.log( + 'HEARTBEAT: Remove mesh link to %s in %s', + id, topic + ) + // update peer score + this.gossipsub.score.prune(id, topic) + // remove peer from mesh + peers.delete(p) + // add to toprune + const topics = toprune.get(p) + if (!topics) { + toprune.set(p, [topic]) + } else { + topics.push(topic) + } + } + const graftPeer = (p: Peer): void => { + const id = p.id.toB58String() + this.gossipsub.log( + 'HEARTBEAT: Add mesh link to %s in %s', + id, topic + ) + // update peer score + this.gossipsub.score.graft(id, topic) + // add peer to mesh + peers.add(p) + // add to tograft + const topics = tograft.get(p) + if (!topics) { + tograft.set(p, [topic]) + } else { + topics.push(topic) + } + } + + // drop all peers with negative score + peers.forEach(p => { + const id = p.id.toB58String() + const score = getScore(id) + if (score < 0) { + this.gossipsub.log( + 'HEARTBEAT: Prune peer %s with negative score: score=%d, topic=%s', + id, score, topic + ) + prunePeer(p) + } + }) + // do we have enough peers? if (peers.size < constants.GossipsubDlo) { const ineed = constants.GossipsubD - peers.size - const peersSet = getGossipPeers(this.gossipsub, topic, ineed) - peersSet.forEach((peer) => { - // add topic peers not already in mesh - if (peers.has(peer)) { - return - } - - this.gossipsub.log('HEARTBEAT: Add mesh link to %s in %s', peer.id.toB58String(), topic) - peers.add(peer) - const peerGrafts = tograft.get(peer) - if (!peerGrafts) { - tograft.set(peer, [topic]) - } else { - peerGrafts.push(topic) - } + const peersSet = getGossipPeers(this.gossipsub, topic, ineed, p => { + // filter out mesh peers, peers with negative score + return !peers.has(p) && getScore(p.id.toB58String()) >= 0 }) + + peersSet.forEach(graftPeer) } // do we have to many peers? if (peers.size > constants.GossipsubDhi) { - const idontneed = peers.size - constants.GossipsubD let peersArray = Array.from(peers) - peersArray = shuffle(peersArray) - peersArray = peersArray.slice(0, idontneed) + // sort by score + peersArray.sort((a, b) => getScore(b.id.toB58String()) - getScore(a.id.toB58String())) + // We keep the first D_score peers by score and the remaining up to D randomly + // under the constraint that we keep D_out peers in the mesh (if we have that many) + peersArray = peersArray.slice(0, constants.GossipsubDscore).concat( + shuffle(peersArray.slice(constants.GossipsubDscore)) + ) - peersArray.forEach((peer) => { - this.gossipsub.log('HEARTBEAT: Remove mesh link to %s in %s', peer.id.toB58String(), topic) - peers.delete(peer) - const peerPrunes = toprune.get(peer) - if (!peerPrunes) { - toprune.set(peer, [topic]) - } else { - peerPrunes.push(topic) + // count the outbound peers we are keeping + let outbound = 0 + peersArray.slice(0, constants.GossipsubD).forEach(p => { + if (this.gossipsub.outbound.get(p)) { + outbound++ } }) + + // if it's less than D_out, bubble up some outbound peers from the random selection + if (outbound < constants.GossipsubDout) { + const rotate = (i: number): void => { + // rotate the peersArray to the right and put the ith peer in the front + const p = peersArray[i] + for (let j = i; j > 0; j--) { + peersArray[j] = peersArray[j - 1] + } + peersArray[0] = p + } + + // first bubble up all outbound peers already in the selection to the front + if (outbound > 0) { + let ihave = outbound + for (let i = 1; i < constants.GossipsubD && ihave > 0; i++) { + if (this.gossipsub.outbound.get(peersArray[i])) { + rotate(i) + ihave-- + } + } + } + + // now bubble up enough outbound peers outside the selection to the front + let ineed = constants.GossipsubD - outbound + for (let i = constants.GossipsubD; i < peersArray.length && ineed > 0; i++) { + if (this.gossipsub.outbound.get(peersArray[i])) { + rotate(i) + ineed-- + } + } + } + + // prune the excess peers + peersArray.slice(0, constants.GossipsubD).forEach(prunePeer) + } + + // do we have enough outbound peers? + if (peers.size >= constants.GossipsubDlo) { + // count the outbound peers we have + let outbound = 0 + peers.forEach(p => { + if (this.gossipsub.outbound.get(p)) { + outbound++ + } + }) + + // if it's less than D_out, select some peers with outbound connections and graft them + if (outbound < constants.GossipsubDout) { + const ineed = constants.GossipsubDout - outbound + getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => { + // filter our current mesh peers and peers with negative score + return !peers.has(p) && getScore(p.id.toB58String()) >= 0 + }).forEach(graftPeer) + } } this.gossipsub._emitGossip(topic, peers) diff --git a/ts/index.ts b/ts/index.ts index 1c8eb221..db69d324 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -42,7 +42,9 @@ class Gossipsub extends BasicPubsub { lastpub: Map gossip: Map control: Map + outbound: Map score: PeerScore + _connectionManager: ConnectionManager _options: GossipOptions public static multicodec: string = constants.GossipsubIDv10 @@ -136,6 +138,13 @@ class Gossipsub extends BasicPubsub { */ this.control = new Map() + /** + * Connection direction cache, marks peers with outbound connections + * + * @type {Map} + */ + this.outbound = new Map() + /** * Use the overriden mesgIdFn or the default one. */ @@ -152,6 +161,11 @@ class Gossipsub extends BasicPubsub { */ this.heartbeat = new Heartbeat(this) + /** + * Connection manager + */ + this._connectionManager = connectionManager + /** * Peer score tracking */ @@ -170,6 +184,18 @@ class Gossipsub extends BasicPubsub { // Add to peer scoring this.score.addPeer(peerId.toB58String()) + // track the connection direction + let outbound = false + for (const c of this._connectionManager.getAll(peerId)) { + if (c.stat.direction === 'outbound') { + if (Array.from(c.registry.values()).some(rvalue => protocols.includes(rvalue.protocol))) { + outbound = true + break + } + } + } + this.outbound.set(p, outbound) + return p } @@ -198,6 +224,8 @@ class Gossipsub extends BasicPubsub { this.gossip.delete(peer) // Remove from control mapping this.control.delete(peer) + // Remove from outbound tracking + this.outbound.delete(peer) // Remove from peer scoring this.score.removePeer(peer.id.toB58String()) @@ -367,20 +395,47 @@ class Gossipsub extends BasicPubsub { */ _handleGraft (peer: Peer, graft: ControlGraft[]): ControlPrune[] | undefined { const prune: string[] = [] + const id = peer.id.toB58String() + const score = this.score.score(id) graft.forEach(({ topicID }) => { if (!topicID) { return } - const peers = this.mesh.get(topicID) - if (!peers) { + const peersInMesh = this.mesh.get(topicID) + if (!peersInMesh) { + // spam hardening: ignore GRAFTs for unknown topics + return + } + + // check if peer is already in the mesh; if so do nothing + if (peersInMesh.has(peer)) { + return + } + + // check the score + if (score < 0) { + // we don't GRAFT peers with negative score + this.log( + 'GRAFT: ignoring peer %s with negative score: score=%d, topic=%s', + id, score, topicID + ) + // we do send them PRUNE however, because it's a matter of protocol correctness prune.push(topicID) - } else { - this.log('GRAFT: Add mesh link from %s in %s', peer.id.toB58String(), topicID) - peers.add(peer) - peer.topics.add(topicID) - this.mesh.set(topicID, peers) + return + } + + // check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts + // from peers with outbound connections; this is a defensive check to restrict potential + // mesh takeover attacks combined with love bombing + if (peersInMesh.size >= constants.GossipsubDhi && !this.outbound.get(peer)) { + prune.push(topicID) + return } + + this.log('GRAFT: Add mesh link from %s in %s', id, topicID) + peersInMesh.add(peer) + peer.topics.add(topicID) }) if (!prune.length) { @@ -443,6 +498,7 @@ class Gossipsub extends BasicPubsub { this.lastpub = new Map() this.gossip = new Map() this.control = new Map() + this.outbound = new Map() } /** diff --git a/ts/score/peerScore.ts b/ts/score/peerScore.ts index c8d209cc..6ac4ad80 100644 --- a/ts/score/peerScore.ts +++ b/ts/score/peerScore.ts @@ -14,10 +14,14 @@ const log = debug('libp2p:gossipsub:score') interface Connection { remoteAddr: Multiaddr remotePeer: PeerId + stat: { + direction: 'inbound' | 'outbound' + } + registry: Map } export interface ConnectionManager { - getAll(id: string): Connection[] + getAll(peerId: PeerId): Connection[] // eslint-disable-next-line @typescript-eslint/ban-types on(evt: string, fn: Function): void // eslint-disable-next-line @typescript-eslint/ban-types @@ -518,7 +522,7 @@ export class PeerScore { * @returns {Array} */ _getIPs (id: string): string[] { - return this._connectionManager.getAll(id) + return this._connectionManager.getAll(PeerId.createFromB58String(id)) .map(c => c.remoteAddr.toOptions().host) } From 5c6df19b290b145ba78c74c81e61872fc5959c7b Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 18 Jun 2020 17:22:53 -0500 Subject: [PATCH 2/2] chore: tweak test --- test/gossip.js | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/test/gossip.js b/test/gossip.js index 7b0c2eb6..2ab9aa78 100644 --- a/test/gossip.js +++ b/test/gossip.js @@ -60,7 +60,7 @@ describe('gossip', () => { nodeA.log.restore() }) - it('should send piggyback gossip into other sent messages', async function () { + it('should send piggyback control into other sent messages', async function () { this.timeout(10000) const nodeA = nodes[0] const topic = 'Z' @@ -79,21 +79,20 @@ describe('gossip', () => { const nodeB = nodes.find((n) => n.peerId.toB58String() === peerB.id.toB58String()) // set spy - sinon.spy(nodeB, 'log') + sinon.spy(nodeA, '_piggybackControl') // manually add control message to be sent to peerB - nodeA.control.set(peerB, { graft: [{ topicID: topic }] }) + const graft = { graft: [{ topicID: topic }] } + nodeA.control.set(peerB, graft) await nodeA.publish(topic, Buffer.from('hey')) - await new Promise((resolve) => nodeA.once('gossipsub:heartbeat', resolve)) - expect(nodeB.log.callCount).to.be.gt(1) + expect(nodeA._piggybackControl.callCount).to.be.equal(1) // expect control message to be sent alongside published message - const call = nodeB.log.getCalls().find((call) => call.args[0] === 'GRAFT: Add mesh link from %s in %s') - expect(call).to.not.equal(undefined) - expect(call.args[1]).to.equal(nodeA.peerId.toB58String()) + const call = nodeA._piggybackControl.getCalls()[0] + expect(call.args[2].graft).to.deep.equal(graft.graft) // unset spy - nodeB.log.restore() + nodeA._piggybackControl.restore() }) })