diff --git a/package.json b/package.json index 3ba8e14db1..e57784ef2e 100644 --- a/package.json +++ b/package.json @@ -120,7 +120,6 @@ "is-stream": "^2.0.0", "iso-url": "~0.4.6", "joi": "^14.3.1", - "just-flatten-it": "^2.1.0", "just-safe-set": "^2.1.0", "kind-of": "^6.0.2", "libp2p": "~0.25.4", diff --git a/src/core/components/dag.js b/src/core/components/dag.js index 014d0f3c9b..0f2f90a563 100644 --- a/src/core/components/dag.js +++ b/src/core/components/dag.js @@ -4,9 +4,7 @@ const promisify = require('promisify-es6') const CID = require('cids') const pull = require('pull-stream') const iterToPull = require('async-iterator-to-pull-stream') -const mapAsync = require('async/map') const setImmediate = require('async/setImmediate') -const flattenDeep = require('just-flatten-it') const errCode = require('err-code') const multicodec = require('multicodec') @@ -180,38 +178,6 @@ module.exports = function dag (self) { iterToPull(self._ipld.tree(cid, path, options)), pull.collect(callback) ) - }), - - // TODO - use IPLD selectors once they are implemented - _getRecursive: promisify((multihash, options, callback) => { - // gets flat array of all DAGNodes in tree given by multihash - - if (typeof options === 'function') { - callback = options - options = {} - } - - options = options || {} - - let cid - - try { - cid = new CID(multihash) - } catch (err) { - return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID'))) - } - - self.dag.get(cid, '', options, (err, res) => { - if (err) { return callback(err) } - - mapAsync(res.value.Links, (link, cb) => { - self.dag._getRecursive(link.Hash, options, cb) - }, (err, nodes) => { - // console.log('nodes:', nodes) - if (err) return callback(err) - callback(null, flattenDeep([res.value, nodes])) - }) - }) }) } } diff --git a/src/core/components/pin.js b/src/core/components/pin.js index 4c4edf0fb4..eaeeef5368 100644 --- a/src/core/components/pin.js +++ b/src/core/components/pin.js @@ -48,7 +48,7 @@ module.exports = (self) => { // entire graph of nested links should be pinned, // so make sure we have all the objects - dag._getRecursive(key, { preload: options.preload }, (err) => { + pinManager.fetchCompleteDag(key, { preload: options.preload }, (err) => { if (err) { return cb(err) } // found all objects, we can add the pin return cb(null, key) @@ -242,7 +242,7 @@ module.exports = (self) => { ) } if (type === PinTypes.indirect || type === PinTypes.all) { - pinManager.getIndirectKeys((err, indirects) => { + pinManager.getIndirectKeys(options, (err, indirects) => { if (err) { return callback(err) } pins = pins // if something is pinned both directly and indirectly, diff --git a/src/core/components/pin/pin-manager.js b/src/core/components/pin/pin-manager.js index 9bfd702b08..5b228d9008 100644 --- a/src/core/components/pin/pin-manager.js +++ b/src/core/components/pin/pin-manager.js @@ -1,14 +1,14 @@ /* eslint max-nested-callbacks: ["error", 8] */ 'use strict' -const { DAGNode, DAGLink, util } = require('ipld-dag-pb') +const { DAGNode, DAGLink } = require('ipld-dag-pb') const CID = require('cids') -const map = require('async/map') const series = require('async/series') const parallel = require('async/parallel') const eachLimit = require('async/eachLimit') const waterfall = require('async/waterfall') const detectLimit = require('async/detectLimit') +const queue = require('async/queue') const { Key } = require('interface-datastore') const errCode = require('err-code') const multicodec = require('multicodec') @@ -43,6 +43,34 @@ class PinManager { this.recursivePins = new Set() } + _walkDag ({ cid, preload = false, onCid = () => {} }, cb) { + const q = queue(function ({ cid }, done) { + this.dag.get(cid, { preload }, function (err, result) { + if (err) { + return done(err) + } + + onCid(cid) + + if (result.value.Links) { + q.push(result.value.Links.map(link => ({ + cid: link.Hash + }))) + } + + done() + }) + }, concurrencyLimit) + q.drain = () => { + cb() + } + q.error = (err) => { + q.kill() + cb(err) + } + q.push({ cid }) + } + directKeys () { return Array.from(this.directPins, key => new CID(key).buffer) } @@ -51,30 +79,21 @@ class PinManager { return Array.from(this.recursivePins, key => new CID(key).buffer) } - getIndirectKeys (callback) { + getIndirectKeys ({ preload }, callback) { const indirectKeys = new Set() eachLimit(this.recursiveKeys(), concurrencyLimit, (multihash, cb) => { - this.dag._getRecursive(multihash, (err, nodes) => { - if (err) { - return cb(err) - } - - map(nodes, (node, cb) => util.cid(util.serialize(node), { - cidVersion: 0 - }).then(cid => cb(null, cid), cb), (err, cids) => { - if (err) { - return cb(err) + this._walkDag({ + cid: new CID(multihash), + preload: preload || false, + onCid: (cid) => { + cid = cid.toString() + + // recursive pins pre-empt indirect pins + if (!this.recursivePins.has(cid)) { + indirectKeys.add(cid) } - - cids - .map(cid => cid.toString()) - // recursive pins pre-empt indirect pins - .filter(key => !this.recursivePins.has(key)) - .forEach(key => indirectKeys.add(key)) - - cb() - }) - }) + } + }, cb) }, (err) => { if (err) { return callback(err) } callback(null, Array.from(indirectKeys)) @@ -283,6 +302,13 @@ class PinManager { }) } + fetchCompleteDag (cid, options, callback) { + this._walkDag({ + cid, + preload: options.preload + }, callback) + } + // Returns an error if the pin type is invalid static checkPinType (type) { if (typeof type !== 'string' || !Object.keys(PinTypes).includes(type)) {