Skip to content

Commit

Permalink
[Flight] Support streaming of decodeReply in Edge environments (faceb…
Browse files Browse the repository at this point in the history
…ook#31852)

We support streaming `multipart/form-data` in Node.js using Busboy since
that's kind of the idiomatic ecosystem way for handling these stream
there. There's not really anything idiomatic like that for Edge that's
universal yet.

This adds a version that's basically just
`AsyncIterable.from(formData)`. It could also be a `ReadableStream` of
those entries since those are also `AsyncIterable`.

I imagine that in the future we might add one from a binary
`ReadableStream` that does the parsing built-in.
  • Loading branch information
sebmarkbage authored Dec 19, 2024
1 parent 8f92ea4 commit 9f540fc
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 0 deletions.
1 change: 1 addition & 0 deletions packages/react-server-dom-parcel/npm/server.edge.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ if (process.env.NODE_ENV === 'production') {

exports.renderToReadableStream = s.renderToReadableStream;
exports.decodeReply = s.decodeReply;
exports.decodeReplyFromAsyncIterable = s.decodeReplyFromAsyncIterable;
exports.decodeAction = s.decodeAction;
exports.decodeFormState = s.decodeFormState;
exports.createClientReference = s.createClientReference;
Expand Down
1 change: 1 addition & 0 deletions packages/react-server-dom-parcel/server.edge.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
export {
renderToReadableStream,
decodeReply,
decodeReplyFromAsyncIterable,
decodeAction,
decodeFormState,
createClientReference,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import {
type ServerReferenceId,
} from '../client/ReactFlightClientConfigBundlerParcel';

import {ASYNC_ITERATOR} from 'shared/ReactSymbols';

import {
createRequest,
createPrerenderRequest,
Expand All @@ -30,6 +32,9 @@ import {
createResponse,
close,
getRoot,
reportGlobalError,
resolveField,
resolveFile,
} from 'react-server/src/ReactFlightReplyServer';

import {
Expand Down Expand Up @@ -189,6 +194,50 @@ export function decodeReply<T>(
return root;
}

export function decodeReplyFromAsyncIterable<T>(
iterable: AsyncIterable<[string, string | File]>,
options?: {temporaryReferences?: TemporaryReferenceSet},
): Thenable<T> {
const iterator: AsyncIterator<[string, string | File]> =
iterable[ASYNC_ITERATOR]();

const response = createResponse(
serverManifest,
'',
options ? options.temporaryReferences : undefined,
);

function progress(
entry:
| {done: false, +value: [string, string | File], ...}
| {done: true, +value: void, ...},
) {
if (entry.done) {
close(response);
} else {
const [name, value] = entry.value;
if (typeof value === 'string') {
resolveField(response, name, value);
} else {
resolveFile(response, name, value);
}
iterator.next().then(progress, error);
}
}
function error(reason: Error) {
reportGlobalError(response, reason);
if (typeof (iterator: any).throw === 'function') {
// The iterator protocol doesn't necessarily include this but a generator do.
// $FlowFixMe should be able to pass mixed
iterator.throw(reason).then(error, error);
}
}

iterator.next().then(progress, error);

return getRoot(response);
}

export function decodeAction<T>(body: FormData): Promise<() => T> | null {
return decodeActionImpl(body, serverManifest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export {
renderToReadableStream,
prerender as unstable_prerender,
decodeReply,
decodeReplyFromAsyncIterable,
decodeAction,
decodeFormState,
createClientReference,
Expand Down
1 change: 1 addition & 0 deletions packages/react-server-dom-turbopack/npm/server.edge.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ if (process.env.NODE_ENV === 'production') {

exports.renderToReadableStream = s.renderToReadableStream;
exports.decodeReply = s.decodeReply;
exports.decodeReplyFromAsyncIterable = s.decodeReplyFromAsyncIterable;
exports.decodeAction = s.decodeAction;
exports.decodeFormState = s.decodeFormState;
exports.registerServerReference = s.registerServerReference;
Expand Down
1 change: 1 addition & 0 deletions packages/react-server-dom-turbopack/server.edge.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
export {
renderToReadableStream,
decodeReply,
decodeReplyFromAsyncIterable,
decodeAction,
decodeFormState,
registerServerReference,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import type {Thenable} from 'shared/ReactTypes';
import type {ClientManifest} from './ReactFlightServerConfigTurbopackBundler';
import type {ServerManifest} from 'react-client/src/ReactFlightClientConfig';

import {ASYNC_ITERATOR} from 'shared/ReactSymbols';

import {
createRequest,
createPrerenderRequest,
Expand All @@ -25,6 +27,9 @@ import {
createResponse,
close,
getRoot,
reportGlobalError,
resolveField,
resolveFile,
} from 'react-server/src/ReactFlightReplyServer';

import {
Expand Down Expand Up @@ -183,10 +188,56 @@ function decodeReply<T>(
return root;
}

function decodeReplyFromAsyncIterable<T>(
iterable: AsyncIterable<[string, string | File]>,
turbopackMap: ServerManifest,
options?: {temporaryReferences?: TemporaryReferenceSet},
): Thenable<T> {
const iterator: AsyncIterator<[string, string | File]> =
iterable[ASYNC_ITERATOR]();

const response = createResponse(
turbopackMap,
'',
options ? options.temporaryReferences : undefined,
);

function progress(
entry:
| {done: false, +value: [string, string | File], ...}
| {done: true, +value: void, ...},
) {
if (entry.done) {
close(response);
} else {
const [name, value] = entry.value;
if (typeof value === 'string') {
resolveField(response, name, value);
} else {
resolveFile(response, name, value);
}
iterator.next().then(progress, error);
}
}
function error(reason: Error) {
reportGlobalError(response, reason);
if (typeof (iterator: any).throw === 'function') {
// The iterator protocol doesn't necessarily include this but a generator do.
// $FlowFixMe should be able to pass mixed
iterator.throw(reason).then(error, error);
}
}

iterator.next().then(progress, error);

return getRoot(response);
}

export {
renderToReadableStream,
prerender,
decodeReply,
decodeReplyFromAsyncIterable,
decodeAction,
decodeFormState,
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export {
renderToReadableStream,
prerender as unstable_prerender,
decodeReply,
decodeReplyFromAsyncIterable,
decodeAction,
decodeFormState,
registerServerReference,
Expand Down
1 change: 1 addition & 0 deletions packages/react-server-dom-webpack/npm/server.edge.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ if (process.env.NODE_ENV === 'production') {

exports.renderToReadableStream = s.renderToReadableStream;
exports.decodeReply = s.decodeReply;
exports.decodeReplyFromAsyncIterable = s.decodeReplyFromAsyncIterable;
exports.decodeAction = s.decodeAction;
exports.decodeFormState = s.decodeFormState;
exports.registerServerReference = s.registerServerReference;
Expand Down
1 change: 1 addition & 0 deletions packages/react-server-dom-webpack/server.edge.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
export {
renderToReadableStream,
decodeReply,
decodeReplyFromAsyncIterable,
decodeAction,
decodeFormState,
registerServerReference,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,40 @@ describe('ReactFlightDOMReplyEdge', () => {
expect(error).not.toBe(null);
expect(error.message).toBe('Connection closed.');
});

it('can stream the decoding using an async iterable', async () => {
let resolve;
const promise = new Promise(r => (resolve = r));

const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
]);

const formData = await ReactServerDOMClient.encodeReply({
a: Promise.resolve('hello'),
b: Promise.resolve(buffer),
});

const iterable = {
async *[Symbol.asyncIterator]() {
// eslint-disable-next-line no-for-of-loops/no-for-of-loops
for (const entry of formData) {
yield entry;
await promise;
}
},
};

const decoded = await ReactServerDOMServer.decodeReplyFromAsyncIterable(
iterable,
webpackServerMap,
);

expect(Object.keys(decoded)).toEqual(['a', 'b']);

await resolve();

expect(await decoded.a).toBe('hello');
expect(Array.from(await decoded.b)).toEqual(Array.from(buffer));
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import type {Thenable} from 'shared/ReactTypes';
import type {ClientManifest} from './ReactFlightServerConfigWebpackBundler';
import type {ServerManifest} from 'react-client/src/ReactFlightClientConfig';

import {ASYNC_ITERATOR} from 'shared/ReactSymbols';

import {
createRequest,
createPrerenderRequest,
Expand All @@ -25,6 +27,9 @@ import {
createResponse,
close,
getRoot,
reportGlobalError,
resolveField,
resolveFile,
} from 'react-server/src/ReactFlightReplyServer';

import {
Expand Down Expand Up @@ -183,10 +188,56 @@ function decodeReply<T>(
return root;
}

function decodeReplyFromAsyncIterable<T>(
iterable: AsyncIterable<[string, string | File]>,
webpackMap: ServerManifest,
options?: {temporaryReferences?: TemporaryReferenceSet},
): Thenable<T> {
const iterator: AsyncIterator<[string, string | File]> =
iterable[ASYNC_ITERATOR]();

const response = createResponse(
webpackMap,
'',
options ? options.temporaryReferences : undefined,
);

function progress(
entry:
| {done: false, +value: [string, string | File], ...}
| {done: true, +value: void, ...},
) {
if (entry.done) {
close(response);
} else {
const [name, value] = entry.value;
if (typeof value === 'string') {
resolveField(response, name, value);
} else {
resolveFile(response, name, value);
}
iterator.next().then(progress, error);
}
}
function error(reason: Error) {
reportGlobalError(response, reason);
if (typeof (iterator: any).throw === 'function') {
// The iterator protocol doesn't necessarily include this but a generator do.
// $FlowFixMe should be able to pass mixed
iterator.throw(reason).then(error, error);
}
}

iterator.next().then(progress, error);

return getRoot(response);
}

export {
renderToReadableStream,
prerender,
decodeReply,
decodeReplyFromAsyncIterable,
decodeAction,
decodeFormState,
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export {
renderToReadableStream,
prerender as unstable_prerender,
decodeReply,
decodeReplyFromAsyncIterable,
decodeAction,
decodeFormState,
registerServerReference,
Expand Down

0 comments on commit 9f540fc

Please sign in to comment.