Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
fix: handle shard updates that create subshards of subshards (#47)
Browse files Browse the repository at this point in the history
I'd assumed that adding a file to a HAMT shard involved either
adding a link, updating a link or replacing a link with a subshard.

You can have files that result in a subshard being created that contains
a subshard with the files in.

This PR handles that outcome.
  • Loading branch information
achingbrain authored Mar 26, 2019
1 parent 34cfd0d commit 1158951
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 128 deletions.
299 changes: 173 additions & 126 deletions src/core/utils/add-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ const {
} = require('ipld-dag-pb')
const CID = require('cids')
const waterfall = require('async/waterfall')
const DirSharded = require('ipfs-unixfs-importer/src/importer/dir-sharded')
const series = require('async/series')
const whilst = require('async/whilst')
const log = require('debug')('ipfs:mfs:core:utils:add-link')
const UnixFS = require('ipfs-unixfs')
const DirSharded = require('ipfs-unixfs-importer/src/importer/dir-sharded')
const {
generatePath,
updateHamtDirectory
updateHamtDirectory,
recreateHamtLevel,
createShard,
toPrefix,
addLinksToHamtBucket
} = require('./hamt-utils')

const defaultOptions = {
Expand Down Expand Up @@ -125,140 +127,185 @@ const addToDirectory = (context, options, callback) => {
}

const addToShardedDirectory = (context, options, callback) => {
return waterfall([
(cb) => generatePath(context, options.name, options.parent, cb),
({ rootBucket, path }, cb) => {
updateShard(context, path.reverse(), {
name: options.name,
cid: options.cid,
size: options.size
}, 0, options, (err, result = {}) => cb(err, { rootBucket, ...result }))
},
({ rootBucket, node }, cb) => updateHamtDirectory(context, node.links, rootBucket, options, cb)
], callback)
}

const updateShard = (context, positions, child, index, options, callback) => {
const {
bucket,
prefix,
node
} = positions[index]

const link = node.links
.find(link => link.name.substring(0, 2) === prefix && link.name !== `${prefix}${child.name}`)

return waterfall([
(cb) => {
if (link && link.name.length > 2) {
log(`Converting existing file ${link.name} into sub-shard for ${child.name}`)

return waterfall([
(done) => createShard(context, [{
name: link.name.substring(2),
size: link.size,
multihash: link.cid.buffer
}, {
name: child.name,
size: child.size,
multihash: child.cid.buffer
}], {}, done),
({ node: { links: [ shard ] } }, done) => {
let position = 0

// step through the shard until we find the newly created sub-shard
return whilst(
() => position < positions.length - 1,
(next) => {
const shardPrefix = positions[position].prefix

log(`Prefix at position ${position} is ${shardPrefix} - shard.name ${shard.name}`)

if (shard.name.substring(0, 2) !== shardPrefix) {
return next(new Error(`Unexpected prefix ${shard.name} !== ${shardPrefix}, position ${position}`))
}

position++

context.ipld.get(shard.cid, (err, result) => {
if (err) {
return next(err)
}

if (position < positions.length) {
const nextPrefix = positions[position].prefix
const nextShard = result.value.links.find(link => link.name.substring(0, 2) === nextPrefix)

if (nextShard) {
shard = nextShard
}
}

next(err, { cid: result && result.cid, node: result && result.value })
})
},
done
)
},
(result, cb) => updateShardParent(context, bucket, node, link.name, result.node, result.cid, prefix, options, cb)
], cb)
}
return addFileToShardedDirectoryy(context, options, (err, result) => {
if (err) {
return callback(err)
}

if (link && link.name.length === 2) {
log(`Descending into sub-shard ${link.name} for ${child.name}`)
const {
shard, path
} = result

return waterfall([
(cb) => updateShard(context, positions, child, index + 1, options, cb),
(result, cb) => updateShardParent(context, bucket, node, link.name, result.node, result.cid, prefix, options, cb)
], cb)
shard.flush('', context.ipld, null, async (err, result) => {
if (err) {
return callback(err)
}

log(`Adding or replacing file`, prefix + child.name)
updateShardParent(context, bucket, node, prefix + child.name, child, child.cid, prefix + child.name, options, cb)
}
], callback)
// we have written out the shard, but only one sub-shard will have been written so replace it in the original shard
const oldLink = options.parent.links
.find(link => link.name.substring(0, 2) === path[0].prefix)

const newLink = result.node.links
.find(link => link.name.substring(0, 2) === path[0].prefix)

waterfall([
(done) => {
if (!oldLink) {
return done(null, options.parent)
}

DAGNode.rmLink(options.parent, oldLink.name, done)
},
(parent, done) => DAGNode.addLink(parent, newLink, done),
(parent, done) => updateHamtDirectory(context, parent.links, path[0].bucket, options, done)
], callback)
})
})
}

const createShard = (context, contents, options, callback) => {
const shard = new DirSharded({
root: true,
dir: true,
parent: null,
parentKey: null,
path: '',
dirty: true,
flat: false,

...options
})
const addFileToShardedDirectoryy = (context, options, callback) => {
const file = {
name: options.name,
cid: options.cid,
size: options.size
}

const operations = contents.map(contents => {
return (cb) => {
shard.put(contents.name, {
size: contents.size,
multihash: contents.multihash
}, cb)
// start at the root bucket and descend, loading nodes as we go
recreateHamtLevel(options.parent.links, null, null, null, async (err, rootBucket) => {
if (err) {
return callback(err)
}
})

return series(
operations,
(err) => {
if (err) {
return callback(err)
const shard = new DirSharded({
root: true,
dir: true,
parent: null,
parentKey: null,
path: '',
dirty: true,
flat: false
})
shard._bucket = rootBucket

// load subshards until the bucket & position no longer changes
const position = await rootBucket._findNewBucketAndPos(file.name)
const path = toBucketPath(position)
path[0].node = options.parent
let index = 0

whilst(
() => index < path.length,
(next) => {
let segment = path[index]
index++
let node = segment.node

let link = node.links
.find(link => link.name.substring(0, 2) === segment.prefix)

if (!link) {
// prefix is new, file will be added to the current bucket
log(`Link ${segment.prefix}${file.name} will be added`)
index = path.length
return next(null, shard)
}

if (link.name === `${segment.prefix}${file.name}`) {
// file already existed, file will be added to the current bucket
log(`Link ${segment.prefix}${file.name} will be replaced`)
index = path.length
return next(null, shard)
}

if (link.name.length > 2) {
// another file had the same prefix, will be replaced with a subshard
log(`Link ${link.name} will be replaced with a subshard`)
index = path.length
return next(null, shard)
}

// load sub-shard
log(`Found subshard ${segment.prefix}`)
context.ipld.get(link.cid, (err, result) => {
if (err) {
return next(err)
}

// subshard hasn't been loaded, descend to the next level of the HAMT
if (!path[index]) {
log(`Loaded new subshard ${segment.prefix}`)
const node = result.value

return recreateHamtLevel(node.links, rootBucket, segment.bucket, parseInt(segment.prefix, 16), async (err) => {
if (err) {
return next(err)
}

const position = await rootBucket._findNewBucketAndPos(file.name)

path.push({
bucket: position.bucket,
prefix: toPrefix(position.pos),
node: node
})

return next(null, shard)
})
}

const nextSegment = path[index]

// add next level's worth of links to bucket
addLinksToHamtBucket(result.value.links, nextSegment.bucket, rootBucket, (error) => {
nextSegment.node = result.value

next(error, shard)
})
})
},
(err, shard) => {
if (err) {
return callback(err)
}

// finally add the new file into the shard
shard.put(file.name, {
size: file.size,
multihash: file.cid.buffer
}, (err) => {
callback(err, {
shard, path
})
})
}

shard.flush('', context.ipld, null, callback)
}
)
)
})
}

const updateShardParent = (context, bucket, parent, name, node, cid, prefix, options, callback) => {
waterfall([
(done) => DAGNode.rmLink(parent, name, done),
(parent, done) => DAGNode.addLink(parent, new DAGLink(prefix, node.size, cid), done),
(parent, done) => updateHamtDirectory(context, parent.links, bucket, options, done)
], callback)
const toBucketPath = (position) => {
let bucket = position.bucket
let positionInBucket = position.pos
let path = [{
bucket,
prefix: toPrefix(positionInBucket)
}]

bucket = position.bucket._parent
positionInBucket = position.bucket._posAtParent

while (bucket) {
path.push({
bucket,
prefix: toPrefix(positionInBucket)
})

positionInBucket = bucket._posAtParent
bucket = bucket._parent
}

path.reverse()

return path
}

module.exports = addLink
43 changes: 41 additions & 2 deletions src/core/utils/hamt-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
} = require('ipld-dag-pb')
const waterfall = require('async/waterfall')
const whilst = require('async/whilst')
const series = require('async/series')
const Bucket = require('hamt-sharding/src/bucket')
const DirSharded = require('ipfs-unixfs-importer/src/importer/dir-sharded')
const log = require('debug')('ipfs:mfs:core:utils:hamt-utils')
Expand Down Expand Up @@ -63,7 +64,10 @@ const addLinksToHamtBucket = (links, bucket, rootBucket, callback) => {
return Promise.resolve()
}

return (rootBucket || bucket).put(link.name.substring(2), true)
return (rootBucket || bucket).put(link.name.substring(2), {
size: link.size,
multihash: link.cid
})
})
)
.then(() => callback(null, bucket), callback)
Expand Down Expand Up @@ -180,10 +184,45 @@ const generatePath = (context, fileName, rootNode, callback) => {
})
}

const createShard = (context, contents, options, callback) => {
const shard = new DirSharded({
root: true,
dir: true,
parent: null,
parentKey: null,
path: '',
dirty: true,
flat: false,

...options
})

const operations = contents.map(contents => {
return (cb) => {
shard.put(contents.name, {
size: contents.size,
multihash: contents.multihash
}, cb)
}
})

return series(
operations,
(err) => {
if (err) {
return callback(err)
}

shard.flush('', context.ipld, null, callback)
}
)
}

module.exports = {
generatePath,
updateHamtDirectory,
recreateHamtLevel,
addLinksToHamtBucket,
toPrefix
toPrefix,
createShard
}
Loading

0 comments on commit 1158951

Please sign in to comment.