Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gs1.1 outbound mesh quota #91

Merged
merged 2 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions test/gossip.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ describe('gossip', () => {
nodeA.log.restore()
})

it('should send piggyback gossip into other sent messages', async function () {
it('should send piggyback control into other sent messages', async function () {
this.timeout(10000)
const nodeA = nodes[0]
const topic = 'Z'
Expand All @@ -79,21 +79,20 @@ describe('gossip', () => {
const nodeB = nodes.find((n) => n.peerId.toB58String() === peerB.id.toB58String())

// set spy
sinon.spy(nodeB, 'log')
sinon.spy(nodeA, '_piggybackControl')

// manually add control message to be sent to peerB
nodeA.control.set(peerB, { graft: [{ topicID: topic }] })
const graft = { graft: [{ topicID: topic }] }
nodeA.control.set(peerB, graft)

await nodeA.publish(topic, Buffer.from('hey'))

await new Promise((resolve) => nodeA.once('gossipsub:heartbeat', resolve))
expect(nodeB.log.callCount).to.be.gt(1)
expect(nodeA._piggybackControl.callCount).to.be.equal(1)
// expect control message to be sent alongside published message
const call = nodeB.log.getCalls().find((call) => call.args[0] === 'GRAFT: Add mesh link from %s in %s')
expect(call).to.not.equal(undefined)
expect(call.args[1]).to.equal(nodeA.peerId.toB58String())
const call = nodeA._piggybackControl.getCalls()[0]
expect(call.args[2].graft).to.deep.equal(graft.graft)

// unset spy
nodeB.log.restore()
nodeA._piggybackControl.restore()
})
})
163 changes: 134 additions & 29 deletions ts/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,52 +75,157 @@ export class Heartbeat {
// that hasn't been piggybacked since the last heartbeat
this.gossipsub._flush()

/**
* @type {Map<Peer, Array<String>>}
*/
// cache scores throught the heartbeat
const scores = new Map<string, number>()
const getScore = (id: string): number => {
let s = scores.get(id)
if (s === undefined) {
s = this.gossipsub.score.score(id)
scores.set(id, s)
}
return s
}

const tograft = new Map<Peer, string[]>()
const toprune = new Map<Peer, string[]>()

// maintain the mesh for topics we have joined
this.gossipsub.mesh.forEach((peers, topic) => {
// prune/graft helper functions (defined per topic)
const prunePeer = (p: Peer): void => {
const id = p.id.toB58String()
this.gossipsub.log(
'HEARTBEAT: Remove mesh link to %s in %s',
id, topic
)
// update peer score
this.gossipsub.score.prune(id, topic)
// remove peer from mesh
peers.delete(p)
// add to toprune
const topics = toprune.get(p)
if (!topics) {
toprune.set(p, [topic])
} else {
topics.push(topic)
}
}
const graftPeer = (p: Peer): void => {
const id = p.id.toB58String()
this.gossipsub.log(
'HEARTBEAT: Add mesh link to %s in %s',
id, topic
)
// update peer score
this.gossipsub.score.graft(id, topic)
// add peer to mesh
peers.add(p)
// add to tograft
const topics = tograft.get(p)
if (!topics) {
tograft.set(p, [topic])
} else {
topics.push(topic)
}
}

// drop all peers with negative score
peers.forEach(p => {
const id = p.id.toB58String()
const score = getScore(id)
if (score < 0) {
this.gossipsub.log(
'HEARTBEAT: Prune peer %s with negative score: score=%d, topic=%s',
id, score, topic
)
prunePeer(p)
}
})

// do we have enough peers?
if (peers.size < constants.GossipsubDlo) {
const ineed = constants.GossipsubD - peers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed)
peersSet.forEach((peer) => {
// add topic peers not already in mesh
if (peers.has(peer)) {
return
}

this.gossipsub.log('HEARTBEAT: Add mesh link to %s in %s', peer.id.toB58String(), topic)
peers.add(peer)
const peerGrafts = tograft.get(peer)
if (!peerGrafts) {
tograft.set(peer, [topic])
} else {
peerGrafts.push(topic)
}
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, p => {
// filter out mesh peers, peers with negative score
return !peers.has(p) && getScore(p.id.toB58String()) >= 0
})

peersSet.forEach(graftPeer)
}

// do we have to many peers?
if (peers.size > constants.GossipsubDhi) {
const idontneed = peers.size - constants.GossipsubD
let peersArray = Array.from(peers)
peersArray = shuffle(peersArray)
peersArray = peersArray.slice(0, idontneed)
// sort by score
peersArray.sort((a, b) => getScore(b.id.toB58String()) - getScore(a.id.toB58String()))
// We keep the first D_score peers by score and the remaining up to D randomly
// under the constraint that we keep D_out peers in the mesh (if we have that many)
peersArray = peersArray.slice(0, constants.GossipsubDscore).concat(
shuffle(peersArray.slice(constants.GossipsubDscore))
)

peersArray.forEach((peer) => {
this.gossipsub.log('HEARTBEAT: Remove mesh link to %s in %s', peer.id.toB58String(), topic)
peers.delete(peer)
const peerPrunes = toprune.get(peer)
if (!peerPrunes) {
toprune.set(peer, [topic])
} else {
peerPrunes.push(topic)
// count the outbound peers we are keeping
let outbound = 0
peersArray.slice(0, constants.GossipsubD).forEach(p => {
if (this.gossipsub.outbound.get(p)) {
outbound++
}
})

// if it's less than D_out, bubble up some outbound peers from the random selection
if (outbound < constants.GossipsubDout) {
const rotate = (i: number): void => {
// rotate the peersArray to the right and put the ith peer in the front
const p = peersArray[i]
for (let j = i; j > 0; j--) {
peersArray[j] = peersArray[j - 1]
}
peersArray[0] = p
}

// first bubble up all outbound peers already in the selection to the front
if (outbound > 0) {
let ihave = outbound
for (let i = 1; i < constants.GossipsubD && ihave > 0; i++) {
if (this.gossipsub.outbound.get(peersArray[i])) {
rotate(i)
ihave--
}
}
}

// now bubble up enough outbound peers outside the selection to the front
let ineed = constants.GossipsubD - outbound
for (let i = constants.GossipsubD; i < peersArray.length && ineed > 0; i++) {
if (this.gossipsub.outbound.get(peersArray[i])) {
rotate(i)
ineed--
}
}
}

// prune the excess peers
peersArray.slice(0, constants.GossipsubD).forEach(prunePeer)
}

// do we have enough outbound peers?
if (peers.size >= constants.GossipsubDlo) {
// count the outbound peers we have
let outbound = 0
peers.forEach(p => {
if (this.gossipsub.outbound.get(p)) {
outbound++
}
})

// if it's less than D_out, select some peers with outbound connections and graft them
if (outbound < constants.GossipsubDout) {
const ineed = constants.GossipsubDout - outbound
getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => {
// filter our current mesh peers and peers with negative score
return !peers.has(p) && getScore(p.id.toB58String()) >= 0
}).forEach(graftPeer)
}
}

this.gossipsub._emitGossip(topic, peers)
Expand Down
70 changes: 63 additions & 7 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ class Gossipsub extends BasicPubsub {
lastpub: Map<string, number>
gossip: Map<Peer, ControlIHave[]>
control: Map<Peer, ControlMessage>
outbound: Map<Peer, boolean>
score: PeerScore
_connectionManager: ConnectionManager
_options: GossipOptions

public static multicodec: string = constants.GossipsubIDv10
Expand Down Expand Up @@ -136,6 +138,13 @@ class Gossipsub extends BasicPubsub {
*/
this.control = new Map()

/**
* Connection direction cache, marks peers with outbound connections
*
* @type {Map<Peer, boolean>}
*/
this.outbound = new Map()

/**
* Use the overriden mesgIdFn or the default one.
*/
Expand All @@ -152,6 +161,11 @@ class Gossipsub extends BasicPubsub {
*/
this.heartbeat = new Heartbeat(this)

/**
* Connection manager
*/
this._connectionManager = connectionManager

/**
* Peer score tracking
*/
Expand All @@ -170,6 +184,18 @@ class Gossipsub extends BasicPubsub {
// Add to peer scoring
this.score.addPeer(peerId.toB58String())

// track the connection direction
let outbound = false
for (const c of this._connectionManager.getAll(peerId)) {
if (c.stat.direction === 'outbound') {
if (Array.from(c.registry.values()).some(rvalue => protocols.includes(rvalue.protocol))) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't great, not sure what the best way to do this is.
Trying to ensure that gossipsub is one of the protocols of one of the streams on the connection (considering that there may be multiple connections for a peer, not all containing a gossipsub stream)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

humm, yeah!
Maybe the connection could provide an utility API to check if it has open streams using a given protocol.
I will add to my TODO list to consider and implement it. Meanwhile, we can go this way

outbound = true
break
}
}
}
this.outbound.set(p, outbound)

return p
}

Expand Down Expand Up @@ -198,6 +224,8 @@ class Gossipsub extends BasicPubsub {
this.gossip.delete(peer)
// Remove from control mapping
this.control.delete(peer)
// Remove from outbound tracking
this.outbound.delete(peer)

// Remove from peer scoring
this.score.removePeer(peer.id.toB58String())
Expand Down Expand Up @@ -367,20 +395,47 @@ class Gossipsub extends BasicPubsub {
*/
_handleGraft (peer: Peer, graft: ControlGraft[]): ControlPrune[] | undefined {
const prune: string[] = []
const id = peer.id.toB58String()
const score = this.score.score(id)

graft.forEach(({ topicID }) => {
if (!topicID) {
return
}
const peers = this.mesh.get(topicID)
if (!peers) {
const peersInMesh = this.mesh.get(topicID)
if (!peersInMesh) {
// spam hardening: ignore GRAFTs for unknown topics
return
}

// check if peer is already in the mesh; if so do nothing
if (peersInMesh.has(peer)) {
return
}

// check the score
if (score < 0) {
// we don't GRAFT peers with negative score
this.log(
'GRAFT: ignoring peer %s with negative score: score=%d, topic=%s',
id, score, topicID
)
// we do send them PRUNE however, because it's a matter of protocol correctness
prune.push(topicID)
} else {
this.log('GRAFT: Add mesh link from %s in %s', peer.id.toB58String(), topicID)
peers.add(peer)
peer.topics.add(topicID)
this.mesh.set(topicID, peers)
return
}

// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
// from peers with outbound connections; this is a defensive check to restrict potential
// mesh takeover attacks combined with love bombing
if (peersInMesh.size >= constants.GossipsubDhi && !this.outbound.get(peer)) {
prune.push(topicID)
return
}

this.log('GRAFT: Add mesh link from %s in %s', id, topicID)
peersInMesh.add(peer)
peer.topics.add(topicID)
})

if (!prune.length) {
Expand Down Expand Up @@ -443,6 +498,7 @@ class Gossipsub extends BasicPubsub {
this.lastpub = new Map()
this.gossip = new Map()
this.control = new Map()
this.outbound = new Map()
}

/**
Expand Down
8 changes: 6 additions & 2 deletions ts/score/peerScore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ const log = debug('libp2p:gossipsub:score')
interface Connection {
remoteAddr: Multiaddr
remotePeer: PeerId
stat: {
direction: 'inbound' | 'outbound'
}
registry: Map<string, {protocol: string}>
}

export interface ConnectionManager {
getAll(id: string): Connection[]
getAll(peerId: PeerId): Connection[]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a mistake that got through, fixed here and below

// eslint-disable-next-line @typescript-eslint/ban-types
on(evt: string, fn: Function): void
// eslint-disable-next-line @typescript-eslint/ban-types
Expand Down Expand Up @@ -518,7 +522,7 @@ export class PeerScore {
* @returns {Array<string>}
*/
_getIPs (id: string): string[] {
return this._connectionManager.getAll(id)
return this._connectionManager.getAll(PeerId.createFromB58String(id))
.map(c => c.remoteAddr.toOptions().host)
}

Expand Down