From b6e213f7f6694613cc76f32bad509318cbf8f370 Mon Sep 17 00:00:00 2001 From: Joao Andrade Date: Thu, 2 May 2024 16:27:29 +0100 Subject: [PATCH] feat: revamp implementation --- packages/upload-client/src/blob.js | 233 ++++++++++++++++++++-------- packages/upload-client/src/types.ts | 8 + pnpm-lock.yaml | 3 + 3 files changed, 182 insertions(+), 62 deletions(-) diff --git a/packages/upload-client/src/blob.js b/packages/upload-client/src/blob.js index 0acbf0364..050fee869 100644 --- a/packages/upload-client/src/blob.js +++ b/packages/upload-client/src/blob.js @@ -1,24 +1,127 @@ +import { ed25519 } from '@ucanto/principal' +import { conclude } from '@web3-storage/capabilities/ucan' +import * as UCAN from '@web3-storage/capabilities/ucan' import { CAR } from '@ucanto/transport' +import { Receipt } from '@ucanto/core' +import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob' import * as BlobCapabilities from '@web3-storage/capabilities/blob' +import * as HTTPCapabilities from '@web3-storage/capabilities/http' import { SpaceDID } from '@web3-storage/capabilities/utils' -import retry, { AbortError } from 'p-retry' +import retry from 'p-retry' import { servicePrincipal, connection } from './service.js' import { REQUEST_RETRIES } from './constants.js' +// FIXME this code has been copied over from upload-api /** - * - * @param {string} url - * @param {import('./types.js').ProgressFn} handler + * @param {import('@ucanto/interface').Invocation} concludeFx + */ +export function getConcludeReceipt(concludeFx) { + const receiptBlocks = new Map() + for (const block of concludeFx.iterateIPLDBlocks()) { + receiptBlocks.set(`${block.cid}`, block) + } + return Receipt.view({ + // @ts-expect-error object of type unknown + root: concludeFx.capabilities[0].nb.receipt, + blocks: receiptBlocks, + }) +} + +// FIXME this code has been copied over from upload-api +/** + * @param {import('@ucanto/interface').Receipt} receipt */ -function createUploadProgressHandler(url, handler) { +export function parseBlobAddReceiptNext(receipt) { + // Get invocations next /** - * - * @param {import('./types.js').ProgressStatus} status + * @type {import('@ucanto/interface').Invocation[]} */ - function onUploadProgress({ total, loaded, lengthComputable }) { - return handler({ total, loaded, lengthComputable, url }) + // @ts-expect-error read only effect + const forkInvocations = receipt.fx.fork + const allocateTask = forkInvocations.find( + (fork) => fork.capabilities[0].can === W3sBlobCapabilities.allocate.can + ) + const concludefxs = forkInvocations.filter( + (fork) => fork.capabilities[0].can === UCAN.conclude.can + ) + const putTask = forkInvocations.find( + (fork) => fork.capabilities[0].can === HTTPCapabilities.put.can + ) + const acceptTask = receipt.fx.join + if (!allocateTask || !concludefxs.length || !putTask || !acceptTask) { + throw new Error('mandatory effects not received') + } + + // Decode receipts available + const nextReceipts = concludefxs.map((fx) => getConcludeReceipt(fx)) + /** @type {import('@ucanto/interface').Receipt | undefined} */ + // @ts-expect-error types unknown for next + const allocateReceipt = nextReceipts.find((receipt) => + receipt.ran.link().equals(allocateTask.cid) + ) + /** @type {import('@ucanto/interface').Receipt<{}, import('@ucanto/interface').Failure> | undefined} */ + // @ts-expect-error types unknown for next + const putReceipt = nextReceipts.find((receipt) => + receipt.ran.link().equals(putTask.cid) + ) + /** @type {import('@ucanto/interface').Receipt | undefined} */ + // @ts-expect-error types unknown for next + const acceptReceipt = nextReceipts.find((receipt) => + receipt.ran.link().equals(acceptTask.link()) + ) + + if (!allocateReceipt) { + throw new Error('mandatory effects not received') + } + + return { + allocate: { + task: allocateTask, + receipt: allocateReceipt, + }, + put: { + task: putTask, + receipt: putReceipt, + }, + accept: { + task: acceptTask, + receipt: acceptReceipt, + }, + } +} + +// FIXME this code has been copied over from upload-api +/** + * @param {import('@ucanto/interface').Signer} id + * @param {import('@ucanto/interface').Verifier} serviceDid + * @param {import('@ucanto/interface').Receipt} receipt + */ +export function createConcludeInvocation(id, serviceDid, receipt) { + const receiptBlocks = [] + const receiptCids = [] + for (const block of receipt.iterateIPLDBlocks()) { + receiptBlocks.push(block) + receiptCids.push(block.cid) + } + const concludeAllocatefx = conclude.invoke({ + issuer: id, + audience: serviceDid, + with: id.toDIDKey(), + nb: { + receipt: receipt.link(), + }, + expiration: Infinity, + facts: [ + { + ...receiptCids, + }, + ], + }) + for (const block of receiptBlocks) { + concludeAllocatefx.attach(block) } - return onUploadProgress + + return concludeAllocatefx } /** @@ -81,63 +184,69 @@ export async function add( }) } - // TODO I'm definitely missing something here - // I suppose it's something alike https://github.com/w3s-project/w3up/pull/1421/files#diff-f1d31e4f2617054f785fab0c186ab965b2fdd3a9ed7873a955d3e3c74bb6e186R100 - const responseAddUpload = result.out.ok - - const fetchWithUploadProgress = - options.fetchWithUploadProgress || - options.fetch || - globalThis.fetch.bind(globalThis) + const nextTasks = parseBlobAddReceiptNext(result) - let fetchDidCallUploadProgressCb = false - const res = await retry( - async () => { - try { - const res = await fetchWithUploadProgress(responseAddUpload.url, { - method: 'PUT', - body: car, - headers: responseAddUpload.headers, - signal: options.signal, - onUploadProgress: (status) => { - fetchDidCallUploadProgressCb = true - if (options.onUploadProgress) - createUploadProgressHandler( - responseAddUpload.url, - options.onUploadProgress - )(status) - }, - // @ts-expect-error - this is needed by recent versions of node - see https://github.com/bluesky-social/atproto/pull/470 for more info - duplex: 'half', - }) - if (res.status >= 400 && res.status < 500) { - throw new AbortError(`upload failed: ${res.status}`) - } - return res - } catch (err) { - if (options.signal?.aborted === true) { - throw new AbortError('upload aborted') - } - throw err - } - }, - { - retries: options.retries ?? REQUEST_RETRIES, - } - ) + const { receipt } = nextTasks.allocate + if (!receipt.out.ok) { + throw new Error(`failed ${BlobCapabilities.add.can} invocation`, { + cause: receipt.out.error, + }) + } - if (!fetchDidCallUploadProgressCb && options.onUploadProgress) { - // the fetch implementation didn't support onUploadProgress - const carBlob = new Blob([car]) - options.onUploadProgress({ - total: carBlob.size, - loaded: carBlob.size, - lengthComputable: false, + const { address } = receipt.out.ok + if (address) { + const { status } = await fetch(address.url, { + method: 'PUT', + mode: 'cors', + body: bytes, + headers: address.headers, }) + if (status !== 200) throw new Error(`unexpected status: ${status}`) } - if (!res.ok) { - throw new Error(`upload failed: ${res.status}`) + // Invoke `conclude` with `http/put` receipt + const derivedSigner = ed25519.from( + /** @type {import('@ucanto/interface').SignerArchive} */ + (nextTasks.put.task.facts[0]['keys']) + ) + const httpPut = HTTPCapabilities.put.invoke({ + issuer: derivedSigner, + audience: derivedSigner, + with: derivedSigner.toDIDKey(), + nb: { + body: { + digest: bytes, + size: bytes.length, + }, + url: { + 'ucan/await': ['.out.ok.address.url', nextTasks.allocate.task.cid], + }, + headers: { + 'ucan/await': [ + '.out.ok.address.headers', + nextTasks.allocate.task.cid, + ], + }, + }, + facts: nextTasks.put.task.facts, + expiration: Infinity, + }) + + const httpPutDelegation = await httpPut.delegate() + const httpPutReceipt = await Receipt.issue({ + issuer: derivedSigner, + ran: httpPutDelegation.cid, + result: { ok: {} }, + }) + // @ts-expect-error object of type unknown + const httpPutConcludeInvocation = createConcludeInvocation(issuer, audience, httpPutReceipt) + // @ts-expect-error object of type unknown + const ucanConclude = await httpPutConcludeInvocation.execute(connection) + + if (!ucanConclude.out.ok) { + throw new Error(`failed ${BlobCapabilities.add.can} invocation`, { + cause: result.out.error, + }) } return link diff --git a/packages/upload-client/src/types.ts b/packages/upload-client/src/types.ts index be854a8c8..b684890d1 100644 --- a/packages/upload-client/src/types.ts +++ b/packages/upload-client/src/types.ts @@ -17,6 +17,10 @@ import { BlobAdd, BlobAddSuccess, BlobAddFailure, + BlobAllocateSuccess, + BlobAllocateFailure, + BlobAcceptSuccess, + BlobAcceptFailure, StoreAdd, StoreAddSuccess, StoreAddSuccessUpload, @@ -62,6 +66,10 @@ type FetchOptions = Override< export type { FetchOptions, + BlobAllocateSuccess, + BlobAllocateFailure, + BlobAcceptSuccess, + BlobAcceptFailure, StoreAdd, StoreAddSuccess, StoreAddSuccessUpload, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 902d8d3a4..892a8f40f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -479,6 +479,9 @@ importers: '@ucanto/client': specifier: ^9.0.1 version: 9.0.1 + '@ucanto/core': + specifier: ^10.0.1 + version: 10.0.1 '@ucanto/interface': specifier: ^10.0.1 version: 10.0.1