Skip to content

Commit

Permalink
Reduce diff
Browse files Browse the repository at this point in the history
  • Loading branch information
ardatan committed Feb 5, 2025
1 parent 2060f1d commit 98b4d1e
Showing 1 changed file with 75 additions and 6 deletions.
81 changes: 75 additions & 6 deletions packages/server/src/plugins/useContentEncoding.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { decompressedResponseMap, getSupportedEncodings } from '../utils.js';
import type { Readable } from 'node:stream';
import {
decompressedResponseMap,
getSupportedEncodings,
isAsyncIterable,
isReadable,
} from '../utils.js';
import type { ServerAdapterPlugin } from './types.js';

export function useContentEncoding<TServerContext>(): ServerAdapterPlugin<TServerContext> {
Expand Down Expand Up @@ -53,8 +59,9 @@ export function useContentEncoding<TServerContext>(): ServerAdapterPlugin<TServe
encodingMap.set(request, acceptEncoding.split(','));
}
},
onResponse({ request, response, setResponse, fetchAPI }) {
if (response.body) {
onResponse({ request, response, setResponse, fetchAPI, serverContext }) {
// Hack for avoiding to create whatwg-node to create a readable stream until it's needed
if ((response as any)['bodyInit'] || response.body) {
const encodings = encodingMap.get(request);
if (encodings) {
const supportedEncoding = encodings.find(encoding =>
Expand All @@ -64,12 +71,42 @@ export function useContentEncoding<TServerContext>(): ServerAdapterPlugin<TServe
const compressionStream = new fetchAPI.CompressionStream(
supportedEncoding as CompressionFormat,
);
// To calculate final content-length
const contentLength = response.headers.get('content-length');
if (contentLength) {
const bufOfRes = (response as any)._buffer;
if (bufOfRes) {
const writer = compressionStream.writable.getWriter();
const write$ = writer.write(bufOfRes);
serverContext.waitUntil?.(write$);
const close$ = writer.close();
serverContext.waitUntil?.(close$);
const uint8Arrays$ = isReadable((compressionStream.readable as any)['readable'])
? collectReadableValues((compressionStream.readable as any)['readable'])
: isAsyncIterable(compressionStream.readable)
? collectAsyncIterableValues(compressionStream.readable)
: collectReadableStreamValues(compressionStream.readable);
return uint8Arrays$.then(uint8Arrays => {
const chunks = uint8Arrays.flatMap(uint8Array => [...uint8Array]);
const uint8Array = new Uint8Array(chunks);
const newHeaders = new fetchAPI.Headers(response.headers);
newHeaders.set('content-encoding', supportedEncoding);
newHeaders.set('content-length', uint8Array.byteLength.toString());
const compressedResponse = new fetchAPI.Response(uint8Array, {
...response,
headers: newHeaders,
});
decompressedResponseMap.set(compressedResponse, response);
setResponse(compressedResponse);
const close$ = compressionStream.writable.close();
serverContext.waitUntil?.(close$);
});
}
}
const newHeaders = new fetchAPI.Headers(response.headers);
newHeaders.set('content-encoding', supportedEncoding);
newHeaders.delete('content-length');
const compressedBody = response.body!.pipeThrough(compressionStream, {
signal: request.signal,
});
const compressedBody = response.body!.pipeThrough(compressionStream);
const compressedResponse = new fetchAPI.Response(compressedBody, {
status: response.status,
statusText: response.statusText,
Expand All @@ -83,3 +120,35 @@ export function useContentEncoding<TServerContext>(): ServerAdapterPlugin<TServe
},
};
}

function collectReadableValues<T>(readable: Readable): Promise<T[]> {
const values: T[] = [];
readable.on('data', value => values.push(value));
return new Promise((resolve, reject) => {
readable.once('end', () => resolve(values));
readable.once('error', reject);
});
}

async function collectAsyncIterableValues<T>(asyncIterable: AsyncIterable<T>): Promise<T[]> {
const values: T[] = [];
for await (const value of asyncIterable) {
values.push(value);
}
return values;
}

async function collectReadableStreamValues<T>(readableStream: ReadableStream<T>): Promise<T[]> {
const reader = readableStream.getReader();
const values: T[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) {
reader.releaseLock();
break;
} else if (value) {
values.push(value);
}
}
return values;
}

0 comments on commit 98b4d1e

Please sign in to comment.