diff --git a/package.json b/package.json index 91ebcd75f..df023a9aa 100644 --- a/package.json +++ b/package.json @@ -74,7 +74,7 @@ "just-kebab-case": "^1.1.0", "just-map-keys": "^1.1.0", "kind-of": "^6.0.2", - "ky": "^0.13.0", + "ky": "^0.14.0", "ky-universal": "^0.3.0", "lru-cache": "^5.1.1", "multiaddr": "^6.0.6", diff --git a/src/cat.js b/src/cat.js new file mode 100644 index 000000000..496257e84 --- /dev/null +++ b/src/cat.js @@ -0,0 +1,34 @@ +'use strict' + +const CID = require('cids') +const { Buffer } = require('buffer') +const configure = require('./lib/configure') +const toIterable = require('./lib/stream-to-iterable') + +module.exports = configure(({ ky }) => { + return (path, options) => (async function * () { + options = options || {} + + const searchParams = new URLSearchParams(options.searchParams) + + if (typeof path === 'string') { + searchParams.set('arg', path) + } else { + searchParams.set('arg', new CID(path).toString()) + } + + if (options.offset) searchParams.set('offset', options.offset) + if (options.length) searchParams.set('length', options.length) + + const res = await ky.get('cat', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) + + for await (const chunk of toIterable(res.body)) { + yield Buffer.from(chunk) + } + })() +}) diff --git a/src/files-regular/cat-pull-stream.js b/src/files-regular/cat-pull-stream.js deleted file mode 100644 index 364f566d2..000000000 --- a/src/files-regular/cat-pull-stream.js +++ /dev/null @@ -1,35 +0,0 @@ -'use strict' - -const cleanCID = require('../utils/clean-cid') -const v = require('is-ipfs') -const toPull = require('stream-to-pull-stream') -const deferred = require('pull-defer') - -module.exports = (send) => { - return (hash, opts) => { - opts = opts || {} - - const p = deferred.source() - - try { - hash = cleanCID(hash) - } catch (err) { - if (!v.ipfsPath(hash)) { - return p.end(err) - } - } - - const query = { - offset: opts.offset, - length: opts.length - } - - send({ path: 'cat', args: hash, buffer: opts.buffer, qs: query }, (err, stream) => { - if (err) { return p.end(err) } - - p.resolve(toPull(stream)) - }) - - return p - } -} diff --git a/src/files-regular/cat-readable-stream.js b/src/files-regular/cat-readable-stream.js deleted file mode 100644 index 58ca69c67..000000000 --- a/src/files-regular/cat-readable-stream.js +++ /dev/null @@ -1,35 +0,0 @@ -'use strict' - -const cleanCID = require('../utils/clean-cid') -const v = require('is-ipfs') -const Stream = require('readable-stream') -const pump = require('pump') - -module.exports = (send) => { - return (hash, opts) => { - opts = opts || {} - - const pt = new Stream.PassThrough() - - try { - hash = cleanCID(hash) - } catch (err) { - if (!v.ipfsPath(hash)) { - return pt.destroy(err) - } - } - - const query = { - offset: opts.offset, - length: opts.length - } - - send({ path: 'cat', args: hash, buffer: opts.buffer, qs: query }, (err, stream) => { - if (err) { return pt.destroy(err) } - - pump(stream, pt) - }) - - return pt - } -} diff --git a/src/files-regular/cat.js b/src/files-regular/cat.js deleted file mode 100644 index ff468eb34..000000000 --- a/src/files-regular/cat.js +++ /dev/null @@ -1,38 +0,0 @@ -'use strict' - -const promisify = require('promisify-es6') -const cleanCID = require('../utils/clean-cid') -const v = require('is-ipfs') -const bl = require('bl') - -module.exports = (send) => { - return promisify((hash, opts, callback) => { - if (typeof opts === 'function') { - callback = opts - opts = {} - } - - try { - hash = cleanCID(hash) - } catch (err) { - if (!v.ipfsPath(hash)) { - return callback(err) - } - } - - const query = { - offset: opts.offset, - length: opts.length - } - - send({ path: 'cat', args: hash, buffer: opts.buffer, qs: query }, (err, stream) => { - if (err) { return callback(err) } - - stream.pipe(bl((err, data) => { - if (err) { return callback(err) } - - callback(null, data) - })) - }) - }) -} diff --git a/src/files-regular/index.js b/src/files-regular/index.js index d098516f1..9bfe10a5d 100644 --- a/src/files-regular/index.js +++ b/src/files-regular/index.js @@ -2,13 +2,14 @@ const nodeify = require('promise-nodeify') const moduleConfig = require('../utils/module-config') -const { collectify, pullify, streamify } = require('../lib/converters') +const { concatify, collectify, pullify, streamify } = require('../lib/converters') module.exports = (arg) => { const send = moduleConfig(arg) const add = require('../add')(arg) const addFromFs = require('../add-from-fs')(arg) const addFromURL = require('../add-from-url')(arg) + const cat = require('../cat')(arg) return { add: (input, options, callback) => { @@ -42,9 +43,15 @@ module.exports = (arg) => { return nodeify(collectify(add)(input, options), callback) }, _addAsyncIterator: add, - cat: require('../files-regular/cat')(send), - catReadableStream: require('../files-regular/cat-readable-stream')(send), - catPullStream: require('../files-regular/cat-pull-stream')(send), + cat: (path, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(concatify(cat)(path, options), callback) + }, + catReadableStream: streamify.readable(cat), + catPullStream: pullify.source(cat), get: require('../files-regular/get')(send), getReadableStream: require('../files-regular/get-readable-stream')(send), getPullStream: require('../files-regular/get-pull-stream')(send), diff --git a/src/lib/converters.js b/src/lib/converters.js index 7c5965feb..a1ed8a010 100644 --- a/src/lib/converters.js +++ b/src/lib/converters.js @@ -3,14 +3,18 @@ const toPull = require('async-iterator-to-pull-stream') const all = require('async-iterator-all') const toStream = require('it-to-stream') +const { Buffer } = require('buffer') exports.collectify = fn => (...args) => all(fn(...args)) +exports.concatify = fn => async (...args) => Buffer.concat(await all(fn(...args))) + exports.pullify = { source: fn => (...args) => toPull(fn(...args)), transform: fn => (...args) => toPull.transform(source => fn(source, ...args)) } exports.streamify = { + readable: fn => (...args) => toStream(fn(...args), { objectMode: true }), transform: fn => (...args) => toStream.transform(source => fn(source, ...args), { objectMode: true }) } diff --git a/src/lib/error-handler.js b/src/lib/error-handler.js index 6df198d78..85e207831 100644 --- a/src/lib/error-handler.js +++ b/src/lib/error-handler.js @@ -2,13 +2,25 @@ const { HTTPError } = require('ky-universal') const log = require('debug')('ipfs-http-client:lib:error-handler') +const { isNode } = require('ipfs-utils/src/env') function isJsonResponse (res) { return (res.headers.get('Content-Type') || '').startsWith('application/json') } module.exports = async function errorHandler (input, options, response) { - if (response.ok) return + if (response.ok) { + // FIXME: remove when fixed https://github.com/sindresorhus/ky-universal/issues/8 + // + // ky clones the response for each handler. In Node.js the response body is + // piped to 2 PassThroughs, one becomes the real body and the other is used + // in the clone. + // + // If the body in the clone is not consumed or destroyed the highwater mark + // will be reached (for large payloads) and stop the real body from flowing. + if (isNode) response.body.destroy() + return + } let msg