Skip to content

Commit

Permalink
EW-7618: Replace KJ_IF_MAYBE with KJ_IF_SOME in queue.{h,c++}
Browse files Browse the repository at this point in the history
Test: bazel test //...
  • Loading branch information
ohodson committed Sep 18, 2023
1 parent 2453f95 commit 2d8be47
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 73 deletions.
60 changes: 30 additions & 30 deletions src/workerd/api/streams/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ void ValueQueue::handleRead(
// resolved either as soon as there is data available or the consumer closes
// or errors.
state.readRequests.push_back(kj::mv(request));
KJ_IF_MAYBE(listener, consumer.stateListener) {
listener->onConsumerWantsData(js);
KJ_IF_SOME(listener, consumer.stateListener) {
listener.onConsumerWantsData(js);
}
}
}
Expand Down Expand Up @@ -238,8 +238,8 @@ void ValueQueue::visitForGc(jsg::GcVisitor& visitor) {}

namespace {
void maybeInvalidateByobRequest(kj::Maybe<ByteQueue::ByobRequest&>& req) {
KJ_IF_MAYBE(byobRequest, req) {
byobRequest->invalidate();
KJ_IF_SOME(byobRequest, req) {
byobRequest.invalidate();
// The call to byobRequest->invalidate() should have cleared the reference.
KJ_ASSERT(req == nullptr);
}
Expand Down Expand Up @@ -391,8 +391,8 @@ ByteQueue::ByobRequest::~ByobRequest() noexcept(false) {
}

void ByteQueue::ByobRequest::invalidate() {
KJ_IF_MAYBE(req, request) {
req->byobReadRequest = nullptr;
KJ_IF_SOME(req, request) {
req.byobReadRequest = nullptr;
request = nullptr;
}
}
Expand Down Expand Up @@ -498,17 +498,17 @@ bool ByteQueue::ByobRequest::respondWithNewView(jsg::Lock& js, jsg::BufferSource
}

size_t ByteQueue::ByobRequest::getAtLeast() const {
KJ_IF_MAYBE(req, request) {
return req->pullInto.atLeast;
KJ_IF_SOME(req, request) {
return req.pullInto.atLeast;
}
return 0;
}

v8::Local<v8::Uint8Array> ByteQueue::ByobRequest::getView(jsg::Lock& js) {
KJ_IF_MAYBE(req, request) {
return req->pullInto.store.getTypedViewSlice<v8::Uint8Array>(
req->pullInto.filled,
req->pullInto.store.size()
KJ_IF_SOME(req, request) {
return req.pullInto.store.getTypedViewSlice<v8::Uint8Array>(
req.pullInto.filled,
req.pullInto.store.size()
).createHandle(js).As<v8::Uint8Array>();
}
return v8::Local<v8::Uint8Array>();
Expand All @@ -519,11 +519,11 @@ v8::Local<v8::Uint8Array> ByteQueue::ByobRequest::getView(jsg::Lock& js) {
ByteQueue::ByteQueue(size_t highWaterMark) : impl(highWaterMark) {}

void ByteQueue::close(jsg::Lock& js) {
KJ_IF_MAYBE(ready, impl.state.tryGet<ByteQueue::QueueImpl::Ready>()) {
while (!ready->pendingByobReadRequests.empty()) {
auto& req = ready->pendingByobReadRequests.front();
KJ_IF_SOME(ready, impl.state.tryGet<ByteQueue::QueueImpl::Ready>()) {
while (!ready.pendingByobReadRequests.empty()) {
auto& req = ready.pendingByobReadRequests.front();
req->invalidate();
ready->pendingByobReadRequests.pop_front();
ready.pendingByobReadRequests.pop_front();
}
}
impl.close(js);
Expand All @@ -536,14 +536,14 @@ void ByteQueue::error(jsg::Lock& js, jsg::Value reason) {
}

void ByteQueue::maybeUpdateBackpressure() {
KJ_IF_MAYBE(state, impl.getState()) {
KJ_IF_SOME(state, impl.getState()) {
// Invalidated byob read requests will accumulate if we do not take
// take of them from time to time since. Since maybeUpdateBackpressure
// is going to be called regularly while the queue is actively in use,
// this is as good a place to clean them out as any.
auto pivot KJ_UNUSED = std::remove_if(
state->pendingByobReadRequests.begin(),
state->pendingByobReadRequests.end(),
state.pendingByobReadRequests.begin(),
state.pendingByobReadRequests.end(),
[](auto& item) {
return item->isInvalidated();
});
Expand Down Expand Up @@ -721,13 +721,13 @@ void ByteQueue::handleRead(
// state.readRequests to create the associated ByobRequest.
// If the queue state is nullptr here, it means the queue has already
// been closed.
KJ_IF_MAYBE(queueState, queue.getState()) {
queueState->pendingByobReadRequests.push_back(
KJ_IF_SOME(queueState, queue.getState()) {
queueState.pendingByobReadRequests.push_back(
state.readRequests.back().makeByobReadRequest(consumer, queue));
}
}
KJ_IF_MAYBE(listener, consumer.stateListener) {
listener->onConsumerWantsData(js);
KJ_IF_SOME(listener, consumer.stateListener) {
listener.onConsumerWantsData(js);
}
};

Expand Down Expand Up @@ -1001,10 +1001,10 @@ bool ByteQueue::handleMaybeClose(
}

kj::Maybe<kj::Own<ByteQueue::ByobRequest>> ByteQueue::nextPendingByobReadRequest() {
KJ_IF_MAYBE(state, impl.getState()) {
while (!state->pendingByobReadRequests.empty()) {
auto request = kj::mv(state->pendingByobReadRequests.front());
state->pendingByobReadRequests.pop_front();
KJ_IF_SOME(state, impl.getState()) {
while (!state.pendingByobReadRequests.empty()) {
auto request = kj::mv(state.pendingByobReadRequests.front());
state.pendingByobReadRequests.pop_front();
if (!request->isInvalidated()) {
return kj::mv(request);
}
Expand All @@ -1014,9 +1014,9 @@ kj::Maybe<kj::Own<ByteQueue::ByobRequest>> ByteQueue::nextPendingByobReadRequest
}

bool ByteQueue::hasPartiallyFulfilledRead() {
KJ_IF_MAYBE(state, impl.getState()) {
if (!state->pendingByobReadRequests.empty()) {
auto& pending = state->pendingByobReadRequests.front();
KJ_IF_SOME(state, impl.getState()) {
if (!state.pendingByobReadRequests.empty()) {
auto& pending = state.pendingByobReadRequests.front();
if (pending->isPartiallyFulfilled()) {
return true;
}
Expand Down
86 changes: 43 additions & 43 deletions src/workerd/api/streams/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ class QueueImpl final {
// Closes the queue. The close is forwarded on to all consumers.
// If we are already closed or errored, do nothing here.
void close(jsg::Lock& js) {
KJ_IF_MAYBE(ready, state.template tryGet<Ready>()) {
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
// We copy the list of consumers in case the consumers remove themselves
// from the queue during the close callback, invalidating the iterator.
auto consumers = ready->consumers;
auto consumers = ready.consumers;
for (auto consumer : consumers) {
consumer->close(js);
}
Expand All @@ -183,10 +183,10 @@ class QueueImpl final {
// all pending consume promises.
// If we are already closed or errored, do nothing here.
void error(jsg::Lock& js, jsg::Value reason) {
KJ_IF_MAYBE(ready, state.template tryGet<Ready>()) {
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
// We copy the list of consumers in case the consumers remove themselves
// from the queue during the error callback, invalidating the iterator.
auto consumers = ready->consumers;
auto consumers = ready.consumers;
for (auto consumer : consumers) {
consumer->error(js, reason.addRef(js));
}
Expand All @@ -199,8 +199,8 @@ class QueueImpl final {
// If we are already closed or errored, set totalQueueSize to zero.
void maybeUpdateBackpressure() {
totalQueueSize = 0;
KJ_IF_MAYBE(ready, state.template tryGet<Ready>()) {
for (auto consumer : ready->consumers) {
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
for (auto consumer : ready.consumers) {
totalQueueSize = kj::max(totalQueueSize, consumer->size());
}
}
Expand All @@ -218,8 +218,8 @@ class QueueImpl final {
"The queue is closed or errored.");

for (auto consumer : ready.consumers) {
KJ_IF_MAYBE(skip, skipConsumer) {
if (&(*skip) == consumer) {
KJ_IF_SOME(skip, skipConsumer) {
if (&skip == consumer) {
continue;
}
}
Expand Down Expand Up @@ -257,10 +257,10 @@ class QueueImpl final {
// Specific queue implementations may provide additional state that is attached
// to the Ready struct.
kj::Maybe<State&> getState() KJ_LIFETIMEBOUND {
KJ_IF_MAYBE(ready, state.template tryGet<Ready>()) {
return *ready;
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
return ready;
}
return nullptr;
return kj::none;
}

private:
Expand All @@ -276,14 +276,14 @@ class QueueImpl final {
kj::OneOf<Ready, Closed, Errored> state = Ready();

void addConsumer(ConsumerImpl* consumer) {
KJ_IF_MAYBE(ready, state.template tryGet<Ready>()) {
ready->consumers.insert(consumer);
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
ready.consumers.insert(consumer);
}
}

void removeConsumer(ConsumerImpl* consumer) {
KJ_IF_MAYBE(ready, state.template tryGet<Ready>()) {
ready->consumers.erase(consumer);
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
ready.consumers.erase(consumer);
maybeUpdateBackpressure();
}
}
Expand Down Expand Up @@ -320,7 +320,7 @@ class ConsumerImpl final {
using Entry = typename Self::Entry;
using QueueEntry = typename Self::QueueEntry;

ConsumerImpl(QueueImpl& queue, kj::Maybe<ConsumerImpl::StateListener&> stateListener = nullptr)
ConsumerImpl(QueueImpl& queue, kj::Maybe<ConsumerImpl::StateListener&> stateListener = kj::none)
: queue(queue), stateListener(stateListener) {
queue.addConsumer(this);
}
Expand Down Expand Up @@ -349,10 +349,10 @@ class ConsumerImpl final {

void close(jsg::Lock& js) {
// If we are already closed or errored, then we do nothing here.
KJ_IF_MAYBE(ready, state.template tryGet<Ready>()) {
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
// If we are not already closing, enqueue a Close sentinel.
if (!isClosing()) {
ready->buffer.push_back(Close {});
ready.buffer.push_back(Close {});
}

// Then check to see if we need to drain pending reads and
Expand All @@ -366,7 +366,8 @@ class ConsumerImpl final {
void error(jsg::Lock& js, jsg::Value reason) {
// If we are already closed or errored, then we do nothing here.
// The new error doesn't matter.
KJ_IF_MAYBE(ready, state.template tryGet<Ready>()) {
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
(void)ready; // Unused
maybeDrainAndSetState(js, kj::mv(reason));
}
}
Expand Down Expand Up @@ -402,10 +403,10 @@ class ConsumerImpl final {
}

void reset() {
KJ_IF_MAYBE(ready, state.template tryGet<Ready>()) {
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
UpdateBackpressureScope scope(queue);
ready->buffer.clear();
ready->queueTotalSize = 0;
ready.buffer.clear();
ready.queueTotalSize = 0;
}
}

Expand Down Expand Up @@ -524,28 +525,28 @@ class ConsumerImpl final {
KJ_UNREACHABLE;
}

void maybeDrainAndSetState(jsg::Lock& js, kj::Maybe<jsg::Value> maybeReason = nullptr) {
void maybeDrainAndSetState(jsg::Lock& js, kj::Maybe<jsg::Value> maybeReason = kj::none) {
// If the state is already errored or closed then there is nothing to drain.
KJ_IF_MAYBE(ready, state.template tryGet<Ready>()) {
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
UpdateBackpressureScope scope(queue);
KJ_IF_MAYBE(reason, maybeReason) {
KJ_IF_SOME(reason, maybeReason) {
// If maybeReason != nullptr, then we are draining because of an error.
// In that case, we want to reset/clear the buffer and reject any remaining
// pending read requests using the given reason.
for (auto& request : ready->readRequests) {
request.reject(js, *reason);
for (auto& request : ready.readRequests) {
request.reject(js, reason);
}
state = reason->addRef(js);
KJ_IF_MAYBE(listener, stateListener) {
listener->onConsumerError(js, kj::mv(*reason));
state = reason.addRef(js);
KJ_IF_SOME(listener, stateListener) {
listener.onConsumerError(js, kj::mv(reason));
// After this point, we should not assume that this consumer can
// be safely used at all. It's most likely the stateListener has
// released it.
}
} else {
// Otherwise, if isClosing() is true...
if (isClosing()) {
if (!empty() && !Self::handleMaybeClose(js, *ready, *this, queue)) {
if (!empty() && !Self::handleMaybeClose(js, ready, *this, queue)) {
// If the queue is not empty, we'll have the implementation see
// if it can drain the remaining data into pending reads. If handleMaybeClose
// returns false, then it could not and we can't yet close. If it returns true,
Expand All @@ -555,13 +556,13 @@ class ConsumerImpl final {
}

KJ_ASSERT(empty());
KJ_REQUIRE(ready->buffer.size() == 1); // The close should be the only item remaining.
for (auto& request : ready->readRequests) {
KJ_REQUIRE(ready.buffer.size() == 1); // The close should be the only item remaining.
for (auto& request : ready.readRequests) {
request.resolveAsDone(js);
}
state.template init<Closed>();
KJ_IF_MAYBE(listener, stateListener) {
listener->onConsumerClose(js);
KJ_IF_SOME(listener, stateListener) {
listener.onConsumerClose(js);
// After this point, we should not assume that this consumer can
// be safely used at all. It's most likely the stateListener has
// released it.
Expand Down Expand Up @@ -620,8 +621,8 @@ class ValueQueue final {

class Consumer final {
public:
Consumer(ValueQueue& queue, kj::Maybe<ConsumerImpl::StateListener&> stateListener = nullptr);
Consumer(QueueImpl& queue, kj::Maybe<ConsumerImpl::StateListener&> stateListener = nullptr);
Consumer(ValueQueue& queue, kj::Maybe<ConsumerImpl::StateListener&> stateListener = kj::none);
Consumer(QueueImpl& queue, kj::Maybe<ConsumerImpl::StateListener&> stateListener = kj::none);
Consumer(Consumer&&) = delete;
Consumer(Consumer&) = delete;
Consumer& operator=(Consumer&&) = delete;
Expand All @@ -644,7 +645,7 @@ class ValueQueue final {
size_t size();

kj::Own<Consumer> clone(jsg::Lock& js,
kj::Maybe<ConsumerImpl::StateListener&> stateListener = nullptr);
kj::Maybe<ConsumerImpl::StateListener&> stateListener = kj::none);

bool hasReadRequests();

Expand Down Expand Up @@ -766,7 +767,7 @@ class ByteQueue final {
// The term "invalidate" is adopted from the streams spec for handling BYOB requests.
void invalidate();

inline bool isInvalidated() const { return request == nullptr; }
inline bool isInvalidated() const { return request == kj::none; }

bool isPartiallyFulfilled();

Expand Down Expand Up @@ -811,8 +812,8 @@ class ByteQueue final {

class Consumer {
public:
Consumer(ByteQueue& queue, kj::Maybe<ConsumerImpl::StateListener&> stateListener = nullptr);
Consumer(QueueImpl& queue, kj::Maybe<ConsumerImpl::StateListener&> stateListener = nullptr);
Consumer(ByteQueue& queue, kj::Maybe<ConsumerImpl::StateListener&> stateListener = kj::none);
Consumer(QueueImpl& queue, kj::Maybe<ConsumerImpl::StateListener&> stateListener = kj::none);
Consumer(Consumer&&) = delete;
Consumer(Consumer&) = delete;
Consumer& operator=(Consumer&&) = delete;
Expand All @@ -835,8 +836,7 @@ class ByteQueue final {
size_t size() const;

kj::Own<Consumer> clone(jsg::Lock& js,
kj::Maybe<ConsumerImpl::StateListener&> stateListener = nullptr);

kj::Maybe<ConsumerImpl::StateListener&> stateListener = kj::none);
bool hasReadRequests();

void visitForGc(jsg::GcVisitor& visitor);
Expand Down

0 comments on commit 2d8be47

Please sign in to comment.