diff --git a/package.json b/package.json index c7246459..554f650f 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "lint": "aegir lint", "test": "aegir test -t node", "test:node": "aegir test -t node", - "build": "aegir build", + "prepare": "aegir build --no-bundle", "docs": "aegir docs", "release": "aegir release --docs -t node", "release-minor": "aegir release --type minor --docs -t node", @@ -39,25 +39,27 @@ "node": ">=12.0.0", "npm": ">=6.0.0" }, + "eslintConfig": { + "extends": "ipfs" + }, "homepage": "https://github.com/libp2p/js-libp2p-kad-dht", "dependencies": { "abort-controller": "^3.0.0", - "async": "^2.6.2", - "base32.js": "~0.1.0", "cids": "^1.1.5", "debug": "^4.3.1", - "err-code": "^2.0.3", + "err-code": "^3.0.0", "hashlru": "^2.3.0", "heap": "~0.2.6", - "interface-datastore": "^3.0.3", + "interface-datastore": "ipfs/interface-datastore#fix/remove-ts-file", + "it-first": "^1.0.4", "it-length-prefixed": "^3.1.0", "it-pipe": "^1.1.0", "k-bucket": "^5.0.0", "libp2p-crypto": "^0.19.0", "libp2p-interfaces": "^0.8.2", - "libp2p-record": "^0.9.0", + "libp2p-record": "^0.10.0", "multiaddr": "^8.1.2", - "multihashing-async": "^2.0.1", + "multihashing-async": "^2.1.0", "p-filter": "^2.1.0", "p-map": "^4.0.0", "p-queue": "^6.6.2", @@ -68,19 +70,20 @@ "protons": "^2.0.0", "streaming-iterables": "^5.0.4", "uint8arrays": "^2.0.5", - "varint": "^5.0.0", + "varint": "^6.0.0", "xor-distance": "^2.0.0" }, "devDependencies": { - "aegir": "^25.0.0", + "@types/debug": "^4.1.5", + "aegir": "^30.3.0", "async-iterator-all": "^1.0.0", "chai": "^4.2.0", "chai-checkmark": "^1.0.1", - "datastore-level": "^2.0.0", - "delay": "^4.3.0", + "datastore-level": "^4.0.0", + "delay": "^5.0.0", "dirty-chai": "^2.0.1", "it-pair": "^1.0.0", - "libp2p": "^0.28.5", + "libp2p": "^0.30.7", "lodash": "^4.17.11", "lodash.random": "^3.2.0", "lodash.range": "^3.2.0", diff --git a/src/content-fetching/index.js b/src/content-fetching/index.js index ea1c43af..46e0847c 100644 --- a/src/content-fetching/index.js +++ b/src/content-fetching/index.js @@ -3,16 +3,26 @@ const errcode = require('err-code') const pTimeout = require('p-timeout') const uint8ArrayEquals = require('uint8arrays/equals') +const uint8ArrayToString = require('uint8arrays/to-string') const libp2pRecord = require('libp2p-record') - const c = require('../constants') const Query = require('../query') - const utils = require('../utils') - const Record = libp2pRecord.Record +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('../query').DHTQueryResult} DHTQueryResult + */ + +/** + * @param {import('../')} dht + */ module.exports = (dht) => { + /** + * @param {Uint8Array} key + * @param {Uint8Array} rec + */ const putLocal = async (key, rec) => { // eslint-disable-line require-await return dht.datastore.put(utils.bufferToKey(key), rec) } @@ -22,18 +32,17 @@ module.exports = (dht) => { * the local datastore. * * @param {Uint8Array} key - * @returns {Promise} - * - * @private */ const getLocal = async (key) => { - dht._log('getLocal %b', key) + dht._log(`getLocal ${uint8ArrayToString(key, 'base32')}`) const raw = await dht.datastore.get(utils.bufferToKey(key)) - dht._log('found %b in local datastore', key) + dht._log(`found ${uint8ArrayToString(key, 'base32')} in local datastore`) + const rec = Record.deserialize(raw) await dht._verifyRecordLocally(rec) + return rec } @@ -41,11 +50,8 @@ module.exports = (dht) => { * Send the best record found to any peers that have an out of date record. * * @param {Uint8Array} key - * @param {Array} vals - values retrieved from the DHT - * @param {Object} best - the best record that was found - * @returns {Promise} - * - * @private + * @param {import('../query').DHTQueryValue[]} vals - values retrieved from the DHT + * @param {Uint8Array} best - the best record that was found */ const sendCorrectionRecord = async (key, vals, best) => { const fixupRec = await utils.createPutRecord(key, best) @@ -78,10 +84,9 @@ module.exports = (dht) => { return { /** * Store the given key/value pair locally, in the datastore. + * * @param {Uint8Array} key * @param {Uint8Array} rec - encoded record - * @returns {Promise} - * @private */ async _putLocal (key, rec) { // eslint-disable-line require-await return putLocal(key, rec) @@ -92,9 +97,8 @@ module.exports = (dht) => { * * @param {Uint8Array} key * @param {Uint8Array} value - * @param {Object} [options] - put options + * @param {object} [options] - put options * @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length) - * @returns {Promise} */ async put (key, value, options = {}) { dht._log('PutValue %b', key) @@ -134,9 +138,8 @@ module.exports = (dht) => { * Times out after 1 minute by default. * * @param {Uint8Array} key - * @param {Object} [options] - get options + * @param {object} [options] - get options * @param {number} [options.timeout] - optional timeout (default: 60000) - * @returns {Promise} */ async get (key, options = {}) { options.timeout = options.timeout || c.minute @@ -173,16 +176,15 @@ module.exports = (dht) => { * * @param {Uint8Array} key * @param {number} nvals - * @param {Object} [options] - get options + * @param {object} [options] - get options * @param {number} [options.timeout] - optional timeout (default: 60000) - * @returns {Promise>} */ async getMany (key, nvals, options = {}) { options.timeout = options.timeout || c.minute dht._log('getMany %b (%s)', key, nvals) - let vals = [] + const vals = [] let localRec try { @@ -204,9 +206,8 @@ module.exports = (dht) => { return vals } - const paths = [] const id = await utils.convertBuffer(key) - const rtp = dht.routingTable.closestPeers(id, this.kBucketSize) + const rtp = dht.routingTable.closestPeers(id, dht.kBucketSize) dht._log('peers in rt: %d', rtp.length) @@ -220,15 +221,23 @@ module.exports = (dht) => { return vals } - // we have peers, lets do the actual query to them - const query = new Query(dht, key, (pathIndex, numPaths) => { - // This function body runs once per disjoint path - const pathSize = utils.pathSize(nvals - vals.length, numPaths) - const pathVals = [] - paths.push(pathVals) + const valsLength = vals.length - // Here we return the query function to use on this particular disjoint path - return async (peer) => { + /** + * @param {number} pathIndex + * @param {number} numPaths + */ + function createQuery (pathIndex, numPaths) { + // This function body runs once per disjoint path + const pathSize = utils.pathSize(nvals - valsLength, numPaths) + let queryResults = 0 + + /** + * Here we return the query function to use on this particular disjoint path + * + * @param {PeerId} peer + */ + async function disjointPathQuery (peer) { let rec, peers, lookupErr try { const results = await dht._getValueOrPeers(peer, key) @@ -242,37 +251,49 @@ module.exports = (dht) => { lookupErr = err } - const res = { closerPeers: peers } + /** @type {import('../query').QueryResult} */ + const res = { + closerPeers: peers + } + + if (rec && rec.value) { + vals.push({ + val: rec.value, + from: peer + }) - if ((rec && rec.value) || lookupErr) { - pathVals.push({ - val: rec && rec.value, + queryResults++ + } else if (lookupErr) { + vals.push({ + err: lookupErr, from: peer }) + + queryResults++ } // enough is enough - if (pathVals.length >= pathSize) { + if (queryResults >= pathSize) { res.pathComplete = true } return res } - }) - let error - try { - await pTimeout(query.run(rtp), options.timeout) - } catch (err) { - error = err + return disjointPathQuery } - query.stop() - // combine vals from each path - vals = [].concat.apply(vals, paths).slice(0, nvals) + // we have peers, lets send the actual query to them + const query = new Query(dht, key, createQuery) - if (error && vals.length === 0) { - throw error + try { + await pTimeout(query.run(rtp), options.timeout) + } catch (err) { + if (vals.length === 0) { + throw err + } + } finally { + query.stop() } return vals diff --git a/src/content-routing/index.js b/src/content-routing/index.js index ba68b1a1..0d93ea55 100644 --- a/src/content-routing/index.js +++ b/src/content-routing/index.js @@ -9,13 +9,21 @@ const Message = require('../message') const Query = require('../query') const utils = require('../utils') +/** + * @typedef {import('cids')} CID + * @typedef {import('peer-id')} PeerId + * @typedef {import('multiaddr')} Multiaddr + */ + +/** + * @param {import('../')} dht + */ module.exports = (dht) => { /** * Check for providers from a single node. * * @param {PeerId} peer * @param {CID} key - * @returns {Promise} * * @private */ @@ -26,13 +34,14 @@ module.exports = (dht) => { return { /** - * Announce to the network that we can provide given key's value. + * Announce to the network that we can provide the value for a given key + * * @param {CID} key - * @returns {Promise} */ async provide (key) { - dht._log('provide: %s', key.toBaseEncodedString()) + dht._log(`provide: ${key}`) + /** @type {Error[]} */ const errors = [] // Add peer as provider @@ -45,49 +54,65 @@ module.exports = (dht) => { multiaddrs }] - // Notify closest peers - await utils.mapParallel(dht.getClosestPeers(key.bytes), async (peer) => { - dht._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String()) + /** + * @param {PeerId} peer + */ + async function mapPeer (peer) { + dht._log(`putProvider ${key} to ${peer}`) try { await dht.network.sendMessage(peer, msg) } catch (err) { errors.push(err) } - }) + } + + // Notify closest peers + await utils.mapParallel(dht.getClosestPeers(key.bytes), mapPeer) if (errors.length) { // TODO: // This should be infrequent. This means a peer we previously connected // to failed to exchange the provide message. If getClosestPeers was an // iterator, we could continue to pull until we announce to kBucketSize peers. - throw errcode(new Error(`Failed to provide to ${errors.length} of ${dht.kBucketSize} peers`, 'ERR_SOME_PROVIDES_FAILED'), { errors }) + throw errcode(new Error(`Failed to provide to ${errors.length} of ${dht.kBucketSize} peers`), 'ERR_SOME_PROVIDES_FAILED', { errors }) } }, /** * Search the dht for up to `K` providers of the given CID. + * * @param {CID} key - * @param {Object} options - findProviders options - * @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000) - * @param {number} options.maxNumProviders - maximum number of providers to find + * @param {Object} [options] - findProviders options + * @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds (default: 60000) + * @param {number} [options.maxNumProviders=5] - maximum number of providers to find * @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} */ - async * findProviders (key, options = {}) { + async * findProviders (key, options = { timeout: 60000, maxNumProviders: 5 }) { const providerTimeout = options.timeout || c.minute const n = options.maxNumProviders || c.K - dht._log('findProviders %s', key.toBaseEncodedString()) + dht._log(`findProviders ${key}`) const out = new LimitedPeerList(n) const provs = await dht.providers.getProviders(key) - provs.forEach((id) => { - const peerData = dht.peerStore.get(id) || {} - out.push({ - id: peerData.id || id, - multiaddrs: (peerData.addresses || []).map((address) => address.multiaddr) + provs + .forEach(id => { + const peerData = dht.peerStore.get(id) + + if (peerData) { + out.push({ + id: peerData.id, + multiaddrs: peerData.addresses + .map((address) => address.multiaddr) + }) + } else { + out.push({ + id, + multiaddrs: [] + }) + } }) - }) // All done if (out.length >= n) { @@ -99,21 +124,34 @@ module.exports = (dht) => { } // need more, query the network + /** @type {LimitedPeerList[]} */ const paths = [] - const query = new Query(dht, key.bytes, (pathIndex, numPaths) => { + + /** + * + * @param {number} pathIndex + * @param {number} numPaths + */ + function makePath (pathIndex, numPaths) { // This function body runs once per disjoint path const pathSize = utils.pathSize(n - out.length, numPaths) const pathProviders = new LimitedPeerList(pathSize) paths.push(pathProviders) - // Here we return the query function to use on this particular disjoint path - return async (peer) => { + /** + * The query function to use on this particular disjoint path + * + * @param {PeerId} peer + */ + async function queryDisjointPath (peer) { const msg = await findProvidersSingle(peer, key) const provs = msg.providerPeers - dht._log('(%s) found %s provider entries', dht.peerId.toB58String(), provs.length) + dht._log(`(${peer}) found ${provs.length} provider entries`) provs.forEach((prov) => { - pathProviders.push({ id: prov.id }) + pathProviders.push({ + ...prov + }) }) // hooray we have all that we want @@ -124,8 +162,11 @@ module.exports = (dht) => { // it looks like we want some more return { closerPeers: msg.closerPeers } } - }) + return queryDisjointPath + } + + const query = new Query(dht, key.bytes, makePath) const peers = dht.routingTable.closestPeers(key.bytes, dht.kBucketSize) try { diff --git a/src/index.js b/src/index.js index 8e281d1d..b231d7c3 100644 --- a/src/index.js +++ b/src/index.js @@ -6,6 +6,7 @@ const errcode = require('err-code') const libp2pRecord = require('libp2p-record') const { MemoryDatastore } = require('interface-datastore') const uint8ArrayEquals = require('uint8arrays/equals') +const uint8ArrayToString = require('uint8arrays/to-string') const RoutingTable = require('./routing') const utils = require('./utils') @@ -21,43 +22,52 @@ const QueryManager = require('./query-manager') const Record = libp2pRecord.Record +/** + * @typedef {import('libp2p')} Libp2p + * @typedef {import('libp2p/src/peer-store')} PeerStore + * @typedef {import('peer-id')} PeerId + * @typedef {import('interface-datastore').Datastore} Datastore + * @typedef {import('libp2p/src/dialer')} Dialer + * @typedef {import('libp2p/src/registrar')} Registrar + * @typedef {import('cids')} CID + * @typedef {import('multiaddr')} Multiaddr + * @typedef {object} PeerData + * @property {PeerId} id + * @property {Multiaddr[]} multiaddrs + * + * @typedef {object} RandomWalkOptions + * @property {boolean} enabled discovery enabled (default: true) + * @property {number} queriesPerPeriod how many queries to run per period (default: 1) + * @property {number} interval how often to run the the random-walk process, in milliseconds (default: 300000) + * @property {number} timeout how long to wait for the the random-walk query to run, in milliseconds (default: 30000) + * @property {number} delay how long to wait before starting the first random walk, in milliseconds (default: 10000) + */ + /** * A DHT implementation modeled after Kademlia with S/Kademlia modifications. * Original implementation in go: https://github.com/libp2p/go-libp2p-kad-dht. */ class KadDHT extends EventEmitter { - /** - * Random walk options - * @typedef {Object} randomWalkOptions - * @property {boolean} enabled discovery enabled (default: true) - * @property {number} queriesPerPeriod how many queries to run per period (default: 1) - * @property {number} interval how often to run the the random-walk process, in milliseconds (default: 300000) - * @property {number} timeout how long to wait for the the random-walk query to run, in milliseconds (default: 30000) - * @property {number} delay how long to wait before starting the first random walk, in milliseconds (default: 10000) - */ - /** * Create a new KadDHT. + * * @param {Object} props - * @param {Libp2p} [props.libp2p] the libp2p instance - * @param {Dialer} props.dialer libp2p dialer instance - * @param {PeerId} props.peerId peer's peerId - * @param {PeerStore} props.peerStore libp2p peerStore - * @param {Object} props.registrar libp2p registrar instance - * @param {function} props.registrar.handle - * @param {function} props.registrar.register - * @param {function} props.registrar.unregister - * @param {string} [props.protocolPrefix = '/ipfs'] libp2p registrar handle protocol - * @param {boolean} [props.forceProtocolLegacy = false] WARNING: this is not recommended and should only be used for legacy purposes - * @param {number} props.kBucketSize k-bucket size (default 20) - * @param {boolean} props.clientMode If true, the DHT will not respond to queries. This should be true if your node will not be dialable. (default: false) - * @param {number} props.concurrency alpha concurrency of queries (default 3) - * @param {Datastore} props.datastore datastore (default MemoryDatastore) - * @param {object} props.validators validators object with namespace as keys and function(key, record, callback) - * @param {object} props.selectors selectors object with namespace as keys and function(key, records) - * @param {randomWalkOptions} options.randomWalk randomWalk options - * @param {function(record: Record, peerId: PeerId)} [props.onPut = () => {}] Called when an entry is added to or changed in the datastore - * @param {function(record: Record)} [props.onRemove = () => {}] Called when an entry is removed from the datastore + * @param {Libp2p} props.libp2p - the libp2p instance + * @param {Dialer} props.dialer - libp2p dialer instance + * @param {PeerId} props.peerId - peer's peerId + * @param {PeerStore} props.peerStore - libp2p peerStore + * @param {Registrar} props.registrar - libp2p registrar instance + * @param {string} [props.protocolPrefix = '/ipfs'] - libp2p registrar handle protocol + * @param {boolean} [props.forceProtocolLegacy = false] - WARNING: this is not recommended and should only be used for legacy purposes + * @param {number} props.kBucketSize - k-bucket size (default 20) + * @param {boolean} props.clientMode - If true, the DHT will not respond to queries. This should be true if your node will not be dialable. (default: false) + * @param {number} props.concurrency - alpha concurrency of queries (default 3) + * @param {Datastore} props.datastore - datastore (default MemoryDatastore) + * @param {object} props.validators - validators object with namespace as keys and function(key, record, callback) + * @param {object} props.selectors - selectors object with namespace as keys and function(key, records) + * @param {RandomWalkOptions} props.randomWalk - randomWalk options + * @param {function(import('libp2p-record').Record, PeerId): void} [props.onPut] - Called when an entry is added to or changed in the datastore + * @param {function(import('libp2p-record').Record): void} [props.onRemove] - Called when an entry is removed from the datastore */ constructor ({ libp2p, @@ -73,7 +83,13 @@ class KadDHT extends EventEmitter { concurrency = c.ALPHA, validators = {}, selectors = {}, - randomWalk = {}, + randomWalk = { + enabled: false, + queriesPerPeriod: 1, + interval: 300000, + timeout: 30000, + delay: 10000 + }, onPut = () => {}, onRemove = () => {} }) { @@ -85,36 +101,42 @@ class KadDHT extends EventEmitter { /** * Local reference to the libp2p instance. May be undefined. + * * @type {Libp2p} */ this.libp2p = libp2p /** * Local reference to the libp2p dialer instance + * * @type {Dialer} */ this.dialer = dialer /** * Local peer-id + * * @type {PeerId} */ this.peerId = peerId /** * Local PeerStore + * * @type {PeerStore} */ this.peerStore = peerStore /** * Local peer info + * * @type {Registrar} */ this.registrar = registrar /** * Registrar protocol + * * @type {string} */ this.protocol = protocolPrefix + (forceProtocolLegacy ? '' : c.PROTOCOL_DHT) @@ -130,6 +152,7 @@ class KadDHT extends EventEmitter { /** * ALPHA concurrency at which each query path with run, defaults to 3 + * * @type {number} */ this.concurrency = concurrency @@ -137,6 +160,7 @@ class KadDHT extends EventEmitter { /** * Number of disjoint query paths to use * This is set to `kBucketSize`/2 per the S/Kademlia paper + * * @type {number} */ this.disjointPaths = Math.ceil(this.kBucketSize / 2) @@ -204,7 +228,6 @@ class KadDHT extends EventEmitter { /** * Is this DHT running. - * @type {bool} */ get isStarted () { return this._running @@ -212,6 +235,7 @@ class KadDHT extends EventEmitter { /** * Start listening to incoming connections. + * * @returns {Promise} */ async start () { @@ -227,6 +251,7 @@ class KadDHT extends EventEmitter { /** * Stop accepting incoming connections and sending outgoing * messages. + * * @returns {Promise} */ stop () { @@ -239,6 +264,7 @@ class KadDHT extends EventEmitter { /** * Store the given key/value pair in the DHT. + * * @param {Uint8Array} key * @param {Uint8Array} value * @param {Object} [options] - put options @@ -252,6 +278,7 @@ class KadDHT extends EventEmitter { /** * Get the value to the given key. * Times out after 1 minute by default. + * * @param {Uint8Array} key * @param {Object} [options] - get options * @param {number} [options.timeout] - optional timeout (default: 60000) @@ -263,11 +290,11 @@ class KadDHT extends EventEmitter { /** * Get the `n` values to the given key without sorting. + * * @param {Uint8Array} key * @param {number} nvals * @param {Object} [options] - get options * @param {number} [options.timeout] - optional timeout (default: 60000) - * @returns {Promise>} */ async getMany (key, nvals, options = {}) { // eslint-disable-line require-await return this.contentFetching.getMany(key, nvals, options) @@ -275,11 +302,11 @@ class KadDHT extends EventEmitter { /** * Remove the given key from the local datastore. + * * @param {Uint8Array} key - * @returns {Promise} */ async removeLocal (key) { - this._log('removeLocal: %b', key) + this._log(`removeLocal: ${uint8ArrayToString(key, 'base32')}`) const dsKey = utils.bufferToKey(key) try { @@ -292,10 +319,22 @@ class KadDHT extends EventEmitter { } } + /** + * @param {Uint8Array} key + * @param {Uint8Array} value + */ + async _putLocal (key, value) { + this._log(`_putLocal: ${uint8ArrayToString(key, 'base32')}`) + const dsKey = utils.bufferToKey(key) + + await this.datastore.put(dsKey, value) + } + // ----------- Content Routing /** * Announce to the network that we can provide given key's value. + * * @param {CID} key * @returns {Promise} */ @@ -305,13 +344,14 @@ class KadDHT extends EventEmitter { /** * Search the dht for up to `K` providers of the given CID. + * * @param {CID} key - * @param {Object} options - findProviders options - * @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000) - * @param {number} options.maxNumProviders - maximum number of providers to find + * @param {Object} [options] - findProviders options + * @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds (default: 60000) + * @param {number} [options.maxNumProviders=5] - maximum number of providers to find * @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} */ - async * findProviders (key, options = {}) { + async * findProviders (key, options = { timeout: 6000, maxNumProviders: 5 }) { for await (const peerData of this.contentRouting.findProviders(key, options)) { yield peerData } @@ -323,38 +363,40 @@ class KadDHT extends EventEmitter { * Search for a peer with the given ID. * * @param {PeerId} id - * @param {Object} options - findPeer options - * @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000) + * @param {Object} [options] - findPeer options + * @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds (default: 60000) * @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>} */ - async findPeer (id, options = {}) { // eslint-disable-line require-await + async findPeer (id, options = { timeout: 60000 }) { // eslint-disable-line require-await return this.peerRouting.findPeer(id, options) } /** * Kademlia 'node lookup' operation. + * * @param {Uint8Array} key * @param {Object} [options] - * @param {boolean} [options.shallow] shallow query (default: false) - * @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} + * @param {boolean} [options.shallow] - shallow query (default: false) */ async * getClosestPeers (key, options = { shallow: false }) { - for await (const pId of this.peerRouting.getClosestPeers(key, options)) { - yield pId - } + yield * this.peerRouting.getClosestPeers(key, options) } /** * Get the public key for the given peer id. + * * @param {PeerId} peer - * @returns {Promise} */ - async getPublicKey (peer) { // eslint-disable-line require-await + getPublicKey (peer) { return this.peerRouting.getPublicKey(peer) } // ----------- Discovery ----------- + /** + * @param {PeerId} peerId + * @param {Multiaddr[]} multiaddrs + */ _peerDiscovered (peerId, multiaddrs) { this.emit('peer', { id: peerId, @@ -369,8 +411,6 @@ class KadDHT extends EventEmitter { * the message. * * @param {Message} msg - * @returns {Promise>} - * @private */ async _nearestPeersToQuery (msg) { const key = await utils.convertBuffer(msg.key) @@ -392,8 +432,6 @@ class KadDHT extends EventEmitter { * * @param {Message} msg * @param {PeerId} peerId - * @returns {Promise>} - * @private */ async _betterPeersToQuery (msg, peerId) { this._log('betterPeersToQuery') @@ -417,12 +455,10 @@ class KadDHT extends EventEmitter { * - it was received less than `MAX_RECORD_AGE` ago. * * @param {Uint8Array} key - * @returns {Promise} - * @private */ async _checkLocalDatastore (key) { - this._log('checkLocalDatastore: %b', key) + this._log(`checkLocalDatastore: ${uint8ArrayToString(key)} %b`, key) const dsKey = utils.bufferToKey(key) // Fetch value from ds @@ -440,12 +476,12 @@ class KadDHT extends EventEmitter { const record = Record.deserialize(rawRecord) if (!record) { - throw errcode('Invalid record', 'ERR_INVALID_RECORD') + throw errcode(new Error('Invalid record'), 'ERR_INVALID_RECORD') } // Check validity: compare time received with max record age if (record.timeReceived == null || - utils.now() - record.timeReceived > c.MAX_RECORD_AGE) { + utils.now() - record.timeReceived.getTime() > c.MAX_RECORD_AGE) { // If record is bad delete it and return await this.datastore.delete(dsKey) this.onRemove(record) @@ -458,9 +494,8 @@ class KadDHT extends EventEmitter { /** * Add the peer to the routing table and update it in the peerStore. + * * @param {PeerId} peerId - * @returns {Promise} - * @private */ async _add (peerId) { await this.routingTable.add(peerId) @@ -469,11 +504,8 @@ class KadDHT extends EventEmitter { /** * Verify a record without searching the DHT. * - * @param {Record} record - * @returns {Promise} - * @private + * @param {import('libp2p-record').Record} record */ - async _verifyRecordLocally (record) { this._log('verifyRecordLocally') @@ -484,11 +516,7 @@ class KadDHT extends EventEmitter { * Is the given peer id our PeerId? * * @param {PeerId} other - * @returns {bool} - * - * @private */ - _isSelf (other) { return other && uint8ArrayEquals(this.peerId.id, other.id) } @@ -499,18 +527,14 @@ class KadDHT extends EventEmitter { * @param {Uint8Array} key * @param {Uint8Array} rec - encoded record * @param {PeerId} target - * @returns {Promise} - * - * @private */ - async _putValueToPeer (key, rec, target) { const msg = new Message(Message.TYPES.PUT_VALUE, key, 0) - msg.record = rec + msg.record = Record.deserialize(rec) const resp = await this.network.sendRequest(target, msg) - if (!resp.record.value.equals(Record.deserialize(rec).value)) { + if (resp.record && !uint8ArrayEquals(resp.record.value, Record.deserialize(rec).value)) { throw errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID') } } @@ -523,10 +547,7 @@ class KadDHT extends EventEmitter { * * @param {PeerId} peer * @param {Uint8Array} key - * @returns {Promise<{Record, Array<{ id: PeerId, multiaddrs: Multiaddr[] }}>} - * @private */ - async _getValueOrPeers (peer, key) { const msg = await this._getValueSingle(peer, key) @@ -558,10 +579,7 @@ class KadDHT extends EventEmitter { * * @param {PeerId} peer * @param {Uint8Array} key - * @returns {Promise} - * @private */ - async _getValueSingle (peer, key) { // eslint-disable-line require-await const msg = new Message(Message.TYPES.GET_VALUE, key, 0) return this.network.sendRequest(peer, msg) @@ -571,11 +589,9 @@ class KadDHT extends EventEmitter { * Verify a record, fetching missing public keys from the network. * Calls back with an error if the record is invalid. * - * @param {Record} record + * @param {import('libp2p-record').Record} record * @returns {Promise} - * @private */ - async _verifyRecordOnline (record) { await libp2pRecord.validator.verifyRecord(this.validators, record) } diff --git a/src/message/index.js b/src/message/index.js index 785e5393..4d43216b 100644 --- a/src/message/index.js +++ b/src/message/index.js @@ -2,6 +2,7 @@ const PeerId = require('peer-id') const multiaddr = require('multiaddr') +// @ts-ignore const protons = require('protons') const { Record } = require('libp2p-record') const pbm = protons(require('./dht.proto')) @@ -9,12 +10,23 @@ const pbm = protons(require('./dht.proto')) const MESSAGE_TYPE = pbm.Message.MessageType const CONNECTION_TYPE = pbm.Message.ConnectionType +/** + * @typedef {0|1|2|3|4} ConnectionType + * + * @typedef {object} PBPeer + * @property {Uint8Array} id + * @property {Uint8Array[]} addrs + * @property {ConnectionType} connection + * + * @typedef {import('../index').PeerData} PeerData + */ + /** * Represents a single DHT control message. */ class Message { /** - * @param {MessageType} type + * @param {MESSAGE_TYPE} type * @param {Uint8Array} key * @param {number} level */ @@ -26,8 +38,12 @@ class Message { this.type = type this.key = key this._clusterLevelRaw = level + + /** @type {PeerData[]} */ this.closerPeers = [] + /** @type {PeerData[]} */ this.providerPeers = [] + /** @type {import('libp2p-record').Record | null} */ this.record = null } @@ -49,7 +65,6 @@ class Message { /** * Encode into protobuf - * @returns {Uint8Array} */ serialize () { const obj = { @@ -57,7 +72,10 @@ class Message { type: this.type, clusterLevelRaw: this._clusterLevelRaw, closerPeers: this.closerPeers.map(toPbPeer), - providerPeers: this.providerPeers.map(toPbPeer) + providerPeers: this.providerPeers.map(toPbPeer), + + /** @type {Uint8Array | undefined} */ + record: undefined } if (this.record) { @@ -75,7 +93,6 @@ class Message { * Decode from protobuf * * @param {Uint8Array} raw - * @returns {Message} */ static deserialize (raw) { const dec = pbm.Message.decode(raw) @@ -84,6 +101,7 @@ class Message { msg.closerPeers = dec.closerPeers.map(fromPbPeer) msg.providerPeers = dec.providerPeers.map(fromPbPeer) + if (dec.record) { msg.record = Record.deserialize(dec.record) } @@ -95,14 +113,23 @@ class Message { Message.TYPES = MESSAGE_TYPE Message.CONNECTION_TYPES = CONNECTION_TYPE +/** + * @param {PeerData} peer + */ function toPbPeer (peer) { - return { + /** @type {PBPeer} */ + const output = { id: peer.id.id, addrs: (peer.multiaddrs || []).map((m) => m.bytes), connection: CONNECTION_TYPE.CONNECTED } + + return output } +/** + * @param {PBPeer} peer + */ function fromPbPeer (peer) { return { id: new PeerId(peer.id), diff --git a/src/network.js b/src/network.js index 976f5d3b..73485a47 100644 --- a/src/network.js +++ b/src/network.js @@ -2,10 +2,11 @@ const errcode = require('err-code') -const pipe = require('it-pipe') +const { pipe } = require('it-pipe') const lp = require('it-length-prefixed') const pTimeout = require('p-timeout') const { consume } = require('streaming-iterables') +const first = require('it-first') const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology') @@ -14,17 +15,22 @@ const c = require('./constants') const Message = require('./message') const utils = require('./utils') +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream + */ + /** * Handle network operations for the dht */ class Network { /** - * Create a new network. + * Create a new network * - * @param {KadDHT} self + * @param {import('./index')} dht */ - constructor (self) { - this.dht = self + constructor (dht) { + this.dht = dht this.readMessageTimeout = c.READ_MESSAGE_TIMEOUT this._log = utils.logger(this.dht.peerId, 'net') this._rpc = rpc(this.dht) @@ -33,8 +39,7 @@ class Network { } /** - * Start the network. - * @returns {Promise} + * Start the network */ async start () { if (this._running) { @@ -65,8 +70,7 @@ class Network { } /** - * Stop all network activity. - * @returns {Promise} + * Stop all network activity */ async stop () { if (!this.dht.isStarted && !this.isStarted) { @@ -75,13 +79,15 @@ class Network { this._running = false // unregister protocol and handlers - await this.dht.registrar.unregister(this._registrarId) + if (this._registrarId) { + await this.dht.registrar.unregister(this._registrarId) + } } /** * Is the network online? * - * @type {bool} + * @type {boolean} */ get isStarted () { return this._running @@ -90,7 +96,7 @@ class Network { /** * Are all network components there? * - * @type {bool} + * @type {boolean} */ get isConnected () { // TODO add a way to check if switch has started or not @@ -99,9 +105,8 @@ class Network { /** * Registrar notifies a connection successfully with dht protocol. - * @private - * @param {PeerId} peerId remote peer id - * @returns {Promise} + * + * @param {PeerId} peerId - remote peer id */ async _onPeerConnected (peerId) { await this.dht._add(peerId) @@ -110,10 +115,10 @@ class Network { /** * Send a request and record RTT for latency measurements. + * * @async * @param {PeerId} to - The peer that should receive a message * @param {Message} msg - The message to send. - * @returns {Promise} */ async sendRequest (to, msg) { // TODO: record latency @@ -139,7 +144,6 @@ class Network { * * @param {PeerId} to * @param {Message} msg - * @returns {Promise} */ async sendMessage (to, msg) { if (!this.isConnected) { @@ -163,10 +167,8 @@ class Network { * If no response is received after the specified timeout * this will error out. * - * @param {DuplexIterable} stream - the stream to use + * @param {MuxedStream} stream - the stream to use * @param {Uint8Array} msg - the message to send - * @returns {Promise} - * @private */ async _writeReadMessage (stream, msg) { // eslint-disable-line require-await return pTimeout( @@ -178,10 +180,8 @@ class Network { /** * Write a message to the given stream. * - * @param {DuplexIterable} stream - the stream to use + * @param {MuxedStream} stream - the stream to use * @param {Uint8Array} msg - the message to send - * @returns {Promise} - * @private */ _writeMessage (stream, msg) { return pipe( @@ -193,15 +193,24 @@ class Network { } } +/** + * @param {MuxedStream} stream + * @param {Uint8Array} msg + */ async function writeReadMessage (stream, msg) { const res = await pipe( [msg], lp.encode(), stream, lp.decode(), + /** + * @param {AsyncIterable} source + */ async source => { - for await (const chunk of source) { - return chunk.slice() + const buf = await first(source) + + if (buf) { + return buf.slice() } } ) diff --git a/src/peer-list/index.js b/src/peer-list/index.js index d0d1379e..d5b102f9 100644 --- a/src/peer-list/index.js +++ b/src/peer-list/index.js @@ -1,10 +1,16 @@ 'use strict' +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('../').PeerData} PeerData + */ + /** * A list of unique peers. */ class PeerList { constructor () { + /** @type {PeerData[]} */ this.list = [] } @@ -12,13 +18,14 @@ class PeerList { * Add a new peer. Returns `true` if it was a new one * * @param {PeerData} peerData - * @returns {bool} */ push (peerData) { if (!this.has(peerData.id)) { this.list.push(peerData) + return true } + return false } @@ -26,17 +33,14 @@ class PeerList { * Check if this PeerData is already in here. * * @param {PeerId} peerId - * @returns {bool} */ has (peerId) { - const match = this.list.find((i) => i.id.isEqual(peerId)) + const match = this.list.find((i) => i.id.equals(peerId)) return Boolean(match) } /** * Get the list as an array. - * - * @returns {Array} */ toArray () { return this.list.slice() @@ -44,8 +48,6 @@ class PeerList { /** * Remove the last element - * - * @returns {PeerData} */ pop () { return this.list.pop() @@ -53,8 +55,6 @@ class PeerList { /** * The length of the list - * - * @type {number} */ get length () { return this.list.length diff --git a/src/peer-list/limited-peer-list.js b/src/peer-list/limited-peer-list.js index 3e390f7f..a88255b4 100644 --- a/src/peer-list/limited-peer-list.js +++ b/src/peer-list/limited-peer-list.js @@ -2,6 +2,10 @@ const PeerList = require('.') +/** + * @typedef {import('../').PeerData} PeerData + */ + /** * Like PeerList but with a length restriction. */ @@ -20,12 +24,12 @@ class LimitedPeerList extends PeerList { * Add a PeerData if it fits in the list * * @param {PeerData} peerData - * @returns {bool} */ push (peerData) { if (this.length < this.limit) { return super.push(peerData) } + return false } } diff --git a/src/peer-list/peer-distance-list.js b/src/peer-list/peer-distance-list.js index df8dea76..c4e622aa 100644 --- a/src/peer-list/peer-distance-list.js +++ b/src/peer-list/peer-distance-list.js @@ -1,10 +1,16 @@ 'use strict' +// @ts-ignore const distance = require('xor-distance') const utils = require('../utils') const pMap = require('p-map') const uint8ArrayEquals = require('uint8arrays/equals') +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('../').PeerData} PeerData + */ + /** * Maintains a list of peerIds sorted by distance from a DHT key. */ @@ -18,6 +24,8 @@ class PeerDistanceList { constructor (originDhtKey, capacity) { this.originDhtKey = originDhtKey this.capacity = capacity + + /** @type {{ peerId: PeerId, distance: Uint8Array }[]} */ this.peerDistances = [] } @@ -39,7 +47,6 @@ class PeerDistanceList { * Add a peerId to the list. * * @param {PeerId} peerId - * @returns {Promise} */ async add (peerId) { if (this.peerDistances.find(pd => uint8ArrayEquals(pd.peerId.id, peerId.id))) { @@ -61,8 +68,7 @@ class PeerDistanceList { * Indicates whether any of the peerIds passed as a parameter are closer * to the origin key than the furthest peerId in the PeerDistanceList. * - * @param {Array} peerIds - * @returns {Boolean} + * @param {PeerId[]} peerIds */ async anyCloser (peerIds) { if (!peerIds.length) { @@ -74,14 +80,16 @@ class PeerDistanceList { } const dhtKeys = await pMap(peerIds, (peerId) => utils.convertPeerId(peerId)) - const furthestDistance = this.peerDistances[this.peerDistances.length - 1].distance + for (const dhtKey of dhtKeys) { const keyDistance = distance(this.originDhtKey, dhtKey) + if (distance.compare(keyDistance, furthestDistance) < 0) { return true } } + return false } } diff --git a/src/peer-list/peer-queue.js b/src/peer-list/peer-queue.js index 48086c5a..e471a695 100644 --- a/src/peer-list/peer-queue.js +++ b/src/peer-list/peer-queue.js @@ -1,6 +1,8 @@ 'use strict' +// @ts-ignore const Heap = require('heap') +// @ts-ignore const distance = require('xor-distance') const debug = require('debug') @@ -8,6 +10,10 @@ const utils = require('../utils') const log = debug('libp2p:dht:peer-queue') +/** + * @typedef {import('peer-id')} PeerId + */ + /** * PeerQueue is a heap that sorts its entries (PeerIds) by their * xor distance to the inital provided key. @@ -52,7 +58,6 @@ class PeerQueue { * Add a new PeerId to the queue. * * @param {PeerId} id - * @returns {Promise} */ async enqueue (id) { log('enqueue %s', id.toB58String()) diff --git a/src/peer-routing/index.js b/src/peer-routing/index.js index 7abf7766..fc358537 100644 --- a/src/peer-routing/index.js +++ b/src/peer-routing/index.js @@ -5,6 +5,7 @@ const pTimeout = require('p-timeout') const PeerId = require('peer-id') const crypto = require('libp2p-crypto') +const uint8ArrayToString = require('uint8arrays/to-string') const c = require('../constants') const Message = require('../message') @@ -12,15 +13,22 @@ const Query = require('../query') const utils = require('../utils') +/** + * @typedef {import('multiaddr')} Multiaddr + */ + +/** + * @param {import('../index')} dht + */ module.exports = (dht) => { /** * Look if we are connected to a peer with the given id. * Returns its id and addresses, if found, otherwise `undefined`. + * * @param {PeerId} peer - * @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>} */ const findPeerLocal = async (peer) => { - dht._log('findPeerLocal %s', peer.toB58String()) + dht._log(`findPeerLocal ${peer}`) const p = await dht.routingTable.find(peer) const peerData = p && dht.peerStore.get(p) @@ -35,6 +43,7 @@ module.exports = (dht) => { /** * Get a value via rpc call for the given parameters. + * * @param {PeerId} peer * @param {Uint8Array} key * @returns {Promise} @@ -47,6 +56,7 @@ module.exports = (dht) => { /** * Find close peers for a given peer + * * @param {Uint8Array} key * @param {PeerId} peer * @returns {Promise>} @@ -54,7 +64,7 @@ module.exports = (dht) => { */ const closerPeersSingle = async (key, peer) => { - dht._log('closerPeersSingle %b from %s', key, peer.toB58String()) + dht._log(`closerPeersSingle ${uint8ArrayToString(key, 'base32')} from ${peer}`) const msg = await dht.peerRouting._findPeerSingle(peer, new PeerId(key)) return msg.closerPeers @@ -68,23 +78,22 @@ module.exports = (dht) => { /** * Get the public key directly from a node. + * * @param {PeerId} peer - * @returns {Promise} - * @private */ const getPublicKeyFromNode = async (peer) => { const pkKey = utils.keyForPublicKey(peer) const msg = await getValueSingle(peer, pkKey) if (!msg.record || !msg.record.value) { - throw errcode(`Node not responding with its public key: ${peer.toB58String()}`, 'ERR_INVALID_RECORD') + throw errcode(new Error(`Node not responding with its public key: ${peer.toB58String()}`), 'ERR_INVALID_RECORD') } - const recPeer = PeerId.createFromPubKey(msg.record.value) + const recPeer = await PeerId.createFromPubKey(msg.record.value) // compare hashes of the pub key - if (!recPeer.isEqual(peer)) { - throw errcode('public key does not match id', 'ERR_PUBLIC_KEY_DOES_NOT_MATCH_ID') + if (!recPeer.equals(peer)) { + throw errcode(new Error('public key does not match id'), 'ERR_PUBLIC_KEY_DOES_NOT_MATCH_ID') } return recPeer.pubKey @@ -93,6 +102,7 @@ module.exports = (dht) => { return { /** * Ask peer `peer` if they know where the peer with id `target` is. + * * @param {PeerId} peer * @param {PeerId} target * @returns {Promise} @@ -107,12 +117,13 @@ module.exports = (dht) => { /** * Search for a peer with the given ID. + * * @param {PeerId} id - * @param {Object} options - findPeer options - * @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000) + * @param {Object} [options] - findPeer options + * @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds (default: 60000) * @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>} */ - async findPeer (id, options = {}) { + async findPeer (id, options = { timeout: 60000 }) { options.timeout = options.timeout || c.minute dht._log('findPeer %s', id.toB58String()) @@ -148,10 +159,13 @@ module.exports = (dht) => { // query the network const query = new Query(dht, id.id, () => { - // There is no distinction between the disjoint paths, - // so there are no per-path variables in dht scope. - // Just return the actual query function. - return async (peer) => { + /** + * There is no distinction between the disjoint paths, so there are no per-path + * variables in dht scope. Just return the actual query function. + * + * @param {PeerId} peer + */ + const queryFn = async (peer) => { const msg = await this._findPeerSingle(peer, id) const match = msg.closerPeers.find((p) => p.id.isEqual(id)) @@ -167,20 +181,20 @@ module.exports = (dht) => { closerPeers: msg.closerPeers } } + + return queryFn }) - let error, result + let result try { result = await pTimeout(query.run(peers), options.timeout) - } catch (err) { - error = err + } finally { + query.stop() } - query.stop() - if (error) throw error let success = false result.paths.forEach((result) => { - if (result.success) { + if (result.success && result.peer) { success = true dht.peerStore.addressBook.add(result.peer.id, result.peer.multiaddrs) } @@ -193,6 +207,10 @@ module.exports = (dht) => { const peerData = dht.peerStore.get(id) + if (!peerData) { + throw errcode(new Error('No peer data found in peer store'), 'ERR_NOT_FOUND') + } + return { id: peerData.id, multiaddrs: peerData.addresses.map((address) => address.multiaddr) @@ -201,9 +219,10 @@ module.exports = (dht) => { /** * Kademlia 'node lookup' operation. + * * @param {Uint8Array} key * @param {Object} [options] - * @param {boolean} [options.shallow] shallow query (default: false) + * @param {boolean} [options.shallow] - shallow query (default: false) * @returns {AsyncIterable} */ async * getClosestPeers (key, options = { shallow: false }) { @@ -240,14 +259,15 @@ module.exports = (dht) => { /** * Get the public key for the given peer id. + * * @param {PeerId} peer - * @returns {Promise} */ async getPublicKey (peer) { dht._log('getPublicKey %s', peer.toB58String()) // local check const peerData = dht.peerStore.get(peer) + if (peerData && peerData.id.pubKey) { dht._log('getPublicKey: found local copy') return peerData.id.pubKey @@ -255,6 +275,7 @@ module.exports = (dht) => { // try the node directly let pk + try { pk = await getPublicKeyFromNode(peer) } catch (err) { @@ -264,10 +285,10 @@ module.exports = (dht) => { pk = crypto.keys.unmarshalPublicKey(value) } - peerData.id = new PeerId(peer.id, null, pk) - const addrs = peerData.addresses.map((address) => address.multiaddr) - dht.peerStore.addressBook.add(peerData.id, addrs) - dht.peerStore.keyBook.set(peerData.id, pk) + const peerId = new PeerId(peer.id, undefined, pk) + const addrs = ((peerData && peerData.addresses) || []).map((address) => address.multiaddr) + dht.peerStore.addressBook.add(peerId, addrs) + dht.peerStore.keyBook.set(peerId, pk) return pk } diff --git a/src/providers.js b/src/providers.js index 499d6c36..ea70848a 100644 --- a/src/providers.js +++ b/src/providers.js @@ -1,6 +1,7 @@ 'use strict' const cache = require('hashlru') +// @ts-ignore const varint = require('varint') const PeerId = require('peer-id') const { Key } = require('interface-datastore') @@ -8,6 +9,10 @@ const { default: Queue } = require('p-queue') const c = require('./constants') const utils = require('./utils') +/** + * @typedef {import('cids')} CID + */ + /** * This class manages known providers. * A provider is a peer that we know to have the content for a given CID. @@ -22,7 +27,7 @@ const utils = require('./utils') */ class Providers { /** - * @param {Object} datastore + * @param {import('interface-datastore').Datastore} datastore * @param {PeerId} [self] * @param {number} [cacheSize=256] */ @@ -52,6 +57,7 @@ class Providers { */ this.lruCacheSize = cacheSize || c.PROVIDERS_LRU_CACHE_SIZE + // @ts-ignore hashlru types are wrong this.providers = cache(this.lruCacheSize) this.syncQueue = new Queue({ concurrency: 1 }) @@ -59,7 +65,6 @@ class Providers { /** * Start the provider cleanup service - * @returns {void} */ start () { this._cleaner = setInterval( @@ -70,11 +75,12 @@ class Providers { /** * Release any resources. - * @returns {void} */ stop () { - clearInterval(this._cleaner) - this._cleaner = null + if (this._cleaner) { + clearInterval(this._cleaner) + this._cleaner = null + } } /** @@ -148,7 +154,7 @@ class Providers { * Get the currently known provider peer ids for a given CID. * * @param {CID} cid - * @returns {Promise>} + * @returns {Promise>} * * @private */ @@ -175,7 +181,7 @@ class Providers { const provs = await this._getProvidersMap(cid) this._log('loaded %s provs', provs.size) - const now = Date.now() + const now = new Date() provs.set(utils.encodeBase32(provider.id), now) const dsKey = makeProviderKey(cid) @@ -217,13 +223,10 @@ function makeProviderKey (cid) { /** * Write a provider into the given store. * - * @param {Datastore} store + * @param {import('interface-datastore').Datastore} store * @param {CID} cid * @param {PeerId} peer - * @param {number} time - * @returns {Promise} - * - * @private + * @param {Date} time */ async function writeProviderEntry (store, cid, peer, time) { // eslint-disable-line require-await const dsKey = [ @@ -233,17 +236,14 @@ async function writeProviderEntry (store, cid, peer, time) { // eslint-disable-l ].join('') const key = new Key(dsKey) - const buffer = Uint8Array.from(varint.encode(time)) + const buffer = Uint8Array.from(varint.encode(time.getTime())) return store.put(key, buffer) } /** * Parse the CID and provider peer id from the key * - * @param {DKey} key - * @returns {Object} object with peer id and cid - * - * @private + * @param {import('interface-datastore').Key} key */ function parseProviderKey (key) { const parts = key.toString().split('/') @@ -260,7 +260,7 @@ function parseProviderKey (key) { /** * Load providers for the given CID from the store. * - * @param {Datastore} store + * @param {import('interface-datastore').Datastore} store * @param {CID} cid * @returns {Promise>} * @@ -276,6 +276,9 @@ async function loadProviders (store, cid) { return providers } +/** + * @param {Uint8Array} buf + */ function readTime (buf) { return varint.decode(buf) } diff --git a/src/query-manager.js b/src/query-manager.js index 3ebbb907..6f1304e6 100644 --- a/src/query-manager.js +++ b/src/query-manager.js @@ -15,7 +15,7 @@ class QueryManager { /** * Called when a query is started. * - * @param {Query} query + * @param {import('./query')} query */ queryStarted (query) { this.queries.add(query) @@ -24,7 +24,7 @@ class QueryManager { /** * Called when a query completes. * - * @param {Query} query + * @param {import('./query')} query */ queryCompleted (query) { this.queries.delete(query) diff --git a/src/query/index.js b/src/query/index.js index 1c408cff..879d5256 100644 --- a/src/query/index.js +++ b/src/query/index.js @@ -5,34 +5,45 @@ const mh = require('multihashing-async').multihash const utils = require('../utils') const Run = require('./run') +/** + * @typedef {import('peer-id')} PeerId + * @typedef {{from: PeerId, val: Uint8Array}} DHTQueryValue + * @typedef {{from: PeerId, err: Error}} DHTQueryError + * @typedef {DHTQueryValue | DHTQueryError} DHTQueryResult + * @typedef {import('../').PeerData} PeerData + * + * @typedef {{ pathComplete?: boolean, queryComplete?: boolean, closerPeers?: PeerData[], peer?: PeerData, success?: boolean }} QueryResult + */ + +/** + * User-supplied function to set up an individual disjoint path. Per-path + * query state should be held in this function's closure. + * + * Accepts the numeric index from zero to numPaths - 1 and returns a function + * to call on each peer in the query. + * + * @typedef {(pathIndex: number, numPaths: number) => QueryFunc } MakeQueryFunc + */ + +/** + * Query function + * + * @typedef {(peer: PeerId) => Promise } QueryFunc + */ + /** * Divide peers up into disjoint paths (subqueries). Any peer can only be used once over all paths. * Within each path, query peers from closest to farthest away. */ class Query { - /** - * User-supplied function to set up an individual disjoint path. Per-path - * query state should be held in this function's closure. - * @typedef {makePath} function - * @param {number} pathNum - Numeric index from zero to numPaths - 1 - * @returns {queryFunc} - Function to call on each peer in the query - */ - - /** - * Query function. - * @typedef {queryFunc} function - * @param {PeerId} next - Peer to query - * @param {function(Error, Object)} callback - Query result callback - */ - /** * Create a new query. The makePath function is called once per disjoint path, so that per-path * variables can be created in that scope. makePath then returns the actual query function (queryFunc) to * use when on that path. * - * @param {DHT} dht - DHT instance + * @param {import('../index')} dht - DHT instance * @param {Uint8Array} key - * @param {makePath} makePath - Called to set up each disjoint path. Must return the query function. + * @param {MakeQueryFunc} makePath - Called to set up each disjoint path. Must return the query function. */ constructor (dht, key, makePath) { this.dht = dht @@ -49,8 +60,7 @@ class Query { /** * Run this query, start with the given list of peers first. * - * @param {Array} peers - * @returns {Promise} + * @param {PeerId[]} peers */ async run (peers) { // eslint-disable-line require-await if (!this.dht._queryManager.running) { @@ -96,7 +106,7 @@ class Query { * Stop the query. */ stop () { - this._log(`query:done in ${Date.now() - this._startTime}ms`) + this._log(`query:done in ${Date.now() - (this._startTime || 0)}ms`) if (this._run) { this._log(`${this._run.errors.length} of ${this._run.peersSeen.size} peers errored (${this._run.errors.length / this._run.peersSeen.size * 100}% fail rate)`) @@ -106,11 +116,14 @@ class Query { return } - this._run.removeListener('start', this._onStart) - this._run.removeListener('complete', this._onComplete) - this.running = false - this._run && this._run.stop() + + if (this._run) { + this._run.removeListener('start', this._onStart) + this._run.removeListener('complete', this._onComplete) + this._run.stop() + } + this.dht._queryManager.queryCompleted(this) } } diff --git a/src/query/path.js b/src/query/path.js index a8b897f1..98dd20ec 100644 --- a/src/query/path.js +++ b/src/query/path.js @@ -9,6 +9,10 @@ const utils = require('../utils') // This should help reduce the high end call times of queries const QUERY_FUNC_TIMEOUT = 30e3 +/** + * @typedef {import('peer-id')} PeerId + */ + /** * Manages a single Path through the DHT. */ @@ -16,8 +20,8 @@ class Path { /** * Creates a Path. * - * @param {Run} run - * @param {queryFunc} queryFunc + * @param {import('./run')} run + * @param {import('./index').QueryFunc} queryFunc */ constructor (run, queryFunc) { this.run = run @@ -25,19 +29,19 @@ class Path { if (!this.queryFunc) throw new Error('Path requires a `queryFn` to be specified') if (typeof this.queryFunc !== 'function') throw new Error('Path expected `queryFn` to be a function. Got ' + typeof this.queryFunc) - /** - * @type {Array} - */ + /** @type {PeerId[]} */ this.initialPeers = [] - /** - * @type {PeerQueue} - */ + /** @type {PeerQueue | null} */ this.peersToQuery = null + + /** @type {import('./index').QueryResult | null} */ + this.res = null } /** * Add a peer to the set of peers that are used to intialize the path. + * * @param {PeerId} peer */ addInitialPeer (peer) { @@ -45,10 +49,7 @@ class Path { } /** - * Execute the path. - * - * @returns {Promise} - * + * Execute the path */ async execute () { // Create a queue of peers ordered by distance from the key @@ -63,7 +64,6 @@ class Path { * Add a peer to the peers to be queried. * * @param {PeerId} peer - * @returns {Promise} */ async addPeerToQuery (peer) { // Don't add self @@ -77,7 +77,9 @@ class Path { return } - await this.peersToQuery.enqueue(peer) + if (this.peersToQuery) { + await this.peersToQuery.enqueue(peer) + } } } diff --git a/src/query/run.js b/src/query/run.js index ec5ce607..c9492eaf 100644 --- a/src/query/run.js +++ b/src/query/run.js @@ -4,9 +4,13 @@ const PeerDistanceList = require('../peer-list/peer-distance-list') const EventEmitter = require('events') const Path = require('./path') -const WorkerQueue = require('./workerQueue') +const WorkerQueue = require('./worker-queue') const utils = require('../utils') +/** + * @typedef {import('peer-id')} PeerId + */ + /** * Manages a single run of the query. */ @@ -14,7 +18,7 @@ class Run extends EventEmitter { /** * Creates a Run. * - * @param {Query} query + * @param {import('./index')} query */ constructor (query) { super() @@ -22,14 +26,20 @@ class Run extends EventEmitter { this.query = query this.running = false + + /** @type {WorkerQueue[]} */ this.workers = [] // The peers that have been queried (including error responses) this.peersSeen = new Set() + // The errors received when querying peers + /** @type {Error[]} */ this.errors = [] + // The closest K peers that have been queried successfully // (this member is initialized when the worker queues start) + /** @type {PeerDistanceList | null} */ this.peersQueried = null } @@ -50,11 +60,10 @@ class Run extends EventEmitter { /** * Execute the run with the given initial set of peers. * - * @param {Array} peers - * @returns {Promise} + * @param {PeerId[]} peers */ - async execute (peers) { + /** @type {import('./path')[]} */ const paths = [] // array of states per disjoint path // Create disjoint paths @@ -73,7 +82,9 @@ class Run extends EventEmitter { const res = { // The closest K peers we were able to query successfully - finalSet: new Set(this.peersQueried.peers), + finalSet: new Set(this.peersQueried && this.peersQueried.peers), + + /** @type {import('./index').QueryResult[]} */ paths: [] } @@ -172,22 +183,22 @@ class Run extends EventEmitter { * stop querying on that `worker`. * * @param {WorkerQueue} worker - * @returns {Promise} + * @returns {Promise} */ async continueQuerying (worker) { // If we haven't queried K peers yet, keep going - if (this.peersQueried.length < this.peersQueried.capacity) { + if (this.peersQueried && this.peersQueried.length < this.peersQueried.capacity) { return true } // Get all the peers that are currently being queried. // Note that this function gets called right after a peer has been popped // off the head of the closest peers queue so it will include that peer. - const running = worker.queue.workersList().map(i => i.data) + const running = Array.from(worker.queuedPeerIds) // Check if any of the peers that are currently being queried are closer // to the key than the peers we've already queried - const someCloser = await this.peersQueried.anyCloser(running) + const someCloser = this.peersQueried && await this.peersQueried.anyCloser(running) // Some are closer, the worker should keep going if (someCloser) { diff --git a/src/query/workerQueue.js b/src/query/worker-queue.js similarity index 69% rename from src/query/workerQueue.js rename to src/query/worker-queue.js index 3f294c87..fdc61021 100644 --- a/src/query/workerQueue.js +++ b/src/query/worker-queue.js @@ -1,16 +1,19 @@ 'use strict' -const queue = require('async/queue') -const promiseToCallback = require('promise-to-callback') +const { default: Queue } = require('p-queue') + +/** + * @typedef {import('peer-id')} PeerId + */ class WorkerQueue { /** * Creates a new WorkerQueue. * - * @param {DHT} dht - * @param {Run} run - * @param {Object} path - * @param {function} log + * @param {import('../index')} dht + * @param {import('./run')} run + * @param {import('./path')} path + * @param {Function & {error: Function}} log */ constructor (dht, run, path, log) { this.dht = dht @@ -22,39 +25,42 @@ class WorkerQueue { this.queue = this.setupQueue() // a container for resolve/reject functions that will be populated // when execute() is called + + /** @type {{ resolve: (result?: any) => void, reject: (err: Error) => void} | null} */ this.execution = null + + /** @type {Set} */ + this.queuedPeerIds = new Set() } /** * Create the underlying async queue. * - * @returns {Object} + * @returns {Queue} */ setupQueue () { - const q = queue((peer, cb) => { - promiseToCallback(this.processNext(peer))(cb) - }, this.concurrency) - - // If there's an error, stop the worker - q.error = (err) => { - this.log.error('queue', err) - this.stop(err) - } + const q = new Queue({ + concurrency: this.concurrency + }) // When all peers in the queue have been processed, stop the worker - q.drain = () => { - this.log('queue:drain') - this.stop() - } + q.on('idle', () => { + if (this.path.peersToQuery && !this.path.peersToQuery.length) { + this.log('queue:drain') + this.stop() + } + }) // When a space opens up in the queue, add some more peers - q.unsaturated = () => { - if (this.running) { - this.fill() + q.on('next', () => { + if (!this.running) { + return } - } - q.buffer = 0 + if (q.pending < this.concurrency) { + this.fill() + } + }) return q } @@ -63,7 +69,7 @@ class WorkerQueue { * Stop the worker, optionally providing an error to pass to the worker's * callback. * - * @param {Error} err + * @param {Error} [err] */ stop (err) { if (!this.running) { @@ -71,12 +77,15 @@ class WorkerQueue { } this.running = false - this.queue.kill() + this.queue.clear() this.log('worker:stop, %d workers still running', this.run.workers.filter(w => w.running).length) - if (err) { - this.execution.reject(err) - } else { - this.execution.resolve() + + if (this.execution) { + if (err) { + this.execution.reject(err) + } else { + this.execution.resolve() + } } } @@ -84,13 +93,17 @@ class WorkerQueue { * Use the queue from async to keep `concurrency` amount items running * per path. * - * @return {Promise} + * @returns {Promise} */ async execute () { this.running = true // store the promise resolution functions to be resolved at end of queue - this.execution = {} - const execPromise = new Promise((resolve, reject) => Object.assign(this.execution, { resolve, reject })) + this.execution = null + const execPromise = new Promise((resolve, reject) => { + this.execution = { + resolve, reject + } + }) // start queue this.fill() // await completion @@ -102,16 +115,35 @@ class WorkerQueue { * worker queue concurrency. * Note that we don't want to take any more than those required to satisfy * concurrency from the peers-to-query queue, because we always want to - * query the closest peers to the key first, and new peers are continously + * query the closest peers to the key first, and new peers are continuously * being added to the peers-to-query queue. */ fill () { + if (!this.path.peersToQuery) { + return + } + // Note: - // - queue.running(): number of items that are currently running - // - queue.length(): the number of items that are waiting to be run - while (this.queue.running() + this.queue.length() < this.concurrency && - this.path.peersToQuery.length > 0) { - this.queue.push(this.path.peersToQuery.dequeue()) + // - queue.pending: number of items that are currently running + // - queue.size: the number of items that are waiting to be run + while (this.queue.pending + this.queue.size < this.concurrency && this.path.peersToQuery.length > 0) { + const peer = this.path.peersToQuery.dequeue() + + // store the peer id so we can potentially abort early + this.queuedPeerIds.add(peer) + + this.queue.add( + () => { + return this.processNext(peer) + .catch(err => { + this.log.error('queue', err) + this.stop(err) + }) + .finally(() => { + this.queuedPeerIds.delete(peer) + }) + } + ) } } @@ -119,7 +151,6 @@ class WorkerQueue { * Process the next peer in the queue * * @param {PeerId} peer - * @returns {Promise} */ async processNext (peer) { if (!this.running) { @@ -203,8 +234,6 @@ class WorkerQueue { * Execute a query on the next peer. * * @param {PeerId} peer - * @returns {Promise} - * @private */ async execQuery (peer) { let res, queryError @@ -225,12 +254,17 @@ class WorkerQueue { } // Add the peer to the closest peers we have successfully queried - await this.run.peersQueried.add(peer) + this.run.peersQueried && await this.run.peersQueried.add(peer) + + if (!res) { + return + } // If the query indicates that this path or the whole query is complete // set the path result and bail out if (res.pathComplete || res.queryComplete) { this.path.res = res + return { pathComplete: res.pathComplete, queryComplete: res.queryComplete @@ -239,14 +273,20 @@ class WorkerQueue { // If there are closer peers to query, add them to the queue if (res.closerPeers && res.closerPeers.length > 0) { - await Promise.all(res.closerPeers.map(async (closer) => { + /** + * @param {import('../').PeerData} closer + */ + const queryCloser = async (closer) => { // don't add ourselves if (this.dht._isSelf(closer.id)) { return } + this.dht._peerDiscovered(closer.id, closer.multiaddrs) await this.path.addPeerToQuery(closer.id) - })) + } + + await Promise.all(res.closerPeers.map(queryCloser)) } } } diff --git a/src/random-walk.js b/src/random-walk.js index a9207010..22a8164b 100644 --- a/src/random-walk.js +++ b/src/random-walk.js @@ -3,23 +3,22 @@ const crypto = require('libp2p-crypto') const multihashing = require('multihashing-async') const PeerId = require('peer-id') -const AbortController = require('abort-controller') +const { AbortController } = require('abort-controller') const errcode = require('err-code') const times = require('p-times') const c = require('./constants') const { logger } = require('./utils') +/** + * @typedef {import('./')} DHT + * @typedef {import('./').RandomWalkOptions} RandomWalkOptions + */ + class RandomWalk { /** - * @constructor + * @class * @param {DHT} dht - * @param {object} options - * @param {randomWalkOptions.enabled} options.enabled - * @param {randomWalkOptions.queriesPerPeriod} options.queriesPerPeriod - * @param {randomWalkOptions.interval} options.interval - * @param {randomWalkOptions.timeout} options.timeout - * @param {randomWalkOptions.delay} options.delay - * @param {DHT} options.dht + * @param {RandomWalkOptions} options */ constructor (dht, options) { if (!dht) { @@ -93,7 +92,6 @@ class RandomWalk { * * @param {number} queries * @param {number} walkTimeout - * @returns {Promise} * * @private */ @@ -142,8 +140,7 @@ class RandomWalk { * @param {PeerId} id * @param {object} options * @param {number} options.timeout - * @param {AbortControllerSignal} options.signal - * @returns {Promise} + * @param {AbortSignal} options.signal * * @private */ @@ -165,7 +162,7 @@ class RandomWalk { this.log('query:found', peer) // wait what, there was something found? Lucky day! - throw errcode(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`, 'ERR_FOUND_RANDOM_PEER') + throw errcode(new Error(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`), 'ERR_FOUND_RANDOM_PEER') } /** diff --git a/src/routing.js b/src/routing.js index bcacde41..25bfd66b 100644 --- a/src/routing.js +++ b/src/routing.js @@ -1,12 +1,21 @@ 'use strict' +// @ts-ignore const KBucket = require('k-bucket') const utils = require('./utils') +/** + * @typedef {import('peer-id')} PeerId + * + * @typedef {object} KBucketPeer + * @property {Uint8Array} id + * @property {PeerId} peer + */ + /** * A wrapper around `k-bucket`, to provide easy store and - * retrival for peers. + * retrieval for peers. */ class RoutingTable { /** @@ -20,7 +29,9 @@ class RoutingTable { this._onInit(kBucketSize) } - // -- Private Methods + /** + * @param {number} kBucketSize + */ async _onInit (kBucketSize) { const selfKey = await utils.convertPeerId(this.self) @@ -36,21 +47,21 @@ class RoutingTable { /** * Called on the `ping` event from `k-bucket`. * Currently this just removes the oldest contact from - * the list, without acutally pinging the individual peers. + * the list, without actually pinging the individual peers. * This is the same as go does, but should probably * be upgraded to actually ping the individual peers. * - * @param {Array} oldContacts - * @param {Object} newContact - * @returns {undefined} - * @private + * @param {KBucketPeer[]} oldContacts + * @param {KBucketPeer} newContact */ _onPing (oldContacts, newContact) { // just use the first one (k-bucket sorts from oldest to newest) const oldest = oldContacts[0] - // remove the oldest one - this.kb.remove(oldest.id) + if (oldest) { + // remove the oldest one + this.kb.remove(oldest.id) + } // add the new one this.kb.add(newContact) @@ -60,8 +71,6 @@ class RoutingTable { /** * Amount of currently stored peers. - * - * @type {number} */ get size () { return this.kb.count() @@ -71,13 +80,13 @@ class RoutingTable { * Find a specific peer by id. * * @param {PeerId} peer - * @returns {Promise} + * @returns {Promise} */ async find (peer) { const key = await utils.convertPeerId(peer) const closest = this.closestPeer(key) - if (closest && closest.isEqual(peer)) { + if (closest && peer.equals(closest)) { return closest } } @@ -86,7 +95,6 @@ class RoutingTable { * Retrieve the closest peers to the given key. * * @param {Uint8Array} key - * @returns {PeerId|undefined} */ closestPeer (key) { const res = this.closestPeers(key, 1) @@ -100,17 +108,18 @@ class RoutingTable { * * @param {Uint8Array} key * @param {number} count - * @returns {Array} */ closestPeers (key, count) { - return this.kb.closest(key, count).map((p) => p.peer) + /** @type {KBucketPeer[]} */ + const closest = this.kb.closest(key, count) + + return closest.map(p => p.peer) } /** * Add or update the routing table with the given peer. * * @param {PeerId} peer - * @returns {Promise} */ async add (peer) { const id = await utils.convertPeerId(peer) @@ -122,7 +131,6 @@ class RoutingTable { * Remove a given peer from the table. * * @param {PeerId} peer - * @returns {Promise} */ async remove (peer) { const id = await utils.convertPeerId(peer) diff --git a/src/rpc/handlers/add-provider.js b/src/rpc/handlers/add-provider.js index 2ba7e71e..d90537ba 100644 --- a/src/rpc/handlers/add-provider.js +++ b/src/rpc/handlers/add-provider.js @@ -5,6 +5,14 @@ const errcode = require('err-code') const utils = require('../../utils') +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('../../message')} Message + */ + +/** + * @param {import('../../index')} dht + */ module.exports = (dht) => { const log = utils.logger(dht.peerId, 'rpc:add-provider') /** @@ -12,15 +20,15 @@ module.exports = (dht) => { * * @param {PeerId} peerId * @param {Message} msg - * @returns {Promise} */ - return async function addProvider (peerId, msg) { // eslint-disable-line require-await + async function addProvider (peerId, msg) { // eslint-disable-line require-await log('start') if (!msg.key || msg.key.length === 0) { throw errcode(new Error('Missing key'), 'ERR_MISSING_KEY') } + /** @type {CID} */ let cid try { cid = new CID(msg.key) @@ -58,4 +66,6 @@ module.exports = (dht) => { // https://github.com/libp2p/js-libp2p-kad-dht/issues/128 return dht.providers.addProvider(cid, peerId) } + + return addProvider } diff --git a/src/rpc/handlers/find-node.js b/src/rpc/handlers/find-node.js index 68fa81e6..658383c2 100644 --- a/src/rpc/handlers/find-node.js +++ b/src/rpc/handlers/find-node.js @@ -4,6 +4,13 @@ const uint8ArrayEquals = require('uint8arrays/equals') const Message = require('../../message') const utils = require('../../utils') +/** + * @typedef {import('peer-id')} PeerId + */ + +/** + * @param {import('../../index')} dht + */ module.exports = (dht) => { const log = utils.logger(dht.peerId, 'rpc:find-node') @@ -12,15 +19,15 @@ module.exports = (dht) => { * * @param {PeerId} peerId * @param {Message} msg - * @returns {Promise} */ - return async function findNode (peerId, msg) { + async function findNode (peerId, msg) { log('start') let closer if (uint8ArrayEquals(msg.key, dht.peerId.id)) { closer = [{ - id: dht.peerId + id: dht.peerId, + multiaddrs: [] }] } else { closer = await dht._betterPeersToQuery(msg, peerId) @@ -36,4 +43,6 @@ module.exports = (dht) => { return response } + + return findNode } diff --git a/src/rpc/handlers/get-providers.js b/src/rpc/handlers/get-providers.js index fd93a2df..4d8ba667 100644 --- a/src/rpc/handlers/get-providers.js +++ b/src/rpc/handlers/get-providers.js @@ -6,6 +6,13 @@ const errcode = require('err-code') const Message = require('../../message') const utils = require('../../utils') +/** + * @typedef {import('peer-id')} PeerId + */ + +/** + * @param {import('../../index')} dht + */ module.exports = (dht) => { const log = utils.logger(dht.peerId, 'rpc:get-providers') @@ -14,9 +21,8 @@ module.exports = (dht) => { * * @param {PeerId} peerId * @param {Message} msg - * @returns {Promise} */ - return async function getProviders (peerId, msg) { + async function getProviders (peerId, msg) { let cid try { cid = new CID(msg.key) @@ -33,12 +39,19 @@ module.exports = (dht) => { dht._betterPeersToQuery(msg, peerId) ]) - const providerPeers = peers.map((peerId) => ({ id: peerId })) - const closerPeers = closer.map((c) => ({ id: c.id })) + const providerPeers = peers.map((peerId) => ({ + id: peerId, + multiaddrs: [] + })) + const closerPeers = closer.map((c) => ({ + id: c.id, + multiaddrs: [] + })) if (has) { providerPeers.push({ - id: dht.peerId + id: dht.peerId, + multiaddrs: [] }) } @@ -55,4 +68,6 @@ module.exports = (dht) => { log('got %s providers %s closerPeers', providerPeers.length, closerPeers.length) return response } + + return getProviders } diff --git a/src/rpc/handlers/get-value.js b/src/rpc/handlers/get-value.js index d4d84db0..815641e7 100644 --- a/src/rpc/handlers/get-value.js +++ b/src/rpc/handlers/get-value.js @@ -7,6 +7,13 @@ const errcode = require('err-code') const Message = require('../../message') const utils = require('../../utils') +/** + * @typedef {import('peer-id')} PeerId + */ + +/** + * @param {import('../../index')} dht + */ module.exports = (dht) => { const log = utils.logger(dht.peerId, 'rpc:get-value') @@ -17,7 +24,7 @@ module.exports = (dht) => { * @param {Message} msg * @returns {Promise} */ - return async function getValue (peerId, msg) { + async function getValue (peerId, msg) { const key = msg.key log('key: %b', key) @@ -64,4 +71,6 @@ module.exports = (dht) => { return response } + + return getValue } diff --git a/src/rpc/handlers/index.js b/src/rpc/handlers/index.js index 2e6aa91e..a676887a 100644 --- a/src/rpc/handlers/index.js +++ b/src/rpc/handlers/index.js @@ -2,6 +2,10 @@ const T = require('../../message').TYPES +/** + * + * @param {import('../../index')} dht + */ module.exports = (dht) => { const handlers = { [T.GET_VALUE]: require('./get-value')(dht), @@ -16,12 +20,10 @@ module.exports = (dht) => { * Get the message handler matching the passed in type. * * @param {number} type - * - * @returns {function(PeerId, Message, function(Error, Message))} - * - * @private */ - return function getMessageHandler (type) { + function getMessageHandler (type) { return handlers[type] } + + return getMessageHandler } diff --git a/src/rpc/handlers/ping.js b/src/rpc/handlers/ping.js index 976d2095..83a98a6c 100644 --- a/src/rpc/handlers/ping.js +++ b/src/rpc/handlers/ping.js @@ -2,6 +2,14 @@ const utils = require('../../utils') +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('../../message')} Message + */ + +/** + * @param {import('../../index')} dht + */ module.exports = (dht) => { const log = utils.logger(dht.peerId, 'rpc:ping') @@ -10,10 +18,11 @@ module.exports = (dht) => { * * @param {PeerId} peerId * @param {Message} msg - * @returns {Message} */ - return function ping (peerId, msg) { + function ping (peerId, msg) { log('from %s', peerId.toB58String()) return msg } + + return ping } diff --git a/src/rpc/handlers/put-value.js b/src/rpc/handlers/put-value.js index 0439964f..a9e1bef7 100644 --- a/src/rpc/handlers/put-value.js +++ b/src/rpc/handlers/put-value.js @@ -3,6 +3,14 @@ const utils = require('../../utils') const errcode = require('err-code') +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('../../message')} Message + */ + +/** + * @param {import('../../index')} dht + */ module.exports = (dht) => { const log = utils.logger(dht.peerId, 'rpc:put-value') @@ -11,9 +19,8 @@ module.exports = (dht) => { * * @param {PeerId} peerId * @param {Message} msg - * @returns {Promise} */ - return async function putValue (peerId, msg) { + async function putValue (peerId, msg) { const key = msg.key log('key: %b', key) @@ -36,4 +43,6 @@ module.exports = (dht) => { return msg } + + return putValue } diff --git a/src/rpc/index.js b/src/rpc/index.js index 568c0f12..59a4cdb7 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -1,23 +1,29 @@ 'use strict' -const pipe = require('it-pipe') +const { pipe } = require('it-pipe') const lp = require('it-length-prefixed') const Message = require('../message') const handlers = require('./handlers') const utils = require('../utils') +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream + */ + +/** + * @param {import('../index')} dht + */ module.exports = (dht) => { const log = utils.logger(dht.peerId, 'rpc') const getMessageHandler = handlers(dht) /** * Process incoming DHT messages. + * * @param {PeerId} peerId * @param {Message} msg - * @returns {Promise} - * - * @private */ async function handleMessage (peerId, msg) { // get handler & execute it @@ -38,13 +44,13 @@ module.exports = (dht) => { } /** - * Handle incoming streams on the dht protocol. - * @param {Object} props - * @param {DuplexStream} props.stream - * @param {Connection} props.connection connection - * @returns {Promise} + * Handle incoming streams on the dht protocol + * + * @param {object} props + * @param {MuxedStream} props.stream + * @param {import('libp2p-interfaces/src/connection').Connection} props.connection */ - return async function onIncomingStream ({ stream, connection }) { + async function onIncomingStream ({ stream, connection }) { const peerId = connection.remotePeer try { @@ -59,6 +65,9 @@ module.exports = (dht) => { await pipe( stream.source, lp.decode(), + /** + * @param {AsyncIterable} source + */ source => (async function * () { for await (const msg of source) { // handle the message @@ -75,4 +84,6 @@ module.exports = (dht) => { stream.sink ) } + + return onIncomingStream } diff --git a/src/utils.js b/src/utils.js index 08339c7b..20fc248a 100644 --- a/src/utils.js +++ b/src/utils.js @@ -4,7 +4,7 @@ const debug = require('debug') const multihashing = require('multihashing-async') const mh = multihashing.multihash const { Key } = require('interface-datastore') -const base32 = require('base32.js') +// @ts-ignore const distance = require('xor-distance') const pMap = require('p-map') const { Record } = require('libp2p-record') @@ -57,10 +57,16 @@ exports.keyForPublicKey = (peer) => { ]) } +/** + * @param {Uint8Array} key + */ exports.isPublicKeyKey = (key) => { return uint8ArrayToString(key.slice(0, 4)) === '/pk/' } +/** + * @param {Uint8Array} key + */ exports.fromPublicKeyKey = (key) => { return new PeerId(key.slice(4)) } @@ -76,22 +82,22 @@ exports.now = () => { /** * Encode a given Uint8Array into a base32 string. + * * @param {Uint8Array} buf * @returns {string} */ exports.encodeBase32 = (buf) => { - const enc = new base32.Encoder() - return enc.write(buf).finalize() + return uint8ArrayToString(buf, 'base32') } /** * Decode a given base32 string into a Uint8Array. + * * @param {string} raw * @returns {Uint8Array} */ exports.decodeBase32 = (raw) => { - const dec = new base32.Decoder() - return Uint8Array.from(dec.write(raw).finalize()) + return uint8ArrayFromString(raw, 'base32') } /** @@ -99,7 +105,6 @@ exports.decodeBase32 = (raw) => { * * @param {Array} peers * @param {Uint8Array} target - * @returns {Array} */ exports.sortClosestPeers = async (peers, target) => { const distances = await pMap(peers, async (peer) => { @@ -117,9 +122,8 @@ exports.sortClosestPeers = async (peers, target) => { /** * Compare function to sort an array of elements which have a distance property which is the xor distance to a given element. * - * @param {Object} a - * @param {Object} b - * @returns {number} + * @param {{ distance: Uint8Array }} a + * @param {{ distance: Uint8Array }} b */ exports.xorCompare = (a, b) => { return distance.compare(a.distance, b.distance) @@ -131,7 +135,6 @@ exports.xorCompare = (a, b) => { * * @param {number} resultsWanted * @param {number} numPaths - total number of paths - * @returns {number} */ exports.pathSize = (resultsWanted, numPaths) => { return Math.ceil(resultsWanted / numPaths) @@ -156,9 +159,6 @@ exports.createPutRecord = (key, value) => { * * @param {PeerId} [id] * @param {string} [subsystem] - * @returns {debug} - * - * @private */ exports.logger = (id, subsystem) => { const name = ['libp2p', 'dht'] @@ -174,8 +174,9 @@ exports.logger = (id, subsystem) => { return mh.toB58String(v) } - const logger = debug(name.join(':')) - logger.error = debug(name.concat(['error']).join(':')) + const logger = Object.assign(debug(name.join(':')), { + error: debug(name.concat(['error']).join(':')) + }) return logger } @@ -190,14 +191,16 @@ exports.TimeoutError = class TimeoutError extends Error { * Creates an async function that calls the given `asyncFn` and Errors * if it does not resolve within `time` ms * - * @param {Function} [asyncFn] - * @param {Number} [time] - * @returns {Function} - * - * @private + * @template T + * @param {(...args: any[]) => Promise} asyncFn + * @param {number} [time] */ exports.withTimeout = (asyncFn, time) => { - return async (...args) => { // eslint-disable-line require-await + /** + * @param {...any} args + * @returns {Promise} + */ + function timeoutFn (...args) { return Promise.race([ asyncFn(...args), new Promise((resolve, reject) => { @@ -207,6 +210,8 @@ exports.withTimeout = (asyncFn, time) => { }) ]) } + + return timeoutFn } /** @@ -214,11 +219,11 @@ exports.withTimeout = (asyncFn, time) => { * Returns a promise that resolves when all items of the `asyncIterator` have been passed * through `asyncFn`. * - * @param {AsyncIterable} [asyncIterator] - * @param {Function} [asyncFn] - * @returns {Array} + * @template T + * @template O * - * @private + * @param {AsyncIterable} asyncIterator + * @param {(arg0: T) => Promise} asyncFn */ exports.mapParallel = async function (asyncIterator, asyncFn) { const tasks = [] diff --git a/test/kad-utils.spec.js b/test/kad-utils.spec.js index fa929da9..b36e1cbf 100644 --- a/test/kad-utils.spec.js +++ b/test/kad-utils.spec.js @@ -4,11 +4,11 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const base32 = require('base32.js') const PeerId = require('peer-id') const distance = require('xor-distance') const uint8ArrayConcat = require('uint8arrays/concat') const uint8ArrayFromString = require('uint8arrays/from-string') +const uint8ArrayToString = require('uint8arrays/to-string') const utils = require('../src/utils') const createPeerId = require('./utils/create-peer-id') @@ -20,9 +20,8 @@ describe('kad utils', () => { const key = utils.bufferToKey(buf) - const enc = new base32.Encoder() expect(key.toString()) - .to.equal('/' + enc.write(buf).finalize()) + .to.equal('/' + uint8ArrayToString(buf, 'base32')) }) }) diff --git a/test/query/index.spec.js b/test/query/index.spec.js index 396ba196..80b8a473 100644 --- a/test/query/index.spec.js +++ b/test/query/index.spec.js @@ -46,7 +46,7 @@ describe('Query', () => { }) before('create a dht', () => { - const peerStore = new PeerStore() + const peerStore = new PeerStore({ peerId: ourPeerId }) dht = new DHT({ dialer: {}, peerStore, diff --git a/test/rpc/handlers/add-provider.spec.js b/test/rpc/handlers/add-provider.spec.js index 3acb9739..4bfbca13 100644 --- a/test/rpc/handlers/add-provider.spec.js +++ b/test/rpc/handlers/add-provider.spec.js @@ -46,7 +46,7 @@ describe('rpc - handlers - AddProvider', () => { error: 'ERR_INVALID_CID' }] - await Promise.all(tests.map((t) => { + tests.forEach((t) => { it(t.error.toString(), async () => { try { await handler(dht)(peerIds[0], t.message) @@ -57,7 +57,7 @@ describe('rpc - handlers - AddProvider', () => { } throw new Error() }) - })) + }) }) it('ignore providers that do not match the sender', async () => { diff --git a/test/simulation/index.js b/test/simulation/index.js index 3201fee1..2f37e5ec 100644 --- a/test/simulation/index.js +++ b/test/simulation/index.js @@ -45,7 +45,7 @@ let topIds // Closest 20 peerIds in the network console.log('Total Nodes=%d, Dead Nodes=%d, Max Siblings per Peer=%d', NUM_PEERS, NUM_DEAD_NODES, MAX_PEERS_KNOWN) console.log('Starting %d runs with concurrency %d...', RUNS, ALPHA) const topRunIds = [] - for (var i = 0; i < RUNS; i++) { + for (let i = 0; i < RUNS; i++) { const { closestPeers, runTime } = await GetClosestPeersSimulation() const foundIds = closestPeers.map(peerId => peerId.toB58String()) const intersection = foundIds.filter(topIdFilter) @@ -132,7 +132,8 @@ async function GetClosestPeersSimulation () { /** * Create `num` PeerIds - * @param {integer} num How many peers to create + * + * @param {integer} num - How many peers to create * @returns {Array} */ function createPeers (num) { @@ -148,6 +149,7 @@ function createPeers (num) { /** * Creates a mock network + * * @param {Array} peers * @returns {Network} */ @@ -182,6 +184,7 @@ async function MockNetwork (peers) { /** * Returns a random integer between `min` and `max` + * * @param {number} min * @param {number} max * @returns {int} @@ -192,8 +195,9 @@ function randomInteger (min, max) { /** * Return a unique array of random `num` members from `list` - * @param {Array} list array to pull random members from - * @param {number} num number of random members to get + * + * @param {Array} list - array to pull random members from + * @param {number} num - number of random members to get * @returns {Array} */ function randomMembers (list, num) { @@ -213,7 +217,8 @@ function randomMembers (list, num) { /** * Finds the common members of all arrays - * @param {Array} arrays An array of arrays to find common members + * + * @param {Array} arrays - An array of arrays to find common members * @returns {Array} */ function getCommonMembers (arrays) { diff --git a/test/utils/create-peer-id.js b/test/utils/create-peer-id.js index 6057d4d6..2e095525 100644 --- a/test/utils/create-peer-id.js +++ b/test/utils/create-peer-id.js @@ -4,7 +4,8 @@ const PeerId = require('peer-id') /** * Creates multiple PeerIds - * @param {number} length The number of `PeerId` to create + * + * @param {number} length - The number of `PeerId` to create * @returns {Promise>} */ function createPeerId (length) { diff --git a/test/utils/test-dht.js b/test/utils/test-dht.js index 6b308a9d..04df038d 100644 --- a/test/utils/test-dht.js +++ b/test/utils/test-dht.js @@ -27,8 +27,10 @@ class TestDHT { } async _spawnOne (index, options = {}, autoStart = true) { + const [peerId] = await createPeerId(1) + const regRecord = {} - const peerStore = new PeerStore() + const peerStore = new PeerStore({ peerId }) // Disable random walk by default for more controlled testing options = { @@ -39,8 +41,6 @@ class TestDHT { ...options } - const [peerId] = await createPeerId(1) - const connectToPeer = (localDHT, peer) => { const remotePeerB58 = peer.toB58String() const remoteDht = this.nodes.find( @@ -156,7 +156,7 @@ class TestDHT { // Check routing tables return Promise.all(routingTableChecks.map(check => { - pRetry(check, { retries: 50 }) + return pRetry(check, { retries: 50 }) })) } diff --git a/test/utils/to-buffer.js b/test/utils/to-buffer.js index 7b97e1eb..f89bd88b 100644 --- a/test/utils/to-buffer.js +++ b/test/utils/to-buffer.js @@ -2,6 +2,7 @@ /** * Converts BufferList messages to Uint8Arrays + * * @param {*} source * @returns {AsyncGenerator} */ diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 00000000..3de2a3b0 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src" + ] +}