diff --git a/sdk/lib/_internal/vm/bin/socket_patch.dart b/sdk/lib/_internal/vm/bin/socket_patch.dart index 6c762bf99528..18c3c86cd167 100644 --- a/sdk/lib/_internal/vm/bin/socket_patch.dart +++ b/sdk/lib/_internal/vm/bin/socket_patch.dart @@ -1623,21 +1623,29 @@ base class _NativeSocket extends _NativeSocketNativeWrapper void issue() { readEventIssued = false; if (isClosing) return; + // Note: it is by design that we don't deliver closedRead event + // unless read events are enabled. This also means we will not + // fully close (and dispose) of the socket unless it is drained + // of accumulated incomming data. if (!sendReadEvents) return; if (stopRead()) { if (isClosedRead && !closedReadEventSent) { if (isClosedWrite) close(); + var handler = closedEventHandler; if (handler == null) return; + closedReadEventSent = true; handler(); } return; } + var handler = readEventHandler; if (handler == null) return; - readEventIssued = true; handler(); + + readEventIssued = true; scheduleMicrotask(issue); } @@ -1846,6 +1854,9 @@ base class _NativeSocket extends _NativeSocketNativeWrapper sendToEventHandler(1 << shutdownReadCommand); } isClosedRead = true; + // Make sure to dispatch a closedRead event. Shutdown is only complete + // once the socket is drained of data and readClosed is dispatched. + issueReadEvent(); } } diff --git a/sdk/lib/io/socket.dart b/sdk/lib/io/socket.dart index 506117e7950e..d988b323c43e 100644 --- a/sdk/lib/io/socket.dart +++ b/sdk/lib/io/socket.dart @@ -619,7 +619,12 @@ final class ConnectionTask { /// ([RawSocketEvent.closed]). abstract interface class RawSocket implements Stream { /// Set or get, if the [RawSocket] should listen for [RawSocketEvent.read] - /// events. Default is `true`. + /// and [RawSocketEvent.readClosed] events. Default is `true`. + /// + /// Warning: setting [readEventsEnabled] to `false` might prevent socket + /// from fully closing when [SocketDirection.receive] and + /// [SocketDirection.send] directions are shutdown independently. See + /// [shutdown] for more details. abstract bool readEventsEnabled; /// Set or get, if the [RawSocket] should listen for [RawSocketEvent.write] @@ -636,8 +641,8 @@ abstract interface class RawSocket implements Stream { /// The [host] can either be a [String] or an [InternetAddress]. If [host] is a /// [String], [connect] will perform a [InternetAddress.lookup] and try /// all returned [InternetAddress]es, until connected. If IPv4 and IPv6 - /// addresses are both availble then connections over IPv4 are preferred. If - /// no connection can be establed then the error from the first failing + /// addresses are both available then connections over IPv4 are preferred. If + /// no connection can be established then the error from the first failing /// connection is returned. /// /// The argument [sourceAddress] can be used to specify the local @@ -788,6 +793,17 @@ abstract interface class RawSocket implements Stream { /// and calling it several times is supported. Calling /// shutdown with either [SocketDirection.both] or [SocketDirection.receive] /// can result in a [RawSocketEvent.readClosed] event. + /// + /// Warning: [SocketDirection.receive] direction is only considered to be + /// to be fully shutdown once all available data is drained and + /// [RawSocketEvent.readClosed] is dispatched. Shutting down + /// [SocketDirection.receive] and [SocketDirection.send] directions separately + /// without draining the data will lead to socket staying around until the + /// data is drained. This can happen if [readEventsEnabled] is set + /// to `false` or if received data is not [read] in response to these + /// events. This does not apply to shutting down both directions + /// simultaneously using [SocketDirection.both] which will discard all + /// received data instead. void shutdown(SocketDirection direction); /// Customize the [RawSocket]. diff --git a/tests/standalone/io/issue_22636_test.dart b/tests/standalone/io/issue_22636_test.dart index 85682d79314d..4cfcac479682 100644 --- a/tests/standalone/io/issue_22636_test.dart +++ b/tests/standalone/io/issue_22636_test.dart @@ -8,7 +8,9 @@ import "dart:async"; import "dart:io"; + import "package:expect/expect.dart"; +import "package:expect/async_helper.dart"; final Duration delay = new Duration(milliseconds: 100); final List data = new List.generate(100, (i) => i % 20 + 65); @@ -28,10 +30,11 @@ void serverListen(RawSocket serverSide) { serverSide.writeEventsEnabled = true; }); } else { - new Future.delayed(delay, () { + new Future.delayed(delay, () async { Expect.isTrue(serverReadClosedReceived); serverSide.shutdown(SocketDirection.send); - server.close(); + await server.close(); + asyncEnd(); }); } break; @@ -58,5 +61,6 @@ Future test() async { } void main() { + asyncStart(); test(); } diff --git a/tests/standalone/io/issue_27414_test.dart b/tests/standalone/io/issue_27414_test.dart new file mode 100644 index 000000000000..31c6558d2047 --- /dev/null +++ b/tests/standalone/io/issue_27414_test.dart @@ -0,0 +1,74 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. +// +// This test verifies that shuting down receive and send directions separately +// on a socket correctly shuts the socket down instead of leaking it. + +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; + +import 'package:expect/expect.dart'; +import 'package:expect/async_helper.dart'; + +const messageContent = "hello, from the client!"; +late RawServerSocket server; +late StreamSubscription clientSubscription; + +void handleConnection(RawSocket serverSide) { + var readClosedReceived = false; + + void serveData(RawSocketEvent event) async { + switch (event) { + case RawSocketEvent.read: + final data = serverSide.read(); + Expect.equals(messageContent, utf8.decode(data!)); + + // There might be a read event in flight, wait for microtasks to drain + // and then shutdown read and write directions separately. This + // should cause [readClosed] to be dispatched. + Future.delayed(Duration(milliseconds: 0), () { + serverSide.shutdown(SocketDirection.receive); + serverSide.shutdown(SocketDirection.send); + }); + break; + + case RawSocketEvent.readClosed: + Expect.isFalse(readClosedReceived); + readClosedReceived = true; + break; + + case RawSocketEvent.closed: + Expect.isTrue(readClosedReceived); + await clientSubscription.cancel(); + await server.close(); + asyncEnd(); + break; + } + } + + serverSide.listen(serveData); +} + +Future test() async { + server = await RawServerSocket.bind(InternetAddress.loopbackIPv4, 0); + server.listen(handleConnection); + + final client = await RawSocket.connect( + InternetAddress.loopbackIPv4, + server.port, + ); + clientSubscription = client.listen((RawSocketEvent event) { + switch (event) { + case RawSocketEvent.write: + client.write(utf8.encode(messageContent)); + break; + } + }); +} + +void main() { + asyncStart(); + test(); +}