Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
fix: handle sub-sub shards properly
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: achingbrain <[email protected]>
  • Loading branch information
achingbrain committed Dec 1, 2018
1 parent b2fbd5d commit 9302f01
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 186 deletions.
233 changes: 143 additions & 90 deletions src/core/utils/add-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const series = require('async/series')
const log = require('debug')('ipfs:mfs:core:utils:add-link')
const UnixFS = require('ipfs-unixfs')
const Bucket = require('hamt-sharding/src/bucket')
const loadNode = require('./load-node')
const whilst = require('async/whilst')

const defaultOptions = {
parent: undefined,
Expand Down Expand Up @@ -133,52 +133,26 @@ const addToDirectory = (context, options, callback) => {

const addToShardedDirectory = (context, options, callback) => {
return waterfall([
(cb) => recreateHamtLevel(options.parent.links, cb),
(rootBucket, cb) => findPosition(options.name, rootBucket, (err, position) => cb(err, { rootBucket, position })),
({ rootBucket, position }, cb) => {
// the path to the root bucket
let path = [{
position: position.pos,
bucket: position.bucket
}]
let currentBucket = position.bucket

while (currentBucket !== rootBucket) {
path.push({
bucket: currentBucket,
position: currentBucket._posAtParent
})

currentBucket = currentBucket._parent
}

cb(null, {
rootBucket,
path
})
(cb) => generatePath(context, options.name, options.parent, cb),
({ rootBucket, path }, cb) => {
updateShard(context, path, {
name: options.name,
cid: options.cid,
size: options.size
}, options, (err, result = {}) => cb(err, { rootBucket, ...result }))
},
({ rootBucket, path }, cb) => updateShard(context, options.parent, rootBucket, path, {
name: options.name,
cid: options.cid,
size: options.size
}, options, (err, results = {}) => cb(err, { rootBucket, node: results.node })),
({ rootBucket, node }, cb) => updateHamtDirectory(context, node.links, rootBucket, options, cb)
], callback)
}

const updateShard = (context, parent, rootBucket, positions, child, options, callback) => {
const updateShard = (context, positions, child, options, callback) => {
const {
bucket,
position
prefix,
node
} = positions.pop()

const prefix = position
.toString('16')
.toUpperCase()
.padStart(2, '0')
.substring(0, 2)

const link = parent.links
const link = node.links
.find(link => link.name.substring(0, 2) === prefix && link.name !== `${prefix}${child.name}`)

return waterfall([
Expand All @@ -201,44 +175,21 @@ const updateShard = (context, parent, rootBucket, positions, child, options, cal
done(err, { cid: shard.cid, node: result && result.value })
})
},
({ cid, node }, cb) => updateShardParent(context, bucket, parent, link.name, node, cid, prefix, options, cb)
(result, cb) => updateShardParent(context, bucket, node, link.name, result.node, result.cid, prefix, options, cb)
], cb)
}

if (link && link.name.length === 2) {
log(`Descending into sub-shard`, child.name)
log(`Descending into sub-shard ${link.name} for ${child.name}`)

return waterfall([
(cb) => loadNode(context, link, cb),
({ node }, cb) => {
Promise.all(
node.links.map(link => {
if (link.name.length === 2) {
// add a bucket for the subshard of this subshard
const pos = parseInt(link.name, 16)

bucket._putObjectAt(pos, new Bucket({
hashFn: DirSharded.hashFn
}, bucket, pos))

return Promise.resolve()
}

// add to the root and let changes cascade down
return rootBucket.put(link.name.substring(2), true)
})
)
.then(() => cb(null, { node }))
.catch(error => cb(error))
},
({ node }, cb) => updateShard(context, node, bucket, positions, child, options, cb),
({ cid, node }, cb) => updateShardParent(context, bucket, parent, link.name, node, cid, prefix, options, cb)
(cb) => updateShard(context, positions, child, options, cb),
(result, cb) => updateShardParent(context, bucket, node, link.name, result.node, result.cid, prefix, options, cb)
], cb)
}

log(`Adding or replacing file`, prefix + child.name)

updateShardParent(context, bucket, parent, prefix + child.name, child, child.cid, prefix + child.name, options, cb)
updateShardParent(context, bucket, node, prefix + child.name, child, child.cid, prefix + child.name, options, cb)
}
], callback)
}
Expand Down Expand Up @@ -279,20 +230,7 @@ const createShard = (context, contents, options, callback) => {

const updateShardParent = (context, bucket, parent, name, node, cid, prefix, options, callback) => {
waterfall([
(done) => {
if (name) {
if (name === prefix) {
log(`Updating link ${name} in shard parent`)
} else {
log(`Removing link ${name} from shard parent, adding link ${prefix}`)
}

return DAGNode.rmLink(parent, name, done)
}

log(`Adding link ${prefix} to shard parent`)
done(null, parent)
},
(done) => DAGNode.rmLink(parent, name, done),
(parent, done) => DAGNode.addLink(parent, new DAGLink(prefix, node.size, cid), done),
(parent, done) => updateHamtDirectory(context, parent.links, bucket, options, done)
], callback)
Expand Down Expand Up @@ -324,12 +262,21 @@ const updateHamtDirectory = (context, links, bucket, options, callback) => {
], callback)
}

