Skip to content

Commit

Permalink
Handle retries for protocol errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jerelmiller committed Jan 29, 2025
1 parent de1620a commit 1f0595e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 2 deletions.
66 changes: 65 additions & 1 deletion src/link/retry/__tests__/retryLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import { execute } from "../../core/execute";
import { Observable } from "../../../utilities/observables/Observable";
import { fromError } from "../../utils/fromError";
import { RetryLink } from "../retryLink";
import { ObservableStream } from "../../../testing/internal";
import {
mockMultipartSubscriptionStream,
ObservableStream,
} from "../../../testing/internal";
import { ApolloError } from "../../../core";

const query = gql`
{
Expand Down Expand Up @@ -210,4 +214,64 @@ describe("RetryLink", () => {
[3, operation, standardError],
]);
});

it("handles protocol errors from multipart subscriptions", async () => {
const subscription = gql`
subscription MySubscription {
aNewDieWasCreated {
die {
roll
sides
color
}
}
}
`;

const attemptStub = jest.fn();
attemptStub.mockReturnValueOnce(true);

const retryLink = new RetryLink({
delay: { initial: 1 },
attempts: attemptStub,
});

const { httpLink, enqueuePayloadResult, enqueueProtocolErrors } =
mockMultipartSubscriptionStream();
const link = ApolloLink.from([retryLink, httpLink]);
const stream = new ObservableStream(execute(link, { query: subscription }));

enqueueProtocolErrors([
{ message: "Error field", extensions: { code: "INTERNAL_SERVER_ERROR" } },
]);

enqueuePayloadResult({
data: {
aNewDieWasCreated: { die: { color: "blue", roll: 2, sides: 6 } },
},
});

await expect(stream).toEmitValue({
data: {
aNewDieWasCreated: { die: { color: "blue", roll: 2, sides: 6 } },
},
});

expect(attemptStub).toHaveBeenCalledTimes(1);
expect(attemptStub).toHaveBeenCalledWith(
1,
expect.objectContaining({
operationName: "MySubscription",
query: subscription,
}),
new ApolloError({
protocolErrors: [
{
message: "Error field",
extensions: { code: "INTERNAL_SERVER_ERROR" },
},
],
})
);
});
});
21 changes: 20 additions & 1 deletion src/link/retry/retryLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import { buildDelayFunction } from "./delayFunction.js";
import type { RetryFunction, RetryFunctionOptions } from "./retryFunction.js";
import { buildRetryFunction } from "./retryFunction.js";
import type { SubscriptionObserver } from "zen-observable-ts";
import {
ApolloError,
graphQLResultHasProtocolErrors,
PROTOCOL_ERRORS_SYMBOL,
} from "../../errors/index.js";

export namespace RetryLink {
export interface Options {
Expand Down Expand Up @@ -54,7 +59,21 @@ class RetryableOperation {

private try() {
this.currentSubscription = this.forward(this.operation).subscribe({
next: this.observer.next.bind(this.observer),
next: (result) => {
if (graphQLResultHasProtocolErrors(result)) {
this.onError(
new ApolloError({
protocolErrors: result.extensions[PROTOCOL_ERRORS_SYMBOL],
})
);
// Unsubscribe from the current subscription to prevent the `complete`
// handler to be called as a result of the stream closing.
this.currentSubscription?.unsubscribe();
return;
}

this.observer.next(result);
},
error: this.onError,
complete: this.observer.complete.bind(this.observer),
});
Expand Down

0 comments on commit 1f0595e

Please sign in to comment.