diff --git a/CHANGELOG.md b/CHANGELOG.md index 6eb05ef35..259d4d0ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Changelog ### vNEXT +- Fix for non forced closes (now it wont send connection_terminate) [PR #197](https://github.com/apollographql/subscriptions-transport-ws/pull/197) +- A lot of connection's flow improvements (on connect, on disconnect and on reconnect) [PR #197](https://github.com/apollographql/subscriptions-transport-ws/pull/197) - Require specific lodash/assign module instead of entire package, so memory impact is reduced [PR #196](https://github.com/apollographql/subscriptions-transport-ws/pull/196) ### 0.7.3 diff --git a/README.md b/README.md index 83a19b610..de672ffed 100644 --- a/README.md +++ b/README.md @@ -216,9 +216,9 @@ ReactDOM.render( ### `Constructor(url, options, connectionCallback)` - `url: string` : url that the client will connect to, starts with `ws://` or `wss://` - `options?: Object` : optional, object to modify default client behavior - * `timeout?: number` : how long the client should wait in ms for a keep-alive message from the server (default 30000 ms), this parameter is ignored if the server does not send keep-alive messages. + * `timeout?: number` : how long the client should wait in ms for a keep-alive message from the server (default 10000 ms), this parameter is ignored if the server does not send keep-alive messages. This will also be used to calculate the max connection time per connect/reconnect * `lazy?: boolean` : use to set lazy mode - connects only when first subscription created, and delay the socket initialization - * `connectionParams?: Object | Function` : object that will be available as first argument of `onConnect` (in server side), if passed a function - it will call it and send the return value. + * `connectionParams?: Object | Function` : object that will be available as first argument of `onConnect` (in server side), if passed a function - it will call it and send the return value * `reconnect?: boolean` : automatic reconnect in case of connection error * `reconnectionAttempts?: number` : how much reconnect attempts * `connectionCallback?: (error) => {}` : optional, callback that called after the first init message, with the error (if there is one) diff --git a/src/client.ts b/src/client.ts index 697956621..8fc73c8f9 100644 --- a/src/client.ts +++ b/src/client.ts @@ -70,11 +70,14 @@ export class SubscriptionClient { private connectionCallback: any; private eventEmitter: EventEmitter; private lazy: boolean; - private forceClose: boolean; + private closedByUser: boolean; private wsImpl: any; private wasKeepAliveReceived: boolean; - private checkConnectionTimeoutId: any; + private tryReconnectTimeoutId: any; + private checkConnectionIntervalId: any; + private maxConnectTimeoutId: any; private middlewares: Middleware[]; + private maxConnectTimeGenerator: any; constructor(url: string, options?: ClientOptions, webSocketImpl?: any) { const { @@ -103,11 +106,12 @@ export class SubscriptionClient { this.reconnecting = false; this.reconnectionAttempts = reconnectionAttempts; this.lazy = !!lazy; - this.forceClose = false; + this.closedByUser = false; this.backoff = new Backoff({ jitter: 0.5 }); this.eventEmitter = new EventEmitter(); this.middlewares = []; this.client = null; + this.maxConnectTimeGenerator = this.createMaxConnectTimeGenerator(); if (!this.lazy) { this.connect(); @@ -122,12 +126,24 @@ export class SubscriptionClient { return this.client.readyState; } - public close(isForced = true) { + public close(isForced = true, closedByUser = true) { if (this.client !== null) { - this.forceClose = isForced; - this.sendMessage(undefined, MessageTypes.GQL_CONNECTION_TERMINATE, null); + this.closedByUser = closedByUser; + + if (isForced) { + this.clearCheckConnectionInterval(); + this.clearMaxConnectTimeout(); + this.clearTryReconnectTimeout(); + this.sendMessage(undefined, MessageTypes.GQL_CONNECTION_TERMINATE, null); + } + this.client.close(); this.client = null; + this.eventEmitter.emit('disconnected'); + + if (!isForced) { + this.tryReconnect(); + } } } @@ -277,6 +293,38 @@ export class SubscriptionClient { return this; } + private createMaxConnectTimeGenerator() { + const minValue = 1000; + const maxValue = this.wsTimeout; + + return new Backoff({ + min: minValue, + max: maxValue, + factor: 1.2, + }); + } + + private clearCheckConnectionInterval() { + if (this.checkConnectionIntervalId) { + clearInterval(this.checkConnectionIntervalId); + this.checkConnectionIntervalId = null; + } + } + + private clearMaxConnectTimeout() { + if (this.maxConnectTimeoutId) { + clearTimeout(this.maxConnectTimeoutId); + this.maxConnectTimeoutId = null; + } + } + + private clearTryReconnectTimeout() { + if (this.tryReconnectTimeoutId) { + clearTimeout(this.tryReconnectTimeoutId); + this.tryReconnectTimeoutId = null; + } + } + private logWarningOnNonProductionEnv(warning: string) { if (process && process.env && process.env.NODE_ENV !== 'production') { console.warn(warning); @@ -369,7 +417,7 @@ export class SubscriptionClient { // send message, or queue it if connection is not open private sendMessageRaw(message: Object) { switch (this.status) { - case this.client.OPEN: + case this.wsImpl.OPEN: let serializedMessage: string = JSON.stringify(message); let parsedMessage: any; try { @@ -380,7 +428,7 @@ export class SubscriptionClient { this.client.send(serializedMessage); break; - case this.client.CONNECTING: + case this.wsImpl.CONNECTING: this.unsentMessagesQueue.push(message); break; @@ -397,7 +445,7 @@ export class SubscriptionClient { } private tryReconnect() { - if (!this.reconnect || this.backoff.attempts > this.reconnectionAttempts) { + if (!this.reconnect || this.backoff.attempts >= this.reconnectionAttempts) { return; } @@ -410,8 +458,10 @@ export class SubscriptionClient { this.reconnecting = true; } + this.clearTryReconnectTimeout(); + const delay = this.backoff.duration(); - setTimeout(() => { + this.tryReconnectTimeoutId = setTimeout(() => { this.connect(); }, delay); } @@ -424,13 +474,35 @@ export class SubscriptionClient { } private checkConnection() { - this.wasKeepAliveReceived ? this.wasKeepAliveReceived = false : this.close(false); + if (this.wasKeepAliveReceived) { + this.wasKeepAliveReceived = false; + return; + } + + if (!this.reconnecting) { + this.close(false, true); + } + } + + private checkMaxConnectTimeout() { + this.clearMaxConnectTimeout(); + + // Max timeout trying to connect + this.maxConnectTimeoutId = setTimeout(() => { + if (this.status !== this.wsImpl.OPEN) { + this.close(false, true); + } + }, this.maxConnectTimeGenerator.duration()); } private connect() { this.client = new this.wsImpl(this.url, GRAPHQL_WS); + this.checkMaxConnectTimeout(); + this.client.onopen = () => { + this.clearMaxConnectTimeout(); + this.closedByUser = false; this.eventEmitter.emit(this.reconnecting ? 'reconnecting' : 'connecting'); const payload: ConnectionParams = typeof this.connectionParams === 'function' ? this.connectionParams() : this.connectionParams; @@ -441,12 +513,8 @@ export class SubscriptionClient { }; this.client.onclose = () => { - this.eventEmitter.emit('disconnected'); - - if (this.forceClose) { - this.forceClose = false; - } else { - this.tryReconnect(); + if ( !this.closedByUser ) { + this.close(false, false); } }; @@ -493,6 +561,7 @@ export class SubscriptionClient { this.eventEmitter.emit(this.reconnecting ? 'reconnected' : 'connected'); this.reconnecting = false; this.backoff.reset(); + this.maxConnectTimeGenerator.reset(); if (this.connectionCallback) { this.connectionCallback(); @@ -522,10 +591,11 @@ export class SubscriptionClient { this.checkConnection(); } - if (this.checkConnectionTimeoutId) { - clearTimeout(this.checkConnectionTimeoutId); + if (this.checkConnectionIntervalId) { + clearInterval(this.checkConnectionIntervalId); + this.checkConnection(); } - this.checkConnectionTimeoutId = setTimeout(this.checkConnection.bind(this), this.wsTimeout); + this.checkConnectionIntervalId = setInterval(this.checkConnection.bind(this), this.wsTimeout); break; default: diff --git a/src/defaults.ts b/src/defaults.ts index b1ec67817..abf60e860 100644 --- a/src/defaults.ts +++ b/src/defaults.ts @@ -1,4 +1,4 @@ -const WS_TIMEOUT = 30000; +const WS_TIMEOUT = 10000; export { WS_TIMEOUT, diff --git a/src/test/tests.ts b/src/test/tests.ts index f261dc4b3..baffc7e73 100644 --- a/src/test/tests.ts +++ b/src/test/tests.ts @@ -216,7 +216,7 @@ new SubscriptionServer(options, { server: httpServer }); const httpServerWithKA = createServer(notFoundRequestListener); httpServerWithKA.listen(KEEP_ALIVE_TEST_PORT); -new SubscriptionServer(Object.assign({}, options, { keepAlive: 10 }), { server: httpServerWithKA }); +new SubscriptionServer(Object.assign({}, options, { keepAlive: 500 }), { server: httpServerWithKA }); const httpServerWithEvents = createServer(notFoundRequestListener); httpServerWithEvents.listen(EVENTS_TEST_PORT); @@ -606,7 +606,7 @@ describe('Client', function () { connection.close(); setTimeout(() => { - expect(client.client.readyState).to.equals(WebSocket.CLOSED); + expect(client.status).to.equals(WebSocket.CLOSED); done(); }, 500); }); @@ -960,33 +960,28 @@ describe('Client', function () { }); it('should stop trying to reconnect to the server', function (done) { - let connections = 0; wsServer.on('connection', (connection: WebSocket) => { - connections += 1; - if (connections === 1) { - wsServer.close(); - } else { - assert(false); - } + connection.close(); }); const subscriptionsClient = new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, { - timeout: 100, + timeout: 500, reconnect: true, - reconnectionAttempts: 1, + reconnectionAttempts: 2, }); const connectSpy = sinon.spy(subscriptionsClient, 'connect'); setTimeout(() => { expect(connectSpy.callCount).to.be.equal(2); done(); - }, 500); + }, 1500); }); - it('should stop trying to reconnect if not receives the ack from the server', function (done) { + it('should stop trying to reconnect to the server if it not receives the ack', function (done) { const subscriptionsClient = new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, { + timeout: 500, reconnect: true, - reconnectionAttempts: 1, + reconnectionAttempts: 2, }); const connectSpy = sinon.spy(subscriptionsClient, 'connect'); wsServer.on('connection', (connection: any) => { @@ -1002,20 +997,23 @@ describe('Client', function () { setTimeout(() => { expect(connectSpy.callCount).to.be.equal(2); done(); - }, 1000); + }, 1500); }); it('should keep trying to reconnect if receives the ack from the server', function (done) { const subscriptionsClient = new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, { + timeout: 500, reconnect: true, - reconnectionAttempts: 1, + reconnectionAttempts: 2, }); const connectSpy = sinon.spy(subscriptionsClient, 'connect'); + let connections = 0; wsServer.on('connection', (connection: any) => { connection.on('message', (message: any) => { const parsedMessage = JSON.parse(message); // mock server if (parsedMessage.type === MessageTypes.GQL_CONNECTION_INIT) { + ++connections; connection.send(JSON.stringify({ type: MessageTypes.GQL_CONNECTION_ACK, payload: {} })); connection.close(); } @@ -1023,15 +1021,17 @@ describe('Client', function () { }); setTimeout(() => { + expect(connections).to.be.greaterThan(3); expect(connectSpy.callCount).to.be.greaterThan(2); + wsServer.close(); done(); - }, 1000); + }, 1900); }); it('should take care of received keep alive', (done) => { let wasKAReceived = false; - const subscriptionsClient = new SubscriptionClient(`ws://localhost:${KEEP_ALIVE_TEST_PORT}/`, { timeout: 5 }); + const subscriptionsClient = new SubscriptionClient(`ws://localhost:${KEEP_ALIVE_TEST_PORT}/`, { timeout: 600 }); const originalOnMessage = subscriptionsClient.client.onmessage; subscriptionsClient.client.onmessage = (dataReceived: any) => { let receivedDataParsed = JSON.parse(dataReceived.data); @@ -1047,18 +1047,28 @@ describe('Client', function () { expect(wasKAReceived).to.equal(true); expect(subscriptionsClient.status).to.equal(WebSocket.CLOSED); done(); - }, 100); + }, 1200); }); it('should correctly clear timeout if receives ka too early', (done) => { - const subscriptionsClient = new SubscriptionClient(`ws://localhost:${KEEP_ALIVE_TEST_PORT}/`, { timeout: 25 }); + let receivedKeepAlive = 0; + + const subscriptionsClient = new SubscriptionClient(`ws://localhost:${KEEP_ALIVE_TEST_PORT}/`, { timeout: 600 }); const checkConnectionSpy = sinon.spy(subscriptionsClient, 'checkConnection'); + const originalOnMessage = subscriptionsClient.client.onmessage; + subscriptionsClient.client.onmessage = (dataReceived: any) => { + let receivedDataParsed = JSON.parse(dataReceived.data); + if (receivedDataParsed.type === MessageTypes.GQL_CONNECTION_KEEP_ALIVE) { + ++receivedKeepAlive; + originalOnMessage(dataReceived); + } + }; setTimeout(() => { - expect(checkConnectionSpy.callCount).to.be.equal(1); + expect(checkConnectionSpy.callCount).to.be.equal(receivedKeepAlive); expect(subscriptionsClient.status).to.be.equal(subscriptionsClient.client.OPEN); done(); - }, 100); + }, 1300); }); it('should take care of invalid message received', (done) => { @@ -1145,6 +1155,7 @@ describe('Client', function () { reconnectionAttempts: 1, }); const tryReconnectSpy = sinon.spy(subscriptionsClient, 'tryReconnect'); + let receivedConnecitonTerminate = false; wsServer.on('connection', (connection: any) => { connection.on('message', (message: any) => { const parsedMessage = JSON.parse(message); @@ -1152,6 +1163,10 @@ describe('Client', function () { if (parsedMessage.type === MessageTypes.GQL_CONNECTION_INIT) { connection.send(JSON.stringify({ type: MessageTypes.GQL_CONNECTION_ACK, payload: {} })); } + + if (parsedMessage.type === MessageTypes.GQL_CONNECTION_TERMINATE) { + receivedConnecitonTerminate = true; + } }); }); @@ -1165,11 +1180,50 @@ describe('Client', function () { }; setTimeout(() => { + expect(receivedConnecitonTerminate).to.be.equal(true); expect(tryReconnectSpy.callCount).to.be.equal(0); expect(subscriptionsClient.status).to.be.equal(WebSocket.CLOSED); done(); }, 500); }); + + it('should close the connection without sent connection terminate and reconnect', function (done) { + const subscriptionsClient = new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, { + reconnect: true, + reconnectionAttempts: 1, + }); + const tryReconnectSpy = sinon.spy(subscriptionsClient, 'tryReconnect'); + let receivedConnecitonTerminate = false; + wsServer.on('connection', (connection: any) => { + connection.on('message', (message: any) => { + const parsedMessage = JSON.parse(message); + // mock server + if (parsedMessage.type === MessageTypes.GQL_CONNECTION_INIT) { + connection.send(JSON.stringify({ type: MessageTypes.GQL_CONNECTION_ACK, payload: {} })); + } + + if (parsedMessage.type === MessageTypes.GQL_CONNECTION_TERMINATE) { + receivedConnecitonTerminate = true; + } + }); + }); + + const originalOnMessage = subscriptionsClient.client.onmessage; + subscriptionsClient.client.onmessage = (dataReceived: any) => { + let receivedDataParsed = JSON.parse(dataReceived.data); + if (receivedDataParsed.type === MessageTypes.GQL_CONNECTION_ACK) { + originalOnMessage(dataReceived); + subscriptionsClient.close(false); + } + }; + + setTimeout(() => { + expect(tryReconnectSpy.callCount).to.be.equal(1); + expect(subscriptionsClient.status).to.be.equal(WebSocket.OPEN); + expect(receivedConnecitonTerminate).to.be.equal(false); + done(); + }, 500); + }); }); describe('Server', function () { @@ -1960,7 +2014,7 @@ describe('Client<->Server Flow', () => { setTimeout(() => { // Disconnect the client - client.close(); + client.close(false); // Subscribe to data, without manually reconnect before const opId = client.subscribe({