Skip to content

Commit

Permalink
feat: async peerstore
Browse files Browse the repository at this point in the history
Refactors interfaces and classes used by `libp2p-interfaces` to use the async peer store from libp2p/js-libp2p#1058

Fixes a memory leak where peer data (multiaddrs, protocols, etc) is never evicted from memory.

BREAKING CHANGE: peerstore methods and pubsub start/stop are now all async
  • Loading branch information
achingbrain committed Jan 15, 2022
1 parent 38ee6e1 commit 5c3491c
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 37 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
node-version: ${{ matrix.node }}
- run: npm install
- run: npm run prebuild
- run: npx aegir test -t node --cov --bail
- run: npx aegir test -t node --cov --bail -- --exit
- uses: codecov/codecov-action@v1
test-chrome:
needs: check
Expand All @@ -46,7 +46,7 @@ jobs:
node-version: lts/*
- run: npm install
- run: npm run prebuild
- run: npx aegir test -t browser -t webworker --bail
- run: npx aegir test -t browser -t webworker --bail -- --exit
test-firefox:
needs: check
runs-on: ubuntu-latest
Expand All @@ -57,4 +57,4 @@ jobs:
node-version: lts/*
- run: npm install
- run: npm run prebuild
- run: npx aegir test -t browser -t webworker --bail -- --browser firefox
- run: npx aegir test -t browser -t webworker --bail -- --browser firefox -- --exit
10 changes: 5 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@
"denque": "^1.5.0",
"err-code": "^3.0.1",
"it-pipe": "^1.1.0",
"libp2p-interfaces": "^2.0.1",
"libp2p-interfaces": "^4.0.4",
"peer-id": "^0.16.0",
"protobufjs": "^6.11.2",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
"@chainsafe/libp2p-noise": "^4.1.1",
"@chainsafe/as-sha256": "^0.2.4",
"@dapplion/benchmark": "^0.1.6",
"@types/chai": "^4.2.3",
"@types/mocha": "^8.2.2",
"@typescript-eslint/eslint-plugin": "^3.0.2",
Expand All @@ -71,9 +72,9 @@
"eslint-plugin-promise": "^4.2.1",
"eslint-plugin-standard": "^4.0.1",
"it-pair": "^1.0.0",
"libp2p": "^0.35.0",
"libp2p-floodsub": "^0.28.0",
"libp2p-interfaces-compliance-tests": "^2.0.3",
"libp2p": "libp2p/js-libp2p#feat/async-peerstore",
"libp2p-floodsub": "^0.29.0",
"libp2p-interfaces-compliance-tests": "^4.0.6",
"libp2p-mplex": "^0.10.3",
"libp2p-websockets": "^0.16.1",
"lodash": "^4.17.15",
Expand All @@ -85,7 +86,6 @@
"promisify-es6": "^1.0.3",
"sinon": "^11.1.1",
"time-cache": "^0.3.0",
"@dapplion/benchmark": "^0.1.6",
"typescript": "4.0.x",
"util": "^0.12.3"
},
Expand Down
4 changes: 3 additions & 1 deletion test/go-gossipsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
* These tests were translated from:
* https://github.com/libp2p/go-libp2p-pubsub/blob/master/gossipsub_test.go
*/
const { expect } = require('chai')
const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const delay = require('delay')
const errcode = require('err-code')
const sinon = require('sinon')
Expand Down
3 changes: 1 addition & 2 deletions test/time-cache.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ describe("SimpleTimeCache", () => {
sandbox.restore()
})


