Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Coroutines Conversion: rewriting AllReader into coroutines" #973

Merged
merged 1 commit into from
Aug 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 31 additions & 34 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ kj::Promise<void> pumpTo(ReadableStreamSource& input, WritableStreamSink& output
class AllReader {
// Modified from AllReader in kj/async-io.c++.

using PartList = kj::Array<kj::ArrayPtr<byte>>;

public:
explicit AllReader(ReadableStreamSource& input, uint64_t limit)
: input(input), limit(limit) {
Expand All @@ -59,58 +61,53 @@ public:
}

kj::Promise<kj::Array<byte>> readAllBytes() {
auto parts = co_await readParts();
auto out = kj::heapArray<byte>(runningTotal);
copyInto(out, kj::mv(parts));
co_return out;
return loop().then([this](PartList&& partPtrs) {
auto out = kj::heapArray<byte>(runningTotal);
copyInto(out, kj::mv(partPtrs));
return kj::mv(out);
});
}

kj::Promise<kj::String> readAllText() {
auto parts = co_await readParts();
auto out = kj::heapArray<char>(runningTotal + 1);
copyInto(out.slice(0, out.size() - 1).asBytes(), kj::mv(parts));
out.back() = '\0';
co_return kj::String(kj::mv(out));
return loop().then([this](PartList&& partPtrs) {
auto out = kj::heapArray<char>(runningTotal + 1);
copyInto(out.slice(0, out.size() - 1).asBytes(), kj::mv(partPtrs));
out.back() = '\0';
return kj::String(kj::mv(out));
});
}

private:
ReadableStreamSource& input;
uint64_t limit;
kj::Vector<kj::Array<kj::byte>> parts;
uint64_t runningTotal = 0;

struct Part {
kj::Array<byte> buffer;
size_t amount;
};
using PartList = kj::Vector<Part>;

kj::Promise<PartList> readParts() {
static constexpr size_t bufferSize = 4096;
PartList parts;
kj::Promise<PartList> loop() {
auto bytes = kj::heapArray<kj::byte>(4096);

while (true) {
auto buffer = kj::heapArray<kj::byte>(bufferSize);
auto amount = co_await input.tryRead(buffer.begin(), bufferSize, bufferSize);
runningTotal += amount;
JSG_REQUIRE(runningTotal < limit, TypeError, "Memory limit exceeded before EOF.");

if (amount > 0) {
Part part = { .buffer = kj::mv(buffer), .amount = amount };
parts.add(kj::mv(part));
return input.tryRead(bytes.begin(), 1, bytes.size())
.then([this, bytes = kj::mv(bytes)](size_t amount) mutable
-> kj::Promise<PartList> {
if (amount == 0) {
return KJ_MAP(p, parts) { return p.asPtr(); };
}

if (amount < bufferSize) {
co_return kj::mv(parts);
runningTotal += amount;
if (runningTotal >= limit) {
return JSG_KJ_EXCEPTION(FAILED, TypeError, "Memory limit exceeded before EOF.");
}
}
parts.add(bytes.slice(0, amount).attach(kj::mv(bytes)));
return loop();
});
}

void copyInto(kj::ArrayPtr<byte> out, PartList&& in) {
void copyInto(kj::ArrayPtr<byte> out, PartList in) {
size_t pos = 0;
for (auto& part: in) {
KJ_ASSERT(part.amount <= out.size() - pos);
memcpy(out.begin() + pos, part.buffer.begin(), part.amount);
pos += part.amount;
KJ_ASSERT(part.size() <= out.size() - pos);
memcpy(out.begin() + pos, part.begin(), part.size());
pos += part.size();
}
}
};
Expand Down