Skip to content

Commit

Permalink
[minor] Discard any data received after the close frame
Browse files Browse the repository at this point in the history
Remove the `'data'` listener when the close frame is received.
  • Loading branch information
lpinca committed Jan 27, 2018
1 parent b890078 commit 5d8ab0e
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 56 deletions.
18 changes: 8 additions & 10 deletions lib/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ class Receiver {
this._fragments = [];

this._cleanupCallback = null;
this._isCleaningUp = false;
this._hadError = false;
this._dead = false;
this._loop = false;

this.add = this.add.bind(this);
this.onmessage = null;
this.onclose = null;
this.onerror = null;
Expand All @@ -72,7 +73,7 @@ class Receiver {
consume (n) {
if (this._bufferedBytes < n) {
this._loop = false;
if (this._dead) this.cleanup(this._cleanupCallback);
if (this._isCleaningUp) this.cleanup(this._cleanupCallback);
return null;
}

Expand Down Expand Up @@ -107,14 +108,12 @@ class Receiver {
/**
* Adds new data to the parser.
*
* @param {Buffer} data A chunk of data
* @param {Buffer} chunk A chunk of data
* @public
*/
add (data) {
if (this._dead) return;

this._bufferedBytes += data.length;
this._buffers.push(data);
add (chunk) {
this._bufferedBytes += chunk.length;
this._buffers.push(chunk);
this.startLoop();
}

Expand Down Expand Up @@ -532,10 +531,9 @@ class Receiver {
* @public
*/
cleanup (cb) {
this._dead = true;

if (!this._hadError && (this._loop || this._state === INFLATING)) {
this._cleanupCallback = cb;
this._isCleaningUp = true;
} else {
this._extensions = null;
this._fragments = null;
Expand Down
59 changes: 26 additions & 33 deletions lib/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const EventEmitter = require('events');
const crypto = require('crypto');
const Ultron = require('ultron');
const https = require('https');
const http = require('http');
const url = require('url');
Expand Down Expand Up @@ -50,7 +49,6 @@ class WebSocket extends EventEmitter {
this._receiver = null;
this._sender = null;
this._socket = null;
this._ultron = null;

if (address !== null) {
if (!protocols) {
Expand Down Expand Up @@ -123,18 +121,17 @@ class WebSocket extends EventEmitter {
socket.setTimeout(0);
socket.setNoDelay();

socket.on('close', this._finalize);
socket.on('error', this._finalize);
socket.on('end', this._finalize);

this._receiver = new Receiver(this._extensions, maxPayload, this.binaryType);
this._sender = new Sender(socket, this._extensions);
this._ultron = new Ultron(socket);
this._socket = socket;

this._ultron.on('close', this._finalize);
this._ultron.on('error', this._finalize);
this._ultron.on('end', this._finalize);

if (head.length > 0) socket.unshift(head);

this._ultron.on('data', (data) => this._receiver.add(data));
socket.on('data', this._receiver.add);

this._receiver.onmessage = (data) => this.emit('message', data);
this._receiver.onping = (data) => {
Expand All @@ -143,6 +140,11 @@ class WebSocket extends EventEmitter {
};
this._receiver.onpong = (data) => this.emit('pong', data);
this._receiver.onclose = (code, reason) => {
//
// Discard any additional data that is received on the socket.
//
this._socket.removeListener('data', this._receiver.add);

this._closeFrameReceived = true;
this._closeMessage = reason;
this._closeCode = code;
Expand Down Expand Up @@ -182,41 +184,32 @@ class WebSocket extends EventEmitter {
this._finalized = true;

if (typeof error === 'object') this.emit('error', error);
if (!this._socket) return this.emitClose();
if (!this._socket) {
this.readyState = WebSocket.CLOSED;
this.emit('close', this._closeCode, this._closeMessage);
return;
}

clearTimeout(this._closeTimer);
this._closeTimer = null;

this._ultron.destroy();
this._ultron = null;

this._socket.removeListener('data', this._receiver.add);
this._socket.removeListener('close', this._finalize);
this._socket.removeListener('error', this._finalize);
this._socket.removeListener('end', this._finalize);
this._socket.on('error', constants.NOOP);

if (!error) this._socket.end();
else this._socket.destroy();

this._socket = null;
this._sender = null;
this._receiver.cleanup(() => {
this.readyState = WebSocket.CLOSED;

this._receiver.cleanup(() => this.emitClose());
this._receiver = null;
}

/**
* Emit the `close` event.
*
* @private
*/
emitClose () {
this.readyState = WebSocket.CLOSED;

this.emit('close', this._closeCode, this._closeMessage);

if (this._extensions[PerMessageDeflate.extensionName]) {
this._extensions[PerMessageDeflate.extensionName].cleanup();
}
if (this._extensions[PerMessageDeflate.extensionName]) {
this._extensions[PerMessageDeflate.extensionName].cleanup();
}

this.removeAllListeners();
this.emit('close', this._closeCode, this._closeMessage);
});
}

/**
Expand Down
13 changes: 0 additions & 13 deletions test/receiver.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -443,19 +443,6 @@ describe('Receiver', function () {
assert.deepStrictEqual(data, ['', 'Hello']);
});

it('ignores data received after a close frame', function () {
const results = [];
const push = results.push.bind(results);
const p = new Receiver();

p.onclose = p.onmessage = push;

p.add(Buffer.from('8800', 'hex'));
p.add(Buffer.from('8100', 'hex'));

assert.deepStrictEqual(results, [1005, '']);
});

it('raises an error when RSV1 is on and permessage-deflate is disabled', function (done) {
const p = new Receiver();

Expand Down

0 comments on commit 5d8ab0e

Please sign in to comment.