Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

fix: transfer set #3573

Merged
merged 17 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/ipfs-message-port-client/src/client/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* @typedef {Object} QueryOptions
* @property {AbortSignal} [signal]
* @property {number} [timeout]
* @property {Transferable[]} [transfer]
* @property {Set<Transferable>} [transfer]
*/

/**
Expand Down Expand Up @@ -49,7 +49,7 @@ export class Query {
/**
* Data that will be transferred over message channel.
*
* @returns {Transferable[]|void}
* @returns {Set<Transferable>|void}
*/
transfer () {
return this.input.transfer
Expand Down
4 changes: 2 additions & 2 deletions packages/ipfs-message-port-client/src/client/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ export class MessageTransport {
id,
input: query.toJSON()
},
// @ts-ignore - TS seems to want second arg to postMessage to not be undefined
[...new Set(query.transfer() || [])]
// @ts-expect-error - Type signature does not expert 2nd undefined arg
achingbrain marked this conversation as resolved.
Show resolved Hide resolved
query.transfer()
)
}

Expand Down
16 changes: 8 additions & 8 deletions packages/ipfs-message-port-client/src/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class CoreClient extends Client {
*/
CoreClient.prototype.addAll = async function * addAll (input, options = {}) {
const { timeout, signal } = options
const transfer = [...(options.transfer || [])]
const transfer = options.transfer || new Set()
const progressCallback = options.progress
? encodeCallback(options.progress, transfer)
: undefined
Expand Down Expand Up @@ -99,7 +99,7 @@ CoreClient.prototype.addAll = async function * addAll (input, options = {}) {
*/
CoreClient.prototype.add = async function add (input, options = {}) {
const { timeout, signal } = options
const transfer = [...(options.transfer || [])]
const transfer = options.transfer || new Set()
const progressCallback = options.progress
? encodeCallback(options.progress, transfer)
: undefined
Expand Down Expand Up @@ -184,7 +184,7 @@ const identity = (v) => v
* given input.
*
* @param {ImportCandidate} input
* @param {Transferable[]} transfer
* @param {Set<Transferable>} transfer
* @returns {Promise<EncodedAddInput>}
*/
const encodeAddInput = async (input, transfer) => {
Expand Down Expand Up @@ -242,7 +242,7 @@ const encodeAddInput = async (input, transfer) => {
* given input.
*
* @param {ImportCandidateStream} input
* @param {Transferable[]} transfer
* @param {Set<Transferable>} transfer
* @returns {EncodedAddAllInput}
*/
const encodeAddAllInput = (input, transfer) => {
Expand Down Expand Up @@ -279,7 +279,7 @@ const encodeAddAllInput = (input, transfer) => {
* effective strategy.
*
* @param {ImportCandidate} content
* @param {Transferable[]} transfer
* @param {Set<Transferable>} transfer
* @returns {EncodedAddInput}
*/
const encodeAsyncIterableContent = (content, transfer) => {
Expand All @@ -303,7 +303,7 @@ const encodeAsyncIterableContent = (content, transfer) => {

/**
* @param {ImportCandidate} content
* @param {Transferable[]} transfer
* @param {Set<Transferable>} transfer
* @returns {EncodedAddInput}
*/
const encodeIterableContent = (content, transfer) => {
Expand All @@ -329,7 +329,7 @@ const encodeIterableContent = (content, transfer) => {

/**
* @param {ToFile | ToDirectory} file
* @param {Transferable[]} transfer
* @param {Set<Transferable>} transfer
* @returns {EncodedFileInput | EncodedDirectoryInput}
*/
const encodeFileObject = ({ path, mode, mtime, content }, transfer) => {
Expand All @@ -349,7 +349,7 @@ const encodeFileObject = ({ path, mode, mtime, content }, transfer) => {

/**
* @param {ToContent|undefined} content
* @param {Transferable[]} transfer
* @param {Set<Transferable>} transfer
* @returns {EncodedFileContent}
*/
const encodeFileContent = (content, transfer) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-message-port-client/src/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ DAGClient.prototype.resolve = async function resolve (cid, options = {}) {

/**
* @param {string|CID} input
* @param {Transferable[]} [transfer]
* @param {Set<Transferable>} [transfer]
* @returns {string|EncodedCID}
*/
const encodeCIDOrPath = (input, transfer) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-message-port-client/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
// JSDoc syntax or that result in a different behaviour when typed in JSDoc.

export interface MessagePortClientOptions {
transfer?: Transferable[]
transfer?: Set<Transferable>
}
3 changes: 1 addition & 2 deletions packages/ipfs-message-port-protocol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ port2.onmessage = async ({data}) => {

### Callback

Primitive callbacks that take single parameter supported by [structured cloning algorithm][] like progress callback used across IPFS APIs can be encoded / decoded. Unilke most encoders `transfer` argument is required (because value is encoded to a [MessagePort][] that can only be transferred)
Primitive callbacks that take single parameter supported by [structured cloning algorithm][] like progress callback used across IPFS APIs can be encoded / decoded. Unlike most encoders `transfer` argument is required (because value is encoded to a [MessagePort][] that can only be transferred)

```js
import { encodeCallback, decodeCallback } from 'ipfs-message-port-protocol/core'
Expand Down Expand Up @@ -186,4 +186,3 @@ Check out our [contributing document](https://github.com/ipfs/community/blob/mas
## License

[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.jparrowsec.cn%2Fipfs%2Fjs-ipfs.svg?type=large)](https://app.fossa.io/projects/git%2Bgithub.jparrowsec.cn%2Fipfs%2Fjs-ipfs?ref=badge_large)

4 changes: 2 additions & 2 deletions packages/ipfs-message-port-protocol/src/block.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* copy.
*
* @param {Uint8Array} data
* @param {Transferable[]} [transfer]
* @param {Set<Transferable>} [transfer]
*/
export const encodeBlock = (data, transfer) => {
if (transfer) {
transfer.push(data.buffer)
transfer.add(data.buffer)
}
return data
}
4 changes: 2 additions & 2 deletions packages/ipfs-message-port-protocol/src/cid.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import { CID } from 'multiformats/cid'
* will be added for the transfer list.
*
* @param {CID} cid
* @param {Transferable[]} [transfer]
* @param {Set<Transferable>} [transfer]
* @returns {EncodedCID}
*/
export const encodeCID = (cid, transfer) => {
if (transfer) {
transfer.push(cid.multihash.bytes.buffer)
transfer.add(cid.multihash.bytes.buffer)
}
return cid
}
Expand Down
44 changes: 30 additions & 14 deletions packages/ipfs-message-port-protocol/src/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,19 @@ export const decodeIterable = async function * ({ port }, decode) {
/**
* @template I,O
* @param {AsyncIterable<I>|Iterable<I>} iterable
* @param {function(I, Transferable[]):O} encode
* @param {Transferable[]} transfer
* @param {function(I, Set<Transferable>):O} encode
* @param {Set<Transferable>} transfer
* @returns {RemoteIterable<O>}
*/
export const encodeIterable = (iterable, encode, transfer) => {
const { port1: port, port2: remote } = new MessageChannel()
/** @type {Transferable[]} */
const itemTransfer = []
/** @type {Iterator<I>|AsyncIterator<I>} */
const iterator = toIterator(iterable)
// Note that port.onmessage will receive multiple 'next' method messages.
// Instead of allocating set every time we allocate one here and recycle
// it on each 'next' message.
/** @type {Set<Transferable>} */
const itemTransfer = new Set()

port.onmessage = async ({ data: { method } }) => {
switch (method) {
Expand All @@ -112,12 +115,15 @@ export const encodeIterable = (iterable, encode, transfer) => {
port.postMessage({ type: 'next', done: true })
port.close()
} else {
itemTransfer.length = 0
port.postMessage(
itemTransfer.clear()
const encodedValue = encode(value, itemTransfer)

postMessage(
port,
{
type: 'next',
done: false,
value: encode(value, itemTransfer)
value: encodedValue
},
itemTransfer
)
Expand All @@ -144,7 +150,7 @@ export const encodeIterable = (iterable, encode, transfer) => {
}
}
port.start()
transfer.push(remote)
transfer.add(remote)

return { type: 'RemoteIterable', port: remote }
}
Expand All @@ -170,31 +176,41 @@ const toIterator = iterable => {

/**
* @param {Function} callback
* @param {Transferable[]} transfer
* @param {Set<Transferable>} transfer
* @returns {RemoteCallback}
*/
export const encodeCallback = (callback, transfer) => {
// eslint-disable-next-line no-undef
const { port1: port, port2: remote } = new MessageChannel()
port.onmessage = ({ data }) => callback.apply(null, data)
transfer.push(remote)
transfer.add(remote)
return { type: 'RemoteCallback', port: remote }
}

/**
* @template T
* @param {RemoteCallback} remote
* @returns {function(T[]):void | function(T[], Transferable[]):void}
* @returns {function(T[]):void | function(T[], Set<Transferable>):void}
*/
export const decodeCallback = ({ port }) => {
/**
* @param {T[]} args
* @param {Transferable[]} [transfer]
* @param {Set<Transferable>} [transfer]
* @returns {void}
*/
const callback = (args, transfer = []) => {
port.postMessage(args, [...new Set(transfer)])
const callback = (args, transfer) => {
postMessage(port, args, transfer)
}

return callback
}

/**
* @param {MessagePort} port
* @param {any} message
* @param {Iterable<Transferable>} [transfer]
*/
const postMessage = (port, message, transfer) =>
// @ts-expect-error - Built in types expect Transferable[] but it really
// should be Iterable<Transferable>
port.postMessage(message, transfer)
8 changes: 4 additions & 4 deletions packages/ipfs-message-port-protocol/src/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export const decodeNode = ({ dagNode, cids }) => {
* this node will be added to transfer so they are moved across without copy.
*
* @param {DAGNode} dagNode
* @param {Transferable[]} [transfer]
* @param {Set<Transferable>} [transfer]
* @returns {EncodedDAGNode}
*/
export const encodeNode = (dagNode, transfer) => {
Expand All @@ -58,7 +58,7 @@ export const encodeNode = (dagNode, transfer) => {
*
* @param {DAGNode} value
* @param {CID[]} cids
* @param {Transferable[]} [transfer]
* @param {Set<Transferable>} [transfer]
* @returns {void}
*/
const collectNode = (value, cids, transfer) => {
Expand All @@ -70,11 +70,11 @@ const collectNode = (value, cids, transfer) => {
encodeCID(cid, transfer)
} else if (value instanceof ArrayBuffer) {
if (transfer) {
transfer.push(value)
transfer.add(value)
}
} else if (ArrayBuffer.isView(value)) {
if (transfer) {
transfer.push(value.buffer)
transfer.add(value.buffer)
}
} else if (Array.isArray(value)) {
for (const member of value) {
Expand Down
6 changes: 3 additions & 3 deletions packages/ipfs-message-port-protocol/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ export type Remote<T extends Record<string, unknown>> = {
[K in keyof T]: Procedure<T[K]>
}

type Return<T> = T extends Promise<infer U>
export type Return<T> = T extends Promise<infer U>
? Promise<U & TransferOptions>
: Promise<T & TransferOptions>

export interface QueryOptions {
signal?: AbortSignal
timeout?: number
transfer?: Transferable[]
transfer?: Set<Transferable>
}

export interface TransferOptions {
transfer?: Transferable[]
transfer?: Set<Transferable>
}

export type NonUndefined<A> = A extends undefined ? never : A
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-message-port-protocol/test/block.browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('block (browser)', function () {
it('should decode Block over message channel & transfer bytes', async () => {
const blockIn = uint8ArrayFromString('hello')

const transfer = []
const transfer = new Set()

const blockOut = await move(encodeBlock(blockIn, transfer), transfer)

Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-message-port-protocol/test/cid.browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('cid (browser)', function () {

it('should decode CID and transfer bytes', async () => {
const cidIn = CID.parse('Qmd7xRhW5f29QuBFtqu3oSD27iVy35NRB91XFjmKFhtgMr')
const transfer = []
const transfer = new Set()
const cidDataIn = encodeCID(cidIn, transfer)
const cidDataOut = await move(cidDataIn, transfer)
const cidOut = decodeCID(cidDataOut)
Expand Down
Loading