Skip to content

Commit

Permalink
Build from Node v10.15.3 (#402)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina authored Apr 1, 2019
1 parent 5b90ed2 commit 3ec079e
Show file tree
Hide file tree
Showing 15 changed files with 588 additions and 225 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ npm install --save readable-stream

This package is a mirror of the streams implementations in Node.js.

Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.15.2/docs/api/stream.html).
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.15.3/docs/api/stream.html).

If you want to guarantee a stable streams base, regardless of what version of
Node you, or the users of your libraries are using, use **readable-stream** *only* and avoid the *"stream"* module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html).
Expand Down
1 change: 1 addition & 0 deletions build/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ pump(
file !== 'test-stream-base-prototype-accessors.js' &&
file !== 'test-stream-base-prototype-accessors-enumerability.js' &&
file !== 'test-stream-wrap-drain.js' &&
file !== 'test-stream-pipeline-http2.js' &&
file !== 'test-stream-base-typechecking.js') {
processTestFile(file)
}
Expand Down
11 changes: 11 additions & 0 deletions build/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,17 @@ module.exports['internal/streams/async_iterator.js'] = [

module.exports['internal/streams/end-of-stream.js'] = [
, errorsTwoLevel
, [
/const \{ once \} = require\('internal\/util'\);/,
`function once(callback) {
let called = false;
return function(...args) {
if (called) return;
called = true;
callback.apply(this, args);
};
}`
]
]

module.exports['internal/streams/pipeline.js'] = [
Expand Down
30 changes: 26 additions & 4 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,35 @@ function maybeReadMore(stream, state) {
}

function maybeReadMore_(stream, state) {
var len = state.length;

while (!state.reading && !state.ended && state.length < state.highWaterMark) {
// Attempt to read more data if we should.
//
// The conditions for reading more data are (one of):
// - Not enough data buffered (state.length < state.highWaterMark). The loop
// is responsible for filling the buffer with enough data if such data
// is available. If highWaterMark is 0 and we are not in the flowing mode
// we should _not_ attempt to buffer any extra data. We'll get more data
// when the stream consumer calls read() instead.
// - No data in the buffer, and the stream is in flowing mode. In this mode
// the loop below is responsible for ensuring read() is called. Failing to
// call read here would abort the flow and there's no other mechanism for
// continuing the flow if the stream consumer has just subscribed to the
// 'data' event.
//
// In addition to the above conditions to keep reading data, the following
// conditions prevent the data from being read:
// - The stream has ended (state.ended).
// - There is already a pending 'read' operation (state.reading). This is a
// case where the the stream has called the implementation defined _read()
// method, but they are processing the call asynchronously and have _not_
// called push() with new data. In this case we skip performing more
// read()s. The execution ends in this method again after the _read() ends
// up calling push() with more data.
while (!state.reading && !state.ended && (state.length < state.highWaterMark || state.flowing && state.length === 0)) {
var len = state.length;
debug('maybeReadMore read 0');
stream.read(0);
if (len === state.length) // didn't get any data, stop spinning.
break;else len = state.length;
break;
}

state.readingMore = false;
Expand Down
17 changes: 10 additions & 7 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ function onReadable(iter) {
function wrapForNext(lastPromise, iter) {
return function (resolve, reject) {
lastPromise.then(function () {
if (iter[kEnded]) {
resolve(createIterResult(undefined, true));
return;
}

iter[kHandlePromise](resolve, reject);
}, reject);
};
Expand All @@ -70,7 +75,7 @@ var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPro
}

if (this[kEnded]) {
return Promise.resolve(createIterResult(null, true));
return Promise.resolve(createIterResult(undefined, true));
}

if (this[kStream].destroyed) {
Expand All @@ -83,7 +88,7 @@ var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPro
if (_this[kError]) {
reject(_this[kError]);
} else {
resolve(createIterResult(null, true));
resolve(createIterResult(undefined, true));
}
});
});
Expand Down Expand Up @@ -128,7 +133,7 @@ var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPro
return;
}

resolve(createIterResult(null, true));
resolve(createIterResult(undefined, true));
});
});
}), _Object$setPrototypeO), AsyncIteratorPrototype);
Expand All @@ -151,9 +156,6 @@ var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterat
}), _defineProperty(_Object$create, kEnded, {
value: stream._readableState.endEmitted,
writable: true
}), _defineProperty(_Object$create, kLastPromise, {
value: null,
writable: true
}), _defineProperty(_Object$create, kHandlePromise, {
value: function value(resolve, reject) {
var data = iterator[kStream].read();
Expand All @@ -170,6 +172,7 @@ var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterat
},
writable: true
}), _Object$create));
iterator[kLastPromise] = null;
finished(stream, function (err) {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
var reject = iterator[kLastReject]; // reject if we are waiting for data in the Promise
Expand All @@ -192,7 +195,7 @@ var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterat
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(null, true));
resolve(createIterResult(undefined, true));
}

