From 85a763c3899d8055645ac6996527d8e39cb0ba96 Mon Sep 17 00:00:00 2001 From: Robert Craigie Date: Mon, 13 Jan 2025 15:16:36 +0000 Subject: [PATCH] feat(stream): add `.withResponse()` (#654) --- src/lib/MessageStream.ts | 50 ++++++++++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/src/lib/MessageStream.ts b/src/lib/MessageStream.ts index dea739f1..bace2cd9 100644 --- a/src/lib/MessageStream.ts +++ b/src/lib/MessageStream.ts @@ -10,7 +10,7 @@ import { type MessageCreateParamsBase, type TextBlock, } from '@anthropic-ai/sdk/resources/messages'; -import { type ReadableStream } from '@anthropic-ai/sdk/_shims/index'; +import { type ReadableStream, type Response } from '@anthropic-ai/sdk/_shims/index'; import { Stream } from '@anthropic-ai/sdk/streaming'; import { partialParse } from '../_vendor/partial-json-parser/parser'; @@ -41,8 +41,8 @@ export class MessageStream implements AsyncIterable { controller: AbortController = new AbortController(); - #connectedPromise: Promise; - #resolveConnectedPromise: () => void = () => {}; + #connectedPromise: Promise; + #resolveConnectedPromise: (response: Response | null) => void = () => {}; #rejectConnectedPromise: (error: AnthropicError) => void = () => {}; #endPromise: Promise; @@ -57,7 +57,7 @@ export class MessageStream implements AsyncIterable { #catchingPromiseCreated = false; constructor() { - this.#connectedPromise = new Promise((resolve, reject) => { + this.#connectedPromise = new Promise((resolve, reject) => { this.#resolveConnectedPromise = resolve; this.#rejectConnectedPromise = reject; }); @@ -75,6 +75,33 @@ export class MessageStream implements AsyncIterable { this.#endPromise.catch(() => {}); } + /** + * Returns the `MessageStream` data, the raw `Response` instance and the ID of the request, + * returned vie the `request-id` header which is useful for debugging requests and resporting + * issues to Anthropic. + * + * This is the same as the `APIPromise.withResponse()` method. + * + * This method will raise an error if you created the stream using `MessageStream.fromReadableStream` + * as no `Response` is available. + */ + async withResponse(): Promise<{ + data: MessageStream; + response: Response; + request_id: string | null | undefined; + }> { + const response = await this.#connectedPromise; + if (!response) { + throw new Error('Could not resolve a `Response` object'); + } + + return { + data: this, + response, + request_id: response.headers.get('request-id'), + }; + } + /** * Intended for use on the frontend, consuming a stream produced with * `.toReadableStream()` on the backend. @@ -136,11 +163,10 @@ export class MessageStream implements AsyncIterable { signal.addEventListener('abort', () => this.controller.abort()); } this.#beginRequest(); - const stream = await messages.create( - { ...params, stream: true }, - { ...options, signal: this.controller.signal }, - ); - this._connected(); + const { response, data: stream } = await messages + .create({ ...params, stream: true }, { ...options, signal: this.controller.signal }) + .withResponse(); + this._connected(response); for await (const event of stream) { this.#addStreamEvent(event); } @@ -150,9 +176,9 @@ export class MessageStream implements AsyncIterable { this.#endRequest(); } - protected _connected() { + protected _connected(response: Response | null) { if (this.ended) return; - this.#resolveConnectedPromise(); + this.#resolveConnectedPromise(response); this._emit('connect'); } @@ -424,7 +450,7 @@ export class MessageStream implements AsyncIterable { signal.addEventListener('abort', () => this.controller.abort()); } this.#beginRequest(); - this._connected(); + this._connected(null); const stream = Stream.fromReadableStream(readableStream, this.controller); for await (const event of stream) { this.#addStreamEvent(event);