From 35b6669e13f0a59e0d34ef328bcfd067b828e208 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 18 Jun 2021 11:55:12 +0200 Subject: [PATCH] stream: use finished for pump Re-use existing compat logic for pump by using finished. PR-URL: https://github.com/nodejs/node/pull/39203 Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum --- lib/internal/streams/pipeline.js | 72 ++++++++++++--------------- test/parallel/test-stream-pipeline.js | 33 ------------ 2 files changed, 32 insertions(+), 73 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 202a8cf9810d62..5759dbd4a580a3 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -20,20 +20,16 @@ const { ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED, - ERR_STREAM_PREMATURE_CLOSE, }, } = require('internal/errors'); const { validateCallback } = require('internal/validators'); -function noop() {} - const { isIterable, isReadable, isStream, } = require('internal/streams/utils'); -const assert = require('internal/assert'); let PassThrough; let Readable; @@ -109,62 +105,58 @@ async function* fromReadable(val) { async function pump(iterable, writable, finish) { let error; - let callback = noop; + let onresolve = null; + const resume = (err) => { - error = aggregateTwoErrors(error, err); - const _callback = callback; - callback = noop; - _callback(); - }; - const onClose = () => { - resume(new ERR_STREAM_PREMATURE_CLOSE()); + if (err) { + error = err; + } + + if (onresolve) { + const callback = onresolve; + onresolve = null; + callback(); + } }; - const waitForDrain = () => new Promise((resolve) => { - assert(callback === noop); - if (error || writable.destroyed) { - resolve(); + const wait = () => new Promise((resolve, reject) => { + if (error) { + reject(error); } else { - callback = resolve; + onresolve = () => { + if (error) { + reject(error); + } else { + resolve(); + } + }; } }); - writable - .on('drain', resume) - .on('error', resume) - .on('close', onClose); + writable.on('drain', resume); + const cleanup = eos(writable, { readable: false }, resume); try { if (writable.writableNeedDrain) { - await waitForDrain(); - } - - if (error) { - return; + await wait(); } for await (const chunk of iterable) { if (!writable.write(chunk)) { - await waitForDrain(); + await wait(); } - if (error) { - return; - } - } - - if (error) { - return; } writable.end(); + + await wait(); + + finish(); } catch (err) { - error = aggregateTwoErrors(error, err); + finish(error !== err ? aggregateTwoErrors(error, err) : err); } finally { - writable - .off('drain', resume) - .off('error', resume) - .off('close', onClose); - finish(error); + cleanup(); + writable.off('drain', resume); } } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index aaf726ea5a0350..e2e5fe2e0d561a 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1387,36 +1387,3 @@ const net = require('net'); assert.strictEqual(res, content); })); } - -{ - const writableLike = new Stream(); - writableLike.writableNeedDrain = true; - - pipeline( - async function *() {}, - writableLike, - common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); - }) - ); - - writableLike.emit('close'); -} - -{ - const writableLike = new Stream(); - writableLike.write = () => false; - - pipeline( - async function *() { - yield null; - yield null; - }, - writableLike, - common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); - }) - ); - - writableLike.emit('close'); -}