Skip to content

Commit

Permalink
Smoothen event source parser by feeding in smaller chunks (#151)
Browse files Browse the repository at this point in the history
It seems like several things, including the event parser (straight up
CPU performance), and node's ability to interleave stream segments when
running lots of concurrent streams, are worsened by having streams with
enormous chunks.

This change splits the stream into newlines, which seems to produce much
better results.
  • Loading branch information
ankrgyl authored Feb 11, 2025
1 parent 87c174a commit bc16819
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions packages/proxy/src/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,24 @@ export async function proxyV1({
stream = new ReadableStream<Uint8Array>({
start(controller) {
if ("body" in cachedData && cachedData.body) {
controller.enqueue(new TextEncoder().encode(cachedData.body));
for (const line of cachedData.body.split("\n")) {
controller.enqueue(new TextEncoder().encode(line));
}
} else if ("data" in cachedData && cachedData.data) {
const data = Buffer.from(cachedData.data, "base64");
const uint8Array = new Uint8Array(data);
controller.enqueue(uint8Array);
let start = 0;
for (let i = 0; i < data.length; i++) {
if (data[i] === 10) {
// 10 is ASCII/UTF-8 code for \n
controller.enqueue(data.subarray(start, i + 1));
start = i + 1;
}
}
if (start < data.length) {
controller.enqueue(data.subarray(start));
}
}

controller.close();
},
});
Expand Down Expand Up @@ -485,7 +497,6 @@ export async function proxyV1({
cacheKey,
JSON.stringify({ headers: proxyResponseHeaders, data: dataB64 }),
);
controller.terminate();
},
});

Expand Down Expand Up @@ -590,7 +601,11 @@ export async function proxyV1({
});
}
if (isStreaming) {
eventSourceParser?.feed(new TextDecoder().decode(chunk));
const start = Date.now();
let textChunks = new TextDecoder().decode(chunk);
eventSourceParser?.feed(textChunks);
const end = Date.now();
console.log(`time to feed ${chunk.length} bytes: ${end - start}ms`);
} else {
allChunks.push(chunk);
}
Expand Down

0 comments on commit bc16819

Please sign in to comment.