From 358c9a0c87b52bfd89c24bbd0110d427a96db46f Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 7 Feb 2024 10:48:46 +0000 Subject: [PATCH 1/2] feat: non-parallel piece hashing and car upload --- packages/upload-client/package.json | 1 - packages/upload-client/src/index.js | 27 ++++++++++----------------- pnpm-lock.yaml | 9 +-------- 3 files changed, 11 insertions(+), 26 deletions(-) diff --git a/packages/upload-client/package.json b/packages/upload-client/package.json index 16c620b11..46d8089b4 100644 --- a/packages/upload-client/package.json +++ b/packages/upload-client/package.json @@ -79,7 +79,6 @@ "ipfs-utils": "^9.0.14", "multiformats": "^12.1.2", "p-retry": "^5.1.2", - "parallel-transform-web": "^1.0.1", "varint": "^6.0.0" }, "devDependencies": { diff --git a/packages/upload-client/src/index.js b/packages/upload-client/src/index.js index 6c6233d64..ca5929489 100644 --- a/packages/upload-client/src/index.js +++ b/packages/upload-client/src/index.js @@ -1,4 +1,3 @@ -import { Parallel } from 'parallel-transform-web' import * as PieceHasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash/async' import * as Link from 'multiformats/link' import * as raw from 'multiformats/codecs/raw' @@ -11,8 +10,6 @@ import { ShardingStream, defaultFileComparator } from './sharding.js' export { Store, Upload, UnixFS, CAR } export * from './sharding.js' -const CONCURRENT_REQUESTS = 3 - /** * Uploads a file to the service and returns the root data CID for the * generated DAG. @@ -122,23 +119,19 @@ async function uploadBlockStream(conf, blocks, options = {}) { const shards = [] /** @type {import('./types.js').AnyLink?} */ let root = null - const concurrency = options.concurrentRequests ?? CONCURRENT_REQUESTS await blocks .pipeThrough(new ShardingStream(options)) .pipeThrough( - new Parallel(concurrency, async (car) => { - const bytes = new Uint8Array(await car.arrayBuffer()) - const [cid, piece] = await Promise.all([ - Store.add(conf, bytes, options), - (async () => { - const multihashDigest = await PieceHasher.digest(bytes) - return /** @type {import('@web3-storage/capabilities/types').PieceLink} */ ( - Link.create(raw.code, multihashDigest) - ) - })(), - ]) - const { version, roots, size } = car - return { version, roots, size, cid, piece } + new TransformStream({ + async transform (car, controller) { + const bytes = new Uint8Array(await car.arrayBuffer()) + const multihashDigest = await PieceHasher.digest(bytes) + /** @type {import('@web3-storage/capabilities/types').PieceLink} */ + const piece = Link.create(raw.code, multihashDigest) + const cid = await Store.add(conf, bytes, options) + const { version, roots, size } = car + controller.enqueue({ version, roots, size, cid, piece }) + }, }) ) .pipeTo( diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ddf197f1c..9a8eb7fd9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1,4 +1,4 @@ -lockfileVersion: '6.1' +lockfileVersion: '6.0' settings: autoInstallPeers: true @@ -479,9 +479,6 @@ importers: p-retry: specifier: ^5.1.2 version: 5.1.2 - parallel-transform-web: - specifier: ^1.0.1 - version: 1.0.1 varint: specifier: ^6.0.0 version: 6.0.0 @@ -10074,10 +10071,6 @@ packages: semver: 7.5.4 dev: true - /parallel-transform-web@1.0.1: - resolution: {integrity: sha512-RtPU/7IuwPZ4ePcqoPxNCpjtaXYOkCVtnhh5tW3O78wy9jqVoV2hQHms17kUeu8DTYoOP+mykFLg2agwVKlwBw==} - dev: false - /param-case@3.0.4: resolution: {integrity: sha512-RXlj7zCYokReqWpOPH9oYivUzLYZ5vAPIfEmCTNViosC78F8F0H9y7T7gG2M39ymgutxF5gcFEsyZQSph9Bp3A==} dependencies: From 4b465f7eb1a9ba591ffac6a343f01d1cb3428b9e Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 7 Feb 2024 11:03:22 +0000 Subject: [PATCH 2/2] chore: appease linter --- packages/upload-client/src/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/upload-client/src/index.js b/packages/upload-client/src/index.js index ca5929489..2cdf58fd2 100644 --- a/packages/upload-client/src/index.js +++ b/packages/upload-client/src/index.js @@ -123,7 +123,7 @@ async function uploadBlockStream(conf, blocks, options = {}) { .pipeThrough(new ShardingStream(options)) .pipeThrough( new TransformStream({ - async transform (car, controller) { + async transform(car, controller) { const bytes = new Uint8Array(await car.arrayBuffer()) const multihashDigest = await PieceHasher.digest(bytes) /** @type {import('@web3-storage/capabilities/types').PieceLink} */