Skip to content

Commit

Permalink
Updated to v10.15.2 (#401)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina authored Feb 28, 2019
1 parent e02fcbb commit cbd72df
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ npm install --save readable-stream

This package is a mirror of the streams implementations in Node.js.

Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.14.2/docs/api/stream.html).
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.15.2/docs/api/stream.html).

If you want to guarantee a stable streams base, regardless of what version of
Node you, or the users of your libraries are using, use **readable-stream** *only* and avoid the *"stream"* module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html).
Expand Down
16 changes: 12 additions & 4 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ function ReadableState(options, stream, isDuplex) {
this.needReadable = false;
this.emittedReadable = false;
this.readableListening = false;
this.resumeScheduled = false; // Should close be emitted on destroy. Defaults to true.
this.resumeScheduled = false;
this.paused = true; // Should close be emitted on destroy. Defaults to true.

this.emitClose = options.emitClose !== false; // has it been destroyed

Expand Down Expand Up @@ -822,9 +823,14 @@ Readable.prototype.removeAllListeners = function (ev) {
};

function updateReadableListening(self) {
self._readableState.readableListening = self.listenerCount('readable') > 0; // crude way to check if we should resume

if (self.listenerCount('data') > 0) {
var state = self._readableState;
state.readableListening = self.listenerCount('readable') > 0;

if (state.resumeScheduled && !state.paused) {
// flowing needs to be set to true now, otherwise
// the upcoming resume will not flow.
state.flowing = true; // crude way to check if we should resume
} else if (self.listenerCount('data') > 0) {
self.resume();
}
}
Expand All @@ -848,6 +854,7 @@ Readable.prototype.resume = function () {
resume(this, state);
}

state.paused = false;
return this;
};

Expand Down Expand Up @@ -880,6 +887,7 @@ Readable.prototype.pause = function () {
this.emit('pause');
}

this._readableState.paused = true;
return this;
};

Expand Down
6 changes: 3 additions & 3 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ function WritableState(options, stream, isDuplex) {
options = options || {}; // Duplex streams are both readable and writable, but share
// the same options object.
// However, some cases require setting options to different
// values for the readable and the writable sides of the duplex stream.
// These options can be provided separately as readableXXX and writableXXX.
// values for the readable and the writable sides of the duplex stream,
// e.g. options.readableObjectMode vs. options.writableObjectMode, etc.

if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex; // object stream flag to indicate whether or not this stream
// contains buffers or objects.
Expand Down Expand Up @@ -445,7 +445,7 @@ function onwrite(stream, er) {
onwriteStateUpdate(state);
if (er) onwriteError(stream, state, sync, er, cb);else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state);
var finished = needFinish(state) || stream.destroyed;

if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
clearBuffer(stream, state);
Expand Down
4 changes: 2 additions & 2 deletions test/common/inspector-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ function (_EventEmitter) {

console.log('[test]', 'Connecting to a child Node process');
var upgradeRequest = yield this.sendUpgradeRequest();
return new Promise(function (resolve, reject) {
return new Promise(function (resolve) {
upgradeRequest.on('upgrade', function (message, socket) {
return resolve(new InspectorSession(socket, _this8));
}).on('response', common.mustNotCall('Upgrade was not received'));
Expand All @@ -655,7 +655,7 @@ function (_EventEmitter) {
var _expectConnectionDeclined = _asyncToGenerator(function* () {
console.log('[test]', 'Checking upgrade is not possible');
var upgradeRequest = yield this.sendUpgradeRequest();
return new Promise(function (resolve, reject) {
return new Promise(function (resolve) {
upgradeRequest.on('upgrade', common.mustNotCall('Upgrade was received')).on('response', function (response) {
return response.on('data', function () {}).on('end', function () {
return resolve(response.statusCode);
Expand Down
49 changes: 49 additions & 0 deletions test/parallel/test-stream-readable-readable-then-resume.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"use strict";

/*<replacement>*/
var bufferShim = require('safe-buffer').Buffer;
/*</replacement>*/


var common = require('../common');

var _require = require('../../'),
Readable = _require.Readable; // This test verifies that a stream could be resumed after
// removing the readable event in the same tick


check(new Readable({
objectMode: true,
highWaterMark: 1,
read: function read() {
if (!this.first) {
this.push('hello');
this.first = true;
return;
}

this.push(null);
}
}));

function check(s) {
var readableListener = common.mustNotCall();
s.on('readable', readableListener);
s.on('end', common.mustCall());
s.removeListener('readable', readableListener);
s.resume();
}

;

require('tap').pass('sync run');

var _list = process.listeners('uncaughtException');

process.removeAllListeners('uncaughtException');

_list.pop();

_list.forEach(function (e) {
return process.on('uncaughtException', e);
});
100 changes: 100 additions & 0 deletions test/parallel/test-stream-write-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"use strict";

/*<replacement>*/
var bufferShim = require('safe-buffer').Buffer;
/*</replacement>*/


require('../common');

var assert = require('assert/');

var _require = require('../../'),
Writable = _require.Writable; // Test interaction between calling .destroy() on a writable and pending
// writes.


var _arr = [false, true];

for (var _i = 0; _i < _arr.length; _i++) {
var withPendingData = _arr[_i];
var _arr2 = [false, true];

var _loop = function _loop() {
var useEnd = _arr2[_i2];
var callbacks = [];
var w = new Writable({
write: function write(data, enc, cb) {
callbacks.push(cb);
},
// Effectively disable the HWM to observe 'drain' events more easily.
highWaterMark: 1
});
var chunksWritten = 0;
var drains = 0;
var finished = false;
w.on('drain', function () {
return drains++;
});
w.on('finish', function () {
return finished = true;
});
w.write('abc', function () {
return chunksWritten++;
});
assert.strictEqual(chunksWritten, 0);
assert.strictEqual(drains, 0);
callbacks.shift()();
assert.strictEqual(chunksWritten, 1);
assert.strictEqual(drains, 1);

if (withPendingData) {
// Test 2 cases: There either is or is not data still in the write queue.
// (The second write will never actually get executed either way.)
w.write('def', function () {
return chunksWritten++;
});
}

if (useEnd) {
// Again, test 2 cases: Either we indicate that we want to end the
// writable or not.
w.end('ghi', function () {
return chunksWritten++;
});
} else {
w.write('ghi', function () {
return chunksWritten++;
});
}

assert.strictEqual(chunksWritten, 1);
w.destroy();
assert.strictEqual(chunksWritten, 1);
callbacks.shift()();
assert.strictEqual(chunksWritten, 2);
assert.strictEqual(callbacks.length, 0);
assert.strictEqual(drains, 1); // When we used `.end()`, we see the 'finished' event if and only if
// we actually finished processing the write queue.

assert.strictEqual(finished, !withPendingData && useEnd);
};

for (var _i2 = 0; _i2 < _arr2.length; _i2++) {
_loop();
}
}

;

require('tap').pass('sync run');

var _list = process.listeners('uncaughtException');

process.removeAllListeners('uncaughtException');

_list.pop();

_list.forEach(function (e) {
return process.on('uncaughtException', e);
});

0 comments on commit cbd72df

Please sign in to comment.