Skip to content

Commit

Permalink
feat(hub connection): throttle reconnect on disconnect (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
joseph118 authored Jan 23, 2025
1 parent e6df816 commit 20740f9
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 30 deletions.
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
## [5.0.1](https://github.com/sketch7/signalr-client/compare/5.0.0...5.0.1) (2023-09-11)
## [5.1.0](https://github.com/sketch7/signalr-client/compare/5.0.1...5.1.0) (2025-01-21)

### Features

- **hub connection:**
- throttle reconnect on disconnect and stop trying after max attempts
- add `autoReconnectRecoverIntervalMS` to reset max attempts after a duration

### Refactor

- **hub connection:** convert `on` + `stream` `retryWhen` (deprecated) to `retry`

## [5.0.1](https://github.com/sketch7/signalr-client/compare/5.0.0...5.0.1) (2024-09-11)

### Refactor

Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ssv/signalr-client",
"version": "5.0.1",
"version": "5.1.0",
"versionSuffix": "",
"description": "SignalR client library built on top of @microsoft/signalr. This gives you more features and easier to use.",
"homepage": "https://github.com/sketch7/signalr-client",
Expand Down
72 changes: 66 additions & 6 deletions src/lib/hub-connection.connection.spec.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
import { HubConnection } from "./hub-connection";
import { Subscription, lastValueFrom, merge, first, switchMap, tap, skip, delay, withLatestFrom } from "rxjs";
import {
Subscription, lastValueFrom, merge, first, switchMap, tap, skip, delay, withLatestFrom, takeWhile, filter, finalize, Observable
} from "rxjs";
import type { Mock, MockInstance } from "vitest";

import { HeroHub, createSUT } from "./testing/hub-connection.util";
import { ConnectionStatus } from "./hub-connection.model";
import { AUTO_RECONNECT_RECOVER_INTERVAL, HeroHub, RETRY_MAXIMUM_ATTEMPTS, createSUT } from "./testing/hub-connection.util";
import { ConnectionState, ConnectionStatus } from "./hub-connection.model";
import { MockSignalRHubConnectionBuilder, MockSignalRHubBackend } from "./testing";

import * as signalr from "@microsoft/signalr";

function promiseDelayResolve(ms: number) {
return new Promise(r => setTimeout(r, ms));
}
function promiseDelayReject(ms: number, reason?: unknown) {
return new Promise((_, reject) => setTimeout(() => reject(reason), ms));
}
function exhaustHubRetryAttempts$(sut: HubConnection<HeroHub>, hubBackend: MockSignalRHubBackend): Observable<ConnectionState> {
let retryCount = 0;
return sut.connectionState$.pipe(
filter(state => state.status === ConnectionStatus.connected),
takeWhile(() => retryCount < RETRY_MAXIMUM_ATTEMPTS),
tap(() => retryCount++),
tap(() => hubBackend.disconnect(new Error("Disconnected by the server to exhaust max attempts"))),
);
}

describe("HubConnection Specs", () => {

Expand Down Expand Up @@ -70,7 +82,7 @@ describe("HubConnection Specs", () => {
hubBackend.connection.stop = vi.fn().mockReturnValue(promiseDelayResolve(5));
});

it("should connect once", async () => {
it("should connect once", async () => {
const c1$ = lastValueFrom(SUT.connect());
const c2$ = lastValueFrom(SUT.connect());

Expand Down Expand Up @@ -253,7 +265,7 @@ describe("HubConnection Specs", () => {

});

describe("when disconnects", () => {
describe("when disconnects from server", () => {

beforeEach(() => {
// todo: check if redundant
Expand All @@ -265,7 +277,7 @@ describe("HubConnection Specs", () => {
const reconnect$ = SUT.connectionState$.pipe(
first(),
tap(state => expect(state.status).toBe(ConnectionStatus.connected)),
tap(() => hubBackend.disconnect(new Error("Disconnected by the server"))),
tap(() => hubBackend.disconnect(new Error("Disconnected by the server for auto reconnect"))),
switchMap(() => SUT.connectionState$.pipe(first(x => x.status === ConnectionStatus.connected))),
tap(state => {
expect(state.status).toBe(ConnectionStatus.connected);
Expand All @@ -277,6 +289,54 @@ describe("HubConnection Specs", () => {
return lastValueFrom(reconnect$);
});

describe("and server is encountering issues", () => {

it("should stop reconnecting after maximum attempts", () => {
const reconnect$ = exhaustHubRetryAttempts$(SUT, hubBackend).pipe(
finalize(() => {
expect(SUT.connectionState.status).toBe(ConnectionStatus.disconnected);
expect(hubStartSpy).toBeCalledTimes(3);
expect(hubStopSpy).not.toBeCalled();
})
);

return lastValueFrom(reconnect$);
});

it("should reset maximum attempts after trigger disconnect + connect", () => {
const resetMaxAttempts$ = exhaustHubRetryAttempts$(SUT, hubBackend).pipe(
tap(() => SUT.disconnect()),
switchMap(() => SUT.connect()),
switchMap(() => SUT.connectionState$.pipe(
first(state => state.status === ConnectionStatus.connected),
tap(() => hubBackend.disconnect(new Error("Disconnected by the server to re-trigger auto reconnect"))),
switchMap(() => SUT.connectionState$.pipe(first(x => x.status === ConnectionStatus.connected))),
)),
tap(state => {
expect(hubStartSpy).toBeCalledTimes(4);
expect(state.status).toBe(ConnectionStatus.connected);
}),
);

return lastValueFrom(resetMaxAttempts$);
});

it("should reset maximum attempts after exceeding the recover duration", () => {
const resetMaxAttempts$ = exhaustHubRetryAttempts$(SUT, hubBackend).pipe(
delay(AUTO_RECONNECT_RECOVER_INTERVAL),
switchMap(() => SUT.connectionState$.pipe(
first(state => state.status === ConnectionStatus.connected),
)),
tap(state => {
expect(hubStartSpy).toBeCalledTimes(3);
expect(state.status).toBe(ConnectionStatus.connected);
}),
);

return lastValueFrom(resetMaxAttempts$);
});
});

});

});
Expand Down
6 changes: 6 additions & 0 deletions src/lib/hub-connection.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ export interface ConnectionOptions extends IHttpConnectionOptions {

export interface ReconnectionStrategyOptions {
maximumAttempts?: number;
/**
* Resets maximum attempts when exhausted after the given duration.
* The duration is restarted for each connection attempt unless a Date is provided.
* Supports number in MS or date. Defaults to 15 minutes.
*/
autoReconnectRecoverInterval?: number | Date;
customStrategy?: (retryOptions: ReconnectionStrategyOptions, retryCount: number) => number;
randomBackOffStrategy?: RandomStrategyOptions;
randomStrategy?: RandomStrategyOptions;
Expand Down
90 changes: 78 additions & 12 deletions src/lib/hub-connection.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { from, BehaviorSubject, Observable, Observer, timer, throwError, Subject } from "rxjs";
import { from, BehaviorSubject, Observable, Observer, timer, throwError, Subject, EMPTY, merge, } from "rxjs";
import {
tap, map, filter, switchMap, skipUntil, delay, first,
retryWhen, delayWhen, distinctUntilChanged, takeUntil,retry
delayWhen, distinctUntilChanged, takeUntil, retry,
scan,
catchError,
skip,
take,
} from "rxjs/operators";
import {
HubConnection as SignalRHubConnection,
Expand All @@ -25,6 +29,8 @@ const connectingState = Object.freeze<ConnectionState>({ status: ConnectionStatu
// todo: rename HubClient?
export class HubConnection<THub> {

get connectionState(): ConnectionState { return this._connectionState$.value; }

/** Gets the connection state. */
get connectionState$(): Observable<ConnectionState> { return this._connectionState$.asObservable(); }

Expand Down Expand Up @@ -108,11 +114,7 @@ export class HubConnection<THub> {
takeUntil(this._destroy$),
);

const reconnectOnDisconnect$ = this._connectionState$.pipe(
// tap(x => console.warn(">>>> _connectionState$ state changed", x)),
filter(x => x.status === ConnectionStatus.disconnected && x.reason === errorReasonName),
// tap(x => console.warn(">>>> reconnecting...", x)),
switchMap(() => this.connect()),
const reconnectOnDisconnect$ = this.reconnectOnDisconnect$().pipe(
takeUntil(this._destroy$),
);

Expand Down Expand Up @@ -226,7 +228,7 @@ export class HubConnection<THub> {
);
}

private untilDesiredDisconnects$() {
private untilDesiredDisconnects$(): Observable<void> {
return this.desiredState$.pipe(
first(state => state === DesiredConnectionStatus.disconnected),
map(() => undefined),
Expand Down Expand Up @@ -268,10 +270,11 @@ export class HubConnection<THub> {
private activateStreamWithRetry<TResult>(stream$: Observable<TResult>): Observable<TResult> {
return this.waitUntilConnect$.pipe(
switchMap(() => stream$.pipe(
retryWhen((errors: Observable<unknown>) => errors.pipe(
delay(1), // workaround - when connection disconnects, stream errors fires before `signalr.onClose`
delayWhen(() => this.waitUntilConnect$)
))
retry({
delay: () => timer(1).pipe( // workaround - when connection disconnects, stream errors fires before `signalr.onClose`
delayWhen(() => this.waitUntilConnect$)
)
})
))
);
}
Expand All @@ -288,4 +291,67 @@ export class HubConnection<THub> {
return data;
}

private reconnectOnDisconnect$(): Observable<void> {
const onServerErrorDisconnect$ = this._connectionState$.pipe(
filter(x => x.status === ConnectionStatus.disconnected && x.reason === errorReasonName),
);

// this is a fallback for when max attempts are reached and will emit to reset the max attempt after a duration
const maxAttemptReset$ = onServerErrorDisconnect$.pipe(
switchMap(() => this._connectionState$.pipe(
switchMap(() => timer(this.retry.autoReconnectRecoverInterval || 900000)), // 15 minutes default
take(1),
takeUntil(
this.connectionState$.pipe(
filter(x => x.status === ConnectionStatus.connected)
)
),
)),
// tap(() => console.error(`${this.source} [reconnectOnDisconnect$] :: resetting max attempts`)),
);

const onDisconnect$ = this.desiredState$.pipe(
filter(state => state === DesiredConnectionStatus.disconnected),
);

return merge(
onDisconnect$,
maxAttemptReset$,
).pipe(
switchMap(() => onServerErrorDisconnect$.pipe(
scan(attempts => attempts += 1, 0),
map(retryCount => ({
retryCount,
nextRetryMs: retryCount ? getReconnectionDelay(this.retry, retryCount) : 0
})),
switchMap(({ retryCount, nextRetryMs }) => {
if (this.retry.maximumAttempts && retryCount > this.retry.maximumAttempts) {
return throwError(() => new Error(errorCodes.retryLimitsReached));
}

const delay$ = !nextRetryMs
? emptyNext()
: timer(nextRetryMs).pipe(
map(() => undefined)
);
return delay$.pipe(
// tap(() => console.error(`${this.source} [reconnectOnDisconnect$] :: retrying`, {
// retryCount,
// nextRetryMs,
// maximumAttempts: this.retry.maximumAttempts,
// })),
switchMap(() => this.connect()),
// tap(() => console.error(">>>> [reconnectOnDisconnect$] connected")),
takeUntil(this.untilDesiredDisconnects$()),
);
}),
catchError(() => EMPTY),
takeUntil(this.desiredState$.pipe(
skip(1),
filter(state => state === DesiredConnectionStatus.disconnected),
)),
)),
);
}

}
23 changes: 15 additions & 8 deletions src/lib/testing/hub-connection.util.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
import { HubConnection } from "../hub-connection";
import { vi } from "vitest";
import { ReconnectionStrategyOptions } from "../hub-connection.model";
// jest.genMockFromModule("@microsoft/signalr");
vi.mock("@microsoft/signalr");

let nextUniqueId = 0;

export const RETRY_MAXIMUM_ATTEMPTS = 3;
export const AUTO_RECONNECT_RECOVER_INTERVAL = 2000;

export interface HeroHub {
UpdateHero: string;
}

export function createSUT(): HubConnection<HeroHub> {
export function createSUT(retryOptions: ReconnectionStrategyOptions = {}): HubConnection<HeroHub> {
const retry = {
backOffStrategy: {
delayRetriesMs: 10,
maxDelayRetriesMs: 10
},
maximumAttempts: RETRY_MAXIMUM_ATTEMPTS,
autoReconnectRecoverInterval: AUTO_RECONNECT_RECOVER_INTERVAL,
...retryOptions,
};
return new HubConnection<HeroHub>({
key: `hero-${nextUniqueId++}`,
endpointUri: "/hero",
defaultData: () => ({ tenant: "kowalski", power: "2000" }),
options: {
retry: {
maximumAttempts: 3,
backOffStrategy: {
delayRetriesMs: 10,
maxDelayRetriesMs: 10
}
},
retry,
}
});
}
Expand Down

0 comments on commit 20740f9

Please sign in to comment.