diff --git a/packages/server/src/plugins/useContentEncoding.ts b/packages/server/src/plugins/useContentEncoding.ts index 0b75a577f51..fbe556c2f85 100644 --- a/packages/server/src/plugins/useContentEncoding.ts +++ b/packages/server/src/plugins/useContentEncoding.ts @@ -1,4 +1,10 @@ -import { decompressedResponseMap, getSupportedEncodings } from '../utils.js'; +import type { Readable } from 'node:stream'; +import { + decompressedResponseMap, + getSupportedEncodings, + isAsyncIterable, + isReadable, +} from '../utils.js'; import type { ServerAdapterPlugin } from './types.js'; export function useContentEncoding(): ServerAdapterPlugin { @@ -53,8 +59,9 @@ export function useContentEncoding(): ServerAdapterPlugin @@ -64,12 +71,42 @@ export function useContentEncoding(): ServerAdapterPlugin { + const chunks = uint8Arrays.flatMap(uint8Array => [...uint8Array]); + const uint8Array = new Uint8Array(chunks); + const newHeaders = new fetchAPI.Headers(response.headers); + newHeaders.set('content-encoding', supportedEncoding); + newHeaders.set('content-length', uint8Array.byteLength.toString()); + const compressedResponse = new fetchAPI.Response(uint8Array, { + ...response, + headers: newHeaders, + }); + decompressedResponseMap.set(compressedResponse, response); + setResponse(compressedResponse); + const close$ = compressionStream.writable.close(); + serverContext.waitUntil?.(close$); + }); + } + } const newHeaders = new fetchAPI.Headers(response.headers); newHeaders.set('content-encoding', supportedEncoding); newHeaders.delete('content-length'); - const compressedBody = response.body!.pipeThrough(compressionStream, { - signal: request.signal, - }); + const compressedBody = response.body!.pipeThrough(compressionStream); const compressedResponse = new fetchAPI.Response(compressedBody, { status: response.status, statusText: response.statusText, @@ -83,3 +120,35 @@ export function useContentEncoding(): ServerAdapterPlugin(readable: Readable): Promise { + const values: T[] = []; + readable.on('data', value => values.push(value)); + return new Promise((resolve, reject) => { + readable.once('end', () => resolve(values)); + readable.once('error', reject); + }); +} + +async function collectAsyncIterableValues(asyncIterable: AsyncIterable): Promise { + const values: T[] = []; + for await (const value of asyncIterable) { + values.push(value); + } + return values; +} + +async function collectReadableStreamValues(readableStream: ReadableStream): Promise { + const reader = readableStream.getReader(); + const values: T[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) { + reader.releaseLock(); + break; + } else if (value) { + values.push(value); + } + } + return values; +}