From 33e291368a48740a25a6661537a1b770f7342c71 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 26 Sep 2023 16:24:36 +0530 Subject: [PATCH] src: introduce PullAll method to speed up blob.text/arrayBuffer Refs: https://github.com/nodejs/performance/issues/118 --- lib/internal/blob.js | 33 ++++++---------- src/node_blob.cc | 91 ++++++++++++++++++++++++++++++++++++++++++++ src/node_blob.h | 1 + 3 files changed, 103 insertions(+), 22 deletions(-) diff --git a/lib/internal/blob.js b/lib/internal/blob.js index a54adb615fbc17..e7e1b15e3b7f52 100644 --- a/lib/internal/blob.js +++ b/lib/internal/blob.js @@ -274,28 +274,17 @@ class Blob { const { promise, resolve, reject } = createDeferredPromise(); const reader = this[kHandle].getReader(); - const buffers = []; - const readNext = () => { - reader.pull((status, buffer) => { - if (status === 0) { - // EOS, concat & resolve - // buffer should be undefined here - resolve(concat(buffers)); - return; - } else if (status < 0) { - // The read could fail for many different reasons when reading - // from a non-memory resident blob part (e.g. file-backed blob). - // The error details the system error code. - const error = lazyDOMException('The blob could not be read', 'NotReadableError'); - reject(error); - return; - } - if (buffer !== undefined) - buffers.push(buffer); - queueMicrotask(() => readNext()); - }); - }; - readNext(); + reader.pullAll((status, buffer) => { + if (status === 0) { + resolve(buffer); + } else if (status < 0) { + // The read could fail for many different reasons when reading + // from a non-memory resident blob part (e.g. file-backed blob). + // The error details the system error code. + const error = lazyDOMException('The blob could not be read', 'NotReadableError'); + reject(error); + } + }) return promise; } diff --git a/src/node_blob.cc b/src/node_blob.cc index d5475de36d7738..574a38fe324be0 100644 --- a/src/node_blob.cc +++ b/src/node_blob.cc @@ -291,6 +291,7 @@ Local Blob::Reader::GetConstructorTemplate(Environment* env) { BaseObject::kInternalFieldCount); tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "BlobReader")); SetProtoMethod(env->isolate(), tmpl, "pull", Pull); + SetProtoMethod(env->isolate(), tmpl, "pullAll", PullAll); env->set_blob_reader_constructor_template(tmpl); } return tmpl; @@ -379,6 +380,95 @@ void Blob::Reader::Pull(const FunctionCallbackInfo& args) { std::move(next), node::bob::OPTIONS_END, nullptr, 0)); } +void Blob::Reader::PullAll(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + Blob::Reader* reader; + ASSIGN_OR_RETURN_UNWRAP(&reader, args.Holder()); + + CHECK(args[0]->IsFunction()); + Local fn = args[0].As(); + CHECK(!fn->IsConstructor()); + + if (reader->eos_) { + Local arg = Int32::New(env->isolate(), bob::STATUS_EOS); + reader->MakeCallback(fn, 1, &arg); + return args.GetReturnValue().Set(bob::STATUS_EOS); + } + + struct View { + std::shared_ptr store; + size_t length; + size_t offset = 0; + }; + + struct Impl { + BaseObjectPtr reader; + Global callback; + Environment* env; + size_t total = 0; + std::vector views; + int status = 1; + }; + + Impl* impl = new Impl(); + impl->reader = BaseObjectPtr(reader); + impl->callback.Reset(env->isolate(), fn); + impl->env = env; + + auto next = [impl](int status, + const DataQueue::Vec* vecs, + size_t count, + bob::Done doneCb) mutable { + Environment* env = impl->env; + if (status == bob::STATUS_EOS) impl->reader->eos_ = true; + + if (count > 0) { + // Copy the returns vectors into a single ArrayBuffer. + size_t total = 0; + for (size_t n = 0; n < count; n++) total += vecs[n].len; + + std::shared_ptr store = + v8::ArrayBuffer::NewBackingStore(env->isolate(), total); + auto ptr = static_cast(store->Data()); + for (size_t n = 0; n < count; n++) { + std::copy(vecs[n].base, vecs[n].base + vecs[n].len, ptr); + ptr += vecs[n].len; + } + // Since we copied the data buffers, signal that we're done with them. + std::move(doneCb)(0); + impl->views.push_back(View{store, total}); + impl->total += total; + } + + impl->status = status; + return; + }; + + while (impl->status > 0) { + impl->reader->inner_->Pull( + next, node::bob::OPTIONS_END, nullptr, 0); + }; + + std::shared_ptr store = ArrayBuffer::NewBackingStore( + env->isolate(), impl->total); + auto ptr = static_cast(store->Data()); + for (size_t n = 0; n < impl->views.size(); n++) { + uint8_t* from = + static_cast(impl->views[n].store->Data()) + impl->views[n].offset; + std::copy(from, from + impl->views[n].length, ptr); + ptr += impl->views[n].length; + } + + Local argv[2] = { + Int32::New(env->isolate(), impl->status), + ArrayBuffer::New(env->isolate(), store), + }; + + impl->reader->MakeCallback(fn, arraysize(argv), argv); + auto dropMe = std::unique_ptr(impl); + args.GetReturnValue().Set(impl->status); +} + BaseObjectPtr Blob::BlobTransferData::Deserialize( Environment* env, @@ -560,6 +650,7 @@ void Blob::RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(Blob::GetDataObject); registry->Register(Blob::RevokeObjectURL); registry->Register(Blob::Reader::Pull); + registry->Register(Blob::Reader::PullAll); registry->Register(Concat); registry->Register(BlobFromFilePath); } diff --git a/src/node_blob.h b/src/node_blob.h index c601015d9af47b..105d2716997cdc 100644 --- a/src/node_blob.h +++ b/src/node_blob.h @@ -82,6 +82,7 @@ class Blob : public BaseObject { static BaseObjectPtr Create(Environment* env, BaseObjectPtr blob); static void Pull(const v8::FunctionCallbackInfo& args); + static void PullAll(const v8::FunctionCallbackInfo& args); explicit Reader(Environment* env, v8::Local obj,