Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client): Dispose of subscription on complete or error messages #23

Merged
merged 3 commits into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,14 @@ const client = createClient({
(async () => {
const result = await new Promise((resolve, reject) => {
let result;
const dispose = client.subscribe(
client.subscribe(
{
query: '{ hello }',
},
{
next: (data) => (result = data),
error: reject,
complete: () => {
dispose();
resolve(result);
},
complete: () => resolve(result),
},
);
});
Expand All @@ -114,17 +111,14 @@ const client = createClient({
};

await new Promise((resolve, reject) => {
const dispose = client.subscribe(
client.subscribe(
{
query: 'subscription { greetings }',
},
{
next: onNext,
error: reject,
complete: () => {
dispose();
resolve();
},
complete: resolve,
},
);
});
Expand All @@ -148,13 +142,10 @@ const client = createClient({
async function execute<T>(payload: SubscribePayload) {
return new Promise((resolve, reject) => {
let result: T;
const dispose = client.subscribe<T>(payload, {
client.subscribe<T>(payload, {
next: (data) => (result = data),
error: reject,
complete: () => {
dispose();
resolve(result);
},
complete: () => resolve(result),
});
});
}
Expand All @@ -165,7 +156,7 @@ async function execute<T>(payload: SubscribePayload) {
const result = await execute({
query: '{ hello }',
});
// complete and dispose
// complete
// next = result = { data: { hello: 'Hello World!' } }
} catch (err) {
// error
Expand Down
15 changes: 13 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ export function createClient(options: ClientOptions): Client {
on: emitter.on,
subscribe(payload, sink) {
const id = generateID();
const cancellerRef: CancellerRef = { current: null };

const messageHandler = ({ data }: MessageEvent) => {
const message = memoParseMessage(data);
Expand All @@ -438,19 +439,28 @@ export function createClient(options: ClientOptions): Client {
case MessageType.Error: {
if (message.id === id) {
sink.error(message.payload);
// the canceller must be set at this point
// because you cannot receive a message
// if there is no existing connection
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
cancellerRef.current!();
}
return;
}
case MessageType.Complete: {
if (message.id === id) {
sink.complete();
// the canceller must be set at this point
// because you cannot receive a message
// if there is no existing connection
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
cancellerRef.current!();
}
return;
}
}
};

const cancellerRef: CancellerRef = { current: null };
(async () => {
for (;;) {
try {
Expand Down Expand Up @@ -511,7 +521,8 @@ export function createClient(options: ClientOptions): Client {
}
})()
.catch(sink.error)
.then(sink.complete); // resolves on cancel or normal closure
.then(sink.complete) // resolves on cancel or normal closure
.finally(() => (cancellerRef.current = null)); // when this promise settles there is nothing to cancel

return () => {
if (cancellerRef.current) {
Expand Down
50 changes: 50 additions & 0 deletions src/tests/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,56 @@ describe('subscription operation', () => {

expect(generateIDFn).toBeCalled();
});

it('should dispose of the subscription on complete', async () => {
const client = createClient({ url });

const completeFn = jest.fn();
client.subscribe(
{
query: `{
getValue
}`,
},
{
next: noop,
error: () => {
fail(`Unexpected error call`);
},
complete: completeFn,
},
);
await wait(20);

expect(completeFn).toBeCalled();

await wait(20);
expect(server.webSocketServer.clients.size).toBe(0);
});

it('should dispose of the subscription on error', async () => {
const client = createClient({ url });

const errorFn = jest.fn();
client.subscribe(
{
query: `{
iDontExist
}`,
},
{
next: noop,
error: errorFn,
complete: noop,
},
);
await wait(20);

expect(errorFn).toBeCalled();

await wait(20);
expect(server.webSocketServer.clients.size).toBe(0);
});
});

describe('"concurrency"', () => {
Expand Down