From 08a0c6c56c0eb862f0ad4611360299c5b5d23c5f Mon Sep 17 00:00:00 2001
From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com>
Date: Thu, 26 Oct 2023 11:42:58 +0300
Subject: [PATCH] stream: improve from perf
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

PR-URL: https://github.com/nodejs/node/pull/50359
Reviewed-By: Vinícius Lourenço Claro Cardoso <contact@viniciusl.com.br>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
---
 lib/internal/streams/from.js | 131 ++++++++++++++++++++++++++++++-----
 1 file changed, 113 insertions(+), 18 deletions(-)

diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js
index c272a01f418dce..aa7e031d3e48d4 100644
--- a/lib/internal/streams/from.js
+++ b/lib/internal/streams/from.js
@@ -36,6 +36,7 @@ function from(Readable, iterable, opts) {
     throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
   }
 
+
   const readable = new Readable({
     objectMode: true,
     highWaterMark: 1,
@@ -46,11 +47,19 @@ function from(Readable, iterable, opts) {
   // Flag to protect against _read
   // being called before last iteration completion.
   let reading = false;
+  let isAsyncValues = false;
 
   readable._read = function() {
     if (!reading) {
       reading = true;
-      next();
+
+      if (isAsync) {
+        nextAsync();
+      } else if (isAsyncValues) {
+        nextSyncWithAsyncValues();
+      } else {
+        nextSyncWithSyncValues();
+      }
     }
   };
 
@@ -78,29 +87,115 @@ function from(Readable, iterable, opts) {
     }
   }
 
-  async function next() {
+  // There are a lot of duplication here, it's done on purpose for performance
+  // reasons - avoid await when not needed.
+
+  function nextSyncWithSyncValues() {
+    for (;;) {
+      try {
+        const { value, done } = iterator.next();
+
+        if (done) {
+          readable.push(null);
+          return;
+        }
+
+        if (value &&
+          typeof value.then === 'function') {
+          return changeToAsyncValues(value);
+        }
+
+        if (value === null) {
+          reading = false;
+          throw new ERR_STREAM_NULL_VALUES();
+        }
+
+        if (readable.push(value)) {
+          continue;
+        }
+
+        reading = false;
+      } catch (err) {
+        readable.destroy(err);
+      }
+      break;
+    }
+  }
+
+  async function changeToAsyncValues(value) {
+    isAsyncValues = true;
+
+    try {
+      const res = await value;
+
+      if (res === null) {
+        reading = false;
+        throw new ERR_STREAM_NULL_VALUES();
+      }
+
+      if (readable.push(res)) {
+        nextSyncWithAsyncValues();
+        return;
+      }
+
+      reading = false;
+    } catch (err) {
+      readable.destroy(err);
+    }
+  }
+
+  async function nextSyncWithAsyncValues() {
     for (;;) {
       try {
-        const { value, done } = isAsync ?
-          await iterator.next() :
-          iterator.next();
+        const { value, done } = iterator.next();
 
         if (done) {
           readable.push(null);
-        } else {
-          const res = (value &&
-            typeof value.then === 'function') ?
-            await value :
-            value;
-          if (res === null) {
-            reading = false;
-            throw new ERR_STREAM_NULL_VALUES();
-          } else if (readable.push(res)) {
-            continue;
-          } else {
-            reading = false;
-          }
+          return;
+        }
+
+        const res = (value &&
+          typeof value.then === 'function') ?
+          await value :
+          value;
+
+        if (res === null) {
+          reading = false;
+          throw new ERR_STREAM_NULL_VALUES();
         }
+
+        if (readable.push(res)) {
+          continue;
+        }
+
+        reading = false;
+      } catch (err) {
+        readable.destroy(err);
+      }
+      break;
+    }
+  }
+
+  async function nextAsync() {
+    for (;;) {
+      try {
+        const { value, done } = await iterator.next();
+
+        if (done) {
+          readable.push(null);
+          return;
+        }
+
+        if (value === null) {
+          reading = false;
+          throw new ERR_STREAM_NULL_VALUES();
+        }
+
+        if (readable.push(value)) {
+          continue;
+        }
+
+        reading = false;
       } catch (err) {
         readable.destroy(err);
       }