Skip to content

Commit

Permalink
feat: refactor to use async/await (#7)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: API refactored to use async/await
  • Loading branch information
achingbrain authored and jacobheun committed Jul 12, 2019
1 parent eccffe5 commit 0e2d91f
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 177 deletions.
6 changes: 4 additions & 2 deletions .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ const server = createServer()

module.exports = {
hooks: {
pre: server.start.bind(server),
post: server.stop.bind(server)
browser: {
pre: () => server.start(),
post: () => server.stop()
}
}
}
13 changes: 6 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@
},
"devDependencies": {
"aegir": "^19.0.5",
"async-iterator-all": "^1.0.0",
"chai": "^4.2.0",
"cids": "^0.7.1",
"go-ipfs-dep": "^0.4.21",
"ipfsd-ctl": "^0.43.0"
"go-ipfs-dep": "~0.4.17",
"ipfsd-ctl": "~0.44.1",
"peer-id": "~0.13.1"
},
"dependencies": {
"async": "^2.6.2",
"ipfs-http-client": "^33.0.2",
"multiaddr": "^6.1.0",
"peer-id": "^0.12.2",
"peer-info": "^0.15.1"
"ipfs-http-client": "^33.1.0",
"multiaddr": "^6.1.0"
},
"contributors": [
"Alan Shaw <[email protected]>",
Expand Down
64 changes: 25 additions & 39 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ const dht = require('ipfs-http-client/src/dht')
const swarm = require('ipfs-http-client/src/swarm')
const refs = require('ipfs-http-client/src/files-regular/refs')
const defaultConfig = require('ipfs-http-client/src/utils/default-config')
const series = require('async/series')
const parallel = require('async/parallel')
const reflect = require('async/reflect')
const multiaddr = require('multiaddr')

const DEFAULT_MAX_TIMEOUT = 30e3 // 30 second default
Expand Down Expand Up @@ -60,26 +57,18 @@ class DelegatedContentRouting {
* @param {CID} key
* @param {object} options
* @param {number} options.maxTimeout How long the query can take. Defaults to 30 seconds
* @param {function(Error, Array<PeerInfo>)} callback
* @returns {void}
* @returns {AsyncIterable<PeerInfo>}
*/
findProviders (key, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
} else if (typeof options === 'number') { // This will be deprecated in a next release
options = {
maxTimeout: options
}
} else {
options = options || {}
}

async * findProviders (key, options = {}) {
options.maxTimeout = options.maxTimeout || DEFAULT_MAX_TIMEOUT

this.dht.findProvs(key.toString(), {
const results = await this.dht.findProvs(key, {
timeout: `${options.maxTimeout}ms` // The api requires specification of the time unit (s/ms)
}, callback)
})

for (let i = 0; i < results.length; i++) {
yield results[i]
}
}

/**
Expand All @@ -91,32 +80,29 @@ class DelegatedContentRouting {
*
* @param {CID} key
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
provide (key, callback) {
async provide (key) {
const addrs = this.bootstrappers.map((addr) => {
return addr.encapsulate(`/p2p-circuit/ipfs/${this.peerId.toB58String()}`)
})

series([
(cb) => parallel(addrs.map((addr) => {
return reflect((cb) => this.swarm.connect(addr.toString(), cb))
}), (err, results) => {
if (err) {
return cb(err)
}
const results = await Promise.all(
addrs.map((addr) => {
return this.swarm.connect(addr.toString()).catch(() => {})
})
)

// only some need to succeed
const success = results.filter((res) => res.error == null)
if (success.length === 0) {
return cb(new Error('unable to swarm.connect using p2p-circuit'))
}
cb()
}),
(cb) => {
this.refs(key.toString(), { recursive: true }, cb)
}
], (err) => callback(err))
// only some need to succeed
const success = results.filter((res) => res && res.error == null)

if (success.length === 0) {
throw new Error('unable to swarm.connect using p2p-circuit')
}

this.refs(key.toBaseEncodedString(), {
recursive: true
})
}
}

Expand Down
207 changes: 78 additions & 129 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,17 @@

const expect = require('chai').expect
const IPFSFactory = require('ipfsd-ctl')
const parallel = require('async/parallel')
const waterfall = require('async/waterfall')
const CID = require('cids')
const PeerId = require('peer-id')

const factory = IPFSFactory.create({ type: 'go' })
const all = require('async-iterator-all')
const factory = IPFSFactory.create({
type: 'go'
})

const DelegatedContentRouting = require('../src')

function spawnNode (bootstrap, callback) {
if (typeof bootstrap === 'function') {
callback = bootstrap
bootstrap = []
}

factory.spawn({
async function spawnNode (bootstrap = []) {
const node = await factory.spawn({
// Lock down the nodes so testing can be deterministic
config: {
Bootstrap: bootstrap,
Expand All @@ -28,59 +23,47 @@ function spawnNode (bootstrap, callback) {
}
}
}
}, (err, node) => {
if (err) return callback(err)
})

node.api.id((err, id) => {
if (err) return callback(err)
const id = await node.api.id()

callback(null, node, id)
})
})
return {
node,
id
}
}

describe('DelegatedContentRouting', function () {
this.timeout(20 * 1000) // we're spawning daemons, give ci some time

let selfNode
let selfId
let delegatedNode
let delegatedId
let delegateNode
let bootstrapNode
let bootstrapId

before((done) => {
waterfall([
// Spawn a "bootstrap" node that doesnt connect to anything
(cb) => spawnNode(cb),
(ipfsd, id, cb) => {
bootstrapNode = ipfsd
bootstrapId = id
cb()
},
// Spawn our local node and bootstrap the bootstrapper node
(cb) => spawnNode(bootstrapId.addresses, cb),
(ipfsd, id, cb) => {
selfNode = ipfsd
selfId = PeerId.createFromB58String(id.id)
cb()
},
// Spawn the delegate node and bootstrap the bootstrapper node
(cb) => spawnNode(bootstrapId.addresses, cb),
(ipfsd, id, cb) => {
delegatedNode = ipfsd
delegatedId = PeerId.createFromB58String(id.id)
cb()
}
], done)
before(async () => {
// Spawn a "Boostrap" node that doesnt connect to anything
const bootstrap = await spawnNode()
bootstrapNode = bootstrap.node
bootstrapId = bootstrap.id

// Spawn our local node and bootstrap the bootstrapper node
const self = await spawnNode(bootstrapId.addresses)
selfNode = self.node
selfId = PeerId.createFromB58String(self.id.id)

// Spawn the delegate node and bootstrap the bootstrapper node
const delegate = await spawnNode(bootstrapId.addresses)
delegateNode = delegate.node
})

after((done) => {
parallel([
(cb) => selfNode.stop(cb),
(cb) => delegatedNode.stop(cb),
(cb) => bootstrapNode.stop(cb)
], done)
after(() => {
return Promise.all([
selfNode.stop(),
delegateNode.stop(),
bootstrapNode.stop()
])
})

describe('create', () => {
Expand Down Expand Up @@ -127,96 +110,62 @@ describe('DelegatedContentRouting', function () {

describe('findProviders', () => {
const cid = new CID('QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv')
before('register providers', (done) => {
parallel([
(cb) => bootstrapNode.api.dht.provide(cid, cb),
(cb) => selfNode.api.dht.provide(cid, cb)
], done)

before('register providers', async () => {
await bootstrapNode.api.dht.provide(cid)
await selfNode.api.dht.provide(cid)
})

it('should be able to find providers through the delegate node', function (done) {
waterfall([
(cb) => {
const opts = delegatedNode.apiAddr.toOptions()
const routing = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})
routing.findProviders(cid, cb)
},
(providers, cb) => {
// We should get our local node and the bootstrap node as providers.
// The delegate node is not included, because it is handling the requests
expect(providers).to.have.length(2)
expect(providers.map((p) => p.id.toB58String())).to.have.members([
bootstrapId.id,
selfId.toB58String()
])
cb()
}
], done)
it('should be able to find providers through the delegate node', async () => {
const opts = delegateNode.apiAddr.toOptions()
const routing = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})

const providers = await all(routing.findProviders(cid))

// We should get the bootstrap node as provider
// The delegate node is not included, because it is handling the requests
expect(providers.map((p) => p.id.toB58String())).to.include(bootstrapId.id, 'Did not include bootstrap node')
expect(providers.map((p) => p.id.toB58String())).to.include(selfId.toB58String(), 'Did not include self node')
})

it('should be able to specify a maxTimeout', function (done) {
waterfall([
(cb) => {
const opts = delegatedNode.apiAddr.toOptions()
const routing = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})
const cid = new CID('QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv')
routing.findProviders(cid, { maxTimeout: 5e3 }, cb)
},
(providers, cb) => {
// We should get our local node and the bootstrap node as providers.
// The delegate node is not included, because it is handling the requests
expect(providers).to.have.length(2)
expect(providers.map((p) => p.id.toB58String())).to.have.members([
bootstrapId.id,
selfId.toB58String()
])
cb()
}
], done)
it('should be able to specify a maxTimeout', async () => {
const opts = delegateNode.apiAddr.toOptions()
const routing = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})

const cid = new CID('QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv')
const providers = await all(routing.findProviders(cid, { maxTimeout: 5e3 }))

expect(providers.map((p) => p.id.toB58String())).to.include(bootstrapId.id, 'Did not include bootstrap node')
})
})

describe('provide', () => {
it('should be able to register as a content provider to the delegate node', function (done) {
it('should be able to register as a content provider to the delegate node', async () => {
let contentRouter
let cid

waterfall([
(cb) => {
const opts = delegatedNode.apiAddr.toOptions()
contentRouter = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})

selfNode.api.add(Buffer.from(`hello-${Math.random()}`), cb)
},
(res, cb) => {
cid = new CID(res[0].hash)
contentRouter.provide(cid, cb)
},
(cb) => {
delegatedNode.api.dht.findProvs(cid, cb)
},
(providers, cb) => {
const providerIds = providers.map(p => p.id.toB58String())
// The delegate should be a provider
expect(providerIds).to.have.members([
selfId.toB58String(),
delegatedId.toB58String()
])
cb()
}
], done)
const opts = delegateNode.apiAddr.toOptions()
contentRouter = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})

const res = await selfNode.api.add(Buffer.from(`hello-${Math.random()}`))
cid = new CID(res[0].hash)
await contentRouter.provide(cid)
const providers = await delegateNode.api.dht.findProvs(cid.toBaseEncodedString())

// We are hosting the file, validate we're the provider
expect(providers.map((p) => p.id.toB58String())).to.include(selfId.toB58String(), 'Did not include self node')
})
})
})

0 comments on commit 0e2d91f

Please sign in to comment.