Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gs1.1 pruning backoff #95

Merged
merged 2 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ts/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ export const GossipsubPrunePeers = 16
*/
export const GossipsubPruneBackoff = minute

/**
* GossipsubPruneBackoffTicks is the number of heartbeat ticks for attempting to prune expired
* backoff timers.
*/
export const GossipsubPruneBackoffTicks = 15

/**
* GossipsubConnectors controls the number of active connection attempts for peers obtained through PX.
*/
Expand Down
19 changes: 15 additions & 4 deletions ts/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ export class Heartbeat {
* @returns {void}
*/
_heartbeat (): void {
this.gossipsub.heartbeatTicks++

// cache scores throught the heartbeat
const scores = new Map<string, number>()
const getScore = (id: string): number => {
Expand All @@ -85,6 +87,9 @@ export class Heartbeat {
const tograft = new Map<Peer, string[]>()
const toprune = new Map<Peer, string[]>()

// clean up expired backoffs
this.gossipsub._clearBackoff()

// maintain the mesh for topics we have joined
this.gossipsub.mesh.forEach((peers, topic) => {
// prune/graft helper functions (defined per topic)
Expand All @@ -96,6 +101,8 @@ export class Heartbeat {
)
// update peer score
this.gossipsub.score.prune(id, topic)
// add prune backoff record
this.gossipsub._addBackoff(id, topic)
// remove peer from mesh
peers.delete(p)
// add to toprune
Expand Down Expand Up @@ -140,10 +147,12 @@ export class Heartbeat {

// do we have enough peers?
if (peers.size < constants.GossipsubDlo) {
const backoff = this.gossipsub.backoff.get(topic)
const ineed = constants.GossipsubD - peers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, p => {
// filter out mesh peers, peers with negative score
return !peers.has(p) && getScore(p.id.toB58String()) >= 0
const id = p.id.toB58String()
// filter out mesh peers, peers we are backing off, peers with negative score
return !peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
})

peersSet.forEach(graftPeer)
Expand Down Expand Up @@ -217,9 +226,11 @@ export class Heartbeat {
// if it's less than D_out, select some peers with outbound connections and graft them
if (outbound < constants.GossipsubDout) {
const ineed = constants.GossipsubDout - outbound
const backoff = this.gossipsub.backoff.get(topic)
getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => {
// filter our current mesh peers and peers with negative score
return !peers.has(p) && getScore(p.id.toB58String()) >= 0
const id = p.id.toB58String()
// filter our current mesh peers, peers we are backing off, peers with negative score
return !peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
}).forEach(graftPeer)
}
}
Expand Down
148 changes: 132 additions & 16 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ class Gossipsub extends BasicPubsub {
lastpub: Map<string, number>
gossip: Map<Peer, ControlIHave[]>
control: Map<Peer, ControlMessage>
backoff: Map<string, Map<string, number>>
outbound: Map<Peer, boolean>
score: PeerScore
heartbeatTicks: number
_connectionManager: ConnectionManager
_options: GossipOptions

Expand Down Expand Up @@ -138,6 +140,11 @@ class Gossipsub extends BasicPubsub {
*/
this.control = new Map()

/**
* Prune backoff map
*/
this.backoff = new Map()

/**
* Connection direction cache, marks peers with outbound connections
*
Expand All @@ -161,6 +168,12 @@ class Gossipsub extends BasicPubsub {
*/
this.heartbeat = new Heartbeat(this)

/**
* Number of heartbeats since the beginning of time
* This allows us to amortize some resource cleanup -- eg: backoff cleanup
*/
this.heartbeatTicks = 0

/**
* Connection manager
*/
Expand Down Expand Up @@ -224,7 +237,7 @@ class Gossipsub extends BasicPubsub {
this.gossip.delete(peer)
// Remove from control mapping
this.control.delete(peer)
// Remove from outbound tracking
// Remove from backoff mapping
this.outbound.delete(peer)

// Remove from peer scoring
Expand Down Expand Up @@ -397,6 +410,7 @@ class Gossipsub extends BasicPubsub {
const prune: string[] = []
const id = peer.id.toB58String()
const score = this.score.score(id)
const now = this._now()

graft.forEach(({ topicID }) => {
if (!topicID) {
Expand All @@ -413,6 +427,24 @@ class Gossipsub extends BasicPubsub {
return
}

// make sure we are not backing off that peer
const expire = this.backoff.get(topicID)?.get(id)
if (typeof expire === 'number' && now < expire) {
this.log('GRAFT: ignoring backed off peer %s', id)
// add behavioral penalty
this.score.addPenalty(id, 1)
// check the flood cutoff -- is the GRAFT coming too fast?
const floodCutoff = expire + constants.GossipsubGraftFloodThreshold - constants.GossipsubPruneBackoff
if (now < floodCutoff) {
// extra penalty
this.score.addPenalty(id, 1)
}
// refresh the backoff
this._addBackoff(id, topicID)
prune.push(topicID)
return
}

// check the score
if (score < 0) {
// we don't GRAFT peers with negative score
Expand All @@ -422,6 +454,8 @@ class Gossipsub extends BasicPubsub {
)
// we do send them PRUNE however, because it's a matter of protocol correctness
prune.push(topicID)
// add/refresh backoff so that we don't reGRAFT too early even if the score decays
this._addBackoff(id, topicID)
return
}

Expand All @@ -430,6 +464,7 @@ class Gossipsub extends BasicPubsub {
// mesh takeover attacks combined with love bombing
if (peersInMesh.size >= constants.GossipsubDhi && !this.outbound.get(peer)) {
prune.push(topicID)
this._addBackoff(id, topicID)
return
}

Expand All @@ -442,13 +477,7 @@ class Gossipsub extends BasicPubsub {
return
}

const buildCtrlPruneMsg = (topic: string) => {
return {
topicID: topic
}
}

return prune.map(buildCtrlPruneMsg)
return prune.map(topic => this._makePrune(id, topic))
}

/**
Expand All @@ -458,7 +487,8 @@ class Gossipsub extends BasicPubsub {
* @returns {void}
*/
_handlePrune (peer: Peer, prune: ControlPrune[]): void {
prune.forEach(({ topicID }) => {
const id = peer.id.toB58String()
prune.forEach(({ topicID, backoff }) => {
if (!topicID) {
return
}
Expand All @@ -467,6 +497,65 @@ class Gossipsub extends BasicPubsub {
this.log('PRUNE: Remove mesh link to %s in %s', peer.id.toB58String(), topicID)
peers.delete(peer)
peer.topics.delete(topicID)
// is there a backoff specified by the peer? if so obey it
if (typeof backoff === 'number' && backoff > 0) {
this._doAddBackoff(id, topicID, backoff * 1000)
} else {
this._addBackoff(id, topicID)
}
}
})
}

/**
* Add standard backoff log for a peer in a topic
* @param {string} id
* @param {string} topic
* @returns {void}
*/
_addBackoff (id: string, topic: string): void {
this._doAddBackoff(id, topic, constants.GossipsubPruneBackoff)
}

/**
* Add backoff expiry interval for a peer in a topic
* @param {string} id
* @param {string} topic
* @param {number} interval backoff duration in milliseconds
* @returns {void}
*/
_doAddBackoff (id: string, topic: string, interval: number): void {
let backoff = this.backoff.get(topic)
if (!backoff) {
backoff = new Map()
this.backoff.set(topic, backoff)
}
const expire = this._now() + interval
const existingExpire = backoff.get(id) || 0
if (existingExpire < expire) {
backoff.set(id, expire)
}
}

/**
* Clear expired backoff expiries
* @returns {void}
*/
_clearBackoff (): void {
// we only clear once every GossipsubPruneBackoffTicks ticks to avoid iterating over the maps too much
if (this.heartbeatTicks % constants.GossipsubPruneBackoffTicks !== 0) {
return
}

const now = this._now()
this.backoff.forEach((backoff, topic) => {
backoff.forEach((expire, id) => {
if (expire < now) {
backoff.delete(id)
}
})
if (backoff.size === 0) {
this.backoff.delete(topic)
}
})
}
Expand Down Expand Up @@ -498,6 +587,7 @@ class Gossipsub extends BasicPubsub {
this.lastpub = new Map()
this.gossip = new Map()
this.control = new Map()
this.backoff = new Map()
this.outbound = new Map()
}

Expand Down Expand Up @@ -693,9 +783,9 @@ class Gossipsub extends BasicPubsub {
* @returns {void}
*/
_sendPrune (peer: Peer, topic: string): void {
const prune = [{
topicID: topic
}]
const prune = [
this._makePrune(peer.id.toB58String(), topic)
]

const out = createGossipRpc([], { prune })
this._sendRpc(peer, out)
Expand Down Expand Up @@ -755,20 +845,22 @@ class Gossipsub extends BasicPubsub {
*/
_sendGraftPrune (tograft: Map<Peer, string[]>, toprune: Map<Peer, string[]>): void {
for (const [p, topics] of tograft) {
const id = p.id.toB58String()
const graft = topics.map((topicID) => ({ topicID }))
let prune: ControlPrune[] = []
// If a peer also has prunes, process them now
const pruneMsg = toprune.get(p)
if (pruneMsg) {
prune = pruneMsg.map((topicID) => ({ topicID }))
const pruning = toprune.get(p)
if (pruning) {
prune = pruning.map((topicID) => this._makePrune(id, topicID))
toprune.delete(p)
}

const outRpc = createGossipRpc([], { graft, prune })
this._sendRpc(p, outRpc)
}
for (const [p, topics] of toprune) {
const prune = topics.map((topicID) => ({ topicID }))
const id = p.id.toB58String()
const prune = topics.map((topicID) => this._makePrune(id, topicID))
const outRpc = createGossipRpc([], { prune })
this._sendRpc(p, outRpc)
}
Expand Down Expand Up @@ -877,6 +969,30 @@ class Gossipsub extends BasicPubsub {
_now (): number {
return Date.now()
}

/**
* Make a PRUNE control message for a peer in a topic
* @param {string} id
* @param {string} topic
* @returns {ControlPrune}
*/
_makePrune (id: string, topic: string): ControlPrune {
if (this.peers.get(id)!.protocols.includes(constants.GossipsubIDv10)) {
// Gossipsub v1.0 -- no backoff, the peer won't be able to parse it anyway
return {
topicID: topic,
peers: []
}
}
// backoff is measured in seconds
// GossipsubPruneBackoff is measured in milliseconds
const backoff = constants.GossipsubPruneBackoff / 1000
return {
topicID: topic,
peers: [],
backoff: backoff
}
}
}

export = Gossipsub
7 changes: 7 additions & 0 deletions ts/message/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ export interface ControlGraft {
*/
export interface ControlPrune {
topicID?: string
peers: PeerInfo[]
backoff?: number
}

export interface PeerInfo {
peerID?: Buffer
signedPeerRecord?: Buffer
}

/**
Expand Down
7 changes: 7 additions & 0 deletions ts/message/rpc.proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,12 @@ message RPC {

message ControlPrune {
optional string topicID = 1;
repeated PeerInfo peers = 2;
optional uint64 backoff = 3;
}

message PeerInfo {
optional bytes peerID = 1;
optional bytes signedPeerRecord = 2;
}
}`