Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(streaming): correct error message serialisation #524

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export class APIError extends AnthropicError {
headers: Headers | undefined,
) {
if (!status) {
return new APIConnectionError({ cause: castToError(errorResponse) });
return new APIConnectionError({ message, cause: castToError(errorResponse) });
}

const error = errorResponse as Record<string, any>;
Expand Down Expand Up @@ -104,7 +104,7 @@ export class APIUserAbortError extends APIError {
export class APIConnectionError extends APIError {
override readonly status: undefined = undefined;

constructor({ message, cause }: { message?: string; cause?: Error | undefined }) {
constructor({ message, cause }: { message?: string | undefined; cause?: Error | undefined }) {
super(undefined, undefined, message || 'Connection error.', undefined);
// in some environments the 'cause' property is already declared
// @ts-ignore
Expand Down
13 changes: 7 additions & 6 deletions src/streaming.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ReadableStream, type Response } from './_shims/index';
import { AnthropicError } from './error';

import { safeJSON, createResponseHeaders } from '@anthropic-ai/sdk/core';
import { createResponseHeaders } from '@anthropic-ai/sdk/core';
import { APIError } from '@anthropic-ai/sdk/error';

type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined;
Expand Down Expand Up @@ -65,11 +65,12 @@ export class Stream<Item> implements AsyncIterable<Item> {
}

if (sse.event === 'error') {
const errText = sse.data;
const errJSON = safeJSON(errText);
const errMessage = errJSON ? undefined : errText;

throw APIError.generate(undefined, errJSON, errMessage, createResponseHeaders(response.headers));
throw APIError.generate(
undefined,
`SSE Error: ${sse.data}`,
sse.data,
createResponseHeaders(response.headers),
);
}
}
done = true;
Expand Down
25 changes: 24 additions & 1 deletion tests/streaming.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Response } from 'node-fetch';
import { PassThrough } from 'stream';
import assert from 'assert';
import { _iterSSEMessages, _decodeChunks as decodeChunks } from '@anthropic-ai/sdk/streaming';
import { Stream, _iterSSEMessages, _decodeChunks as decodeChunks } from '@anthropic-ai/sdk/streaming';
import { APIConnectionError } from '@anthropic-ai/sdk/error';

describe('line decoder', () => {
test('basic', () => {
Expand Down Expand Up @@ -247,6 +248,28 @@ describe('streaming decoding', () => {
});
});

test('error handling', async () => {
async function* body(): AsyncGenerator<Buffer> {
yield Buffer.from('event: error\n');
yield Buffer.from('data: {"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}');
yield Buffer.from('\n\n');
}

const stream = Stream.fromSSEResponse(new Response(await iteratorToStream(body())), new AbortController());

const err = expect(
(async () => {
for await (const _event of stream) {
}
})(),
).rejects;

await err.toMatchInlineSnapshot(
`[Error: {"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}]`,
);
await err.toBeInstanceOf(APIConnectionError);
});

async function iteratorToStream(iterator: AsyncGenerator<any>): Promise<PassThrough> {
const parts: unknown[] = [];

Expand Down