Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
refactor: async it
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Nov 21, 2019
1 parent eee2f61 commit 0fc408a
Show file tree
Hide file tree
Showing 24 changed files with 650 additions and 531 deletions.
13 changes: 6 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@
"hashlru": "^2.3.0",
"heap": "~0.2.6",
"interface-datastore": "~0.8.0",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.1.0",
"k-bucket": "^5.0.0",
"libp2p-crypto": "~0.17.1",
"libp2p-interfaces": "^0.1.5",
"libp2p-record": "~0.7.0",
"multihashes": "~0.4.15",
"multihashing-async": "~0.8.0",
Expand All @@ -57,26 +60,22 @@
"p-queue": "^6.2.1",
"p-timeout": "^3.2.0",
"p-times": "^2.1.0",
"paramap-it": "^0.1.1",
"peer-id": "~0.13.5",
"peer-info": "~0.17.0",
"promise-to-callback": "^1.0.0",
"promisify-es6": "^1.0.3",
"protons": "^1.0.1",
"pull-length-prefixed": "^1.3.3",
"pull-stream": "^3.6.14",
"varint": "^5.0.0",
"xor-distance": "^2.0.0"
},
"devDependencies": {
"aegir": "^20.4.1",
"async-iterator-all": "^1.0.0",
"chai": "^4.2.0",
"datastore-level": "~0.12.1",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"interface-connection": "~0.3.3",
"libp2p-mplex": "~0.8.5",
"libp2p-switch": "~0.42.7",
"libp2p-tcp": "~0.13.0",
"it-pair": "^1.0.0",
"lodash": "^4.17.11",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
Expand Down
10 changes: 5 additions & 5 deletions src/content-routing/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ module.exports = (dht) => {

provs.forEach((id) => {
let info
if (dht.peerBook.has(id)) {
info = dht.peerBook.get(id)
if (dht.peerStore.has(id)) { // TODO: add support for has
info = dht.peerStore.get(id)
} else {
info = dht.peerBook.put(new PeerInfo(id))
info = dht.peerStore.put(new PeerInfo(id))
}
out.push(info)
})
Expand All @@ -110,7 +110,7 @@ module.exports = (dht) => {
dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length)

provs.forEach((prov) => {
pathProviders.push(dht.peerBook.put(prov))
pathProviders.push(dht.peerStore.put(prov))
})

// hooray we have all that we want
Expand All @@ -131,7 +131,7 @@ module.exports = (dht) => {
providerTimeout
)
} catch (err) {
if (err !== pTimeout.TimeoutError) {
if (err.name !== pTimeout.TimeoutError.name) {
throw err
}
} finally {
Expand Down
67 changes: 37 additions & 30 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ class KadDHT extends EventEmitter {
/**
* Create a new KadDHT.
* @param {Object} props
* @param {Switch} props.sw libp2p-switch instance
* @param {Dialer} props.dialer libp2p dialer instance
* @param {PeerInfo} props.peerInfo peer's peerInfo
* @param {Object} props.registrar registrar for libp2p protocols
* @param {PeerStore} props.peerStore libp2p peerStore
* @param {Object} props.registrar libp2p registrar instance
* @param {function} props.registrar.handle
* @param {function} props.registrar.register
* @param {function} props.registrar.unregister
Expand All @@ -54,7 +55,10 @@ class KadDHT extends EventEmitter {
* @param {randomWalkOptions} options.randomWalk randomWalk options
*/
constructor ({
sw,
dialer,
peerInfo,
peerStore,
registrar,
datastore = new MemoryDatastore(),
kBucketSize = c.K,
concurrency = c.ALPHA,
Expand All @@ -63,14 +67,31 @@ class KadDHT extends EventEmitter {
randomWalk = {}
}) {
super()
assert(sw, 'libp2p-kad-dht requires a instance of Switch')
assert(dialer, 'libp2p-kad-dht requires a instance of Dialer')

/**
* Local reference to the libp2p-switch instance
*
* @type {Switch}
* Local reference to the libp2p dialer instance
* @type {Dialer}
*/
this.dialer = dialer

/**
* Local peer info
* @type {PeerInfo}
*/
this.peerInfo = peerInfo

/**
* Local peer info
* @type {PeerStore}
*/
this.peerStore = peerStore

/**
* Local peer info
* @type {Registrar}
*/
this.switch = sw
this.registrar = registrar

/**
* k-bucket size
Expand Down Expand Up @@ -141,6 +162,8 @@ class KadDHT extends EventEmitter {
*/
this._queryManager = new QueryManager()

this._running = false

// DHT components
this.contentFetching = contentFetching(this)
this.contentRouting = contentRouting(this)
Expand All @@ -155,22 +178,6 @@ class KadDHT extends EventEmitter {
return this._running
}

/**
* Local peer (yourself)
* @type {PeerInfo}
*/
get peerInfo () {
return this.switch._peerInfo
}

/**
* Peerbook
* @type {PeerBook}
*/
get peerBook () {
return this.switch._peerBook
}

/**
* Start listening to incoming connections.
* @returns {Promise<void>}
Expand Down Expand Up @@ -312,10 +319,10 @@ class KadDHT extends EventEmitter {
const ids = this.routingTable.closestPeers(key, this.kBucketSize)

return ids.map((p) => {
if (this.peerBook.has(p)) {
return this.peerBook.get(p)
if (this.peerStore.has(p)) {
return this.peerStore.get(p)
}
return this.peerBook.put(new PeerInfo(p))
return this.peerStore.put(new PeerInfo(p))
})
}

Expand Down Expand Up @@ -390,15 +397,14 @@ class KadDHT extends EventEmitter {
}

/**
* Add the peer to the routing table and update it in the peerbook.
* Add the peer to the routing table and update it in the peerStore.
*
* @param {PeerInfo} peer
* @returns {Promise<void>}
* @private
*/

async _add (peer) {
peer = this.peerBook.put(peer)
await this.routingTable.add(peer.id)
}

Expand Down Expand Up @@ -455,7 +461,7 @@ class KadDHT extends EventEmitter {
* Query a particular peer for the value for the given key.
* It will either return the value or a list of closer peers.
*
* Note: The peerbook is updated with new addresses found for the given peer.
* Note: The peerStore is updated with new addresses found for the given peer.
*
* @param {PeerId} peer
* @param {Buffer} key
Expand Down Expand Up @@ -518,3 +524,4 @@ class KadDHT extends EventEmitter {
}

module.exports = KadDHT
module.exports.multicodec = c.PROTOCOL_DHT
Loading

0 comments on commit 0fc408a

Please sign in to comment.