Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: add types and update all deps
Browse files Browse the repository at this point in the history
Adds types to source code, tests can be done later.

Updates all deps.

Depends on:

- [ ] ipfs/interface-datastore#69
  • Loading branch information
achingbrain committed Feb 3, 2021
1 parent 6c85b6b commit 480d3e8
Show file tree
Hide file tree
Showing 36 changed files with 780 additions and 457 deletions.
27 changes: 15 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"lint": "aegir lint",
"test": "aegir test -t node",
"test:node": "aegir test -t node",
"build": "aegir build",
"prepare": "aegir build --no-bundle",
"docs": "aegir docs",
"release": "aegir release --docs -t node",
"release-minor": "aegir release --type minor --docs -t node",
Expand Down Expand Up @@ -39,25 +39,27 @@
"node": ">=12.0.0",
"npm": ">=6.0.0"
},
"eslintConfig": {
"extends": "ipfs"
},
"homepage": "https://github.com/libp2p/js-libp2p-kad-dht",
"dependencies": {
"abort-controller": "^3.0.0",
"async": "^2.6.2",
"base32.js": "~0.1.0",
"cids": "^1.1.5",
"debug": "^4.3.1",
"err-code": "^2.0.3",
"err-code": "^3.0.0",
"hashlru": "^2.3.0",
"heap": "~0.2.6",
"interface-datastore": "^3.0.3",
"interface-datastore": "ipfs/interface-datastore#fix/remove-ts-file",
"it-first": "^1.0.4",
"it-length-prefixed": "^3.1.0",
"it-pipe": "^1.1.0",
"k-bucket": "^5.0.0",
"libp2p-crypto": "^0.19.0",
"libp2p-interfaces": "^0.8.2",
"libp2p-record": "^0.9.0",
"libp2p-record": "^0.10.0",
"multiaddr": "^8.1.2",
"multihashing-async": "^2.0.1",
"multihashing-async": "^2.1.0",
"p-filter": "^2.1.0",
"p-map": "^4.0.0",
"p-queue": "^6.6.2",
Expand All @@ -68,19 +70,20 @@
"protons": "^2.0.0",
"streaming-iterables": "^5.0.4",
"uint8arrays": "^2.0.5",
"varint": "^5.0.0",
"varint": "^6.0.0",
"xor-distance": "^2.0.0"
},
"devDependencies": {
"aegir": "^25.0.0",
"@types/debug": "^4.1.5",
"aegir": "^30.3.0",
"async-iterator-all": "^1.0.0",
"chai": "^4.2.0",
"chai-checkmark": "^1.0.1",
"datastore-level": "^2.0.0",
"delay": "^4.3.0",
"datastore-level": "^4.0.0",
"delay": "^5.0.0",
"dirty-chai": "^2.0.1",
"it-pair": "^1.0.0",
"libp2p": "^0.28.5",
"libp2p": "^0.30.7",
"lodash": "^4.17.11",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
Expand Down
117 changes: 69 additions & 48 deletions src/content-fetching/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,26 @@
const errcode = require('err-code')
const pTimeout = require('p-timeout')
const uint8ArrayEquals = require('uint8arrays/equals')
const uint8ArrayToString = require('uint8arrays/to-string')
const libp2pRecord = require('libp2p-record')

const c = require('../constants')
const Query = require('../query')

const utils = require('../utils')

const Record = libp2pRecord.Record

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../query').DHTQueryResult} DHTQueryResult
*/

/**
* @param {import('../')} dht
*/
module.exports = (dht) => {
/**
* @param {Uint8Array} key
* @param {Uint8Array} rec
*/
const putLocal = async (key, rec) => { // eslint-disable-line require-await
return dht.datastore.put(utils.bufferToKey(key), rec)
}
Expand All @@ -22,30 +32,26 @@ module.exports = (dht) => {
* the local datastore.
*
* @param {Uint8Array} key
* @returns {Promise<Record>}
*
* @private
*/
const getLocal = async (key) => {
dht._log('getLocal %b', key)
dht._log(`getLocal ${uint8ArrayToString(key, 'base32')}`)

const raw = await dht.datastore.get(utils.bufferToKey(key))
dht._log('found %b in local datastore', key)
dht._log(`found ${uint8ArrayToString(key, 'base32')} in local datastore`)

const rec = Record.deserialize(raw)

await dht._verifyRecordLocally(rec)

return rec
}

/**
* Send the best record found to any peers that have an out of date record.
*
* @param {Uint8Array} key
* @param {Array<Object>} vals - values retrieved from the DHT
* @param {Object} best - the best record that was found
* @returns {Promise}
*
* @private
* @param {import('../query').DHTQueryValue[]} vals - values retrieved from the DHT
* @param {Uint8Array} best - the best record that was found
*/
const sendCorrectionRecord = async (key, vals, best) => {
const fixupRec = await utils.createPutRecord(key, best)
Expand Down Expand Up @@ -78,10 +84,9 @@ module.exports = (dht) => {
return {
/**
* Store the given key/value pair locally, in the datastore.
*
* @param {Uint8Array} key
* @param {Uint8Array} rec - encoded record
* @returns {Promise<void>}
* @private
*/
async _putLocal (key, rec) { // eslint-disable-line require-await
return putLocal(key, rec)
Expand All @@ -92,9 +97,8 @@ module.exports = (dht) => {
*
* @param {Uint8Array} key
* @param {Uint8Array} value
* @param {Object} [options] - put options
* @param {object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length)
* @returns {Promise<void>}
*/
async put (key, value, options = {}) {
dht._log('PutValue %b', key)
Expand Down Expand Up @@ -134,9 +138,8 @@ module.exports = (dht) => {
* Times out after 1 minute by default.
*
* @param {Uint8Array} key
* @param {Object} [options] - get options
* @param {object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Uint8Array>}
*/
async get (key, options = {}) {
options.timeout = options.timeout || c.minute
Expand Down Expand Up @@ -173,16 +176,15 @@ module.exports = (dht) => {
*
* @param {Uint8Array} key
* @param {number} nvals
* @param {Object} [options] - get options
* @param {object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Array<{from: PeerId, val: Uint8Array}>>}
*/
async getMany (key, nvals, options = {}) {
options.timeout = options.timeout || c.minute

dht._log('getMany %b (%s)', key, nvals)

let vals = []
const vals = []
let localRec

try {
Expand All @@ -204,9 +206,8 @@ module.exports = (dht) => {
return vals
}

const paths = []
const id = await utils.convertBuffer(key)
const rtp = dht.routingTable.closestPeers(id, this.kBucketSize)
const rtp = dht.routingTable.closestPeers(id, dht.kBucketSize)

dht._log('peers in rt: %d', rtp.length)

Expand All @@ -220,15 +221,23 @@ module.exports = (dht) => {
return vals
}

// we have peers, lets do the actual query to them
const query = new Query(dht, key, (pathIndex, numPaths) => {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(nvals - vals.length, numPaths)
const pathVals = []
paths.push(pathVals)
const valsLength = vals.length

// Here we return the query function to use on this particular disjoint path
return async (peer) => {
/**
* @param {number} pathIndex
* @param {number} numPaths
*/
function createQuery (pathIndex, numPaths) {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(nvals - valsLength, numPaths)
let queryResults = 0

/**
* Here we return the query function to use on this particular disjoint path
*
* @param {PeerId} peer
*/
async function disjointPathQuery (peer) {
let rec, peers, lookupErr
try {
const results = await dht._getValueOrPeers(peer, key)
Expand All @@ -242,37 +251,49 @@ module.exports = (dht) => {
lookupErr = err
}

const res = { closerPeers: peers }
/** @type {import('../query').QueryResult} */
const res = {
closerPeers: peers
}

if (rec && rec.value) {
vals.push({
val: rec.value,
from: peer
})

if ((rec && rec.value) || lookupErr) {
pathVals.push({
val: rec && rec.value,
queryResults++
} else if (lookupErr) {
vals.push({
err: lookupErr,
from: peer
})

queryResults++
}

// enough is enough
if (pathVals.length >= pathSize) {
if (queryResults >= pathSize) {
res.pathComplete = true
}

return res
}
})

let error
try {
await pTimeout(query.run(rtp), options.timeout)
} catch (err) {
error = err
return disjointPathQuery
}
query.stop()

// combine vals from each path
vals = [].concat.apply(vals, paths).slice(0, nvals)
// we have peers, lets send the actual query to them
const query = new Query(dht, key, createQuery)

if (error && vals.length === 0) {
throw error
try {
await pTimeout(query.run(rtp), options.timeout)
} catch (err) {
if (vals.length === 0) {
throw err
}
} finally {
query.stop()
}

return vals
Expand Down
Loading

0 comments on commit 480d3e8

Please sign in to comment.