From e1edd39e9246982574ac76752cef6afad92a0e6b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 21 Apr 2020 12:12:25 +0200 Subject: [PATCH] stream: pipeline don't destroy Duplex src before 'finish' pipeline was too agressive with destroying Duplex streams which were the first argument into pipeline. Just because it's !writable does not mean that it is safe to be destroyed, unless it has also emitted 'finish'. Fixes: https://github.com/nodejs/node/issues/32955 --- lib/internal/streams/pipeline.js | 31 +++++++++++++---- test/parallel/test-stream-pipeline.js | 49 +++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index cdd5bcb791f451..041ff9337594f5 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -50,13 +50,30 @@ function destroyer(stream, reading, writing, final, callback) { return callback(); } - if (!err && reading && !writing && stream.writable) { - return callback(); - } + const wState = stream._writableState; + + const writableEnded = stream.writableEnded || + (wState && wState.ended); + const writableFinished = stream.writableFinished || + (wState && wState.finished); + + const willFinish = stream.writable || + (writableEnded && !writableFinished); + const willEnd = stream.readable; - if (err || !final || !stream.readable) { - destroyImpl.destroyer(stream, err); + if (!err) { + // First + if (reading && !writing && willFinish) { + return callback(); + } + + // Last + if (!reading && writing && willEnd) { + return callback(); + } } + + destroyImpl.destroyer(stream, err); callback(err); }); @@ -81,7 +98,9 @@ function destroyer(stream, reading, writing, final, callback) { .once('end', _destroy) .once('error', _destroy); } else { - _destroy(err); + // Do an extra tick so that 'finish' has a chance to be emitted if + // first stream is Duplex. + process.nextTick(_destroy, err); } }); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index b273fddfa3b613..6d9e2be5299422 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -13,6 +13,7 @@ const { const assert = require('assert'); const http = require('http'); const { promisify } = require('util'); +const net = require('net'); { let finished = false; @@ -1118,3 +1119,51 @@ const { promisify } = require('util'); assert.strictEqual(closed, true); })); } + +{ + const server = net.createServer(common.mustCall((socket) => { + // echo server + pipeline(socket, socket, common.mustCall()); + // 13 force destroys the socket before it has a chance to emit finish + socket.on('finish', common.mustCall(() => { + server.close(); + })); + })).listen(0, common.mustCall(() => { + const socket = net.connect(server.address().port); + socket.end(); + })); +} + +{ + const d = new Duplex({ + autoDestroy: false, + write: common.mustCall((data, enc, cb) => { + d.push(data); + cb(); + }), + read: common.mustCall(() => { + d.push(null); + }), + final: common.mustCall((cb) => { + setTimeout(() => { + assert.strictEqual(d.destroyed, false); + cb(); + }, 1000); + }), + // `destroy()` won't be invoked by pipeline since + // the writable side has not completed when + // the pipeline has completed. + destroy: common.mustNotCall() + }); + + const sink = new Writable({ + write: common.mustCall((data, enc, cb) => { + cb(); + }) + }); + + pipeline(d, sink, common.mustCall()); + + d.write('test'); + d.end(); +}