Skip to content

Commit

Permalink
src: introduce PullAll method to speed up blob.text/arrayBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
debadree25 committed Sep 26, 2023
1 parent f16f41c commit 33e2913
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 22 deletions.
33 changes: 11 additions & 22 deletions lib/internal/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -274,28 +274,17 @@ class Blob {

const { promise, resolve, reject } = createDeferredPromise();
const reader = this[kHandle].getReader();
const buffers = [];
const readNext = () => {
reader.pull((status, buffer) => {
if (status === 0) {
// EOS, concat & resolve
// buffer should be undefined here
resolve(concat(buffers));
return;
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
reject(error);
return;
}
if (buffer !== undefined)
buffers.push(buffer);
queueMicrotask(() => readNext());
});
};
readNext();
reader.pullAll((status, buffer) => {
if (status === 0) {
resolve(buffer);
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
reject(error);
}
})
return promise;
}

Expand Down
91 changes: 91 additions & 0 deletions src/node_blob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ Local<FunctionTemplate> Blob::Reader::GetConstructorTemplate(Environment* env) {
BaseObject::kInternalFieldCount);
tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "BlobReader"));
SetProtoMethod(env->isolate(), tmpl, "pull", Pull);
SetProtoMethod(env->isolate(), tmpl, "pullAll", PullAll);
env->set_blob_reader_constructor_template(tmpl);
}
return tmpl;
Expand Down Expand Up @@ -379,6 +380,95 @@ void Blob::Reader::Pull(const FunctionCallbackInfo<Value>& args) {
std::move(next), node::bob::OPTIONS_END, nullptr, 0));
}

void Blob::Reader::PullAll(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Blob::Reader* reader;
ASSIGN_OR_RETURN_UNWRAP(&reader, args.Holder());

CHECK(args[0]->IsFunction());
Local<Function> fn = args[0].As<Function>();
CHECK(!fn->IsConstructor());

if (reader->eos_) {
Local<Value> arg = Int32::New(env->isolate(), bob::STATUS_EOS);
reader->MakeCallback(fn, 1, &arg);
return args.GetReturnValue().Set(bob::STATUS_EOS);
}

struct View {
std::shared_ptr<BackingStore> store;
size_t length;
size_t offset = 0;
};

struct Impl {
BaseObjectPtr<Blob::Reader> reader;
Global<Function> callback;
Environment* env;
size_t total = 0;
std::vector<View> views;
int status = 1;
};

Impl* impl = new Impl();
impl->reader = BaseObjectPtr<Blob::Reader>(reader);
impl->callback.Reset(env->isolate(), fn);
impl->env = env;

auto next = [impl](int status,
const DataQueue::Vec* vecs,
size_t count,
bob::Done doneCb) mutable {
Environment* env = impl->env;
if (status == bob::STATUS_EOS) impl->reader->eos_ = true;

if (count > 0) {
// Copy the returns vectors into a single ArrayBuffer.
size_t total = 0;
for (size_t n = 0; n < count; n++) total += vecs[n].len;

std::shared_ptr<BackingStore> store =
v8::ArrayBuffer::NewBackingStore(env->isolate(), total);
auto ptr = static_cast<uint8_t*>(store->Data());
for (size_t n = 0; n < count; n++) {
std::copy(vecs[n].base, vecs[n].base + vecs[n].len, ptr);
ptr += vecs[n].len;
}
// Since we copied the data buffers, signal that we're done with them.
std::move(doneCb)(0);
impl->views.push_back(View{store, total});
impl->total += total;
}

impl->status = status;
return;
};

while (impl->status > 0) {
impl->reader->inner_->Pull(
next, node::bob::OPTIONS_END, nullptr, 0);
};

std::shared_ptr<BackingStore> store = ArrayBuffer::NewBackingStore(
env->isolate(), impl->total);
auto ptr = static_cast<uint8_t*>(store->Data());
for (size_t n = 0; n < impl->views.size(); n++) {
uint8_t* from =
static_cast<uint8_t*>(impl->views[n].store->Data()) + impl->views[n].offset;
std::copy(from, from + impl->views[n].length, ptr);
ptr += impl->views[n].length;
}

Local<Value> argv[2] = {
Int32::New(env->isolate(), impl->status),
ArrayBuffer::New(env->isolate(), store),
};

impl->reader->MakeCallback(fn, arraysize(argv), argv);
auto dropMe = std::unique_ptr<Impl>(impl);
args.GetReturnValue().Set(impl->status);
}

BaseObjectPtr<BaseObject>
Blob::BlobTransferData::Deserialize(
Environment* env,
Expand Down Expand Up @@ -560,6 +650,7 @@ void Blob::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Blob::GetDataObject);
registry->Register(Blob::RevokeObjectURL);
registry->Register(Blob::Reader::Pull);
registry->Register(Blob::Reader::PullAll);
registry->Register(Concat);
registry->Register(BlobFromFilePath);
}
Expand Down
1 change: 1 addition & 0 deletions src/node_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class Blob : public BaseObject {
static BaseObjectPtr<Reader> Create(Environment* env,
BaseObjectPtr<Blob> blob);
static void Pull(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PullAll(const v8::FunctionCallbackInfo<v8::Value>& args);

explicit Reader(Environment* env,
v8::Local<v8::Object> obj,
Expand Down

0 comments on commit 33e2913

Please sign in to comment.