Skip to content

Commit

Permalink
Improved support for readAtLeast byob reads
Browse files Browse the repository at this point in the history
* Allows multiple respond/respondWithNewView to partially resolve
  readAtLeast byob reads.
* Fix IdentityTransformStream minBytes calculation
* Properly calculate atLeast with multi-byte views
  • Loading branch information
jasnell committed Oct 25, 2022
1 parent 2305361 commit 562ee01
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 32 deletions.
1 change: 0 additions & 1 deletion src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1656,7 +1656,6 @@ kj::Promise<size_t> IdentityTransformStreamImpl::tryRead(

total += amount;
buffer = reinterpret_cast<char*>(buffer) + amount;
minBytes -= kj::min(minBytes, amount);
maxBytes -= amount;
}

Expand Down
26 changes: 20 additions & 6 deletions src/workerd/api/streams/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ void ByteQueue::ByobRequest::invalidate() {
}
}

void ByteQueue::ByobRequest::respond(jsg::Lock& js, size_t amount) {
bool ByteQueue::ByobRequest::respond(jsg::Lock& js, size_t amount) {
// So what happens here? The read request has been fulfilled directly by writing
// into the storage buffer of the request. Unfortunately, this will only resolve
// the data for the one consumer from which the request was received. We have to
Expand All @@ -375,10 +375,10 @@ void ByteQueue::ByobRequest::respond(jsg::Lock& js, size_t amount) {
// other consumers of the queue.
auto entry = kj::refcounted<Entry>(jsg::BackingStore::alloc(js, amount));

auto start = sourcePtr.begin() + req.pullInto.filled;

// Safely copy the data over into the entry.
std::copy(sourcePtr.begin(),
sourcePtr.begin() + amount,
entry->toArrayPtr().begin());
std::copy(start, start + amount, entry->toArrayPtr().begin());

// Push the entry into the other consumers.
queue.push(js, kj::mv(entry), consumer);
Expand All @@ -390,6 +390,18 @@ void ByteQueue::ByobRequest::respond(jsg::Lock& js, size_t amount) {
// up by the next read.
req.pullInto.filled += amount;

if (amount < req.pullInto.atLeast) {
// The response has not yet met the minimal requirement of this byob read.
// In this case, we do not want to resolve the read yet, and we do not
// want the byob request to be invalidated. We don't need to worry about
// unaligned bytes yet. We're just going to return false to tell the caller
// not to invalidate and to update the view over this store.

// We do want to decrease the atLeast by the amount of bytes we received.
req.pullInto.atLeast -= amount;
return false;
}

// There is no need to adjust the pullInto.atLeast here because we are resolving
// the read immediately.

Expand All @@ -406,9 +418,11 @@ void ByteQueue::ByobRequest::respond(jsg::Lock& js, size_t amount) {
std::copy(start, start + unaligned, excess->toArrayPtr().begin());
consumer.push(js, kj::mv(excess));
}

return true;
}

void ByteQueue::ByobRequest::respondWithNewView(jsg::Lock& js, jsg::BufferSource view) {
bool ByteQueue::ByobRequest::respondWithNewView(jsg::Lock& js, jsg::BufferSource view) {
// The idea here is that rather than filling the view that the controller was given,
// it chose to create it's own view and fill that, likely over the same ArrayBuffer.
// What we do here is perform some basic validations on what we were given, and if
Expand All @@ -429,7 +443,7 @@ void ByteQueue::ByobRequest::respondWithNewView(jsg::Lock& js, jsg::BufferSource
"The view is not the correct length.");

req.pullInto.store = view.detach(js);
respond(js, amount);
return respond(js, amount);
}

size_t ByteQueue::ByobRequest::getAtLeast() const {
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/streams/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -771,9 +771,9 @@ class ByteQueue final {

ReadRequest& getRequest() { return KJ_ASSERT_NONNULL(request); }

void respond(jsg::Lock& js, size_t amount);
bool respond(jsg::Lock& js, size_t amount);

void respondWithNewView(jsg::Lock& js, jsg::BufferSource view);
bool respondWithNewView(jsg::Lock& js, jsg::BufferSource view);

void invalidate();
// Disconnects this ByobRequest instance from the associated ByteQueue::ReadRequest.
Expand Down
8 changes: 2 additions & 6 deletions src/workerd/api/streams/readable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,8 @@ jsg::Promise<ReadResult> ReaderImpl::read(
") exceeds size of buffer (", options->byteLength, ").")));
}

// TODO(conform): This should really be defined as the number of elements of the provided
// buffer view rather than a byte length for consistency with read().
// TODO(soon): This check is not valid for JS controllers, which support this no problem.
if (!options->bufferView.getHandle(js)->IsUint8Array()) {
KJ_LOG(WARNING, "Reading into non-Uint8Array isn't currently supported.");
}
jsg::BufferSource source(js, options->bufferView.getHandle(js));
options->atLeast = atLeast * source.getElementSize();
}

return KJ_ASSERT_NONNULL(stream->getController().read(js, kj::mv(byobOptions)));
Expand Down
62 changes: 45 additions & 17 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,11 @@ kj::Own<ValueQueue::Consumer> ReadableStreamDefaultController::getConsumer(

// ======================================================================================

void ReadableStreamBYOBRequest::Impl::updateView(jsg::Lock& js) {
view.getHandle(js)->Buffer()->Detach();
view = js.v8Ref(readRequest->getView(js));
}

void ReadableStreamBYOBRequest::visitForGc(jsg::GcVisitor& visitor) {
KJ_IF_MAYBE(impl, maybeImpl) {
visitor.visit(impl->view, impl->controller);
Expand Down Expand Up @@ -1519,14 +1524,15 @@ void ReadableStreamBYOBRequest::respond(jsg::Lock& js, int bytesWritten) {
"This ReadableStreamBYOBRequest has been invalidated.");
JSG_REQUIRE(impl.view.getHandle(js)->ByteLength() > 0, TypeError,
"Cannot respond with a zero-length or detached view");
bool pull = false;
if (!impl.controller->canCloseOrEnqueue()) {
JSG_REQUIRE(bytesWritten == 0,
TypeError,
"The bytesWritten must be zero after the stream is closed.");
KJ_ASSERT(impl.readRequest->isInvalidated());
invalidate(js);
} else {
if (impl.readRequest->isInvalidated()) {
bool shouldInvalidate = false;
if (impl.readRequest->isInvalidated() && impl.controller->impl.consumerCount() >= 1) {
// While this particular request may be invalidated, there are still
// other branches we can push the data to. Let's do so.
jsg::BufferSource source(js, impl.view.getHandle(js));
Expand All @@ -1536,37 +1542,59 @@ void ReadableStreamBYOBRequest::respond(jsg::Lock& js, int bytesWritten) {
JSG_REQUIRE(bytesWritten > 0,
TypeError,
"The bytesWritten must be more than zero while the stream is open.");
impl.readRequest->respond(js, bytesWritten);
if (impl.readRequest->respond(js, bytesWritten)) {
// The read request was fulfilled, we need to invalidate.
shouldInvalidate = true;
} else {
// The response did not fulfill the minimum requirements of the read.
// We do not want to invalidate the read request and we need to update the
// view so that on the next read the view will be properly adjusted.
impl.updateView(js);
}
}
pull = true;
}

KJ_DEFER(invalidate(js));
if (pull) {
impl.controller->pull(js);
if (shouldInvalidate) {
invalidate(js);
}
}
}

void ReadableStreamBYOBRequest::respondWithNewView(jsg::Lock& js, jsg::BufferSource view) {
auto& impl = JSG_REQUIRE_NONNULL(maybeImpl,
TypeError,
"This ReadableStreamBYOBRequest has been invalidated.");
bool pull = false;
if (!impl.controller->canCloseOrEnqueue()) {
JSG_REQUIRE(view.size() == 0,
TypeError,
"The view byte length must be zero after the stream is closed.");
KJ_ASSERT(impl.readRequest->isInvalidated());
invalidate(js);
} else {
JSG_REQUIRE(view.size() > 0,
TypeError,
"The view byte length must be more than zero while the stream is open.");
impl.readRequest->respondWithNewView(js, kj::mv(view));
pull = true;
}
bool shouldInvalidate = false;
if (impl.readRequest->isInvalidated()) {
// While this particular request may be invalidated, there are still
// other branches we can push the data to. Let's do so.
auto entry = kj::refcounted<ByteQueue::Entry>(view.detach(js));
impl.controller->impl.enqueue(js, kj::mv(entry), impl.controller.addRef());
} else {
JSG_REQUIRE(view.size() > 0,
TypeError,
"The view byte length must be more than zero while the stream is open.");
if (impl.readRequest->respondWithNewView(js, kj::mv(view))) {
// The read request was fulfilled, we need to invalidate.
shouldInvalidate = true;
} else {
// The response did not fulfill the minimum requirements of the read.
// We do not want to invalidate the read request and we need to update the
// view so that on the next read the view will be properly adjusted.
impl.updateView(js);
}
}

KJ_DEFER(invalidate(js));
if (pull) {
impl.controller->pull(js);
if (shouldInvalidate) {
invalidate(js);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/workerd/api/streams/standard.h
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,8 @@ class ReadableStreamBYOBRequest: public jsg::Object {
: readRequest(kj::mv(readRequest)),
controller(kj::mv(controller)),
view(js.v8Ref(this->readRequest->getView(js))) {}

void updateView(jsg::Lock& js);
};

kj::Maybe<Impl> maybeImpl;
Expand Down

0 comments on commit 562ee01

Please sign in to comment.