Skip to content

Commit

Permalink
Fixup ref cycle in streams PipeLocks (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell authored Oct 17, 2022
1 parent 2fd13d8 commit 4ef7d59
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
13 changes: 6 additions & 7 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ kj::Maybe<jsg::Promise<void>> WritableStreamInternalController::tryPipeFrom(
KJ_ASSERT_NONNULL(source->getController().tryPipeLock(KJ_ASSERT_NONNULL(owner).addRef()));

// Let's also acquire the destination pipe lock.
writeState = PipeLocked{ kj::mv(source) };
writeState = PipeLocked{ *source };

// If the source has errored, the spec requires us to reject the pipe promise and, if preventAbort
// is false, error the destination (Propagate error forward). The errored source will be unlocked
Expand Down Expand Up @@ -1300,9 +1300,10 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
// Note that preventClose (below) means "don't close the writable side", i.e. don't
// call end().
request.source.close();
auto preventClose = request.preventClose;
queue.pop_front();

if (!request.preventClose) {
if (!preventClose) {
// Note: unlike a real Close request, it's not possible for us to have been aborted.
return close(js, true);
} else {
Expand All @@ -1314,7 +1315,6 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
auto handle = reason.getHandle(js);
auto& request = check();
maybeRejectPromise<void>(request.promise, handle);
queue.pop_front();
// TODO(conform): Remember all those checks we performed in ReadableStream::pipeTo()?
// We're supposed to perform the same checks continually, e.g., errored writes should
// cancel the readable side unless preventCancel is truthy... This would require
Expand All @@ -1323,6 +1323,7 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
// side, rather than close (cancel) it, which is what the spec would have us do.
// TODO(now): Warn on the console about this.
request.source.error(js, handle);
queue.pop_front();
if (!preventAbort) {
return abort(js, handle);
}
Expand All @@ -1335,6 +1336,7 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
return handlePromise(js, ioContext.awaitIo(
AbortSignal::maybeCancelWrap(request.maybeSignal, kj::mv(*promise))));
}

// The ReadableStream is JavaScript-backed. We can still pipe the data but it's going to be
// a bit slower because we will be relying on JavaScript promises when reading the data
// from the ReadableStream, then waiting on kj::Promises to write the data. We will keep
Expand All @@ -1345,7 +1347,6 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
KJ_CASE_ONEOF(request, Close) {
// writeLoop() is only called with the sink in the Writable state.
auto& writable = state.get<Writable>();

auto check = makeChecker(request);

return ioContext.awaitIo(writable->end()).then(js,
Expand Down Expand Up @@ -1561,9 +1562,7 @@ void WritableStreamInternalController::visitForGc(jsg::GcVisitor& visitor) {
}
}
}
KJ_IF_MAYBE(locked, writeState.tryGet<PipeLocked>()) {
visitor.visit(locked->ref);
} else KJ_IF_MAYBE(locked, writeState.tryGet<WriterLocked>()) {
KJ_IF_MAYBE(locked, writeState.tryGet<WriterLocked>()) {
visitor.visit(*locked);
}
visitor.visit(maybePendingAbort);
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class WritableStreamInternalController: public WritableStreamController {
void finishError(jsg::Lock& js, v8::Local<v8::Value> reason);

struct PipeLocked {
jsg::Ref<ReadableStream> ref;
ReadableStream& ref;
};

kj::Maybe<WritableStream&> owner;
Expand Down

0 comments on commit 4ef7d59

Please sign in to comment.