Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: finished should invoke callback for closed streams #31509

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ function ReadableState(options, stream, isDuplex) {
// Indicates whether the stream has errored.
this.errored = false;

// Indicates whether the stream has finished destroying.
this.closed = false;

// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
Expand Down
3 changes: 3 additions & 0 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ function WritableState(options, stream, isDuplex) {
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;

// Indicates whether the stream has finished destroying.
this.closed = false;

// Count buffered requests
this.bufferedRequestCount = 0;

Expand Down
10 changes: 0 additions & 10 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,6 @@ function finish(self, err) {
return new Promise((resolve, reject) => {
const stream = self[kStream];

// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);

if (ended) {
resolve(createIterResult(undefined, true));
return;
}

finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
Expand Down
9 changes: 9 additions & 0 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ function destroy(err, cb) {
}
}

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if (cb) {
// Invoke callback before scheduling emitClose so that callback
// can schedule before.
Expand Down Expand Up @@ -101,6 +108,7 @@ function undestroy() {
const w = this._writableState;

if (r) {
r.closed = false;
r.destroyed = false;
r.errored = false;
r.reading = false;
Expand All @@ -110,6 +118,7 @@ function undestroy() {
}

if (w) {
w.closed = false;
w.destroyed = false;
w.errored = false;
w.ended = false;
Expand Down
30 changes: 26 additions & 4 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ function isWritableFinished(stream) {
return wState.finished || (wState.ended && wState.length === 0);
}

function nop() {}

function eos(stream, opts, callback) {
if (arguments.length === 2) {
callback = opts;
Expand All @@ -52,20 +54,23 @@ function eos(stream, opts, callback) {
let writable = opts.writable ||
(opts.writable !== false && isWritable(stream));

const wState = stream._writableState;
const rState = stream._readableState;

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

let writableFinished = stream.writableFinished ||
(stream._writableState && stream._writableState.finished);
(rState && rState.finished);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like wState should be here instead of rState

const onfinish = () => {
writable = false;
writableFinished = true;
if (!readable) callback.call(stream);
};

let readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
(rState && rState.endEmitted);
const onend = () => {
readable = false;
readableEnded = true;
Expand All @@ -79,7 +84,7 @@ function eos(stream, opts, callback) {
const onclose = () => {
let err;
if (readable && !readableEnded) {
if (!stream._readableState || !stream._readableState.ended)
if (!rState || !rState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
Expand All @@ -99,7 +104,7 @@ function eos(stream, opts, callback) {
stream.on('abort', onclose);
if (stream.req) onrequest();
else stream.on('request', onrequest);
} else if (writable && !stream._writableState) { // legacy streams
} else if (writable && !wState) { // legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
}
Expand All @@ -114,7 +119,24 @@ function eos(stream, opts, callback) {
if (opts.error !== false) stream.on('error', onerror);
stream.on('close', onclose);

const closed = (wState && wState.closed) || (rState && rState.closed) ||
(wState && wState.errorEmitted) || (rState && rState.errorEmitted) ||
(wState && wState.finished) || (rState && rState.endEmitted) ||
(rState && stream.req && stream.aborted);

if (closed) {
// TODO(ronag): Re-throw error if errorEmitted?
// TODO(ronag): Throw premature close as if finished was called?
// before being closed? i.e. if closed but not errored, ended or finished.
// TODO(ronag): Throw some kind of error? Does it make sense
// to call finished() on a "finished" stream?
process.nextTick(() => {
callback();
});
}

return function() {
callback = nop;
stream.removeListener('aborted', onclose);
stream.removeListener('complete', onfinish);
stream.removeListener('abort', onclose);
Expand Down
117 changes: 117 additions & 0 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,120 @@ const { promisify } = require('util');
w.end('asd');
w.destroy();
}

function testClosed(factory) {
{
// If already destroyed but finished is cancelled in same tick
// don't invoke the callback,

const s = factory();
s.destroy();
const dispose = finished(s, common.mustNotCall());
dispose();
}

{
// If already destroyed invoked callback.

const s = factory();
s.destroy();
finished(s, common.mustCall());
}

{
// Don't invoke until destroy has completed.

let destroyed = false;
const s = factory({
destroy(err, cb) {
setImmediate(() => {
destroyed = true;
cb();
});
}
});
s.destroy();
finished(s, common.mustCall(() => {
assert.strictEqual(destroyed, true);
}));
}

{
// Invoke callback even if close is inhibited.

const s = factory({
emitClose: false,
destroy(err, cb) {
cb();
finished(s, common.mustCall());
}
});
s.destroy();
}

{
// Invoke with deep async.

const s = factory({
destroy(err, cb) {
setImmediate(() => {
cb();
setImmediate(() => {
finished(s, common.mustCall());
});
});
}
});
s.destroy();
}
}

testClosed((opts) => new Readable({ ...opts }));
testClosed((opts) => new Writable({ write() {}, ...opts }));

{
const w = new Writable({
write(chunk, encoding, cb) {
cb();
},
autoDestroy: false
});
w.end('asd');
process.nextTick(() => {
finished(w, common.mustCall());
});
}

{
const w = new Writable({
write(chunk, encoding, cb) {
cb(new Error());
},
autoDestroy: false
});
w.write('asd');
w.on('error', common.mustCall(() => {
finished(w, common.mustCall());
}));
}


{
const r = new Readable({
autoDestroy: false
});
r.push(null);
r.resume();
r.on('end', common.mustCall(() => {
finished(r, common.mustCall());
}));
}

{
const rs = fs.createReadStream(__filename, { autoClose: false });
rs.resume();
rs.on('close', common.mustNotCall());
rs.on('end', common.mustCall(() => {
finished(rs, common.mustCall());
}));
}