Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix: Do not load all of a DAG into memory when pinning
Browse files Browse the repository at this point in the history
Port of #2372 into gc branch to ease merging
  • Loading branch information
achingbrain committed Aug 22, 2019
1 parent 51febd3 commit 2761724
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 60 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@
"is-stream": "^2.0.0",
"iso-url": "~0.4.6",
"joi": "^14.3.1",
"just-flatten-it": "^2.1.0",
"just-safe-set": "^2.1.0",
"kind-of": "^6.0.2",
"libp2p": "~0.25.4",
Expand Down
34 changes: 0 additions & 34 deletions src/core/components/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ const promisify = require('promisify-es6')
const CID = require('cids')
const pull = require('pull-stream')
const iterToPull = require('async-iterator-to-pull-stream')
const mapAsync = require('async/map')
const setImmediate = require('async/setImmediate')
const flattenDeep = require('just-flatten-it')
const errCode = require('err-code')
const multicodec = require('multicodec')

Expand Down Expand Up @@ -180,38 +178,6 @@ module.exports = function dag (self) {
iterToPull(self._ipld.tree(cid, path, options)),
pull.collect(callback)
)
}),

// TODO - use IPLD selectors once they are implemented
_getRecursive: promisify((multihash, options, callback) => {
// gets flat array of all DAGNodes in tree given by multihash

if (typeof options === 'function') {
callback = options
options = {}
}

options = options || {}

let cid

try {
cid = new CID(multihash)
} catch (err) {
return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID')))
}

self.dag.get(cid, '', options, (err, res) => {
if (err) { return callback(err) }

mapAsync(res.value.Links, (link, cb) => {
self.dag._getRecursive(link.Hash, options, cb)
}, (err, nodes) => {
// console.log('nodes:', nodes)
if (err) return callback(err)
callback(null, flattenDeep([res.value, nodes]))
})
})
})
}
}
4 changes: 2 additions & 2 deletions src/core/components/pin.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ module.exports = (self) => {

// entire graph of nested links should be pinned,
// so make sure we have all the objects
dag._getRecursive(key, { preload: options.preload }, (err) => {
pinManager.fetchCompleteDag(key, { preload: options.preload }, (err) => {
if (err) { return cb(err) }
// found all objects, we can add the pin
return cb(null, key)
Expand Down Expand Up @@ -242,7 +242,7 @@ module.exports = (self) => {
)
}
if (type === PinTypes.indirect || type === PinTypes.all) {
pinManager.getIndirectKeys((err, indirects) => {
pinManager.getIndirectKeys(options, (err, indirects) => {
if (err) { return callback(err) }
pins = pins
// if something is pinned both directly and indirectly,
Expand Down
72 changes: 49 additions & 23 deletions src/core/components/pin/pin-manager.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/* eslint max-nested-callbacks: ["error", 8] */
'use strict'

const { DAGNode, DAGLink, util } = require('ipld-dag-pb')
const { DAGNode, DAGLink } = require('ipld-dag-pb')
const CID = require('cids')
const map = require('async/map')
const series = require('async/series')
const parallel = require('async/parallel')
const eachLimit = require('async/eachLimit')
const waterfall = require('async/waterfall')
const detectLimit = require('async/detectLimit')
const queue = require('async/queue')
const { Key } = require('interface-datastore')
const errCode = require('err-code')
const multicodec = require('multicodec')
Expand Down Expand Up @@ -43,6 +43,34 @@ class PinManager {
this.recursivePins = new Set()
}

_walkDag ({ cid, preload = false, onCid = () => {} }, cb) {
const q = queue(function ({ cid }, done) {
this.dag.get(cid, { preload }, function (err, result) {
if (err) {
return done(err)
}

onCid(cid)

if (result.value.Links) {
q.push(result.value.Links.map(link => ({
cid: link.Hash
})))
}

done()
})
}, concurrencyLimit)
q.drain = () => {
cb()
}
q.error = (err) => {
q.kill()
cb(err)
}
q.push({ cid })
}

directKeys () {
return Array.from(this.directPins, key => new CID(key).buffer)
}
Expand All @@ -51,30 +79,21 @@ class PinManager {
return Array.from(this.recursivePins, key => new CID(key).buffer)
}

getIndirectKeys (callback) {
getIndirectKeys ({ preload }, callback) {
const indirectKeys = new Set()
eachLimit(this.recursiveKeys(), concurrencyLimit, (multihash, cb) => {
this.dag._getRecursive(multihash, (err, nodes) => {
if (err) {
return cb(err)
}

map(nodes, (node, cb) => util.cid(util.serialize(node), {
cidVersion: 0
}).then(cid => cb(null, cid), cb), (err, cids) => {
if (err) {
return cb(err)
this._walkDag({
cid: new CID(multihash),
preload: preload || false,
onCid: (cid) => {
cid = cid.toString()

// recursive pins pre-empt indirect pins
if (!this.recursivePins.has(cid)) {
indirectKeys.add(cid)
}

cids
.map(cid => cid.toString())
// recursive pins pre-empt indirect pins
.filter(key => !this.recursivePins.has(key))
.forEach(key => indirectKeys.add(key))

cb()
})
})
}
}, cb)
}, (err) => {
if (err) { return callback(err) }
callback(null, Array.from(indirectKeys))
Expand Down Expand Up @@ -283,6 +302,13 @@ class PinManager {
})
}

fetchCompleteDag (cid, options, callback) {
this._walkDag({
cid,
preload: options.preload
}, callback)
}

// Returns an error if the pin type is invalid
static checkPinType (type) {
if (typeof type !== 'string' || !Object.keys(PinTypes).includes(type)) {
Expand Down

0 comments on commit 2761724

Please sign in to comment.