diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js
index cdd5bcb791f451..041ff9337594f5 100644
--- a/lib/internal/streams/pipeline.js
+++ b/lib/internal/streams/pipeline.js
@@ -50,13 +50,30 @@ function destroyer(stream, reading, writing, final, callback) {
       return callback();
     }
 
-    if (!err && reading && !writing && stream.writable) {
-      return callback();
-    }
+    const wState = stream._writableState;
+
+    const writableEnded = stream.writableEnded ||
+      (wState && wState.ended);
+    const writableFinished = stream.writableFinished ||
+      (wState && wState.finished);
+
+    const willFinish = stream.writable ||
+      (writableEnded && !writableFinished);
+    const willEnd = stream.readable;
 
-    if (err || !final || !stream.readable) {
-      destroyImpl.destroyer(stream, err);
+    if (!err) {
+      // First
+      if (reading && !writing && willFinish) {
+        return callback();
+      }
+
+      // Last
+      if (!reading && writing && willEnd) {
+        return callback();
+      }
     }
+
+    destroyImpl.destroyer(stream, err);
     callback(err);
   });
 
@@ -81,7 +98,9 @@ function destroyer(stream, reading, writing, final, callback) {
         .once('end', _destroy)
         .once('error', _destroy);
     } else {
-      _destroy(err);
+      // Do an extra tick so that 'finish' has a chance to be emitted if
+      // first stream is Duplex.
+      process.nextTick(_destroy, err);
     }
   });
 
diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js
index b273fddfa3b613..6d9e2be5299422 100644
--- a/test/parallel/test-stream-pipeline.js
+++ b/test/parallel/test-stream-pipeline.js
@@ -13,6 +13,7 @@ const {
 const assert = require('assert');
 const http = require('http');
 const { promisify } = require('util');
+const net = require('net');
 
 {
   let finished = false;
@@ -1118,3 +1119,51 @@ const { promisify } = require('util');
     assert.strictEqual(closed, true);
   }));
 }
+
+{
+  const server = net.createServer(common.mustCall((socket) => {
+    // echo server
+    pipeline(socket, socket, common.mustCall());
+    // 13 force destroys the socket before it has a chance to emit finish
+    socket.on('finish', common.mustCall(() => {
+      server.close();
+    }));
+  })).listen(0, common.mustCall(() => {
+    const socket = net.connect(server.address().port);
+    socket.end();
+  }));
+}
+
+{
+  const d = new Duplex({
+    autoDestroy: false,
+    write: common.mustCall((data, enc, cb) => {
+      d.push(data);
+      cb();
+    }),
+    read: common.mustCall(() => {
+      d.push(null);
+    }),
+    final: common.mustCall((cb) => {
+      setTimeout(() => {
+        assert.strictEqual(d.destroyed, false);
+        cb();
+      }, 1000);
+    }),
+    // `destroy()` won't be invoked by pipeline since
+    // the writable side has not completed when
+    // the pipeline has completed.
+    destroy: common.mustNotCall()
+  });
+
+  const sink = new Writable({
+    write: common.mustCall((data, enc, cb) => {
+      cb();
+    })
+  });
+
+  pipeline(d, sink, common.mustCall());
+
+  d.write('test');
+  d.end();
+}