it("should delete items after 1sec", () => {
timeCache.put("aFirst")
timeCache.put("bFirst")
Expand All @@ -25,7 +24,7 @@ describe("SimpleTimeCache", () => {
expect(timeCache.has("bFirst")).to.be.true
expect(timeCache.has("cFirst")).to.be.true

sandbox.clock.tick(validityMs)
sandbox.clock.tick(validityMs + 1)

timeCache.put("aSecond")
timeCache.put("bSecond")
Expand Down
4 changes: 2 additions & 2 deletions test/utils/create-gossipsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ const {
*/
async function startNode(gs) {
await gs._libp2p.start()
gs.start()
await gs.start()
}

/**
* Stop node - gossipsub + libp2p
*/
async function stopNode(gs) {
await gs._libp2p.stop()
gs.stop()
await gs.stop()
}

async function connectGossipsub (gs1, gs2) {
Expand Down
2 changes: 1 addition & 1 deletion test/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const createFloodsubNode = async (libp2p, shouldStart = false, options) => {

if (shouldStart) {
await libp2p.start()
fs.start()
await fs.start()
}

return fs
Expand Down
46 changes: 23 additions & 23 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ class Gossipsub extends Pubsub {
async _processRpc (id: string, peerStreams: PeerStreams, rpc: RPC): Promise<boolean> {
if (await super._processRpc(id, peerStreams, rpc)) {
if (rpc.control) {
this._processRpcControlMessage(id, rpc.control)
await this._processRpcControlMessage(id, rpc.control)
}
return true
}
Expand All @@ -434,14 +434,14 @@ class Gossipsub extends Pubsub {
* @param {RPC.IControlMessage} controlMsg
* @returns {void}
*/
_processRpcControlMessage (id: string, controlMsg: RPC.IControlMessage): void {
async _processRpcControlMessage (id: string, controlMsg: RPC.IControlMessage): Promise<void> {
if (!controlMsg) {
return
}

const iwant = controlMsg.ihave ? this._handleIHave(id, controlMsg.ihave) : []
const ihave = controlMsg.iwant ? this._handleIWant(id, controlMsg.iwant) : []
const prune = controlMsg.graft ? this._handleGraft(id, controlMsg.graft) : []
const prune = controlMsg.graft ? await this._handleGraft(id, controlMsg.graft) : []
controlMsg.prune && this._handlePrune(id, controlMsg.prune)

if (!iwant.length && !ihave.length && !prune.length) {
Expand Down Expand Up @@ -682,9 +682,9 @@ class Gossipsub extends Pubsub {
* Handles Graft messages
* @param {string} id peer id
* @param {Array<RPC.IControlGraft>} graft
* @return {Array<RPC.IControlPrune>}
* @return {Promise<RPC.IControlPrune[]>}
*/
_handleGraft (id: string, graft: RPC.IControlGraft[]): RPC.IControlPrune[] {
async _handleGraft (id: string, graft: RPC.IControlGraft[]): Promise<RPC.IControlPrune[]> {
const prune: string[] = []
const score = this.score.score(id)
const now = this._now()
Expand Down Expand Up @@ -771,7 +771,7 @@ class Gossipsub extends Pubsub {
return []
}

return prune.map(topic => this._makePrune(id, topic, doPX))
return Promise.all(prune.map(topic => this._makePrune(id, topic, doPX)))
}

/**
Expand Down Expand Up @@ -969,10 +969,10 @@ class Gossipsub extends Pubsub {
* Mounts the gossipsub protocol onto the libp2p node and sends our
* our subscriptions to every peer connected
* @override
* @returns {void}
* @returns {Promise<void>}
*/
start (): void {
super.start()
async start (): Promise<void> {
await super.start()
this.heartbeat.start()
this.score.start()
// connect to direct peers
Expand All @@ -986,10 +986,10 @@ class Gossipsub extends Pubsub {
/**
* Unmounts the gossipsub protocol and shuts down every connection
* @override
* @returns {void}
* @returns {Promise<void>}
*/
stop (): void {
super.stop()
async stop (): Promise<void> {
await super.stop()
this.heartbeat.stop()
this.score.stop()

Expand Down Expand Up @@ -1244,11 +1244,11 @@ class Gossipsub extends Pubsub {
* Sends a PRUNE message to a peer
* @param {string} id peer id
* @param {string} topic
* @returns {void}
* @returns {Promise<void>}
*/
_sendPrune (id: string, topic: string): void {
async _sendPrune (id: string, topic: string): Promise<void> {
const prune = [
this._makePrune(id, topic, this._options.doPX)
await this._makePrune(id, topic, this._options.doPX)
]

const out = createGossipRpc([], { prune })
Expand Down Expand Up @@ -1311,23 +1311,23 @@ class Gossipsub extends Pubsub {
* @param {Map<string, Array<string>>} tograft peer id => topic[]
* @param {Map<string, Array<string>>} toprune peer id => topic[]
*/
_sendGraftPrune (tograft: Map<string, string[]>, toprune: Map<string, string[]>, noPX: Map<string, boolean>): void {
async _sendGraftPrune (tograft: Map<string, string[]>, toprune: Map<string, string[]>, noPX: Map<string, boolean>): Promise<void> {
const doPX = this._options.doPX
for (const [id, topics] of tograft) {
const graft = topics.map((topicID) => ({ topicID }))
let prune: RPC.IControlPrune[] = []
// If a peer also has prunes, process them now
const pruning = toprune.get(id)
if (pruning) {
prune = pruning.map((topicID) => this._makePrune(id, topicID, doPX && !noPX.get(id)))
prune = await Promise.all(pruning.map((topicID) => this._makePrune(id, topicID, doPX && !noPX.get(id))))
toprune.delete(id)
}

const outRpc = createGossipRpc([], { graft, prune })
this._sendRpc(id, outRpc)
}
for (const [id, topics] of toprune) {
const prune = topics.map((topicID) => this._makePrune(id, topicID, doPX && !noPX.get(id)))
const prune = await Promise.all(topics.map((topicID) => this._makePrune(id, topicID, doPX && !noPX.get(id))))
const outRpc = createGossipRpc([], { prune })
this._sendRpc(id, outRpc)
}
Expand Down Expand Up @@ -1448,9 +1448,9 @@ class Gossipsub extends Pubsub {
* @param {string} id
* @param {string} topic
* @param {boolean} doPX
* @returns {RPC.IControlPrune}
* @returns {Promise<RPC.IControlPrune>}
*/
_makePrune (id: string, topic: string, doPX: boolean): RPC.IControlPrune {
async _makePrune (id: string, topic: string, doPX: boolean): Promise<RPC.IControlPrune> {
if (this.peers.get(id)!.protocol === constants.GossipsubIDv10) {
// Gossipsub v1.0 -- no backoff, the peer won't be able to parse it anyway
return {
Expand All @@ -1467,17 +1467,17 @@ class Gossipsub extends Pubsub {
const peers = getGossipPeers(this, topic, constants.GossipsubPrunePeers, (xid: string): boolean => {
return xid !== id && this.score.score(xid) >= 0
})
peers.forEach(p => {
for (const p of peers) {
// see if we have a signed record to send back; if we don't, just send
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
// unsigned address records through PX anyways
// Finding signed records in the DHT is not supported at the time of writing in js-libp2p
const peerId = PeerId.createFromB58String(p)
px.push({
peerID: peerId.toBytes(),
signedPeerRecord: this._libp2p.peerStore.addressBook.getRawEnvelope(peerId)
signedPeerRecord: await this._libp2p.peerStore.addressBook.getRawEnvelope(peerId)
})
})
}
}
return {
topicID: topic,
Expand Down

0 comments on commit 5c3491c

Please sign in to comment.