From bc16819b0318f2ed5d89d75ddc70d234ceabb326 Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Tue, 11 Feb 2025 08:50:30 -0800 Subject: [PATCH] Smoothen event source parser by feeding in smaller chunks (#151) It seems like several things, including the event parser (straight up CPU performance), and node's ability to interleave stream segments when running lots of concurrent streams, are worsened by having streams with enormous chunks. This change splits the stream into newlines, which seems to produce much better results. --- packages/proxy/src/proxy.ts | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/packages/proxy/src/proxy.ts b/packages/proxy/src/proxy.ts index 5c5471b..f6755e1 100644 --- a/packages/proxy/src/proxy.ts +++ b/packages/proxy/src/proxy.ts @@ -328,12 +328,24 @@ export async function proxyV1({ stream = new ReadableStream({ start(controller) { if ("body" in cachedData && cachedData.body) { - controller.enqueue(new TextEncoder().encode(cachedData.body)); + for (const line of cachedData.body.split("\n")) { + controller.enqueue(new TextEncoder().encode(line)); + } } else if ("data" in cachedData && cachedData.data) { const data = Buffer.from(cachedData.data, "base64"); - const uint8Array = new Uint8Array(data); - controller.enqueue(uint8Array); + let start = 0; + for (let i = 0; i < data.length; i++) { + if (data[i] === 10) { + // 10 is ASCII/UTF-8 code for \n + controller.enqueue(data.subarray(start, i + 1)); + start = i + 1; + } + } + if (start < data.length) { + controller.enqueue(data.subarray(start)); + } } + controller.close(); }, }); @@ -485,7 +497,6 @@ export async function proxyV1({ cacheKey, JSON.stringify({ headers: proxyResponseHeaders, data: dataB64 }), ); - controller.terminate(); }, }); @@ -590,7 +601,11 @@ export async function proxyV1({ }); } if (isStreaming) { - eventSourceParser?.feed(new TextDecoder().decode(chunk)); + const start = Date.now(); + let textChunks = new TextDecoder().decode(chunk); + eventSourceParser?.feed(textChunks); + const end = Date.now(); + console.log(`time to feed ${chunk.length} bytes: ${end - start}ms`); } else { allChunks.push(chunk); }