diff --git a/lib/sender.js b/lib/sender.js index ad71e1950..18f0cf48c 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -151,6 +151,7 @@ class Sender { }), cb ); + this._socket.end(); } /** diff --git a/lib/websocket.js b/lib/websocket.js index 40ab4e86a..efec80e96 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -123,7 +123,7 @@ class WebSocket extends EventEmitter { * @type {Number} */ get readyState() { - return this._readyState; + return this._readyState === -2 ? WebSocket.CLOSING : this._readyState; } /** @@ -159,6 +159,7 @@ class WebSocket extends EventEmitter { receiver.on('conclude', receiverOnConclude); receiver.on('drain', receiverOnDrain); receiver.on('error', receiverOnError); + receiver.on('finish', receiverOnFinish); receiver.on('message', receiverOnMessage); receiver.on('ping', receiverOnPing); receiver.on('pong', receiverOnPong); @@ -201,31 +202,25 @@ class WebSocket extends EventEmitter { /** * Start a closing handshake. * - * +----------+ +-----------+ +----------+ - * - - -|ws.close()|-->|close frame|-->|ws.close()|- - - - * | +----------+ +-----------+ +----------+ | - * +----------+ +-----------+ | - * CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING - * +----------+ +-----------+ | - * | | | +---+ | - * +------------------------+-->|fin| - - - - - * | +---+ | +---+ - * - - - - -|fin|<---------------------+ - * +---+ - * * @param {Number} [code] Status code explaining why the connection is closing * @param {String} [data] A string explaining why the connection is closing * @public */ close(code, data) { - if (this.readyState === WebSocket.CLOSED) return; if (this.readyState === WebSocket.CONNECTING) { const msg = 'WebSocket was closed before the connection was established'; return abortHandshake(this, this._req, msg); } - if (this.readyState === WebSocket.CLOSING) { - if (this._closeFrameSent && this._closeFrameReceived) this._socket.end(); + // + // `this._readyState` below is not a typo. The + // `WebSocket.prototype.readyState` getter returns `WebSocket.CLOSING` even + // if `this._readyState` is `-2`. + // + if ( + this._readyState === WebSocket.CLOSING || + this.readyState === WebSocket.CLOSED + ) { return; } @@ -238,7 +233,6 @@ class WebSocket extends EventEmitter { if (err) return; this._closeFrameSent = true; - if (this._closeFrameReceived) this._socket.end(); }); // @@ -610,6 +604,15 @@ function initAsClient(websocket, address, protocols, options) { }); req.on('upgrade', (res, socket, head) => { + // + // `tls.connect()` does not support the `allowHalfOpen` option in Node.js < + // 12.9.0. Also, the socket creation is not always under our control as the + // user might use the `http{,s}.request()` `agent` option and by default + // `net.connect()` and `tls.connect()` create a socket not allowed to be + // half-open. + // + socket.allowHalfOpen = true; + websocket.emit('upgrade', res); // @@ -679,6 +682,7 @@ function initAsClient(websocket, address, protocols, options) { * @private */ function netConnect(options) { + options.allowHalfOpen = true; options.path = options.socketPath; return net.connect(options); } @@ -691,6 +695,7 @@ function netConnect(options) { * @private */ function tlsConnect(options) { + options.allowHalfOpen = true; options.path = undefined; if (!options.servername && options.servername !== '') { @@ -820,6 +825,17 @@ function receiverOnError(err) { * @private */ function receiverOnFinish() { + const websocket = this[kWebSocket]; + + if (!websocket._closeFrameReceived) websocket.close(); +} + +/** + * A listener for the `Receiver` `'error'` or `'finish'` event. + * + * @private + */ +function receiverOnErrorOrFinish() { this[kWebSocket].emitClose(); } @@ -893,8 +909,8 @@ function socketOnClose() { ) { websocket.emitClose(); } else { - websocket._receiver.on('error', receiverOnFinish); - websocket._receiver.on('finish', receiverOnFinish); + websocket._receiver.on('error', receiverOnErrorOrFinish); + websocket._receiver.on('finish', receiverOnErrorOrFinish); } } @@ -918,9 +934,15 @@ function socketOnData(chunk) { function socketOnEnd() { const websocket = this[kWebSocket]; - websocket._readyState = WebSocket.CLOSING; + // + // This is a special state. It indicates that the connection is going through + // the closing handshake but unlike `WebSocket.CLOSING` it does not prevent + // `WebSocket.prototype.close()` from sending a close frame. + // + if (websocket._readyState === WebSocket.OPEN) { + websocket._readyState = -2; + } websocket._receiver.end(); - this.end(); } /** diff --git a/test/create-websocket-stream.test.js b/test/create-websocket-stream.test.js index bcd240974..b96ac9b1b 100644 --- a/test/create-websocket-stream.test.js +++ b/test/create-websocket-stream.test.js @@ -204,6 +204,8 @@ describe('createWebSocketStream', () => { it('reemits errors', (done) => { let duplexCloseEventEmitted = false; + let serverClientCloseEventEmitted = false; + const wss = new WebSocket.Server({ port: 0 }, () => { const ws = new WebSocket(`ws://localhost:${wss.address().port}`); const duplex = createWebSocketStream(ws); @@ -218,6 +220,7 @@ describe('createWebSocketStream', () => { duplex.on('close', () => { duplexCloseEventEmitted = true; + if (serverClientCloseEventEmitted) wss.close(done); }); }); }); @@ -225,10 +228,11 @@ describe('createWebSocketStream', () => { wss.on('connection', (ws) => { ws._socket.write(Buffer.from([0x85, 0x00])); ws.on('close', (code, reason) => { - assert.ok(duplexCloseEventEmitted); assert.strictEqual(code, 1002); assert.strictEqual(reason, ''); - wss.close(done); + + serverClientCloseEventEmitted = true; + if (duplexCloseEventEmitted) wss.close(done); }); }); }); diff --git a/test/sender.test.js b/test/sender.test.js index 58eca8fbf..c53ceb87b 100644 --- a/test/sender.test.js +++ b/test/sender.test.js @@ -13,9 +13,10 @@ class MockSocket { if (write) this.write = write; } + end() {} cork() {} - write() {} uncork() {} + write() {} } describe('Sender', () => { diff --git a/test/websocket-server.test.js b/test/websocket-server.test.js index ce6617ec6..9d2a1a0b4 100644 --- a/test/websocket-server.test.js +++ b/test/websocket-server.test.js @@ -400,7 +400,9 @@ describe('WebSocketServer', () => { server.on('upgrade', (req, socket, head) => { wss.handleUpgrade(req, socket, head, (ws) => { - ws.close(); + process.nextTick(() => { + ws.close(); + }); }); assert.throws( () => wss.handleUpgrade(req, socket, head, NOOP), diff --git a/test/websocket.test.js b/test/websocket.test.js index 03f984fcb..4cf60b6d9 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -430,6 +430,8 @@ describe('WebSocket', () => { describe('Events', () => { it("emits an 'error' event if an error occurs", (done) => { let clientCloseEventEmitted = false; + let serverClientCloseEventEmitted = false; + const wss = new WebSocket.Server({ port: 0 }, () => { const ws = new WebSocket(`ws://localhost:${wss.address().port}`); @@ -442,19 +444,22 @@ describe('WebSocket', () => { ); ws.on('close', (code, reason) => { - clientCloseEventEmitted = true; assert.strictEqual(code, 1006); assert.strictEqual(reason, ''); + + clientCloseEventEmitted = true; + if (serverClientCloseEventEmitted) wss.close(done); }); }); }); wss.on('connection', (ws) => { ws.on('close', (code, reason) => { - assert.ok(clientCloseEventEmitted); assert.strictEqual(code, 1002); assert.strictEqual(reason, ''); - wss.close(done); + + serverClientCloseEventEmitted = true; + if (clientCloseEventEmitted) wss.close(done); }); ws._socket.write(Buffer.from([0x85, 0x00])); @@ -547,6 +552,7 @@ describe('WebSocket', () => { .update(req.headers['sec-websocket-key'] + GUID) .digest('base64'); + socket.resume(); socket.end( 'HTTP/1.1 101 Switching Protocols\r\n' + 'Upgrade: websocket\r\n' + @@ -1419,16 +1425,19 @@ describe('WebSocket', () => { }); it('honors the `mask` option', (done) => { + let clientCloseEventEmitted = false; let serverClientCloseEventEmitted = false; + const wss = new WebSocket.Server({ port: 0 }, () => { const ws = new WebSocket(`ws://localhost:${wss.address().port}`); ws.on('open', () => ws.send('hi', { mask: false })); ws.on('close', (code, reason) => { - assert.ok(serverClientCloseEventEmitted); assert.strictEqual(code, 1002); assert.strictEqual(reason, ''); - wss.close(done); + + clientCloseEventEmitted = true; + if (serverClientCloseEventEmitted) wss.close(done); }); }); @@ -1450,9 +1459,11 @@ describe('WebSocket', () => { ); ws.on('close', (code, reason) => { - serverClientCloseEventEmitted = true; assert.strictEqual(code, 1006); assert.strictEqual(reason, ''); + + serverClientCloseEventEmitted = true; + if (clientCloseEventEmitted) wss.close(done); }); }); }); @@ -2624,7 +2635,171 @@ describe('WebSocket', () => { ws.send('foo'); ws.send('bar'); ws.send('baz'); - ws.send('qux', () => ws._socket.end()); + ws.send('qux', () => { + ws.terminate(); + }); + }); + }); + + it('handles a close frame received while compressing data', (done) => { + const wss = new WebSocket.Server( + { + perMessageDeflate: true, + port: 0 + }, + () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`, { + perMessageDeflate: { threshold: 0 } + }); + + ws.on('open', () => { + ws._receiver.on('conclude', () => { + assert.ok(ws._sender._deflating); + }); + + ws.send('foo'); + ws.send('bar'); + ws.send('baz'); + ws.send('qux'); + }); + } + ); + + wss.on('connection', (ws) => { + const messages = []; + + ws.on('message', (message) => { + messages.push(message); + }); + + ws.on('close', (code, reason) => { + assert.deepStrictEqual(messages, ['foo', 'bar', 'baz', 'qux']); + assert.strictEqual(code, 1000); + assert.strictEqual(reason, ''); + wss.close(done); + }); + + ws.close(1000); + }); + }); + + it("handles the socket 'end' event while compressing data", (done) => { + const wss = new WebSocket.Server( + { + perMessageDeflate: true, + port: 0 + }, + () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`, { + perMessageDeflate: { threshold: 0 } + }); + + ws.on('open', () => { + ws._socket.on('end', () => { + assert.ok(ws._sender._deflating); + }); + + ws.send('foo'); + ws.send('bar'); + ws.send('baz'); + ws.send('qux'); + }); + } + ); + + wss.on('connection', (ws) => { + const messages = []; + + ws.on('message', (message) => { + messages.push(message); + }); + + ws.on('close', (code, reason) => { + assert.deepStrictEqual(messages, ['foo', 'bar', 'baz', 'qux']); + assert.strictEqual(code, 1000); + assert.strictEqual(reason, ''); + wss.close(done); + }); + + ws.close(1000); + ws._socket.end(); // Ensure the socket is immediately half-closed. + }); + }); + + describe('#close', () => { + it('can be used while data is being decompressed (1/2)', (done) => { + const wss = new WebSocket.Server( + { + perMessageDeflate: true, + port: 0 + }, + () => { + const messages = []; + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + ws.on('open', () => { + ws._socket.on('end', () => { + assert.strictEqual(ws._receiver._state, 5); + }); + }); + + ws.on('message', (message) => { + if (messages.push(message) > 1) return; + + ws.close(1000); + }); + + ws.on('close', (code, reason) => { + assert.deepStrictEqual(messages, ['', '', '', '']); + assert.strictEqual(code, 1000); + assert.strictEqual(reason, ''); + wss.close(done); + }); + } + ); + + wss.on('connection', (ws) => { + const buf = Buffer.from('c10100c10100c10100c10100', 'hex'); + ws._socket.write(buf); + }); + }); + + it('can be used while data is being decompressed (2/2)', (done) => { + const wss = new WebSocket.Server( + { + perMessageDeflate: true, + port: 0 + }, + () => { + const messages = []; + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + ws.on('open', () => { + ws._socket.on('end', () => { + assert.strictEqual(ws._receiver._state, 5); + }); + }); + + ws.on('message', (message) => { + if (messages.push(message) > 1) return; + + ws.close(1000); + ws._socket.end(); // Ensure the socket is immediately half-closed. + }); + + ws.on('close', (code, reason) => { + assert.deepStrictEqual(messages, ['', '', '', '']); + assert.strictEqual(code, 1000); + assert.strictEqual(reason, ''); + wss.close(done); + }); + } + ); + + wss.on('connection', (ws) => { + const buf = Buffer.from('c10100c10100c10100c10100', 'hex'); + ws._socket.write(buf); + }); }); }); @@ -2679,6 +2854,8 @@ describe('WebSocket', () => { 'The socket was closed while data was being compressed' ); }); + + ws.terminate(); }); ws.on('close', () => { @@ -2687,10 +2864,6 @@ describe('WebSocket', () => { }); } ); - - wss.on('connection', (ws) => { - ws._socket.end(); - }); }); }); @@ -2760,4 +2933,60 @@ describe('WebSocket', () => { }); }); }); + + describe('Connection close edge cases', () => { + it('cleanly shuts down after simultaneous errors', (done) => { + let clientCloseEventEmitted = false; + let serverClientCloseEventEmitted = false; + + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + ws.on('error', (err) => { + assert.ok(err instanceof RangeError); + assert.strictEqual(err.code, 'WS_ERR_INVALID_OPCODE'); + assert.strictEqual( + err.message, + 'Invalid WebSocket frame: invalid opcode 5' + ); + + ws.on('close', (code, reason) => { + assert.strictEqual(code, 1006); + assert.strictEqual(reason, ''); + + clientCloseEventEmitted = true; + if (serverClientCloseEventEmitted) wss.close(done); + }); + }); + + ws.on('open', () => { + // Write an invalid frame in both directions to trigger simultaneous + // failure. + const chunk = Buffer.from([0x85, 0x00]); + + wss.clients.values().next().value._socket.write(chunk); + ws._socket.write(chunk); + }); + }); + + wss.on('connection', (ws) => { + ws.on('error', (err) => { + assert.ok(err instanceof RangeError); + assert.strictEqual(err.code, 'WS_ERR_INVALID_OPCODE'); + assert.strictEqual( + err.message, + 'Invalid WebSocket frame: invalid opcode 5' + ); + + ws.on('close', (code, reason) => { + assert.strictEqual(code, 1006); + assert.strictEqual(reason, ''); + + serverClientCloseEventEmitted = true; + if (clientCloseEventEmitted) wss.close(done); + }); + }); + }); + }); + }); });