From a1879d4afd99dfa6955caf3e62b1572004df2ab8 Mon Sep 17 00:00:00 2001 From: Dan Gohman Date: Thu, 3 Aug 2023 15:29:07 -0700 Subject: [PATCH] Implement the `tcp` interface of wasi-sockets. Implement the `tcp`, `tcp-create-socket`, and `network` interfaces of wasi-sockets. --- Cargo.lock | 64 +- Cargo.toml | 4 +- crates/test-programs/Cargo.toml | 1 + crates/test-programs/build.rs | 10 + crates/test-programs/tests/wasi-sockets.rs | 93 +++ .../wasi-sockets-tests/Cargo.toml | 10 + .../wasi-sockets-tests/src/bin/tcp_v4.rs | 99 +++ .../wasi-sockets-tests/src/bin/tcp_v6.rs | 101 +++ .../wasi-sockets-tests/src/lib.rs | 1 + .../src/descriptors.rs | 4 +- crates/wasi/Cargo.toml | 6 +- crates/wasi/src/preview2/command.rs | 6 + crates/wasi/src/preview2/ctx.rs | 60 ++ crates/wasi/src/preview2/filesystem.rs | 4 +- .../src/preview2/host/instance_network.rs | 11 + crates/wasi/src/preview2/host/mod.rs | 4 + crates/wasi/src/preview2/host/network.rs | 185 ++++++ crates/wasi/src/preview2/host/tcp.rs | 575 ++++++++++++++++++ .../src/preview2/host/tcp_create_socket.rs | 18 + crates/wasi/src/preview2/mod.rs | 8 +- crates/wasi/src/preview2/network.rs | 32 + crates/wasi/src/preview2/stdio/unix.rs | 4 +- crates/wasi/src/preview2/tcp.rs | 290 +++++++++ crates/wasi/wit/test.wit | 13 + supply-chain/audits.toml | 19 + supply-chain/imports.lock | 14 + 26 files changed, 1607 insertions(+), 29 deletions(-) create mode 100644 crates/test-programs/tests/wasi-sockets.rs create mode 100644 crates/test-programs/wasi-sockets-tests/Cargo.toml create mode 100644 crates/test-programs/wasi-sockets-tests/src/bin/tcp_v4.rs create mode 100644 crates/test-programs/wasi-sockets-tests/src/bin/tcp_v6.rs create mode 100644 crates/test-programs/wasi-sockets-tests/src/lib.rs create mode 100644 crates/wasi/src/preview2/host/instance_network.rs create mode 100644 crates/wasi/src/preview2/host/network.rs create mode 100644 crates/wasi/src/preview2/host/tcp.rs create mode 100644 crates/wasi/src/preview2/host/tcp_create_socket.rs create mode 100644 crates/wasi/src/preview2/network.rs create mode 100644 crates/wasi/src/preview2/tcp.rs diff --git a/Cargo.lock b/Cargo.lock index f5bcb273c8a3..c7a41f13c200 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -261,6 +261,18 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "cap-net-ext" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ffc30dee200c20b4dcb80572226f42658e1d9c4b668656d7cc59c33d50e396e" +dependencies = [ + "cap-primitives", + "cap-std", + "rustix 0.38.8", + "smallvec", +] + [[package]] name = "cap-primitives" version = "2.0.0" @@ -273,7 +285,7 @@ dependencies = [ "io-lifetimes 2.0.2", "ipnet", "maybe-owned", - "rustix 0.38.4", + "rustix 0.38.8", "windows-sys", "winx", ] @@ -297,7 +309,7 @@ dependencies = [ "cap-primitives", "io-extras", "io-lifetimes 2.0.2", - "rustix 0.38.4", + "rustix 0.38.8", ] [[package]] @@ -308,7 +320,7 @@ checksum = "7b9e3348a3510c4619b4c7a7bcdef09a71221da18f266bda3ed6b9aea2c509e2" dependencies = [ "cap-std", "rand", - "rustix 0.38.4", + "rustix 0.38.8", "uuid", ] @@ -320,7 +332,7 @@ checksum = "f8f52b3c8f4abfe3252fd0a071f3004aaa3b18936ec97bdbd8763ce03aff6247" dependencies = [ "cap-primitives", "once_cell", - "rustix 0.38.4", + "rustix 0.38.8", "winx", ] @@ -1120,7 +1132,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b0377f1edc77dbd1118507bc7a66e4ab64d2b90c66f90726dc801e73a8c68f9" dependencies = [ "cfg-if", - "rustix 0.38.4", + "rustix 0.38.8", "windows-sys", ] @@ -1184,7 +1196,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd738b84894214045e8414eaded76359b4a5773f0a0a56b16575110739cdcf39" dependencies = [ "io-lifetimes 2.0.2", - "rustix 0.38.4", + "rustix 0.38.8", "windows-sys", ] @@ -2287,9 +2299,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.4" +version = "0.38.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" +checksum = "19ed4fa021d81c8392ce04db050a3da9a60299050b7ae1cf482d862b54a7218f" dependencies = [ "bitflags 2.3.3", "errno", @@ -2469,9 +2481,9 @@ checksum = "826167069c09b99d56f31e9ae5c99049e932a98c9dc2dac47645b08dbbf76ba7" [[package]] name = "smallvec" -version = "1.8.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" dependencies = [ "serde", ] @@ -2579,7 +2591,7 @@ dependencies = [ "cap-std", "fd-lock", "io-lifetimes 2.0.2", - "rustix 0.38.4", + "rustix 0.38.8", "windows-sys", "winx", ] @@ -2996,7 +3008,7 @@ dependencies = [ "io-lifetimes 2.0.2", "is-terminal", "once_cell", - "rustix 0.38.4", + "rustix 0.38.8", "system-interface", "tempfile", "tracing", @@ -3014,7 +3026,7 @@ dependencies = [ "cap-std", "io-extras", "log", - "rustix 0.38.4", + "rustix 0.38.8", "thiserror", "tracing", "wasmtime", @@ -3041,6 +3053,14 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasi-sockets-tests" +version = "0.0.0" +dependencies = [ + "anyhow", + "wit-bindgen", +] + [[package]] name = "wasi-tests" version = "0.0.0" @@ -3059,7 +3079,7 @@ dependencies = [ "cap-tempfile", "io-extras", "io-lifetimes 2.0.2", - "rustix 0.38.4", + "rustix 0.38.8", "tempfile", "tokio", "wasi-cap-std-sync", @@ -3384,7 +3404,7 @@ dependencies = [ "log", "once_cell", "pretty_env_logger 0.5.0", - "rustix 0.38.4", + "rustix 0.38.8", "serde", "sha2", "tempfile", @@ -3414,7 +3434,7 @@ dependencies = [ "num_cpus", "once_cell", "rayon", - "rustix 0.38.4", + "rustix 0.38.8", "serde", "serde_json", "target-lexicon", @@ -3568,7 +3588,7 @@ dependencies = [ "backtrace", "cc", "cfg-if", - "rustix 0.38.4", + "rustix 0.38.8", "wasmtime-asm-macros", "wasmtime-versioned-export-macros", "windows-sys", @@ -3643,7 +3663,7 @@ dependencies = [ "log", "object", "rustc-demangle", - "rustix 0.38.4", + "rustix 0.38.8", "serde", "target-lexicon", "wasmtime-environ", @@ -3659,7 +3679,7 @@ version = "13.0.0" dependencies = [ "object", "once_cell", - "rustix 0.38.4", + "rustix 0.38.8", "wasmtime-versioned-export-macros", ] @@ -3689,7 +3709,7 @@ dependencies = [ "once_cell", "paste", "rand", - "rustix 0.38.4", + "rustix 0.38.8", "sptr", "wasm-encoder 0.31.1", "wasmtime-asm-macros", @@ -3728,16 +3748,18 @@ dependencies = [ "bitflags 2.3.3", "bytes", "cap-fs-ext", + "cap-net-ext", "cap-rand", "cap-std", "cap-time-ext", "fs-set-times", "futures", "io-extras", + "io-lifetimes 2.0.2", "is-terminal", "libc", "once_cell", - "rustix 0.38.4", + "rustix 0.38.8", "system-interface", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index d45779629fef..3e8639581d36 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ members = [ "crates/jit-icache-coherence", "crates/test-programs/wasi-tests", "crates/test-programs/wasi-http-tests", + "crates/test-programs/wasi-sockets-tests", "crates/test-programs/command-tests", "crates/test-programs/reactor-tests", "crates/wasi-preview1-component-adapter", @@ -188,13 +189,14 @@ target-lexicon = { version = "0.12.3", default-features = false, features = ["st cap-std = "2.0.0" cap-rand = { version = "2.0.0", features = ["small_rng"] } cap-fs-ext = "2.0.0" +cap-net-ext = "2.0.0" cap-time-ext = "2.0.0" cap-tempfile = "2.0.0" fs-set-times = "0.20.0" system-interface = { version = "0.26.0", features = ["cap_std_impls"] } io-lifetimes = { version = "2.0.2", default-features = false } io-extras = "0.18.0" -rustix = "0.38.4" +rustix = "0.38.8" is-terminal = "0.4.0" # wit-bindgen: wit-bindgen = { version = "0.9.0", default-features = false } diff --git a/crates/test-programs/Cargo.toml b/crates/test-programs/Cargo.toml index bd9de15696cd..a9fcf5d70ddb 100644 --- a/crates/test-programs/Cargo.toml +++ b/crates/test-programs/Cargo.toml @@ -41,3 +41,4 @@ http-body-util = "0.1.0-rc.2" [features] test_programs = [] test_programs_http = [ "wasmtime/component-model" ] +test_programs_sockets = [ "wasmtime/component-model" ] diff --git a/crates/test-programs/build.rs b/crates/test-programs/build.rs index eccf8e0d6e00..a3b1ce2dc26c 100644 --- a/crates/test-programs/build.rs +++ b/crates/test-programs/build.rs @@ -30,6 +30,7 @@ fn build_and_generate_tests() { println!("cargo:rerun-if-changed=./wasi-tests"); println!("cargo:rerun-if-changed=./command-tests"); println!("cargo:rerun-if-changed=./reactor-tests"); + println!("cargo:rerun-if-changed=./wasi-sockets-tests"); if BUILD_WASI_HTTP_TESTS { println!("cargo:rerun-if-changed=./wasi-http-tests"); } else { @@ -43,6 +44,7 @@ fn build_and_generate_tests() { .arg("--package=wasi-tests") .arg("--package=command-tests") .arg("--package=reactor-tests") + .arg("--package=wasi-sockets-tests") .env("CARGO_TARGET_DIR", &out_dir) .env("CARGO_PROFILE_DEV_DEBUG", "1") .env_remove("CARGO_ENCODED_RUSTFLAGS"); @@ -67,6 +69,14 @@ fn build_and_generate_tests() { components_rs(&meta, "command-tests", "bin", &command_adapter, &out_dir); components_rs(&meta, "reactor-tests", "cdylib", &reactor_adapter, &out_dir); + + components_rs( + &meta, + "wasi-sockets-tests", + "bin", + &command_adapter, + &out_dir, + ); } // Creates an `${out_dir}/${package}_modules.rs` file that exposes a `get_module(&str) -> Module`, diff --git a/crates/test-programs/tests/wasi-sockets.rs b/crates/test-programs/tests/wasi-sockets.rs new file mode 100644 index 000000000000..ff119108fc0c --- /dev/null +++ b/crates/test-programs/tests/wasi-sockets.rs @@ -0,0 +1,93 @@ +#![cfg(all(feature = "test_programs", not(skip_wasi_sockets_tests)))] +use cap_std::ambient_authority; +use wasmtime::component::Linker; +use wasmtime::{Config, Engine, Store}; +use wasmtime_wasi::preview2::{self, command::Command, Table, WasiCtx, WasiCtxBuilder, WasiView}; + +lazy_static::lazy_static! { + static ref ENGINE: Engine = { + let mut config = Config::new(); + config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); + config.wasm_component_model(true); + config.async_support(true); + + let engine = Engine::new(&config).unwrap(); + engine + }; +} +// uses ENGINE, creates a fn get_component(&str) -> Component +include!(concat!( + env!("OUT_DIR"), + "/wasi_sockets_tests_components.rs" +)); + +struct SocketsCtx { + table: Table, + wasi: WasiCtx, +} + +impl WasiView for SocketsCtx { + fn table(&self) -> &Table { + &self.table + } + fn table_mut(&mut self) -> &mut Table { + &mut self.table + } + fn ctx(&self) -> &WasiCtx { + &self.wasi + } + fn ctx_mut(&mut self) -> &mut WasiCtx { + &mut self.wasi + } +} + +async fn run(name: &str) -> anyhow::Result<()> { + let component = get_component(name); + let mut linker = Linker::new(&ENGINE); + + preview2::bindings::io::streams::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::poll::poll::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::cli::exit::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::cli::stdin::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::cli::stdout::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::cli::stderr::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::cli::terminal_input::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::cli::terminal_output::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::cli::terminal_stdin::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::cli::terminal_stdout::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::cli::terminal_stderr::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::cli::environment::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::filesystem::types::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::filesystem::preopens::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::sockets::tcp::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::sockets::tcp_create_socket::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::sockets::network::add_to_linker(&mut linker, |x| x)?; + preview2::bindings::sockets::instance_network::add_to_linker(&mut linker, |x| x)?; + + // Create our wasi context. + let mut table = Table::new(); + let wasi = WasiCtxBuilder::new() + .inherit_stdio() + .inherit_network(ambient_authority()) + .arg(name) + .build(&mut table)?; + + let mut store = Store::new(&ENGINE, SocketsCtx { table, wasi }); + + let (command, _instance) = Command::instantiate_async(&mut store, &component, &linker).await?; + command + .wasi_cli_run() + .call_run(&mut store) + .await? + .map_err(|()| anyhow::anyhow!("command returned with failing exit status")) +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn tcp_v4() { + run("tcp_v4").await.unwrap(); +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn tcp_v6() { + run("tcp_v6").await.unwrap(); +} diff --git a/crates/test-programs/wasi-sockets-tests/Cargo.toml b/crates/test-programs/wasi-sockets-tests/Cargo.toml new file mode 100644 index 000000000000..3dcc85370173 --- /dev/null +++ b/crates/test-programs/wasi-sockets-tests/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "wasi-sockets-tests" +version = "0.0.0" +readme = "README.md" +edition = "2021" +publish = false + +[dependencies] +anyhow = { workspace = true } +wit-bindgen = { workspace = true, default-features = false, features = ["macros"] } diff --git a/crates/test-programs/wasi-sockets-tests/src/bin/tcp_v4.rs b/crates/test-programs/wasi-sockets-tests/src/bin/tcp_v4.rs new file mode 100644 index 000000000000..fff3d6a09093 --- /dev/null +++ b/crates/test-programs/wasi-sockets-tests/src/bin/tcp_v4.rs @@ -0,0 +1,99 @@ +//! A simple TCP testcase, using IPv4. + +use wasi::io::streams; +use wasi::poll::poll; +use wasi::sockets::network::{IpAddressFamily, IpSocketAddress, Ipv4SocketAddress}; +use wasi::sockets::{instance_network, network, tcp, tcp_create_socket}; +use wasi_sockets_tests::*; + +fn wait(sub: poll::Pollable) { + loop { + let wait = poll::poll_oneoff(&[sub]); + if wait[0] { + break; + } + } +} + +fn main() { + let first_message = b"Hello, world!"; + let second_message = b"Greetings, planet!"; + + let net = instance_network::instance_network(); + + let sock = tcp_create_socket::create_tcp_socket(IpAddressFamily::Ipv4).unwrap(); + + let addr = IpSocketAddress::Ipv4(Ipv4SocketAddress { + port: 0, // use any free port + address: (127, 0, 0, 1), // localhost + }); + + let sub = tcp::subscribe(sock); + + tcp::start_bind(sock, net, addr).unwrap(); + wait(sub); + tcp::finish_bind(sock).unwrap(); + + tcp::start_listen(sock, net).unwrap(); + wait(sub); + tcp::finish_listen(sock).unwrap(); + + let addr = tcp::local_address(sock).unwrap(); + + let client = tcp_create_socket::create_tcp_socket(IpAddressFamily::Ipv4).unwrap(); + let client_sub = tcp::subscribe(client); + + tcp::start_connect(client, net, addr).unwrap(); + wait(client_sub); + let (client_input, client_output) = tcp::finish_connect(client).unwrap(); + + let (n, _status) = streams::write(client_output, first_message).unwrap(); + assert_eq!(n, first_message.len() as u64); // Not guaranteed to work but should work in practice. + + streams::drop_input_stream(client_input); + streams::drop_output_stream(client_output); + poll::drop_pollable(client_sub); + tcp::drop_tcp_socket(client); + + wait(sub); + let (accepted, input, output) = tcp::accept(sock).unwrap(); + let (data, _status) = streams::read(input, first_message.len() as u64).unwrap(); + + tcp::drop_tcp_socket(accepted); + streams::drop_input_stream(input); + streams::drop_output_stream(output); + + // Check that we sent and recieved our message! + assert_eq!(data, first_message); // Not guaranteed to work but should work in practice. + + // Another client + let client = tcp_create_socket::create_tcp_socket(IpAddressFamily::Ipv4).unwrap(); + let client_sub = tcp::subscribe(client); + + tcp::start_connect(client, net, addr).unwrap(); + wait(client_sub); + let (client_input, client_output) = tcp::finish_connect(client).unwrap(); + + let (n, _status) = streams::write(client_output, second_message).unwrap(); + assert_eq!(n, second_message.len() as u64); // Not guaranteed to work but should work in practice. + + streams::drop_input_stream(client_input); + streams::drop_output_stream(client_output); + poll::drop_pollable(client_sub); + tcp::drop_tcp_socket(client); + + wait(sub); + let (accepted, input, output) = tcp::accept(sock).unwrap(); + let (data, _status) = streams::read(input, second_message.len() as u64).unwrap(); + + streams::drop_input_stream(input); + streams::drop_output_stream(output); + tcp::drop_tcp_socket(accepted); + + // Check that we sent and recieved our message! + assert_eq!(data, second_message); // Not guaranteed to work but should work in practice. + + poll::drop_pollable(sub); + tcp::drop_tcp_socket(sock); + network::drop_network(net); +} diff --git a/crates/test-programs/wasi-sockets-tests/src/bin/tcp_v6.rs b/crates/test-programs/wasi-sockets-tests/src/bin/tcp_v6.rs new file mode 100644 index 000000000000..64b89957cde4 --- /dev/null +++ b/crates/test-programs/wasi-sockets-tests/src/bin/tcp_v6.rs @@ -0,0 +1,101 @@ +//! Like v4.rs, but with IPv6. + +use wasi::io::streams; +use wasi::poll::poll; +use wasi::sockets::network::{IpAddressFamily, IpSocketAddress, Ipv6SocketAddress}; +use wasi::sockets::{instance_network, network, tcp, tcp_create_socket}; +use wasi_sockets_tests::*; + +fn wait(sub: poll::Pollable) { + loop { + let wait = poll::poll_oneoff(&[sub]); + if wait[0] { + break; + } + } +} + +fn main() { + let first_message = b"Hello, world!"; + let second_message = b"Greetings, planet!"; + + let net = instance_network::instance_network(); + + let sock = tcp_create_socket::create_tcp_socket(IpAddressFamily::Ipv6).unwrap(); + + let addr = IpSocketAddress::Ipv6(Ipv6SocketAddress { + port: 0, // use any free port + address: (0, 0, 0, 0, 0, 0, 0, 1), // localhost + flow_info: 0, + scope_id: 0, + }); + + let sub = tcp::subscribe(sock); + + tcp::start_bind(sock, net, addr).unwrap(); + wait(sub); + tcp::finish_bind(sock).unwrap(); + + tcp::start_listen(sock, net).unwrap(); + wait(sub); + tcp::finish_listen(sock).unwrap(); + + let addr = tcp::local_address(sock).unwrap(); + + let client = tcp_create_socket::create_tcp_socket(IpAddressFamily::Ipv6).unwrap(); + let client_sub = tcp::subscribe(client); + + tcp::start_connect(client, net, addr).unwrap(); + wait(client_sub); + let (client_input, client_output) = tcp::finish_connect(client).unwrap(); + + let (n, _status) = streams::write(client_output, first_message).unwrap(); + assert_eq!(n, first_message.len() as u64); // Not guaranteed to work but should work in practice. + + streams::drop_input_stream(client_input); + streams::drop_output_stream(client_output); + poll::drop_pollable(client_sub); + tcp::drop_tcp_socket(client); + + wait(sub); + let (accepted, input, output) = tcp::accept(sock).unwrap(); + let (data, _status) = streams::read(input, first_message.len() as u64).unwrap(); + + tcp::drop_tcp_socket(accepted); + streams::drop_input_stream(input); + streams::drop_output_stream(output); + + // Check that we sent and recieved our message! + assert_eq!(data, first_message); // Not guaranteed to work but should work in practice. + + // Another client + let client = tcp_create_socket::create_tcp_socket(IpAddressFamily::Ipv6).unwrap(); + let client_sub = tcp::subscribe(client); + + tcp::start_connect(client, net, addr).unwrap(); + wait(client_sub); + let (client_input, client_output) = tcp::finish_connect(client).unwrap(); + + let (n, _status) = streams::write(client_output, second_message).unwrap(); + assert_eq!(n, second_message.len() as u64); // Not guaranteed to work but should work in practice. + + streams::drop_input_stream(client_input); + streams::drop_output_stream(client_output); + poll::drop_pollable(client_sub); + tcp::drop_tcp_socket(client); + + wait(sub); + let (accepted, input, output) = tcp::accept(sock).unwrap(); + let (data, _status) = streams::read(input, second_message.len() as u64).unwrap(); + + streams::drop_input_stream(input); + streams::drop_output_stream(output); + tcp::drop_tcp_socket(accepted); + + // Check that we sent and recieved our message! + assert_eq!(data, second_message); // Not guaranteed to work but should work in practice. + + poll::drop_pollable(sub); + tcp::drop_tcp_socket(sock); + network::drop_network(net); +} diff --git a/crates/test-programs/wasi-sockets-tests/src/lib.rs b/crates/test-programs/wasi-sockets-tests/src/lib.rs new file mode 100644 index 000000000000..cf3ecf02f82c --- /dev/null +++ b/crates/test-programs/wasi-sockets-tests/src/lib.rs @@ -0,0 +1 @@ +wit_bindgen::generate!("test-command-with-sockets" in "../../wasi/wit"); diff --git a/crates/wasi-preview1-component-adapter/src/descriptors.rs b/crates/wasi-preview1-component-adapter/src/descriptors.rs index 81d8f2810771..bcc1ba7cf71b 100644 --- a/crates/wasi-preview1-component-adapter/src/descriptors.rs +++ b/crates/wasi-preview1-component-adapter/src/descriptors.rs @@ -47,10 +47,10 @@ impl Drop for Descriptor { /// identifies what kind of stream they are and possibly supporting /// type-specific operations like seeking. pub struct Streams { - /// The output stream, if present. + /// The input stream, if present. pub input: Cell>, - /// The input stream, if present. + /// The output stream, if present. pub output: Cell>, /// Information about the source of the stream. diff --git a/crates/wasi/Cargo.toml b/crates/wasi/Cargo.toml index 462dccbaeea8..5cd5602d3fda 100644 --- a/crates/wasi/Cargo.toml +++ b/crates/wasi/Cargo.toml @@ -29,7 +29,9 @@ tracing = { workspace = true, optional = true } cap-std = { workspace = true, optional = true } cap-rand = { workspace = true, optional = true } cap-fs-ext = { workspace = true, optional = true } +cap-net-ext = { workspace = true, optional = true } cap-time-ext = { workspace = true, optional = true } +io-lifetimes = { workspace = true, optional = true } fs-set-times = { workspace = true, optional = true } is-terminal = { workspace = true, optional = true } bitflags = { workspace = true, optional = true } @@ -41,7 +43,7 @@ futures = { workspace = true, optional = true } tokio = { workspace = true, features = ["time", "sync", "io-std", "io-util", "rt", "rt-multi-thread", "net", "macros"] } [target.'cfg(unix)'.dependencies] -rustix = { workspace = true, features = ["fs"], optional = true } +rustix = { workspace = true, features = ["fs", "net"], optional = true } [target.'cfg(unix)'.dev-dependencies] libc = { workspace = true } @@ -63,7 +65,9 @@ preview2 = [ 'dep:cap-std', 'dep:cap-rand', 'dep:cap-fs-ext', + 'dep:cap-net-ext', 'dep:cap-time-ext', + 'dep:io-lifetimes', 'dep:fs-set-times', 'dep:is-terminal', 'dep:bitflags', diff --git a/crates/wasi/src/preview2/command.rs b/crates/wasi/src/preview2/command.rs index d44b4026a4c4..4ebd5640c1ab 100644 --- a/crates/wasi/src/preview2/command.rs +++ b/crates/wasi/src/preview2/command.rs @@ -6,11 +6,13 @@ wasmtime::component::bindgen!({ async: true, trappable_error_type: { "wasi:filesystem/types"::"error-code": Error, + "wasi:sockets/tcp"::"error-code": Error, "wasi:io/streams"::"stream-error": Error, }, with: { "wasi:filesystem/types": crate::preview2::bindings::filesystem::types, "wasi:filesystem/preopens": crate::preview2::bindings::filesystem::preopens, + "wasi:sockets/tcp": crate::preview2::bindings::sockets::tcp, "wasi:clocks/monotonic_clock": crate::preview2::bindings::clocks::monotonic_clock, "wasi:poll/poll": crate::preview2::bindings::poll::poll, "wasi:io/streams": crate::preview2::bindings::io::streams, @@ -36,6 +38,7 @@ pub fn add_to_linker(l: &mut wasmtime::component::Linker) -> any crate::preview2::bindings::clocks::timezone::add_to_linker(l, |t| t)?; crate::preview2::bindings::filesystem::types::add_to_linker(l, |t| t)?; crate::preview2::bindings::filesystem::preopens::add_to_linker(l, |t| t)?; + crate::preview2::bindings::sockets::tcp::add_to_linker(l, |t| t)?; crate::preview2::bindings::poll::poll::add_to_linker(l, |t| t)?; crate::preview2::bindings::io::streams::add_to_linker(l, |t| t)?; crate::preview2::bindings::random::random::add_to_linker(l, |t| t)?; @@ -61,11 +64,13 @@ pub mod sync { async: false, trappable_error_type: { "wasi:filesystem/types"::"error-code": Error, + "wasi:sockets/tcp"::"error-code": Error, "wasi:io/streams"::"stream-error": Error, }, with: { "wasi:filesystem/types": crate::preview2::bindings::sync_io::filesystem::types, "wasi:filesystem/preopens": crate::preview2::bindings::filesystem::preopens, + "wasi:sockets/tcp": crate::preview2::bindings::sockets::tcp, "wasi:clocks/monotonic_clock": crate::preview2::bindings::clocks::monotonic_clock, "wasi:poll/poll": crate::preview2::bindings::sync_io::poll::poll, "wasi:io/streams": crate::preview2::bindings::sync_io::io::streams, @@ -106,6 +111,7 @@ pub mod sync { crate::preview2::bindings::cli::terminal_stdin::add_to_linker(l, |t| t)?; crate::preview2::bindings::cli::terminal_stdout::add_to_linker(l, |t| t)?; crate::preview2::bindings::cli::terminal_stderr::add_to_linker(l, |t| t)?; + crate::preview2::bindings::sockets::tcp::add_to_linker(l, |t| t)?; Ok(()) } } diff --git a/crates/wasi/src/preview2/ctx.rs b/crates/wasi/src/preview2/ctx.rs index 252307c80faf..03b69da3b795 100644 --- a/crates/wasi/src/preview2/ctx.rs +++ b/crates/wasi/src/preview2/ctx.rs @@ -8,7 +8,11 @@ use crate::preview2::{ DirPerms, FilePerms, IsATTY, Table, }; use cap_rand::{Rng, RngCore, SeedableRng}; +use cap_std::ipnet::{self, IpNet}; +use cap_std::net::Pool; +use cap_std::{ambient_authority, AmbientAuthority}; use std::mem; +use std::net::{Ipv4Addr, Ipv6Addr}; pub struct WasiCtxBuilder { stdin: (Box, IsATTY), @@ -18,6 +22,7 @@ pub struct WasiCtxBuilder { args: Vec, preopens: Vec<(Dir, String)>, + pool: Pool, random: Box, insecure_random: Box, insecure_random_seed: u128, @@ -63,6 +68,7 @@ impl WasiCtxBuilder { env: Vec::new(), args: Vec::new(), preopens: Vec::new(), + pool: Pool::new(), random: random::thread_rng(), insecure_random, insecure_random_seed, @@ -200,6 +206,57 @@ impl WasiCtxBuilder { self } + /// Add all network addresses accessable to the host to the pool. + pub fn inherit_network(&mut self, ambient_authority: AmbientAuthority) -> &mut Self { + self.pool.insert_ip_net_port_any( + IpNet::new(Ipv4Addr::UNSPECIFIED.into(), 0).unwrap(), + ambient_authority, + ); + self.pool.insert_ip_net_port_any( + IpNet::new(Ipv6Addr::UNSPECIFIED.into(), 0).unwrap(), + ambient_authority, + ); + self + } + + /// Add network addresses to the pool. + pub fn insert_addr(&mut self, addrs: A) -> std::io::Result<()> { + self.pool.insert(addrs, ambient_authority()) + } + + /// Add a specific [`cap_std::net::SocketAddr`] to the pool. + pub fn insert_socket_addr(&mut self, addr: cap_std::net::SocketAddr) { + self.pool.insert_socket_addr(addr, ambient_authority()); + } + + /// Add a range of network addresses, accepting any port, to the pool. + /// + /// Unlike `insert_ip_net`, this function grants access to any requested port. + pub fn insert_ip_net_port_any(&mut self, ip_net: ipnet::IpNet) { + self.pool + .insert_ip_net_port_any(ip_net, ambient_authority()) + } + + /// Add a range of network addresses, accepting a range of ports, to + /// per-instance networks. + /// + /// This grants access to the port range starting at `ports_start` and, if + /// `ports_end` is provided, ending before `ports_end`. + pub fn insert_ip_net_port_range( + &mut self, + ip_net: ipnet::IpNet, + ports_start: u16, + ports_end: Option, + ) { + self.pool + .insert_ip_net_port_range(ip_net, ports_start, ports_end, ambient_authority()) + } + + /// Add a range of network addresses with a specific port to the pool. + pub fn insert_ip_net(&mut self, ip_net: ipnet::IpNet, port: u16) { + self.pool.insert_ip_net(ip_net, port, ambient_authority()) + } + /// Uses the configured context so far to construct the final `WasiCtx`. /// /// This will insert resources into the provided `table`. @@ -221,6 +278,7 @@ impl WasiCtxBuilder { env, args, preopens, + pool, random, insecure_random, insecure_random_seed, @@ -260,6 +318,7 @@ impl WasiCtxBuilder { env, args, preopens, + pool, random, insecure_random, insecure_random_seed, @@ -288,4 +347,5 @@ pub struct WasiCtx { pub(crate) stdin: StdioInput, pub(crate) stdout: StdioOutput, pub(crate) stderr: StdioOutput, + pub(crate) pool: Pool, } diff --git a/crates/wasi/src/preview2/filesystem.rs b/crates/wasi/src/preview2/filesystem.rs index d13ee68e2dae..465455cd7ec4 100644 --- a/crates/wasi/src/preview2/filesystem.rs +++ b/crates/wasi/src/preview2/filesystem.rs @@ -12,7 +12,9 @@ bitflags::bitflags! { pub(crate) struct File { /// Wrapped in an Arc because the same underlying file is used for - /// implementing the stream types. Also needed for [`block`]. + /// implementing the stream types. Also needed for [`spawn_blocking`]. + /// + /// [`spawn_blocking`]: Self::spawn_blocking pub file: Arc, pub perms: FilePerms, } diff --git a/crates/wasi/src/preview2/host/instance_network.rs b/crates/wasi/src/preview2/host/instance_network.rs new file mode 100644 index 000000000000..8c8b56974aa1 --- /dev/null +++ b/crates/wasi/src/preview2/host/instance_network.rs @@ -0,0 +1,11 @@ +use crate::preview2::bindings::sockets::instance_network::{self, Network}; +use crate::preview2::network::{HostNetwork, TableNetworkExt}; +use crate::preview2::WasiView; + +impl instance_network::Host for T { + fn instance_network(&mut self) -> Result { + let network = HostNetwork::new(self.ctx().pool.clone()); + let network = self.table_mut().push_network(network)?; + Ok(network) + } +} diff --git a/crates/wasi/src/preview2/host/mod.rs b/crates/wasi/src/preview2/host/mod.rs index 8bb111cc3a21..138166731565 100644 --- a/crates/wasi/src/preview2/host/mod.rs +++ b/crates/wasi/src/preview2/host/mod.rs @@ -2,5 +2,9 @@ mod clocks; mod env; mod exit; pub(crate) mod filesystem; +mod instance_network; mod io; +mod network; mod random; +mod tcp; +mod tcp_create_socket; diff --git a/crates/wasi/src/preview2/host/network.rs b/crates/wasi/src/preview2/host/network.rs new file mode 100644 index 000000000000..5f6b090d6802 --- /dev/null +++ b/crates/wasi/src/preview2/host/network.rs @@ -0,0 +1,185 @@ +use crate::preview2::bindings::sockets::network::{ + self, ErrorCode, IpAddressFamily, IpSocketAddress, Ipv4Address, Ipv4SocketAddress, Ipv6Address, + Ipv6SocketAddress, +}; +use crate::preview2::network::TableNetworkExt; +use crate::preview2::{TableError, WasiView}; +use std::io; + +impl network::Host for T { + fn drop_network(&mut self, this: network::Network) -> Result<(), anyhow::Error> { + let table = self.table_mut(); + + table.delete_network(this)?; + + Ok(()) + } +} + +impl From for network::Error { + fn from(error: TableError) -> Self { + Self::trap(error.into()) + } +} + +impl From for network::Error { + fn from(error: io::Error) -> Self { + match error.kind() { + // Errors that we can directly map. + io::ErrorKind::PermissionDenied => ErrorCode::AccessDenied, + io::ErrorKind::ConnectionRefused => ErrorCode::ConnectionRefused, + io::ErrorKind::ConnectionReset => ErrorCode::ConnectionReset, + io::ErrorKind::NotConnected => ErrorCode::NotConnected, + io::ErrorKind::AddrInUse => ErrorCode::AddressInUse, + io::ErrorKind::AddrNotAvailable => ErrorCode::AddressNotBindable, + io::ErrorKind::WouldBlock => ErrorCode::WouldBlock, + io::ErrorKind::TimedOut => ErrorCode::Timeout, + io::ErrorKind::Unsupported => ErrorCode::NotSupported, + io::ErrorKind::OutOfMemory => ErrorCode::OutOfMemory, + + // Errors we don't expect to see here. + io::ErrorKind::Interrupted | io::ErrorKind::ConnectionAborted => { + panic!("transient errors should be skipped") + } + + // Errors not expected from network APIs. + io::ErrorKind::WriteZero + | io::ErrorKind::InvalidInput + | io::ErrorKind::InvalidData + | io::ErrorKind::BrokenPipe + | io::ErrorKind::NotFound + | io::ErrorKind::UnexpectedEof + | io::ErrorKind::AlreadyExists => ErrorCode::Unknown, + + // Errors that don't correspond to a Rust `io::ErrorKind`. + io::ErrorKind::Other => match error.raw_os_error() { + None => ErrorCode::Unknown, + Some(libc::ENOBUFS) | Some(libc::ENOMEM) => ErrorCode::OutOfMemory, + Some(libc::EOPNOTSUPP) => ErrorCode::NotSupported, + Some(libc::ENETUNREACH) | Some(libc::EHOSTUNREACH) | Some(libc::ENETDOWN) => { + ErrorCode::RemoteUnreachable + } + Some(libc::ECONNRESET) => ErrorCode::ConnectionReset, + Some(libc::ECONNREFUSED) => ErrorCode::ConnectionRefused, + Some(libc::EADDRINUSE) => ErrorCode::AddressInUse, + Some(_) => panic!("unknown error {:?}", error), + }, + _ => panic!("unknown error {:?}", error), + } + .into() + } +} + +impl From for network::Error { + fn from(error: rustix::io::Errno) -> Self { + std::io::Error::from(error).into() + } +} + +impl From for std::net::SocketAddr { + fn from(addr: IpSocketAddress) -> Self { + match addr { + IpSocketAddress::Ipv4(ipv4) => Self::V4(ipv4.into()), + IpSocketAddress::Ipv6(ipv6) => Self::V6(ipv6.into()), + } + } +} + +impl From for IpSocketAddress { + fn from(addr: std::net::SocketAddr) -> Self { + match addr { + std::net::SocketAddr::V4(v4) => Self::Ipv4(v4.into()), + std::net::SocketAddr::V6(v6) => Self::Ipv6(v6.into()), + } + } +} + +impl From for std::net::SocketAddrV4 { + fn from(addr: Ipv4SocketAddress) -> Self { + Self::new(to_ipv4_addr(addr.address), addr.port) + } +} + +impl From for Ipv4SocketAddress { + fn from(addr: std::net::SocketAddrV4) -> Self { + Self { + address: from_ipv4_addr(*addr.ip()), + port: addr.port(), + } + } +} + +impl From for std::net::SocketAddrV6 { + fn from(addr: Ipv6SocketAddress) -> Self { + Self::new( + to_ipv6_addr(addr.address), + addr.port, + addr.flow_info, + addr.scope_id, + ) + } +} + +impl From for Ipv6SocketAddress { + fn from(addr: std::net::SocketAddrV6) -> Self { + Self { + address: from_ipv6_addr(*addr.ip()), + port: addr.port(), + flow_info: addr.flowinfo(), + scope_id: addr.scope_id(), + } + } +} + +fn to_ipv4_addr(addr: Ipv4Address) -> std::net::Ipv4Addr { + let (x0, x1, x2, x3) = addr; + std::net::Ipv4Addr::new(x0, x1, x2, x3) +} + +fn from_ipv4_addr(addr: std::net::Ipv4Addr) -> Ipv4Address { + let [x0, x1, x2, x3] = addr.octets(); + (x0, x1, x2, x3) +} + +fn to_ipv6_addr(addr: Ipv6Address) -> std::net::Ipv6Addr { + let (x0, x1, x2, x3, x4, x5, x6, x7) = addr; + std::net::Ipv6Addr::new(x0, x1, x2, x3, x4, x5, x6, x7) +} + +fn from_ipv6_addr(addr: std::net::Ipv6Addr) -> Ipv6Address { + let [x0, x1, x2, x3, x4, x5, x6, x7] = addr.segments(); + (x0, x1, x2, x3, x4, x5, x6, x7) +} + +impl std::net::ToSocketAddrs for IpSocketAddress { + type Iter = ::Iter; + + fn to_socket_addrs(&self) -> io::Result { + std::net::SocketAddr::from(*self).to_socket_addrs() + } +} + +impl std::net::ToSocketAddrs for Ipv4SocketAddress { + type Iter = ::Iter; + + fn to_socket_addrs(&self) -> io::Result { + std::net::SocketAddrV4::from(*self).to_socket_addrs() + } +} + +impl std::net::ToSocketAddrs for Ipv6SocketAddress { + type Iter = ::Iter; + + fn to_socket_addrs(&self) -> io::Result { + std::net::SocketAddrV6::from(*self).to_socket_addrs() + } +} + +impl From for cap_net_ext::AddressFamily { + fn from(family: IpAddressFamily) -> Self { + match family { + IpAddressFamily::Ipv4 => cap_net_ext::AddressFamily::Ipv4, + IpAddressFamily::Ipv6 => cap_net_ext::AddressFamily::Ipv6, + } + } +} diff --git a/crates/wasi/src/preview2/host/tcp.rs b/crates/wasi/src/preview2/host/tcp.rs new file mode 100644 index 000000000000..439072b1d6dd --- /dev/null +++ b/crates/wasi/src/preview2/host/tcp.rs @@ -0,0 +1,575 @@ +use crate::preview2::bindings::{ + io::streams::{InputStream, OutputStream}, + poll::poll::Pollable, + sockets::network::{self, ErrorCode, IpAddressFamily, IpSocketAddress, Network}, + sockets::tcp::{self, ShutdownType}, +}; +use crate::preview2::network::TableNetworkExt; +use crate::preview2::poll::TablePollableExt; +use crate::preview2::stream::TableStreamExt; +use crate::preview2::tcp::{HostTcpSocket, HostTcpSocketInner, HostTcpState, TableTcpSocketExt}; +use crate::preview2::{HostPollable, PollableFuture, WasiView}; +use cap_net_ext::{Blocking, PoolExt, TcpListenerExt}; +use io_lifetimes::AsSocketlike; +use rustix::net::sockopt; +use std::any::Any; +use std::mem; +use std::pin::Pin; +use std::sync::Arc; +#[cfg(unix)] +use tokio::task::spawn; +#[cfg(not(unix))] +use tokio::task::spawn_blocking; +use tokio::task::JoinHandle; + +impl tcp::Host for T { + fn start_bind( + &mut self, + this: tcp::TcpSocket, + network: Network, + local_address: IpSocketAddress, + ) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + + let mut tcp_state = socket.inner.tcp_state.write().unwrap(); + match &*tcp_state { + HostTcpState::Default => {} + _ => return Err(ErrorCode::NotInProgress.into()), + } + + let network = table.get_network(network)?; + let binder = network.0.tcp_binder(local_address)?; + + binder.bind_existing_tcp_listener(socket.tcp_socket())?; + + *tcp_state = HostTcpState::BindStarted; + socket.inner.sender.send(()).unwrap(); + + Ok(()) + } + + // TODO: Bind and listen aren't really blocking operations; figure this + // out at the spec level. + fn finish_bind(&mut self, this: tcp::TcpSocket) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + + let mut tcp_state = socket.inner.tcp_state.write().unwrap(); + match &mut *tcp_state { + HostTcpState::BindStarted => { + *tcp_state = HostTcpState::Bound; + Ok(()) + } + _ => Err(ErrorCode::NotInProgress.into()), + } + } + + fn start_connect( + &mut self, + this: tcp::TcpSocket, + network: Network, + remote_address: IpSocketAddress, + ) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + + let mut tcp_state = socket.inner.tcp_state.write().unwrap(); + match &*tcp_state { + HostTcpState::Default => {} + HostTcpState::Connected => return Err(ErrorCode::AlreadyConnected.into()), + _ => return Err(ErrorCode::NotInProgress.into()), + } + + let network = table.get_network(network)?; + let connecter = network.0.tcp_connecter(remote_address)?; + + // Do a host `connect`. Our socket is non-blocking, so it'll either... + match connecter.connect_existing_tcp_listener(socket.tcp_socket()) { + // succeed immediately, + Ok(()) => { + *tcp_state = HostTcpState::ConnectReady(Ok(())); + return Ok(()); + } + // continue in progress, + Err(err) + if err.raw_os_error() == Some(rustix::io::Errno::INPROGRESS.raw_os_error()) => {} + // or fail immediately. + Err(err) => return Err(err.into()), + } + + // The connect is continuing in progres. Set up the join handle. + + let clone = socket.clone_inner(); + + #[cfg(unix)] + let join = spawn(async move { + let result = match clone.tcp_socket.writable().await { + Ok(mut writable) => { + writable.retain_ready(); + + // Check whether the connect succeeded. + match sockopt::get_socket_error(&clone.tcp_socket) { + Ok(Ok(())) => Ok(()), + Err(err) | Ok(Err(err)) => Err(err.into()), + } + } + Err(err) => Err(err), + }; + + *clone.tcp_state.write().unwrap() = HostTcpState::ConnectReady(result); + clone.sender.send(()).unwrap(); + }); + + #[cfg(not(unix))] + let join = spawn_blocking(move || { + let result = match rustix::event::poll( + &mut [rustix::event::PollFd::new( + &clone.tcp_socket, + rustix::event::PollFlags::OUT, + )], + -1, + ) { + Ok(_) => { + // Check whether the connect succeeded. + match sockopt::get_socket_error(&clone.tcp_socket) { + Ok(Ok(())) => Ok(()), + Err(err) | Ok(Err(err)) => Err(err.into()), + } + } + Err(err) => Err(err.into()), + }; + + *clone.tcp_state.write().unwrap() = HostTcpState::ConnectReady(result); + clone.sender.send(()).unwrap(); + }); + + *tcp_state = HostTcpState::Connecting(Pin::from(Box::new(join))); + + Ok(()) + } + + fn finish_connect( + &mut self, + this: tcp::TcpSocket, + ) -> Result<(InputStream, OutputStream), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + + let mut tcp_state = socket.inner.tcp_state.write().unwrap(); + match &mut *tcp_state { + HostTcpState::ConnectReady(_) => {} + HostTcpState::Connecting(join) => match maybe_unwrap_future(join) { + Some(joined) => joined.unwrap(), + None => return Err(ErrorCode::WouldBlock.into()), + }, + _ => return Err(ErrorCode::NotInProgress.into()), + }; + + let old_state = mem::replace(&mut *tcp_state, HostTcpState::Connected); + + // Extract the connection result. + let result = match old_state { + HostTcpState::ConnectReady(result) => result, + _ => panic!(), + }; + + // Report errors, resetting the state if needed. + match result { + Ok(()) => {} + Err(err) => { + *tcp_state = HostTcpState::Default; + return Err(err.into()); + } + } + + drop(tcp_state); + + let input_clone = socket.clone_inner(); + let output_clone = socket.clone_inner(); + + let input_stream = self.table_mut().push_input_stream(Box::new(input_clone))?; + let output_stream = self + .table_mut() + .push_output_stream(Box::new(output_clone))?; + + Ok((input_stream, output_stream)) + } + + fn start_listen( + &mut self, + this: tcp::TcpSocket, + _network: Network, + ) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + + let mut tcp_state = socket.inner.tcp_state.write().unwrap(); + match &*tcp_state { + HostTcpState::Bound => {} + HostTcpState::ListenStarted => return Err(ErrorCode::AlreadyListening.into()), + HostTcpState::Connected => return Err(ErrorCode::AlreadyConnected.into()), + _ => return Err(ErrorCode::NotInProgress.into()), + } + + socket.tcp_socket().listen(None)?; + + *tcp_state = HostTcpState::ListenStarted; + socket.inner.sender.send(()).unwrap(); + + Ok(()) + } + + fn finish_listen(&mut self, this: tcp::TcpSocket) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + + let mut tcp_state = socket.inner.tcp_state.write().unwrap(); + + match &mut *tcp_state { + HostTcpState::ListenStarted => {} + _ => return Err(ErrorCode::NotInProgress.into()), + } + + let new_join = spawn_task_to_wait_for_connections(socket.clone_inner()); + *tcp_state = HostTcpState::Listening(Pin::from(Box::new(new_join))); + drop(tcp_state); + + Ok(()) + } + + fn accept( + &mut self, + this: tcp::TcpSocket, + ) -> Result<(tcp::TcpSocket, InputStream, OutputStream), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + + let mut tcp_state = socket.inner.tcp_state.write().unwrap(); + match &mut *tcp_state { + HostTcpState::ListenReady(_) => {} + HostTcpState::Listening(join) => match maybe_unwrap_future(join) { + Some(joined) => joined.unwrap(), + None => return Err(ErrorCode::WouldBlock.into()), + }, + HostTcpState::Connected => return Err(ErrorCode::AlreadyConnected.into()), + _ => return Err(ErrorCode::NotInProgress.into()), + } + + let new_join = spawn_task_to_wait_for_connections(socket.clone_inner()); + *tcp_state = HostTcpState::Listening(Pin::from(Box::new(new_join))); + drop(tcp_state); + + // Do the host system call. + let (connection, _addr) = socket.tcp_socket().accept_with(Blocking::No)?; + let tcp_socket = HostTcpSocket::from_tcp_stream(connection)?; + + let input_clone = tcp_socket.clone_inner(); + let output_clone = tcp_socket.clone_inner(); + + let tcp_socket = self.table_mut().push_tcp_socket(tcp_socket)?; + let input_stream = self.table_mut().push_input_stream(Box::new(input_clone))?; + let output_stream = self + .table_mut() + .push_output_stream(Box::new(output_clone))?; + + Ok((tcp_socket, input_stream, output_stream)) + } + + fn local_address(&mut self, this: tcp::TcpSocket) -> Result { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + let addr = socket + .inner + .tcp_socket + .as_socketlike_view::() + .local_addr()?; + Ok(addr.into()) + } + + fn remote_address(&mut self, this: tcp::TcpSocket) -> Result { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + let addr = socket + .inner + .tcp_socket + .as_socketlike_view::() + .peer_addr()?; + Ok(addr.into()) + } + + fn address_family(&mut self, this: tcp::TcpSocket) -> Result { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + + // If `SO_DOMAIN` is available, use it. + // + // TODO: OpenBSD also supports this; upstream PRs are posted. + #[cfg(not(any(apple, windows, target_os = "netbsd", target_os = "openbsd")))] + { + use rustix::net::AddressFamily; + + let family = sockopt::get_socket_domain(socket.tcp_socket())?; + let family = match family { + AddressFamily::INET => IpAddressFamily::Ipv4, + AddressFamily::INET6 => IpAddressFamily::Ipv6, + _ => return Err(ErrorCode::NotSupported.into()), + }; + Ok(family) + } + + // When `SO_DOMAIN` is not available, emulate it. + #[cfg(any(apple, windows, target_os = "netbsd", target_os = "openbsd"))] + { + if let Ok(_) = sockopt::get_ipv6_unicast_hops(socket.tcp_socket()) { + return Ok(IpAddressFamily::Ipv6); + } + if let Ok(_) = sockopt::get_ip_ttl(socket.tcp_socket()) { + return Ok(IpAddressFamily::Ipv4); + } + Err(ErrorCode::NotSupported.into()) + } + } + + fn ipv6_only(&mut self, this: tcp::TcpSocket) -> Result { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + Ok(sockopt::get_ipv6_v6only(socket.tcp_socket())?) + } + + fn set_ipv6_only(&mut self, this: tcp::TcpSocket, value: bool) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + Ok(sockopt::set_ipv6_v6only(socket.tcp_socket(), value)?) + } + + fn set_listen_backlog_size( + &mut self, + this: tcp::TcpSocket, + value: u64, + ) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + + let tcp_state = socket.inner.tcp_state.read().unwrap(); + match &*tcp_state { + HostTcpState::Listening(_) => {} + _ => return Err(ErrorCode::NotInProgress.into()), + } + + let value = value.try_into().map_err(|_| ErrorCode::OutOfMemory)?; + Ok(rustix::net::listen(socket.tcp_socket(), value)?) + } + + fn keep_alive(&mut self, this: tcp::TcpSocket) -> Result { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + Ok(sockopt::get_socket_keepalive(socket.tcp_socket())?) + } + + fn set_keep_alive(&mut self, this: tcp::TcpSocket, value: bool) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + Ok(sockopt::set_socket_keepalive(socket.tcp_socket(), value)?) + } + + fn no_delay(&mut self, this: tcp::TcpSocket) -> Result { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + Ok(sockopt::get_tcp_nodelay(socket.tcp_socket())?) + } + + fn set_no_delay(&mut self, this: tcp::TcpSocket, value: bool) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + Ok(sockopt::set_tcp_nodelay(socket.tcp_socket(), value)?) + } + + fn unicast_hop_limit(&mut self, this: tcp::TcpSocket) -> Result { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + + // We don't track whether the socket is IPv4 or IPv6 so try one and + // fall back to the other. + match sockopt::get_ipv6_unicast_hops(socket.tcp_socket()) { + Ok(value) => Ok(value), + Err(rustix::io::Errno::NOPROTOOPT) => { + let value = sockopt::get_ip_ttl(socket.tcp_socket())?; + let value = value.try_into().unwrap(); + Ok(value) + } + Err(err) => Err(err.into()), + } + } + + fn set_unicast_hop_limit( + &mut self, + this: tcp::TcpSocket, + value: u8, + ) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + + // We don't track whether the socket is IPv4 or IPv6 so try one and + // fall back to the other. + match sockopt::set_ipv6_unicast_hops(socket.tcp_socket(), Some(value)) { + Ok(()) => Ok(()), + Err(rustix::io::Errno::NOPROTOOPT) => { + Ok(sockopt::set_ip_ttl(socket.tcp_socket(), value.into())?) + } + Err(err) => Err(err.into()), + } + } + + fn receive_buffer_size(&mut self, this: tcp::TcpSocket) -> Result { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + Ok(sockopt::get_socket_recv_buffer_size(socket.tcp_socket())? as u64) + } + + fn set_receive_buffer_size( + &mut self, + this: tcp::TcpSocket, + value: u64, + ) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + let value = value.try_into().map_err(|_| ErrorCode::OutOfMemory)?; + Ok(sockopt::set_socket_recv_buffer_size( + socket.tcp_socket(), + value, + )?) + } + + fn send_buffer_size(&mut self, this: tcp::TcpSocket) -> Result { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + Ok(sockopt::get_socket_send_buffer_size(socket.tcp_socket())? as u64) + } + + fn set_send_buffer_size( + &mut self, + this: tcp::TcpSocket, + value: u64, + ) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + let value = value.try_into().map_err(|_| ErrorCode::OutOfMemory)?; + Ok(sockopt::set_socket_send_buffer_size( + socket.tcp_socket(), + value, + )?) + } + + fn subscribe(&mut self, this: tcp::TcpSocket) -> anyhow::Result { + fn make_tcp_socket_future<'a>(stream: &'a mut dyn Any) -> PollableFuture<'a> { + let socket = stream + .downcast_mut::() + .expect("downcast to HostTcpSocket failed"); + + Box::pin(async { + socket.receiver.changed().await.unwrap(); + Ok(()) + }) + } + + let pollable = HostPollable::TableEntry { + index: this, + make_future: make_tcp_socket_future, + }; + + Ok(self.table_mut().push_host_pollable(pollable)?) + } + + fn shutdown( + &mut self, + this: tcp::TcpSocket, + shutdown_type: ShutdownType, + ) -> Result<(), network::Error> { + let table = self.table(); + let socket = table.get_tcp_socket(this)?; + + let how = match shutdown_type { + ShutdownType::Receive => std::net::Shutdown::Read, + ShutdownType::Send => std::net::Shutdown::Write, + ShutdownType::Both => std::net::Shutdown::Both, + }; + + socket + .inner + .tcp_socket + .as_socketlike_view::() + .shutdown(how)?; + Ok(()) + } + + fn drop_tcp_socket(&mut self, this: tcp::TcpSocket) -> Result<(), anyhow::Error> { + let table = self.table_mut(); + + // As in the filesystem implementation, we assume closing a socket + // doesn't block. + let dropped = table.delete_tcp_socket(this)?; + + // On non-Unix platforms, do a `shutdown` to wake up `poll`. + #[cfg(not(unix))] + rustix::net::shutdown(&dropped.inner.tcp_socket, rustix::net::Shutdown::ReadWrite).unwrap(); + + drop(dropped); + + Ok(()) + } +} + +/// Spawn a task to monitor a socket for incoming connections that +/// can be `accept`ed. +fn spawn_task_to_wait_for_connections(socket: Arc) -> JoinHandle<()> { + #[cfg(unix)] + let new_join = spawn(async move { + socket.tcp_socket.readable().await.unwrap().retain_ready(); + *socket.tcp_state.write().unwrap() = HostTcpState::ListenReady(Ok(())); + socket.sender.send(()).unwrap(); + }); + + #[cfg(not(unix))] + let new_join = spawn_blocking(move || { + let result = match rustix::event::poll( + &mut [rustix::event::PollFd::new( + &socket.tcp_socket, + rustix::event::PollFlags::IN, + )], + -1, + ) { + Ok(_) => Ok(()), + Err(err) => Err(err.into()), + }; + *socket.tcp_state.write().unwrap() = HostTcpState::ListenReady(result); + socket.sender.send(()).unwrap(); + }); + + new_join +} + +/// Given a future, return the finished value if it's already ready, or +/// `None` if it's not. +fn maybe_unwrap_future( + future: &mut Pin>, +) -> Option { + use std::ptr; + use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + + unsafe fn clone(_ptr: *const ()) -> RawWaker { + const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop); + RawWaker::new(std::ptr::null(), &VTABLE) + } + unsafe fn wake(_ptr: *const ()) {} + unsafe fn wake_by_ref(_ptr: *const ()) {} + unsafe fn drop(_ptr: *const ()) {} + + let waker = unsafe { Waker::from_raw(clone(ptr::null() as _)) }; + + let mut cx = Context::from_waker(&waker); + match future.as_mut().poll(&mut cx) { + Poll::Ready(val) => Some(val), + Poll::Pending => None, + } +} diff --git a/crates/wasi/src/preview2/host/tcp_create_socket.rs b/crates/wasi/src/preview2/host/tcp_create_socket.rs new file mode 100644 index 000000000000..d2559e7df7d5 --- /dev/null +++ b/crates/wasi/src/preview2/host/tcp_create_socket.rs @@ -0,0 +1,18 @@ +use crate::preview2::bindings::{ + sockets::network::{self, IpAddressFamily}, + sockets::tcp::TcpSocket, + sockets::tcp_create_socket, +}; +use crate::preview2::tcp::{HostTcpSocket, TableTcpSocketExt}; +use crate::preview2::WasiView; + +impl tcp_create_socket::Host for T { + fn create_tcp_socket( + &mut self, + address_family: IpAddressFamily, + ) -> Result { + let socket = HostTcpSocket::new(address_family.into())?; + let socket = self.table_mut().push_tcp_socket(socket)?; + Ok(socket) + } +} diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index f0144562eeb7..a205ae0721ca 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -21,6 +21,7 @@ mod ctx; mod error; mod filesystem; mod host; +mod network; pub mod pipe; mod poll; #[cfg(feature = "preview1-on-preview2")] @@ -29,6 +30,7 @@ mod random; mod stdio; mod stream; mod table; +mod tcp; pub use self::clocks::{HostMonotonicClock, HostWallClock}; pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiView}; @@ -133,10 +135,14 @@ pub mod bindings { import wasi:cli/terminal-stdin import wasi:cli/terminal-stdout import wasi:cli/terminal-stderr + import wasi:sockets/tcp + import wasi:sockets/tcp-create-socket + import wasi:sockets/instance-network ", tracing: true, trappable_error_type: { "wasi:filesystem/types"::"error-code": Error, + "wasi:sockets/network"::"error-code": Error, "wasi:io/streams"::"stream-error": Error, }, with: { @@ -148,7 +154,7 @@ pub mod bindings { }); } - pub use self::_internal_rest::wasi::{cli, random}; + pub use self::_internal_rest::wasi::{cli, random, sockets}; pub mod filesystem { pub use super::_internal_io::wasi::filesystem::types; pub use super::_internal_rest::wasi::filesystem::preopens; diff --git a/crates/wasi/src/preview2/network.rs b/crates/wasi/src/preview2/network.rs new file mode 100644 index 000000000000..4d462fcbd275 --- /dev/null +++ b/crates/wasi/src/preview2/network.rs @@ -0,0 +1,32 @@ +use crate::preview2::{Table, TableError}; +use cap_std::net::Pool; + +pub(crate) struct HostNetwork(pub(crate) Pool); + +impl HostNetwork { + pub fn new(pool: Pool) -> Self { + Self(pool) + } +} + +pub(crate) trait TableNetworkExt { + fn push_network(&mut self, network: HostNetwork) -> Result; + fn delete_network(&mut self, fd: u32) -> Result; + fn is_network(&self, fd: u32) -> bool; + fn get_network(&self, fd: u32) -> Result<&HostNetwork, TableError>; +} + +impl TableNetworkExt for Table { + fn push_network(&mut self, network: HostNetwork) -> Result { + self.push(Box::new(network)) + } + fn delete_network(&mut self, fd: u32) -> Result { + self.delete(fd) + } + fn is_network(&self, fd: u32) -> bool { + self.is::(fd) + } + fn get_network(&self, fd: u32) -> Result<&HostNetwork, TableError> { + self.get(fd) + } +} diff --git a/crates/wasi/src/preview2/stdio/unix.rs b/crates/wasi/src/preview2/stdio/unix.rs index a27cf381db63..e43e64b8d6dc 100644 --- a/crates/wasi/src/preview2/stdio/unix.rs +++ b/crates/wasi/src/preview2/stdio/unix.rs @@ -9,7 +9,7 @@ use std::pin::Pin; use std::sync::{Arc, Mutex, OnceLock}; use std::task::{Context, Poll}; use tokio::io::unix::AsyncFd; -use tokio::io::{AsyncRead, ReadBuf}; +use tokio::io::{AsyncRead, Interest, ReadBuf}; // We need a single global instance of the AsyncFd because creating // this instance registers the process's stdin fd with epoll, which will @@ -128,7 +128,7 @@ impl InnerStdin { } Ok(Self { - inner: AsyncFd::new(stdin)?, + inner: AsyncFd::with_interest(stdin, Interest::READABLE)?, }) } } diff --git a/crates/wasi/src/preview2/tcp.rs b/crates/wasi/src/preview2/tcp.rs new file mode 100644 index 000000000000..75e0b3fbdc3f --- /dev/null +++ b/crates/wasi/src/preview2/tcp.rs @@ -0,0 +1,290 @@ +use crate::preview2::{HostInputStream, HostOutputStream, StreamState, Table, TableError}; +use bytes::{Bytes, BytesMut}; +use cap_net_ext::{AddressFamily, Blocking, TcpListenerExt}; +use cap_std::net::{TcpListener, TcpStream}; +use io_lifetimes::AsSocketlike; +use std::io; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; +use system_interface::io::IoExt; +use tokio::sync::watch::{channel, Receiver, Sender}; +use tokio::task::JoinHandle; + +/// The state of a TCP socket. +/// +/// This represents the various states a socket can be in during the +/// activities of binding, listening, accepting, and connecting. +pub(crate) enum HostTcpState { + /// The initial state for a newly-created socket. + Default, + + /// Binding started via `start_bind`. + BindStarted, + + /// Binding finished via `finish_bind`. The socket has an address but + /// is not yet listening for connections. + Bound, + + /// Listening started via `listen_start`. + ListenStarted, + + /// The socket is now listening and waiting for an incomming connection. + Listening(Pin>>), + + /// Listening heard an incomming connection arrive that is ready to be + /// accepted. + ListenReady(io::Result<()>), + + /// An outgoing connection is started via `start_connect`. + Connecting(Pin>>), + + /// An outgoing connection is ready to be established. + ConnectReady(io::Result<()>), + + /// An outgoing connection has been established. + Connected, +} + +/// A host TCP socket, plus associated bookkeeping. +// The inner state is wrapped in an Arc because the same underlying socket is +// used for implementing the stream types. Also needed for [`spawn_blocking`]. +// +// [`spawn_blocking`]: Self::spawn_blocking +pub(crate) struct HostTcpSocket { + /// The part of a `HostTcpSocket` which is reference-counted so that we + /// can pass it to async tasks. + pub(crate) inner: Arc, + + /// The recieving end of `inner`'s `sender`, used by `subscribe` + /// subscriptions to wait for I/O. + pub(crate) receiver: Receiver<()>, +} + +/// The inner reference-counted state of a `HostTcpSocket`. +pub(crate) struct HostTcpSocketInner { + /// On Unix-family platforms we can use `AsyncFd` for efficient polling. + #[cfg(unix)] + pub(crate) tcp_socket: tokio::io::unix::AsyncFd, + + /// On non-Unix, we can use plain `poll`. + #[cfg(not(unix))] + pub(crate) tcp_socket: cap_std::net::TcpListener, + + /// The current state in the bind/listen/accept/connect progression. + pub(crate) tcp_state: RwLock, + + /// A sender used to send messages when I/O events complete. + pub(crate) sender: Sender<()>, +} + +impl HostTcpSocket { + pub fn new(family: AddressFamily) -> io::Result { + let tcp_socket = TcpListener::new(family, Blocking::No)?; + + // On Unix, pack it up in an `AsyncFd` so we can efficiently poll it. + #[cfg(unix)] + let tcp_socket = tokio::io::unix::AsyncFd::new(tcp_socket)?; + + let (sender, receiver) = channel(()); + + Ok(Self { + inner: Arc::new(HostTcpSocketInner { + tcp_socket, + tcp_state: RwLock::new(HostTcpState::Default), + sender, + }), + receiver, + }) + } + + pub fn from_tcp_stream(tcp_socket: cap_std::net::TcpStream) -> io::Result { + let fd = rustix::fd::OwnedFd::from(tcp_socket); + let tcp_socket = TcpListener::from(fd); + + // On Unix, pack it up in an `AsyncFd` so we can efficiently poll it. + #[cfg(unix)] + let tcp_socket = tokio::io::unix::AsyncFd::new(tcp_socket)?; + + let (sender, receiver) = channel(()); + + Ok(Self { + inner: Arc::new(HostTcpSocketInner { + tcp_socket, + tcp_state: RwLock::new(HostTcpState::Default), + sender, + }), + receiver, + }) + } + + pub fn tcp_socket(&self) -> &cap_std::net::TcpListener { + self.inner.tcp_socket() + } + + pub fn clone_inner(&self) -> Arc { + Arc::clone(&self.inner) + } +} + +impl HostTcpSocketInner { + pub fn tcp_socket(&self) -> &cap_std::net::TcpListener { + let tcp_socket = &self.tcp_socket; + + // Unpack the `AsyncFd`. + #[cfg(unix)] + let tcp_socket = tcp_socket.get_ref(); + + tcp_socket + } + + /// Spawn a task on tokio's blocking thread for performing blocking + /// syscalls on the underlying [`cap_std::net::TcpListener`]. + #[cfg(not(unix))] + pub(crate) async fn spawn_blocking(self: &Arc, body: F) -> R + where + F: FnOnce(&cap_std::net::TcpListener) -> R + Send + 'static, + R: Send + 'static, + { + let s = Arc::clone(self); + tokio::task::spawn_blocking(move || body(s.tcp_socket())) + .await + .unwrap() + } +} + +#[async_trait::async_trait] +impl HostInputStream for Arc { + fn read(&mut self, size: usize) -> anyhow::Result<(Bytes, StreamState)> { + let mut buf = BytesMut::zeroed(size); + let r = self + .tcp_socket() + .as_socketlike_view::() + .read(&mut buf); + let (n, state) = read_result(r)?; + buf.truncate(n); + Ok((buf.freeze(), state)) + } + + async fn ready(&mut self) -> anyhow::Result<()> { + #[cfg(unix)] + { + self.tcp_socket.readable().await?.retain_ready(); + Ok(()) + } + + #[cfg(not(unix))] + { + self.spawn_blocking(move |tcp_socket| { + match rustix::event::poll( + &mut [rustix::event::PollFd::new( + tcp_socket, + rustix::event::PollFlags::IN, + )], + -1, + ) { + Ok(_) => Ok(()), + Err(err) => Err(err.into()), + } + }) + .await + } + } +} + +#[async_trait::async_trait] +impl HostOutputStream for Arc { + fn write(&mut self, buf: Bytes) -> anyhow::Result<(usize, StreamState)> { + let r = self + .tcp_socket + .as_socketlike_view::() + .write(buf.as_ref()); + let (n, state) = write_result(r)?; + Ok((n, state)) + } + + async fn ready(&mut self) -> anyhow::Result<()> { + #[cfg(unix)] + { + self.tcp_socket.writable().await?.retain_ready(); + Ok(()) + } + + #[cfg(not(unix))] + { + self.spawn_blocking(move |tcp_socket| { + match rustix::event::poll( + &mut [rustix::event::PollFd::new( + tcp_socket, + rustix::event::PollFlags::OUT, + )], + -1, + ) { + Ok(_) => Ok(()), + Err(err) => Err(err.into()), + } + }) + .await + } + } +} + +impl Drop for HostTcpSocketInner { + fn drop(&mut self) { + match &*self.tcp_state.read().unwrap() { + HostTcpState::Default + | HostTcpState::BindStarted + | HostTcpState::Bound + | HostTcpState::ListenStarted + | HostTcpState::ListenReady(_) + | HostTcpState::ConnectReady(_) + | HostTcpState::Connected => {} + HostTcpState::Listening(join) | HostTcpState::Connecting(join) => { + // Abort the tasks so that they don't detach. + join.abort(); + } + } + } +} + +pub(crate) trait TableTcpSocketExt { + fn push_tcp_socket(&mut self, tcp_socket: HostTcpSocket) -> Result; + fn delete_tcp_socket(&mut self, fd: u32) -> Result; + fn is_tcp_socket(&self, fd: u32) -> bool; + fn get_tcp_socket(&self, fd: u32) -> Result<&HostTcpSocket, TableError>; +} + +impl TableTcpSocketExt for Table { + fn push_tcp_socket(&mut self, tcp_socket: HostTcpSocket) -> Result { + self.push(Box::new(tcp_socket)) + } + fn delete_tcp_socket(&mut self, fd: u32) -> Result { + self.delete(fd) + } + fn is_tcp_socket(&self, fd: u32) -> bool { + self.is::(fd) + } + fn get_tcp_socket(&self, fd: u32) -> Result<&HostTcpSocket, TableError> { + self.get(fd) + } +} + +pub(crate) fn read_result( + r: Result, +) -> Result<(usize, StreamState), std::io::Error> { + match r { + Ok(0) => Ok((0, StreamState::Closed)), + Ok(n) => Ok((n, StreamState::Open)), + Err(e) if e.kind() == std::io::ErrorKind::Interrupted => Ok((0, StreamState::Open)), + Err(e) => Err(e), + } +} + +pub(crate) fn write_result( + r: Result, +) -> Result<(usize, StreamState), std::io::Error> { + match r { + Ok(0) => Ok((0, StreamState::Closed)), + Ok(n) => Ok((n, StreamState::Open)), + Err(e) => Err(e), + } +} diff --git a/crates/wasi/wit/test.wit b/crates/wasi/wit/test.wit index 447304cba3d8..4543cb194af1 100644 --- a/crates/wasi/wit/test.wit +++ b/crates/wasi/wit/test.wit @@ -26,3 +26,16 @@ world test-command { import wasi:cli/stdout import wasi:cli/stderr } + +world test-command-with-sockets { + import wasi:poll/poll + import wasi:io/streams + import wasi:cli/environment + import wasi:cli/stdin + import wasi:cli/stdout + import wasi:cli/stderr + import wasi:sockets/tcp + import wasi:sockets/tcp-create-socket + import wasi:sockets/network + import wasi:sockets/instance-network +} diff --git a/supply-chain/audits.toml b/supply-chain/audits.toml index 3532b776b806..79ceb44e75fd 100644 --- a/supply-chain/audits.toml +++ b/supply-chain/audits.toml @@ -1861,6 +1861,19 @@ few minor issues related to Stacked Borrows and running in MIRI. No fundamental change to any preexisting unsafe code is happening here. """ +[[audits.smallvec]] +who = "Dan Gohman " +criteria = "safe-to-deploy" +delta = "1.8.0 -> 1.11.0" +notes = """ +The main change is the switch to use `NonNull` internally instead of +`*mut T`. This seems reasonable, as `Vec` also never stores a null pointer, +and in particular the new `NonNull::new_unchecked`s look ok. + +Most of the rest of the changes are adding some new unstable features which +aren't enabled by default. +""" + [[audits.socket2]] who = "Alex Crichton " criteria = "safe-to-deploy" @@ -2877,6 +2890,12 @@ user-id = 6825 # Dan Gohman (sunfishcode) start = "2020-12-11" end = "2024-07-14" +[[trusted.cap-net-ext]] +criteria = "safe-to-deploy" +user-id = 6825 # Dan Gohman (sunfishcode) +start = "2020-12-11" +end = "2024-07-14" + [[trusted.cap-primitives]] criteria = "safe-to-deploy" user-id = 6825 # Dan Gohman (sunfishcode) diff --git a/supply-chain/imports.lock b/supply-chain/imports.lock index ca06104a07c9..f0b588d9ec15 100644 --- a/supply-chain/imports.lock +++ b/supply-chain/imports.lock @@ -288,6 +288,13 @@ user-id = 6825 user-login = "sunfishcode" user-name = "Dan Gohman" +[[publisher.cap-net-ext]] +version = "2.0.0" +when = "2023-06-30" +user-id = 6825 +user-login = "sunfishcode" +user-name = "Dan Gohman" + [[publisher.cap-primitives]] version = "2.0.0" when = "2023-06-30" @@ -621,6 +628,13 @@ user-id = 6825 user-login = "sunfishcode" user-name = "Dan Gohman" +[[publisher.rustix]] +version = "0.38.8" +when = "2023-08-10" +user-id = 6825 +user-login = "sunfishcode" +user-name = "Dan Gohman" + [[publisher.ryu]] version = "1.0.9" when = "2021-12-12"