const recreateHamtLevel = (links, callback) => {
const recreateHamtLevel = (links, rootBucket, parentBucket, positionAtParent, callback) => {
// recreate this level of the HAMT
const bucket = new Bucket({
hashFn: DirSharded.hashFn
})
hashFn: DirSharded.hashFn,
hash: parentBucket ? parentBucket._options.hash : undefined
}, parentBucket, positionAtParent)

if (parentBucket) {
parentBucket._putObjectAt(positionAtParent, bucket)
}

addLinksToHamtBucket(links, bucket, rootBucket, callback)
}

const addLinksToHamtBucket = (links, bucket, rootBucket, callback) => {
Promise.all(
links.map(link => {
if (link.name.length === 2) {
Expand All @@ -342,19 +289,125 @@ const recreateHamtLevel = (links, callback) => {
return Promise.resolve()
}

return bucket.put(link.name.substring(2), true)
return (rootBucket || bucket).put(link.name.substring(2), true)
})
)
.then(() => callback(null, bucket))
.catch(error => callback(error))
.catch(err => {
callback(err)
callback = null
})
.then(() => callback && callback(null, bucket))
}

const toPrefix = (position) => {
return position
.toString('16')
.toUpperCase()
.padStart(2, '0')
.substring(0, 2)
}

const findPosition = async (name, bucket, callback) => {
const position = await bucket._findNewBucketAndPos(name)
const generatePath = (context, fileName, rootNode, callback) => {
// start at the root bucket and descend, loading nodes as we go
recreateHamtLevel(rootNode.links, null, null, null, async (err, rootBucket) => {
if (err) {
return callback(err)
}

const position = await rootBucket._findNewBucketAndPos(fileName)

// the path to the root bucket
let path = [{
bucket: position.bucket,
prefix: toPrefix(position.pos)
}]
let currentBucket = position.bucket

while (currentBucket !== rootBucket) {
path.push({
bucket: currentBucket,
prefix: toPrefix(currentBucket._posAtParent)
})

currentBucket = currentBucket._parent
}

path[path.length - 1].node = rootNode

let index = path.length

// load DAGNode for each path segment
whilst(
() => index > 0,
(next) => {
index--

const segment = path[index]

await bucket.put(name, true)
// find prefix in links
const link = segment.node.links
.filter(link => link.name.substring(0, 2) === segment.prefix)
.pop()

callback(null, position)
if (!link) {
// reached bottom of tree, file will be added to the current bucket
log(`Link ${segment.prefix}${fileName} will be added`)
return next(null, path)
}

if (link.name === `${segment.prefix}${fileName}`) {
log(`Link ${segment.prefix}${fileName} will be replaced`)
// file already existed, file will be added to the current bucket
return next(null, path)
}

// found subshard
log(`Found subshard ${segment.prefix}`)
context.ipld.get(link.cid, (err, result) => {
if (err) {
return next(err)
}

// subshard hasn't been loaded, descend to the next level of the HAMT
if (!path[index - 1]) {
log(`Loaded new subshard ${segment.prefix}`)
const node = result.value

return recreateHamtLevel(node.links, rootBucket, segment.bucket, parseInt(segment.prefix, 16), async (err, bucket) => {
if (err) {
return next(err)
}

const position = await rootBucket._findNewBucketAndPos(fileName)

index++
path.unshift({
bucket: position.bucket,
prefix: toPrefix(position.pos),
node: node
})

next()
})
}

const nextSegment = path[index - 1]

// add intermediate links to bucket
addLinksToHamtBucket(result.value.links, nextSegment.bucket, rootBucket, (error) => {
nextSegment.node = result.value

next(error)
})
})
},
async (err, path) => {
await rootBucket.put(fileName, true)

callback(err, { rootBucket, path })
}
)
})
}

module.exports = addLink
31 changes: 31 additions & 0 deletions test/helpers/create-shard.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use strict'

const pull = require('pull-stream/pull')
const values = require('pull-stream/sources/values')
const collect = require('pull-stream/sinks/collect')
const importer = require('ipfs-unixfs-importer')
const CID = require('cids')

const createShard = (mfs, files, shardSplitThreshold = 10) => {
return new Promise((resolve, reject) => {
pull(
values(files),
importer(mfs.ipld, {
shardSplitThreshold,
reduceSingleLeafToSelf: false, // same as go-ipfs-mfs implementation, differs from `ipfs add`(!)
leafType: 'raw' // same as go-ipfs-mfs implementation, differs from `ipfs add`(!)
}),
collect((err, files) => {
if (err) {
return reject(files)
}

const dir = files[files.length - 1]

resolve(new CID(dir.multihash))
})
)
})
}

module.exports = createShard
40 changes: 8 additions & 32 deletions test/helpers/create-sharded-directory.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,18 @@
const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const crypto = require('crypto')
const pull = require('pull-stream/pull')
const values = require('pull-stream/sources/values')
const collect = require('pull-stream/sinks/collect')
const importer = require('ipfs-unixfs-importer')
const CID = require('cids')
const createShard = require('./create-shard')

module.exports = async (mfs, shardSplitThreshold = 10, files = shardSplitThreshold) => {
const dirPath = `/sharded-dir-${Math.random()}`
const cid = await createShard(mfs, new Array(files).fill(0).map((_, index) => ({
path: `${dirPath}/file-${index}`,
content: Buffer.from([0, 1, 2, 3, 4, 5, index])
})), shardSplitThreshold)

return new Promise((resolve, reject) => {
pull(
values(
new Array(files).fill(0).map((_, index) => ({
path: `${dirPath}/file-${index}`,
content: crypto.randomBytes(5)
}))
),
importer(mfs.ipld, {
shardSplitThreshold,
reduceSingleLeafToSelf: false, // same as go-ipfs-mfs implementation, differs from `ipfs add`(!)
leafType: 'raw' // same as go-ipfs-mfs implementation, differs from `ipfs add`(!)
}),
collect(async (err, files) => {
if (err) {
return reject(files)
}
await mfs.cp(`/ipfs/${cid.toBaseEncodedString()}`, dirPath)

const dir = files[files.length - 1]
expect((await mfs.stat(dirPath)).type).to.equal('hamt-sharded-directory')

await mfs.cp(`/ipfs/${new CID(dir.multihash).toBaseEncodedString()}`, dirPath)

expect((await mfs.stat(dirPath)).type).to.equal('hamt-sharded-directory')

resolve(dirPath)
})
)
})
return dirPath
}
Loading

0 comments on commit 9302f01

Please sign in to comment.