From b2fbd5dbc554cb27d1556a21805ba9fd50c0f9c2 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 30 Nov 2018 19:55:57 +0000 Subject: [PATCH] fix: make sure hashes are the same after shard changes Ensures hashes of sharded directories are the same after adding/removing nodes as if they'd been imported by the importer in their final state. License: MIT Signed-off-by: achingbrain --- src/core/utils/add-link.js | 310 +++++++++++++++-------- test/helpers/create-sharded-directory.js | 4 +- test/helpers/index.js | 1 + test/helpers/print-tree.js | 25 ++ test/rm.spec.js | 16 ++ test/write.spec.js | 156 +++++++++--- 6 files changed, 363 insertions(+), 149 deletions(-) create mode 100644 test/helpers/print-tree.js diff --git a/src/core/utils/add-link.js b/src/core/utils/add-link.js index 0c8aceb..ba1b4be 100644 --- a/src/core/utils/add-link.js +++ b/src/core/utils/add-link.js @@ -10,7 +10,8 @@ const DirSharded = require('ipfs-unixfs-importer/src/importer/dir-sharded') const series = require('async/series') const log = require('debug')('ipfs:mfs:core:utils:add-link') const UnixFS = require('ipfs-unixfs') -const Bucket = require('hamt-sharding') +const Bucket = require('hamt-sharding/src/bucket') +const loadNode = require('./load-node') const defaultOptions = { parent: undefined, @@ -78,11 +79,29 @@ const addLink = (context, options, callback) => { return convertToShardedDirectory(context, options, callback) } - log('Adding to regular directory') + log(`Adding ${options.name} to regular directory`) addToDirectory(context, options, callback) } +const convertToShardedDirectory = (context, options, callback) => { + createShard(context, options.parent.links.map(link => ({ + name: link.name, + size: link.size, + multihash: link.cid.buffer + })).concat({ + name: options.name, + size: options.size, + multihash: options.cid.buffer + }), {}, (err, result) => { + if (!err) { + log('Converted directory to sharded directory', result.cid.toBaseEncodedString()) + } + + callback(err, result) + }) +} + const addToDirectory = (context, options, callback) => { waterfall([ (done) => { @@ -112,125 +131,116 @@ const addToDirectory = (context, options, callback) => { ], callback) } -const addToShardedDirectory = async (context, options, callback) => { - const bucket = new Bucket({ - hashFn: DirSharded.hashFn - }) - const position = await bucket._findNewBucketAndPos(options.name) - const prefix = position.pos +const addToShardedDirectory = (context, options, callback) => { + return waterfall([ + (cb) => recreateHamtLevel(options.parent.links, cb), + (rootBucket, cb) => findPosition(options.name, rootBucket, (err, position) => cb(err, { rootBucket, position })), + ({ rootBucket, position }, cb) => { + // the path to the root bucket + let path = [{ + position: position.pos, + bucket: position.bucket + }] + let currentBucket = position.bucket + + while (currentBucket !== rootBucket) { + path.push({ + bucket: currentBucket, + position: currentBucket._posAtParent + }) + + currentBucket = currentBucket._parent + } + + cb(null, { + rootBucket, + path + }) + }, + ({ rootBucket, path }, cb) => updateShard(context, options.parent, rootBucket, path, { + name: options.name, + cid: options.cid, + size: options.size + }, options, (err, results = {}) => cb(err, { rootBucket, node: results.node })), + ({ rootBucket, node }, cb) => updateHamtDirectory(context, node.links, rootBucket, options, cb) + ], callback) +} + +const updateShard = (context, parent, rootBucket, positions, child, options, callback) => { + const { + bucket, + position + } = positions.pop() + + const prefix = position .toString('16') .toUpperCase() .padStart(2, '0') .substring(0, 2) - const existingSubShard = options.parent.links - .filter(link => link.name === prefix) - .pop() - - if (existingSubShard) { - log(`Descending into sub-shard ${prefix} to add link ${options.name}`) - - return addLink(context, { - ...options, - parent: null, - parentCid: existingSubShard.cid - }, (err, { cid, node }) => { - if (err) { - return callback(err) + const link = parent.links + .find(link => link.name.substring(0, 2) === prefix && link.name !== `${prefix}${child.name}`) + + return waterfall([ + (cb) => { + if (link && link.name.length > 2) { + log(`Converting existing file ${link.name} into sub-shard for ${child.name}`) + + return waterfall([ + (done) => createShard(context, [{ + name: link.name.substring(2), + size: link.size, + multihash: link.cid.buffer + }, { + name: child.name, + size: child.size, + multihash: child.cid.buffer + }], {}, done), + ({ node: { links: [ shard ] } }, done) => { + return context.ipld.get(shard.cid, (err, result) => { + done(err, { cid: shard.cid, node: result && result.value }) + }) + }, + ({ cid, node }, cb) => updateShardParent(context, bucket, parent, link.name, node, cid, prefix, options, cb) + ], cb) } - // make sure parent is updated with new sub-shard cid - addToDirectory(context, { - ...options, - parent: options.parent, - parentCid: options.parentCid, - name: prefix, - size: node.size, - cid: cid - }, callback) - }) - } - - const existingFile = options.parent.links - .filter(link => link.name.substring(2) === options.name) - .pop() - - if (existingFile) { - log(`Updating file ${existingFile.name}`) - - return addToDirectory(context, { - ...options, - name: existingFile.name - }, callback) - } - - const existingUnshardedFile = options.parent.links - .filter(link => link.name.substring(0, 2) === prefix) - .pop() - - if (existingUnshardedFile) { - log(`Replacing file ${existingUnshardedFile.name} with sub-shard`) - - return createShard(context, [{ - name: existingUnshardedFile.name.substring(2), - size: existingUnshardedFile.size, - multihash: existingUnshardedFile.cid.buffer - }, { - name: options.name, - size: options.size, - multihash: options.cid.buffer - }], { - root: false - }, (err, result) => { - if (err) { - return callback(err) + if (link && link.name.length === 2) { + log(`Descending into sub-shard`, child.name) + + return waterfall([ + (cb) => loadNode(context, link, cb), + ({ node }, cb) => { + Promise.all( + node.links.map(link => { + if (link.name.length === 2) { + // add a bucket for the subshard of this subshard + const pos = parseInt(link.name, 16) + + bucket._putObjectAt(pos, new Bucket({ + hashFn: DirSharded.hashFn + }, bucket, pos)) + + return Promise.resolve() + } + + // add to the root and let changes cascade down + return rootBucket.put(link.name.substring(2), true) + }) + ) + .then(() => cb(null, { node })) + .catch(error => cb(error)) + }, + ({ node }, cb) => updateShard(context, node, bucket, positions, child, options, cb), + ({ cid, node }, cb) => updateShardParent(context, bucket, parent, link.name, node, cid, prefix, options, cb) + ], cb) } - const newShard = result.node.links[0] - - waterfall([ - (done) => DAGNode.rmLink(options.parent, existingUnshardedFile.name, done), - (parent, done) => DAGNode.addLink(parent, newShard, done), - (parent, done) => { - // Persist the new parent DAGNode - context.ipld.put(parent, { - version: options.cidVersion, - format: options.codec, - hashAlg: options.hashAlg, - hashOnly: !options.flush - }, (error, cid) => done(error, { - node: parent, - cid - })) - } - ], callback) - }) - } - - log(`Appending ${prefix + options.name} to shard`) - - return addToDirectory(context, { - ...options, - name: prefix + options.name - }, callback) -} + log(`Adding or replacing file`, prefix + child.name) -const convertToShardedDirectory = (context, options, callback) => { - createShard(context, options.parent.links.map(link => ({ - name: link.name, - size: link.size, - multihash: link.cid.buffer - })).concat({ - name: options.name, - size: options.size, - multihash: options.cid.buffer - }), {}, (err, result) => { - if (!err) { - log('Converted directory to sharded directory', result.cid.toBaseEncodedString()) + updateShardParent(context, bucket, parent, prefix + child.name, child, child.cid, prefix + child.name, options, cb) } - - callback(err, result) - }) + ], callback) } const createShard = (context, contents, options, callback) => { @@ -267,4 +277,84 @@ const createShard = (context, contents, options, callback) => { ) } +const updateShardParent = (context, bucket, parent, name, node, cid, prefix, options, callback) => { + waterfall([ + (done) => { + if (name) { + if (name === prefix) { + log(`Updating link ${name} in shard parent`) + } else { + log(`Removing link ${name} from shard parent, adding link ${prefix}`) + } + + return DAGNode.rmLink(parent, name, done) + } + + log(`Adding link ${prefix} to shard parent`) + done(null, parent) + }, + (parent, done) => DAGNode.addLink(parent, new DAGLink(prefix, node.size, cid), done), + (parent, done) => updateHamtDirectory(context, parent.links, bucket, options, done) + ], callback) +} + +const updateHamtDirectory = (context, links, bucket, options, callback) => { + // update parent with new bit field + waterfall([ + (cb) => { + const data = Buffer.from(bucket._children.bitField().reverse()) + const dir = new UnixFS('hamt-sharded-directory', data) + dir.fanout = bucket.tableSize() + dir.hashType = DirSharded.hashFn.code + + DAGNode.create(dir.marshal(), links, cb) + }, + (parent, done) => { + // Persist the new parent DAGNode + context.ipld.put(parent, { + version: options.cidVersion, + format: options.codec, + hashAlg: options.hashAlg, + hashOnly: !options.flush + }, (error, cid) => done(error, { + node: parent, + cid + })) + } + ], callback) +} + +const recreateHamtLevel = (links, callback) => { + // recreate this level of the HAMT + const bucket = new Bucket({ + hashFn: DirSharded.hashFn + }) + + Promise.all( + links.map(link => { + if (link.name.length === 2) { + const pos = parseInt(link.name, 16) + + bucket._putObjectAt(pos, new Bucket({ + hashFn: DirSharded.hashFn + }, bucket, pos)) + + return Promise.resolve() + } + + return bucket.put(link.name.substring(2), true) + }) + ) + .then(() => callback(null, bucket)) + .catch(error => callback(error)) +} + +const findPosition = async (name, bucket, callback) => { + const position = await bucket._findNewBucketAndPos(name) + + await bucket.put(name, true) + + callback(null, position) +} + module.exports = addLink diff --git a/test/helpers/create-sharded-directory.js b/test/helpers/create-sharded-directory.js index ac82792..5d28e31 100644 --- a/test/helpers/create-sharded-directory.js +++ b/test/helpers/create-sharded-directory.js @@ -22,7 +22,9 @@ module.exports = async (mfs, shardSplitThreshold = 10, files = shardSplitThresho })) ), importer(mfs.ipld, { - shardSplitThreshold + shardSplitThreshold, + reduceSingleLeafToSelf: false, // same as go-ipfs-mfs implementation, differs from `ipfs add`(!) + leafType: 'raw' // same as go-ipfs-mfs implementation, differs from `ipfs add`(!) }), collect(async (err, files) => { if (err) { diff --git a/test/helpers/index.js b/test/helpers/index.js index 7bf9f3f..d1aad08 100644 --- a/test/helpers/index.js +++ b/test/helpers/index.js @@ -38,6 +38,7 @@ module.exports = { cidAtPath: require('./cid-at-path'), collectLeafCids: require('./collect-leaf-cids'), createShardedDirectory: require('./create-sharded-directory'), + printTree: require('./print-tree'), EMPTY_DIRECTORY_HASH: 'QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn', EMPTY_DIRECTORY_HASH_BASE32: 'bafybeiczsscdsbs7ffqz55asqdf3smv6klcw3gofszvwlyarci47bgf354' } diff --git a/test/helpers/print-tree.js b/test/helpers/print-tree.js new file mode 100644 index 0000000..5daa34f --- /dev/null +++ b/test/helpers/print-tree.js @@ -0,0 +1,25 @@ +'use strict' + +const load = async (cid, mfs) => { + return new Promise((resolve, reject) => { + mfs.ipld.get(cid, (err, res) => { + if (err) { + return reject(err) + } + + resolve(res.value) + }) + }) +} + +const printTree = async (cid, mfs, indentation = '', name = '') => { + console.info(indentation, name, cid.toBaseEncodedString()) // eslint-disable-line no-console + + const node = await load(cid, mfs) + + for (let i = 0; i < node.links.length; i++) { + await printTree(node.links[i].cid, mfs, ` ${indentation}`, node.links[i].name) + } +} + +module.exports = printTree diff --git a/test/rm.spec.js b/test/rm.spec.js index be36643..c95f981 100644 --- a/test/rm.spec.js +++ b/test/rm.spec.js @@ -246,4 +246,20 @@ describe('rm', function () { expect(error.message).to.contain('does not exist') } }) + + it.skip('results in the same hash as a sharded directory created by the importer when removing a subshard', async () => { + + }) + + it.skip('results in the same hash as a sharded directory created by the importer when removing a file', async () => { + + }) + + it.skip('results in the same hash as a sharded directory created by the importer when removing a subshard of a subshard', async () => { + + }) + + it.skip('results in the same hash as a sharded directory created by the importer when removing a file from a subshard of a subshard', async () => { + + }) }) diff --git a/test/write.spec.js b/test/write.spec.js index 34ffc86..84a852e 100644 --- a/test/write.spec.js +++ b/test/write.spec.js @@ -71,6 +71,56 @@ describe('write', function () { }) } + const createShardedDir = (files, shardSplitThreshold) => { + return new Promise((resolve, reject) => { + pull( + values(files), + importer(mfs.ipld, { + shardSplitThreshold, + reduceSingleLeafToSelf: false, // same as go-ipfs-mfs implementation, differs from `ipfs add`(!) + leafType: 'raw' // same as go-ipfs-mfs implementation, differs from `ipfs add`(!) + }), + collect((err, files) => { + if (err) { + return reject(files) + } + + const dir = files[files.length - 1] + + resolve(new CID(dir.multihash)) + }) + ) + }) + } + + const createShard = async (fileCount, nextFileName) => { + const shardSplitThreshold = 10 + const dirPath = `/sharded-dir-${Math.random()}` + const files = new Array(fileCount).fill(0).map((_, index) => ({ + path: `${dirPath}/file-${index}`, + content: crypto.randomBytes(5) + })) + files[files.length - 1].path = `${dirPath}/${nextFileName}` + + const allFiles = files.map(file => ({ + ...file + })) + const someFiles = files.map(file => ({ + ...file + })) + const nextFile = someFiles.pop() + + const dirWithAllFiles = await createShardedDir(allFiles, shardSplitThreshold) + const dirWithSomeFiles = await createShardedDir(someFiles, shardSplitThreshold) + + return { + nextFile, + dirWithAllFiles, + dirWithSomeFiles, + dirPath + } + } + before(() => { return createMfs() .then(instance => { @@ -788,56 +838,86 @@ describe('write', function () { expect(actualBytes).to.deep.equal(expectedBytes) }) - it('results in the same hash as a sharded directory created by the importer', async () => { - const shardSplitThreshold = 10 + it('results in the same hash as a sharded directory created by the importer when creating a new subshard', async () => { + const { + nextFile, + dirWithAllFiles, + dirWithSomeFiles, + dirPath + } = await createShard(100, 'file99.txt') - const createShardedDir = (files) => { - return new Promise((resolve, reject) => { - pull( - values(files), - importer(mfs.ipld, { - shardSplitThreshold, - reduceSingleLeafToSelf: false, // same as go-ipfs-mfs implementation, differs from `ipfs add`(!) - leafType: 'raw' // same as go-ipfs-mfs implementation, differs from `ipfs add`(!) - }), - collect((err, files) => { - if (err) { - return reject(files) - } + await mfs.cp(`/ipfs/${dirWithSomeFiles.toBaseEncodedString()}`, dirPath) - const dir = files[files.length - 1] + await mfs.write(nextFile.path, nextFile.content, { + create: true + }) - resolve(new CID(dir.multihash)) - }) - ) - }) - } + const stats = await mfs.stat(dirPath) + const updatedDirCid = new CID(stats.hash) - const dirPath = `/sharded-dir-${Math.random()}` - const fileCount = 100 - const files = new Array(fileCount).fill(0).map((_, index) => ({ - path: `${dirPath}/file-${index}`, - content: crypto.randomBytes(5) - })) - const allFiles = files.map(file => ({ - ...file - })) - const someFiles = files.map(file => ({ - ...file - })) - const newFile = someFiles.pop() + expect(updatedDirCid.toBaseEncodedString()).to.deep.equal(dirWithAllFiles.toBaseEncodedString()) + }) + + it('results in the same hash as a sharded directory created by the importer when adding a new file', async () => { + const { + nextFile, + dirWithAllFiles, + dirWithSomeFiles, + dirPath + } = await createShard(75, 'file74.txt') + + await mfs.cp(`/ipfs/${dirWithSomeFiles.toBaseEncodedString()}`, dirPath) + + await mfs.write(nextFile.path, nextFile.content, { + create: true + }) + + const stats = await mfs.stat(dirPath) + const updatedDirCid = new CID(stats.hash) + + expect(stats.type).to.equal('hamt-sharded-directory') + expect(updatedDirCid.toBaseEncodedString()).to.deep.equal(dirWithAllFiles.toBaseEncodedString()) + }) + + it('results in the same hash as a sharded directory created by the importer when adding a file to a subshard', async () => { + const { + nextFile, + dirWithAllFiles, + dirWithSomeFiles, + dirPath + } = await createShard(82, 'file-81.txt') + + await mfs.cp(`/ipfs/${dirWithSomeFiles.toBaseEncodedString()}`, dirPath) + + await mfs.write(nextFile.path, nextFile.content, { + create: true + }) + + const stats = await mfs.stat(dirPath) + const updatedDirCid = new CID(stats.hash) + + expect(stats.type).to.equal('hamt-sharded-directory') + expect(updatedDirCid.toBaseEncodedString()).to.deep.equal(dirWithAllFiles.toBaseEncodedString()) + }) - const dirWithAllFiles = await createShardedDir(allFiles) - const dirWithSomeFiles = await createShardedDir(someFiles) + it('results in the same hash as a sharded directory created by the importer when adding a file to a subshard of a subshard', async () => { + const { + nextFile, + dirWithAllFiles, + dirWithSomeFiles, + dirPath + } = await createShard(200, 'file-50a.txt') await mfs.cp(`/ipfs/${dirWithSomeFiles.toBaseEncodedString()}`, dirPath) - await mfs.write(newFile.path, newFile.content, { + await mfs.write(nextFile.path, nextFile.content, { create: true }) const stats = await mfs.stat(dirPath) + const updatedDirCid = new CID(stats.hash) - expect(new CID(stats.hash)).to.deep.equal(dirWithAllFiles) + expect(stats.type).to.equal('hamt-sharded-directory') + expect(updatedDirCid.toBaseEncodedString()).to.deep.equal(dirWithAllFiles.toBaseEncodedString()) }) })