diff --git a/README.md b/README.md index 791576af68..08aca9c421 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ npm install --save readable-stream This package is a mirror of the streams implementations in Node.js. -Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.14.2/docs/api/stream.html). +Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.15.2/docs/api/stream.html). If you want to guarantee a stable streams base, regardless of what version of Node you, or the users of your libraries are using, use **readable-stream** *only* and avoid the *"stream"* module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html). diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index b9b1b742cc..42e16bc8b2 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -141,7 +141,8 @@ function ReadableState(options, stream, isDuplex) { this.needReadable = false; this.emittedReadable = false; this.readableListening = false; - this.resumeScheduled = false; // Should close be emitted on destroy. Defaults to true. + this.resumeScheduled = false; + this.paused = true; // Should close be emitted on destroy. Defaults to true. this.emitClose = options.emitClose !== false; // has it been destroyed @@ -822,9 +823,14 @@ Readable.prototype.removeAllListeners = function (ev) { }; function updateReadableListening(self) { - self._readableState.readableListening = self.listenerCount('readable') > 0; // crude way to check if we should resume - - if (self.listenerCount('data') > 0) { + var state = self._readableState; + state.readableListening = self.listenerCount('readable') > 0; + + if (state.resumeScheduled && !state.paused) { + // flowing needs to be set to true now, otherwise + // the upcoming resume will not flow. + state.flowing = true; // crude way to check if we should resume + } else if (self.listenerCount('data') > 0) { self.resume(); } } @@ -848,6 +854,7 @@ Readable.prototype.resume = function () { resume(this, state); } + state.paused = false; return this; }; @@ -880,6 +887,7 @@ Readable.prototype.pause = function () { this.emit('pause'); } + this._readableState.paused = true; return this; }; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 9abbad6bc2..b35447aedc 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -103,8 +103,8 @@ function WritableState(options, stream, isDuplex) { options = options || {}; // Duplex streams are both readable and writable, but share // the same options object. // However, some cases require setting options to different - // values for the readable and the writable sides of the duplex stream. - // These options can be provided separately as readableXXX and writableXXX. + // values for the readable and the writable sides of the duplex stream, + // e.g. options.readableObjectMode vs. options.writableObjectMode, etc. if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex; // object stream flag to indicate whether or not this stream // contains buffers or objects. @@ -445,7 +445,7 @@ function onwrite(stream, er) { onwriteStateUpdate(state); if (er) onwriteError(stream, state, sync, er, cb);else { // Check if we're actually ready to finish, but don't emit yet - var finished = needFinish(state); + var finished = needFinish(state) || stream.destroyed; if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) { clearBuffer(stream, state); diff --git a/test/common/inspector-helper.js b/test/common/inspector-helper.js index 25d1f2efbe..33a29e49c1 100644 --- a/test/common/inspector-helper.js +++ b/test/common/inspector-helper.js @@ -635,7 +635,7 @@ function (_EventEmitter) { console.log('[test]', 'Connecting to a child Node process'); var upgradeRequest = yield this.sendUpgradeRequest(); - return new Promise(function (resolve, reject) { + return new Promise(function (resolve) { upgradeRequest.on('upgrade', function (message, socket) { return resolve(new InspectorSession(socket, _this8)); }).on('response', common.mustNotCall('Upgrade was not received')); @@ -655,7 +655,7 @@ function (_EventEmitter) { var _expectConnectionDeclined = _asyncToGenerator(function* () { console.log('[test]', 'Checking upgrade is not possible'); var upgradeRequest = yield this.sendUpgradeRequest(); - return new Promise(function (resolve, reject) { + return new Promise(function (resolve) { upgradeRequest.on('upgrade', common.mustNotCall('Upgrade was received')).on('response', function (response) { return response.on('data', function () {}).on('end', function () { return resolve(response.statusCode); diff --git a/test/parallel/test-stream-readable-readable-then-resume.js b/test/parallel/test-stream-readable-readable-then-resume.js new file mode 100644 index 0000000000..aee4551c3a --- /dev/null +++ b/test/parallel/test-stream-readable-readable-then-resume.js @@ -0,0 +1,49 @@ +"use strict"; + +/**/ +var bufferShim = require('safe-buffer').Buffer; +/**/ + + +var common = require('../common'); + +var _require = require('../../'), + Readable = _require.Readable; // This test verifies that a stream could be resumed after +// removing the readable event in the same tick + + +check(new Readable({ + objectMode: true, + highWaterMark: 1, + read: function read() { + if (!this.first) { + this.push('hello'); + this.first = true; + return; + } + + this.push(null); + } +})); + +function check(s) { + var readableListener = common.mustNotCall(); + s.on('readable', readableListener); + s.on('end', common.mustCall()); + s.removeListener('readable', readableListener); + s.resume(); +} + +; + +require('tap').pass('sync run'); + +var _list = process.listeners('uncaughtException'); + +process.removeAllListeners('uncaughtException'); + +_list.pop(); + +_list.forEach(function (e) { + return process.on('uncaughtException', e); +}); \ No newline at end of file diff --git a/test/parallel/test-stream-write-destroy.js b/test/parallel/test-stream-write-destroy.js new file mode 100644 index 0000000000..72449967dd --- /dev/null +++ b/test/parallel/test-stream-write-destroy.js @@ -0,0 +1,100 @@ +"use strict"; + +/**/ +var bufferShim = require('safe-buffer').Buffer; +/**/ + + +require('../common'); + +var assert = require('assert/'); + +var _require = require('../../'), + Writable = _require.Writable; // Test interaction between calling .destroy() on a writable and pending +// writes. + + +var _arr = [false, true]; + +for (var _i = 0; _i < _arr.length; _i++) { + var withPendingData = _arr[_i]; + var _arr2 = [false, true]; + + var _loop = function _loop() { + var useEnd = _arr2[_i2]; + var callbacks = []; + var w = new Writable({ + write: function write(data, enc, cb) { + callbacks.push(cb); + }, + // Effectively disable the HWM to observe 'drain' events more easily. + highWaterMark: 1 + }); + var chunksWritten = 0; + var drains = 0; + var finished = false; + w.on('drain', function () { + return drains++; + }); + w.on('finish', function () { + return finished = true; + }); + w.write('abc', function () { + return chunksWritten++; + }); + assert.strictEqual(chunksWritten, 0); + assert.strictEqual(drains, 0); + callbacks.shift()(); + assert.strictEqual(chunksWritten, 1); + assert.strictEqual(drains, 1); + + if (withPendingData) { + // Test 2 cases: There either is or is not data still in the write queue. + // (The second write will never actually get executed either way.) + w.write('def', function () { + return chunksWritten++; + }); + } + + if (useEnd) { + // Again, test 2 cases: Either we indicate that we want to end the + // writable or not. + w.end('ghi', function () { + return chunksWritten++; + }); + } else { + w.write('ghi', function () { + return chunksWritten++; + }); + } + + assert.strictEqual(chunksWritten, 1); + w.destroy(); + assert.strictEqual(chunksWritten, 1); + callbacks.shift()(); + assert.strictEqual(chunksWritten, 2); + assert.strictEqual(callbacks.length, 0); + assert.strictEqual(drains, 1); // When we used `.end()`, we see the 'finished' event if and only if + // we actually finished processing the write queue. + + assert.strictEqual(finished, !withPendingData && useEnd); + }; + + for (var _i2 = 0; _i2 < _arr2.length; _i2++) { + _loop(); + } +} + +; + +require('tap').pass('sync run'); + +var _list = process.listeners('uncaughtException'); + +process.removeAllListeners('uncaughtException'); + +_list.pop(); + +_list.forEach(function (e) { + return process.on('uncaughtException', e); +}); \ No newline at end of file