From 2fa16c53e825bee95fb568712c65b0fd24270f0f Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 20 Apr 2018 12:24:39 +0100 Subject: [PATCH] fix: adding files by pull stream --- package.json | 2 ++ src/files/add.js | 21 ++++++++++++++++--- src/utils/multipart.js | 6 ++++++ test/files.spec.js | 47 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index 5b3b1ec296..5a90d19658 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ "ipld-dag-cbor": "^0.12.0", "ipld-dag-pb": "^0.14.2", "is-ipfs": "^0.3.2", + "is-pull-stream": "0.0.0", "is-stream": "^1.1.0", "libp2p-crypto": "^0.13.0", "lru-cache": "^4.1.2", @@ -51,6 +52,7 @@ "promisify-es6": "^1.0.3", "pull-defer": "^0.2.2", "pull-pushable": "^2.2.0", + "pull-stream-to-stream": "^1.3.4", "pump": "^3.0.0", "qs": "^6.5.1", "readable-stream": "^2.3.6", diff --git a/src/files/add.js b/src/files/add.js index dd937855c0..f706843a81 100644 --- a/src/files/add.js +++ b/src/files/add.js @@ -5,13 +5,14 @@ const ConcatStream = require('concat-stream') const once = require('once') const isStream = require('is-stream') const OtherBuffer = require('buffer').Buffer +const isSource = require('is-pull-stream').isSource const FileResultStreamConverter = require('../utils/file-result-stream-converter') const SendFilesStream = require('../utils/send-files-stream') module.exports = (send) => { const createAddStream = SendFilesStream(send, 'add') - return promisify((_files, options, _callback) => { + const add = promisify((_files, options, _callback) => { if (typeof options === 'function') { _callback = options options = null @@ -28,10 +29,11 @@ module.exports = (send) => { isStream.readable(_files) || Array.isArray(_files) || OtherBuffer.isBuffer(_files) || - typeof _files === 'object' + typeof _files === 'object' || + isSource(_files) if (!ok) { - return callback(new Error('first arg must be a buffer, readable stream, an object or array of objects')) + return callback(new Error('first arg must be a buffer, readable stream, pull stream, an object or array of objects')) } const files = [].concat(_files) @@ -44,4 +46,17 @@ module.exports = (send) => { files.forEach((file) => stream.write(file)) stream.end() }) + + return function () { + const args = Array.from(arguments) + + // If we files.add(), then promisify thinks the pull stream is + // a callback! Add an empty options object in this case so that a promise + // is returned. + if (args.length === 1 && isSource(args[0])) { + args.push({}) + } + + return add.apply(null, args) + } } diff --git a/src/utils/multipart.js b/src/utils/multipart.js index bae39e1414..a7096eefdb 100644 --- a/src/utils/multipart.js +++ b/src/utils/multipart.js @@ -2,6 +2,8 @@ const Transform = require('stream').Transform const isNode = require('detect-node') +const isSource = require('is-pull-stream').isSource +const toStream = require('pull-stream-to-stream') const PADDING = '--' const NEW_LINE = '\r\n' @@ -75,6 +77,10 @@ class Multipart extends Transform { return callback() // early } + if (isSource(content)) { + content = toStream.source(content) + } + // From now on we assume content is a stream content.once('error', this.emit.bind(this, 'error')) diff --git a/test/files.spec.js b/test/files.spec.js index f2cee30403..e3832b3b64 100644 --- a/test/files.spec.js +++ b/test/files.spec.js @@ -10,6 +10,7 @@ const isNode = require('detect-node') const loadFixture = require('aegir/fixtures') const mh = require('multihashes') const CID = require('cids') +const pull = require('pull-stream') const IPFSApi = require('../src') const f = require('./utils/factory') @@ -272,6 +273,52 @@ describe('.files (the MFS API part)', function () { }) }) + it('files.addPullStream with object chunks and pull stream content', (done) => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + pull( + pull.values([{ content: pull.values([Buffer.from('test')]) }]), + ipfs.files.addPullStream(), + pull.collect((err, res) => { + if (err) return done(err) + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + done() + }) + ) + }) + + it('files.add with pull stream (callback)', (done) => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + ipfs.files.add(pull.values([Buffer.from('test')]), (err, res) => { + if (err) return done(err) + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + done() + }) + }) + + it('files.add with pull stream (promise)', () => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + return ipfs.files.add(pull.values([Buffer.from('test')])) + .then((res) => { + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + }) + }) + + it('files.add with array of objects with pull stream content', () => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + return ipfs.files.add([{ content: pull.values([Buffer.from('test')]) }]) + .then((res) => { + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + }) + }) + it('files.mkdir', (done) => { ipfs.files.mkdir('/test-folder', done) })