Skip to content

Commit

Permalink
support converse fake stream (#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
sachinpad authored Feb 24, 2025
1 parent fa96a1a commit 97595c2
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 51 deletions.
11 changes: 9 additions & 2 deletions packages/proxy/src/providers/bedrock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
ChatCompletionToolMessageParam,
} from "openai/resources/chat/completions";
import { convertImageToBase64 } from "./util";
import { makeFakeOpenAIStreamTransformer } from "./openai";

const brt = new BedrockRuntimeClient({});
export async function fetchBedrockAnthropic({
Expand Down Expand Up @@ -396,13 +397,17 @@ export async function fetchConverse({
toolConfig,
};

const supportsStreaming = !!secret.metadata?.supportsStreaming;
const doStream = !!stream && supportsStreaming;
const fakeStream = !!stream && !supportsStreaming;

const httpResponse = new Response(null, {
status: 200,
});

let responseStream;
try {
if (stream) {
if (doStream) {
const command = new ConverseStreamCommand(input);
const response = await brt.send(command);
if (!response.stream) {
Expand Down Expand Up @@ -499,7 +504,9 @@ export async function fetchConverse({
}

return {
stream: responseStream,
stream: fakeStream
? responseStream.pipeThrough(makeFakeOpenAIStreamTransformer())
: responseStream,
response: httpResponse,
};
}
Expand Down
41 changes: 40 additions & 1 deletion packages/proxy/src/providers/openai.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ChatCompletionChunk, ChatCompletion } from "openai/resources";

export function openAIChatCompletionToChatEvent(
function openAIChatCompletionToChatEvent(
completion: ChatCompletion,
): ChatCompletionChunk {
return {
Expand All @@ -27,3 +27,42 @@ export function openAIChatCompletionToChatEvent(
usage: completion.usage,
};
}

export function makeFakeOpenAIStreamTransformer() {
let responseChunks: Uint8Array[] = [];
return new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
responseChunks.push(chunk);
},
flush(controller) {
const decoder = new TextDecoder();
const responseText = responseChunks
.map((c) => decoder.decode(c))
.join("");
let responseJson: ChatCompletion = {
id: "invalid",
choices: [],
created: 0,
model: "invalid",
object: "chat.completion",
usage: {
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 0,
},
};
try {
responseJson = JSON.parse(responseText);
} catch (e) {
console.error("Failed to parse response as JSON", responseText);
}
controller.enqueue(
new TextEncoder().encode(
`data: ${JSON.stringify(openAIChatCompletionToChatEvent(responseJson))}\n\n`,
),
);
controller.enqueue(new TextEncoder().encode(`data: [DONE]\n\n`));
controller.terminate();
},
});
}
50 changes: 2 additions & 48 deletions packages/proxy/src/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ import {
verifyTempCredentials,
} from "utils";
import { differenceInSeconds } from "date-fns";
import { openAIChatCompletionToChatEvent } from "./providers/openai";
import { makeFakeOpenAIStreamTransformer } from "./providers/openai";
import { ChatCompletionCreateParamsBase } from "openai/resources/chat/completions";
import { importPKCS8, SignJWT } from "jose";
import { z } from "zod";
Expand Down Expand Up @@ -1403,59 +1403,13 @@ async function fetchOpenAIFakeStream({
},
);

let responseChunks: Uint8Array[] = [];
const responseToStream = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
if (proxyResponse.ok) {
responseChunks.push(chunk);
} else {
controller.enqueue(chunk);
}
},
flush(controller) {
if (!proxyResponse.ok) {
controller.terminate();
return;
}
const decoder = new TextDecoder();
const responseText = responseChunks
.map((c) => decoder.decode(c))
.join("");
let responseJson: ChatCompletion = {
id: "invalid",
choices: [],
created: 0,
model: "invalid",
object: "chat.completion",
usage: {
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 0,
},
};
try {
responseJson = JSON.parse(responseText);
} catch (e) {
console.error("Failed to parse response as JSON", responseText);
}
controller.enqueue(
new TextEncoder().encode(
`data: ${JSON.stringify(openAIChatCompletionToChatEvent(responseJson))}\n\n`,
),
);
controller.enqueue(new TextEncoder().encode(`data: [DONE]\n\n`));
controller.terminate();
},
});

if (isStream) {
setHeader("content-type", "text/event-stream; charset=utf-8");
}

return {
stream:
isStream && proxyResponse.ok
? proxyResponse.body?.pipeThrough(responseToStream) ||
? proxyResponse.body?.pipeThrough(makeFakeOpenAIStreamTransformer()) ||
createEmptyReadableStream()
: proxyResponse.body,
response: proxyResponse,
Expand Down

0 comments on commit 97595c2

Please sign in to comment.