iterator[kEnded] = true;
Expand Down
43 changes: 28 additions & 15 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,50 @@

var ERR_STREAM_PREMATURE_CLOSE = require('../../../errors').codes.ERR_STREAM_PREMATURE_CLOSE;

function noop() {}

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function once(callback) {
var called = false;
return function (err) {
return function () {
if (called) return;
called = true;
callback.call(this, err);

for (var _len = arguments.length, args = new Array(_len), _key = 0; _key < _len; _key++) {
args[_key] = arguments[_key];
}

callback.apply(this, args);
};
}

function noop() {}

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function eos(stream, opts, callback) {
if (typeof opts === 'function') return eos(stream, null, opts);
if (!opts) opts = {};
callback = once(callback || noop);
var ws = stream._writableState;
var rs = stream._readableState;
var readable = opts.readable || opts.readable !== false && stream.readable;
var writable = opts.writable || opts.writable !== false && stream.writable;

var onlegacyfinish = function onlegacyfinish() {
if (!stream.writable) onfinish();
};

var writableEnded = stream._writableState && stream._writableState.finished;

var onfinish = function onfinish() {
writable = false;
writableEnded = true;
if (!readable) callback.call(stream);
};

var readableEnded = stream._readableState && stream._readableState.endEmitted;

var onend = function onend() {
readable = false;
readableEnded = true;
if (!writable) callback.call(stream);
};

Expand All @@ -47,12 +56,16 @@ function eos(stream, opts, callback) {
};

var onclose = function onclose() {
if (readable && !(rs && rs.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
var err;

if (readable && !readableEnded) {
if (!stream._readableState || !stream._readableState.ended) err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}

if (writable && !(ws && ws.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
if (writable && !writableEnded) {
if (!stream._writableState || !stream._writableState.ended) err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
};

Expand All @@ -64,7 +77,7 @@ function eos(stream, opts, callback) {
stream.on('complete', onfinish);
stream.on('abort', onclose);
if (stream.req) onrequest();else stream.on('request', onrequest);
} else if (writable && !ws) {
} else if (writable && !stream._writableState) {
// legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
Expand Down
57 changes: 57 additions & 0 deletions test/parallel/test-stream-pipeline-queued-end-in-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"use strict";

/*<replacement>*/
var bufferShim = require('safe-buffer').Buffer;
/*</replacement>*/


var common = require('../common');

var assert = require('assert/');

var _require = require('../../'),
Readable = _require.Readable,
Duplex = _require.Duplex,
pipeline = _require.pipeline; // Test that the callback for pipeline() is called even when the ._destroy()
// method of the stream places an .end() request to itself that does not
// get processed before the destruction of the stream (i.e. the 'close' event).
// Refs: https://github.com/nodejs/node/issues/24456


var readable = new Readable({
read: common.mustCall(function () {})
});
var duplex = new Duplex({
write: function write(chunk, enc, cb) {// Simulate messages queueing up.
},
read: function read() {},
destroy: function destroy(err, cb) {
// Call end() from inside the destroy() method, like HTTP/2 streams
// do at the time of writing.
this.end();
cb(err);
}
});
duplex.on('finished', common.mustNotCall());
pipeline(readable, duplex, common.mustCall(function (err) {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
})); // Write one chunk of data, and destroy the stream later.
// That should trigger the pipeline destruction.

readable.push('foo');
setImmediate(function () {
readable.destroy();
});
;

require('tap').pass('sync run');

var _list = process.listeners('uncaughtException');

process.removeAllListeners('uncaughtException');

_list.pop();

_list.forEach(function (e) {
return process.on('uncaughtException', e);
});
38 changes: 0 additions & 38 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ var bufferShim = require('safe-buffer').Buffer;

var common = require('../common');

if (!common.hasCrypto) common.skip('missing crypto');

var _require = require('../../'),
Stream = _require.Stream,
Writable = _require.Writable,
Expand All @@ -24,14 +22,6 @@ var assert = require('assert/');

var http = require('http');

var http2 = {
createServer: function createServer() {
return {
listen: function listen() {}
};
}
};

var promisify = require('util-promisify');

{
Expand Down Expand Up @@ -281,34 +271,6 @@ var promisify = require('util-promisify');
});
});
}
{
var _server4 = http2.createServer(function (req, res) {
pipeline(req, res, common.mustCall());
});

_server4.listen(0, function () {
var url = "http://localhost:".concat(_server4.address().port);
var client = http2.connect(url);
var req = client.request({
':method': 'POST'
});
var rs = new Readable({
read: function read() {
rs.push('hello');
}
});
pipeline(rs, req, common.mustCall(function (err) {
_server4.close();

client.close();
}));
var cnt = 10;
req.on('data', function (data) {
cnt--;
if (cnt === 0) rs.destroy();
});
});
}
{
var makeTransform = function makeTransform() {
var tr = new Transform({
Expand Down
Loading

0 comments on commit 3ec079e

Please sign in to comment.