Skip to content

Commit

Permalink
fix(stream): unhandled rejection on connection (#79)
Browse files Browse the repository at this point in the history
* fix(stream): unhandled rejection on connection

* refactor: removed interface for compatibility
  • Loading branch information
microwavekonijn authored Jul 18, 2022
1 parent 7f05c6a commit ae55653
Show file tree
Hide file tree
Showing 15 changed files with 84 additions and 63 deletions.
12 changes: 6 additions & 6 deletions src/client/command.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { StreamClient } from '../stream/stream.client';
import { EventSubscribed, EventSubscription } from './types';
import { Queue } from './utils/queue';
import { CensusCommand } from '../stream';
import { StreamClosedException } from './exceptions/stream-closed.exception';
import { StreamResponseException } from './exceptions/stream-response.exception';

interface CommandCallback<T> {
resolve(data: T): void;
Expand Down Expand Up @@ -45,7 +45,7 @@ export class CommandHandler {
this.subscriptionQueue
.dequeue()
.reject(
new StreamClosedException(
new StreamResponseException(
'Stream closed before receiving response',
),
);
Expand All @@ -54,7 +54,7 @@ export class CommandHandler {
this.recentCharacterQueue
.dequeue()
.reject(
new StreamClosedException(
new StreamResponseException(
'Stream closed before receiving response',
),
);
Expand Down Expand Up @@ -113,10 +113,10 @@ export class CommandHandler {
queue: Queue<CommandCallback<T>>,
command: CensusCommand,
): Promise<T> {
return new Promise<T>((resolve, reject) => {
if (!this.stream.isReady)
reject(new StreamClosedException('Stream is closed'));
if (!this.stream.isReady)
return Promise.reject(new StreamResponseException('Stream is closed'));

return new Promise<T>((resolve, reject) => {
this.stream
.send(command)
.then(() => {
Expand Down
2 changes: 1 addition & 1 deletion src/client/exceptions/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from './max-retry.exception';
export * from './stream-closed.exception';
export * from './stream-response.exception';
export * from './service-id-reject.exception';
2 changes: 1 addition & 1 deletion src/client/exceptions/max-retry.exception.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ export class MaxRetryException extends Error {
)}`,
);

this.name = 'MaxRetryException';
this.name = MaxRetryException.name;
}
}
2 changes: 1 addition & 1 deletion src/client/exceptions/service-id-reject.exception.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ export class ServiceIdRejectException extends Error {
constructor(message: string) {
super(message);

this.name = 'StreamClosedException';
this.name = ServiceIdRejectException.name;
}
}
7 changes: 0 additions & 7 deletions src/client/exceptions/stream-closed.exception.ts

This file was deleted.

7 changes: 7 additions & 0 deletions src/client/exceptions/stream-response.exception.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export class StreamResponseException extends Error {
constructor(message: string) {
super(message);

this.name = StreamResponseException.name;
}
}
47 changes: 24 additions & 23 deletions src/client/stream.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Timeout = NodeJS.Timeout;

export interface StreamManagerOptions extends StreamClientOptions {
subscription?: EventSubscription;
reconnectDelay?: number;
}

export class StreamManager {
Expand Down Expand Up @@ -41,7 +42,7 @@ export class StreamManager {
/**
* @type {number} delay before trying to reconnect
*/
private reconnectDelay = 2000;
private readonly reconnectDelay: number;

/**
* @type {Timeout?} The reconnect timeout
Expand Down Expand Up @@ -77,6 +78,8 @@ export class StreamManager {
options.subscription,
);

this.reconnectDelay = options.reconnectDelay ?? 2000;

this.prepareEventStream();
}

Expand All @@ -87,7 +90,7 @@ export class StreamManager {
/**
* Stream closed
*/
this.stream.on('close', ({ code, reason }) => {
this.stream.on('close', (code, reason) => {
if (!this.isStarted) {
this.client.emit('disconnected', code, reason);
return;
Expand Down Expand Up @@ -174,29 +177,27 @@ export class StreamManager {
* Connection has done something, now we need a new one
*/
private async reconnect(): Promise<void> {
try {
await this.stream.connect();
} catch (e: any) {
if ([403].includes(e.httpState)) {
if (this.reconnectTimeout) clearTimeout(this.reconnectTimeout);

this.reconnectTimeout = setTimeout(async () => {
try {
await this.stream.connect();
} catch (e: any) {
if ([403].includes(e.httpState)) {
this.client.emit(
'error',
new Error(`Service ID rejected while trying to reconnect.`),
);
this.disconnect();

return;
}

this.client.emit(
'error',
new Error(`Service ID rejected while trying to reconnect.`),
'debug',
`Reconnect failed, trying again in ${this.reconnectDelay}ms.`,
);
this.disconnect();

return;
}

this.client.emit(
'debug',
`Reconnect failed, trying again in ${this.reconnectDelay}ms.`,
);

if (this.reconnectTimeout) clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = setTimeout(
() => this.reconnect(),
this.reconnectDelay,
);
}
}, this.reconnectDelay);
}
}
4 changes: 2 additions & 2 deletions src/client/subscription.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { StreamClient } from '../stream/stream.client';
import { PS2EventNames } from '../stream/types/ps2.events';
import { EventSubscribed, EventSubscription } from './types';
import { CommandHandler } from './command.handler';
import { StreamClosedException } from './exceptions';
import { StreamResponseException } from './exceptions/stream-response.exception';

export class SubscriptionManager {
/**
Expand Down Expand Up @@ -59,7 +59,7 @@ export class SubscriptionManager {
try {
await this.commandHandler.subscribe(this.subscription);
} catch (err) {
if (!(err instanceof StreamClosedException)) throw err;
if (!(err instanceof StreamResponseException)) throw err;
}
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/rest/exceptions/census-rest.exception.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ export class CensusRestException extends Error {
constructor(message: string) {
super(message);

this.name = 'CensusRestException';
this.name = CensusRestException.name;
}

hasNoData(): boolean {
Expand Down
2 changes: 1 addition & 1 deletion src/rest/exceptions/census-server.error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export class CensusServerError extends Error {
constructor(data: any) {
super(data.errorMessage ?? 'Error message not supplied');

this.name = 'CensusServerError';
this.name = CensusServerError.name;
this.code = data.errorCode;
}
}
2 changes: 2 additions & 0 deletions src/stream/exceptions/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './stream-closed.exception';
export * from './stream-destroyed.exception';
7 changes: 7 additions & 0 deletions src/stream/exceptions/stream-closed.exception.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export class StreamClosedException extends Error {
constructor(readonly code: number, readonly reason?: string) {
super(`Stream closed with code ${code}: ${reason}`);

this.name = StreamClosedException.name;
}
}
7 changes: 7 additions & 0 deletions src/stream/exceptions/stream-destroyed.exception.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export class StreamDestroyedException extends Error {
constructor() {
super('Stream destroyed before connection could be made');

this.name = StreamDestroyedException.name;
}
}
1 change: 1 addition & 0 deletions src/stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ import {

export { Client, ClientOptions };
export * from './types';
export * from './exceptions';
43 changes: 23 additions & 20 deletions src/stream/stream.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import WebSocket, { ClientOptions, Data } from 'ws';
import { PS2Environment } from '../types/ps2.options';
import { CensusMessage } from './types/messages.types';
import { CensusCommand } from './types/command.types';
import { StreamDestroyedException } from './exceptions/stream-destroyed.exception';
import { StreamClosedException } from './exceptions/stream-closed.exception';
import Timeout = NodeJS.Timeout;

enum State {
Expand All @@ -23,7 +25,7 @@ export interface StreamClientOptions {
type StreamClientEvents = {
ready: () => void;
destroyed: () => void;
close: (info: { code: number; reason: string }) => void;
close: (code: number, reason?: string) => void;
error: (err: Error) => void;
warn: (err: Error) => void;
debug: (info: string) => void;
Expand Down Expand Up @@ -130,33 +132,34 @@ export class StreamClient extends EventEmitter<StreamClientEvents> {
return Promise.resolve();

return new Promise((resolve, reject) => {
const accept = () => {
const ready = () => {
cleanup();
resolve();
};

const decline = (...e: any[]) => {
const destroyed = () => {
cleanup();
reject(e);
reject(new StreamDestroyedException());
};

const closed = (code: number, reason?: string) => {
cleanup();
reject(new StreamClosedException(code, reason));
};

const cleanup = () => {
this.removeListener('ready', accept);
this.removeListener('destroyed', decline);
this.removeListener('close', decline);
this.removeListener('ready', ready)
.removeListener('destroyed', destroyed)
.removeListener('close', closed);
};

this.once('ready', accept);
this.once('destroyed', decline);
this.once('close', decline);
this.once('ready', ready)
.once('destroyed', destroyed)
.once('close', closed);

if (this.connection && this.connection.readyState === WebSocket.OPEN) {
if (this.connection && this.state == State.READY) {
this.emit('debug', `Open connection found, continuing operations.`);

// Assume everything is fine
this.state = State.READY;
this.emit('ready');

return;
}

Expand All @@ -176,10 +179,10 @@ export class StreamClient extends EventEmitter<StreamClientEvents> {
this.wsOptions,
));

ws.on('open', this.onOpen.bind(this));
ws.on('message', this.onMessage.bind(this));
ws.on('close', this.onClose.bind(this));
ws.on('error', this.onError.bind(this));
ws.on('open', this.onOpen.bind(this))
.on('message', this.onMessage.bind(this))
.on('close', this.onClose.bind(this))
.on('error', this.onError.bind(this));
});
}

Expand Down Expand Up @@ -263,7 +266,7 @@ export class StreamClient extends EventEmitter<StreamClientEvents> {
this.cleanupConnection();

this.state = State.DISCONNECTED;
this.emit('close', { code, reason: reason.toString() });
this.emit('close', code, reason.length ? reason.toString() : undefined);
}

/**
Expand Down

0 comments on commit ae55653

Please sign in to comment.