From 448c3507652006dc818b28a8a26eef7cf63c5f69 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Mon, 4 Jan 2021 13:09:25 +0100 Subject: [PATCH] Reintroduce feature flags for tokio vs async-io. To avoid having an extra reactor thread running for tokio users and to make sure all TCP I/O uses the mio-based tokio reactor. Thereby run tests with both backends. --- Cargo.toml | 5 +- src/lib.rs | 20 +- transports/tcp/Cargo.toml | 16 +- transports/tcp/src/lib.rs | 595 +++++++++++++++++++++++--------------- 4 files changed, 384 insertions(+), 252 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d4cc4eebb8b..6287bba2bba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ default = [ "pnet", "request-response", "secp256k1", - "tcp", + "tcp-async-io", "uds", "wasm-ext", "websocket", @@ -44,7 +44,8 @@ ping = ["libp2p-ping"] plaintext = ["libp2p-plaintext"] pnet = ["libp2p-pnet"] request-response = ["libp2p-request-response"] -tcp = ["libp2p-tcp"] +tcp-async-io = ["libp2p-tcp", "libp2p-tcp/async-io"] +tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"] uds = ["libp2p-uds"] wasm-ext = ["libp2p-wasm-ext"] wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext/websocket"] diff --git a/src/lib.rs b/src/lib.rs index 2c019ddc849..36749890408 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -215,7 +215,7 @@ pub use libp2p_ping as ping; pub use libp2p_plaintext as plaintext; #[doc(inline)] pub use libp2p_swarm as swarm; -#[cfg(feature = "tcp")] +#[cfg(any(feature = "tcp-async-io", feature = "tcp-tokio"))] #[cfg_attr(docsrs, doc(cfg(feature = "tcp")))] #[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))] #[doc(inline)] @@ -268,8 +268,8 @@ pub use self::transport_ext::TransportExt; /// /// > **Note**: This `Transport` is not suitable for production usage, as its implementation /// > reserves the right to support additional protocols or remove deprecated protocols. -#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp", feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))] -#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))] +#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))] +#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))] pub fn build_development_transport(keypair: identity::Keypair) -> std::io::Result> { @@ -280,13 +280,16 @@ pub fn build_development_transport(keypair: identity::Keypair) /// /// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, /// and mplex or yamux as the multiplexing layer. -#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp", feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))] -#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))] +#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))] +#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))] pub fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair) -> std::io::Result> { let transport = { + #[cfg(feature = "tcp-async-io")] let tcp = tcp::TcpConfig::new().nodelay(true); + #[cfg(feature = "tcp-tokio")] + let tcp = tcp::TokioTcpConfig::new().nodelay(true); let transport = dns::DnsConfig::new(tcp)?; let trans_clone = transport.clone(); transport.or_transport(websocket::WsConfig::new(trans_clone)) @@ -308,13 +311,16 @@ pub fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair) /// /// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, /// and mplex or yamux as the multiplexing layer. -#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp", feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))] -#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))))] +#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))] +#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))))] pub fn build_tcp_ws_pnet_noise_mplex_yamux(keypair: identity::Keypair, psk: PreSharedKey) -> std::io::Result> { let transport = { + #[cfg(feature = "tcp-async-io")] let tcp = tcp::TcpConfig::new().nodelay(true); + #[cfg(feature = "tcp-tokio")] + let tcp = tcp::TokioTcpConfig::new().nodelay(true); let transport = dns::DnsConfig::new(tcp)?; let trans_clone = transport.clone(); transport.or_transport(websocket::WsConfig::new(trans_clone)) diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 707504624eb..258c822a936 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -10,14 +10,24 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-io = "1.2.0" +async-io-crate = { package = "async-io", version = "1.2.0", optional = true } futures = "0.3.8" -if-watch = "0.1.4" +futures-timer = "3.0" +if-watch = { version = "0.1.4", optional = true } +if-addrs = { version = "0.6.4", optional = true } +ipnet = "2.0.0" libc = "0.2.80" libp2p-core = { version = "0.26.0", path = "../../core" } log = "0.4.11" socket2 = { version = "0.3.17", features = ["reuseport"] } +tokio-crate = { package = "tokio", version = "0.3", default-features = false, features = ["net"], optional = true } + +[features] +default = ["async-io"] +tokio = ["tokio-crate", "if-addrs"] +async-io = ["async-io-crate", "if-watch"] [dev-dependencies] -async-std = { version = "1.7.0", features = ["attributes"] } +async-std = { version = "1.6.5", features = ["attributes"] } +tokio-crate = { package = "tokio", version = "0.3", default-features = false, features = ["net", "rt"] } env_logger = "0.8.2" diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index c0bc447b44d..d5703c7ec7d 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -22,16 +22,32 @@ //! //! # Usage //! -//! This crate provides `TcpConfig` which implements the `Transport` trait -//! for use as a transport with `libp2p-core` or `libp2p-swarm`. +//! This crate provides a `TcpConfig` and `TokioTcpConfig`, depending on +//! the enabled features, which implement the `Transport` trait for use as a +//! transport with `libp2p-core` or `libp2p-swarm`. + +mod provider; + +#[cfg(feature = "async-io")] +pub use provider::async_io; + +/// The type of a [`GenTcpConfig`] using the `async-io` implementation. +#[cfg(feature = "async-io")] +pub type TcpConfig = GenTcpConfig; + +#[cfg(feature = "tokio")] +pub use provider::tokio; + +/// The type of a [`GenTcpConfig`] using the `tokio` implementation. +#[cfg(feature = "tokio")] +pub type TokioTcpConfig = GenTcpConfig; -use async_io::{Async, Timer}; use futures::{ future::{self, BoxFuture, Ready}, prelude::*, ready, }; -use if_watch::{IfWatcher, IfEvent}; +use futures_timer::Delay; use libp2p_core::{ address_translation, multiaddr::{Multiaddr, Protocol}, @@ -41,19 +57,25 @@ use socket2::{Domain, Socket, Type}; use std::{ collections::HashSet, io, - net::{SocketAddr, IpAddr, TcpListener, TcpStream}, + net::{SocketAddr, IpAddr, TcpListener}, pin::Pin, sync::{Arc, RwLock}, task::{Context, Poll}, time::Duration, }; -/// Represents the configuration for a TCP/IP transport capability for libp2p. +use provider::{Provider, IfEvent}; + +/// The configuration for a TCP/IP transport capability for libp2p. /// -/// The TCP sockets created by libp2p will need to be progressed by running the futures and streams -/// obtained by libp2p through the tokio reactor. +/// A [`GenTcpConfig`] implements the [`Transport`] interface and thus +/// is consumed on [`Transport::listen_on`] and [`Transport::dial`]. +/// However, the config can be cheaply cloned to perform multiple such +/// operations with the same config. #[derive(Clone, Debug)] -pub struct TcpConfig { +pub struct GenTcpConfig { + /// The type of the I/O provider. + _impl: std::marker::PhantomData, /// TTL to set for opened sockets, or `None` to keep default. ttl: Option, /// `TCP_NODELAY` to set for opened sockets, or `None` to keep default. @@ -138,23 +160,27 @@ impl PortReuse { } } -impl TcpConfig { +impl GenTcpConfig +where + T: Provider + Send, +{ /// Creates a new configuration for a TCP/IP transport: /// /// * Nagle's algorithm, i.e. `TCP_NODELAY`, is _enabled_. - /// See [`TcpConfig::nodelay`]. + /// See [`GenTcpConfig::nodelay`]. /// * Reuse of listening ports is _disabled_. - /// See [`TcpConfig::port_reuse`]. + /// See [`GenTcpConfig::port_reuse`]. /// * No custom `IP_TTL` is set. The default of the OS TCP stack applies. - /// See [`TcpConfig::ttl`]. + /// See [`GenTcpConfig::ttl`]. /// * The size of the listen backlog for new listening sockets is `1024`. - /// See [`TcpConfig::listen_backlog`]. + /// See [`GenTcpConfig::listen_backlog`]. pub fn new() -> Self { Self { ttl: None, nodelay: None, backlog: 1024, port_reuse: PortReuse::Disabled, + _impl: std::marker::PhantomData, } } @@ -213,23 +239,24 @@ impl TcpConfig { /// > a single outgoing connection to a particular address and port /// > of a peer per local listening socket address. /// - /// If enabled, the returned `TcpConfig` and all of its `Clone`s + /// If enabled, the returned `GenTcpConfig` and all of its `Clone`s /// keep track of the listen socket addresses as they are reported - /// by polling [`TcpListenStream`]s obtained from [`TcpConfig::listen_on()`]. + /// by polling [`TcpListenStream`]s obtained from [`GenTcpConfig::listen_on()`]. /// - /// In contrast, two `TcpConfig`s constructed separately via [`TcpConfig::new()`] + /// In contrast, two `GenTcpConfig`s constructed separately via [`GenTcpConfig::new()`] /// maintain these addresses independently. It is thus possible to listen on /// multiple addresses, enabling port reuse for each, knowing exactly which - /// listen address is reused when dialing with a specific `TcpConfig`, as in + /// listen address is reused when dialing with a specific `GenTcpConfig`, as in /// the following example: /// /// ```no_run /// # use libp2p_core::transport::ListenerEvent; - /// # use libp2p_tcp::TcpConfig; /// # use libp2p_core::{Multiaddr, Transport}; /// # use futures::stream::StreamExt; - /// # #[async_std::main] - /// # async fn main() -> std::io::Result<()> { + /// #[cfg(feature = "async-io")] + /// #[async_std::main] + /// async fn main() -> std::io::Result<()> { + /// use libp2p_tcp::TcpConfig; /// /// let listen_addr1: Multiaddr = "/ip4/127.0.0.1/tcp/9001".parse().unwrap(); /// let listen_addr2: Multiaddr = "/ip4/127.0.0.1/tcp/9002".parse().unwrap(); @@ -255,17 +282,17 @@ impl TcpConfig { /// } /// _ => {} /// } - /// # Ok(()) - /// # } + /// Ok(()) + /// } /// ``` /// - /// If a single `TcpConfig` is used and cloned for the creation of multiple + /// If a single `GenTcpConfig` is used and cloned for the creation of multiple /// listening sockets or a wildcard listen socket address is used to listen /// on any interface, there can be multiple such addresses registered for /// port reuse. In this case, one is chosen whose IP protocol version and /// loopback status is the same as that of the remote address. Consequently, for /// maximum control of the local listening addresses and ports that are used - /// for outgoing connections, a new `TcpConfig` should be created for each + /// for outgoing connections, a new `GenTcpConfig` should be created for each /// listening socket, avoiding the use of wildcard addresses which bind a /// socket to all network interfaces. /// @@ -308,15 +335,14 @@ impl TcpConfig { Ok(socket) } - fn do_listen(self, socket_addr: SocketAddr) -> io::Result { + fn do_listen(self, socket_addr: SocketAddr) -> io::Result> { let socket = self.create_socket(&socket_addr)?; socket.bind(&socket_addr.into())?; socket.listen(self.backlog as _)?; - let listener = Async::new(socket.into_tcp_listener())?; - TcpListenStream::new(listener, self.port_reuse) + TcpListenStream::::new(socket.into_tcp_listener(), self.port_reuse) } - async fn do_dial(self, socket_addr: SocketAddr) -> Result, io::Error> { + async fn do_dial(self, socket_addr: SocketAddr) -> Result { let socket = self.create_socket(&socket_addr)?; if let Some(addr) = self.port_reuse.local_dial_addr(&socket_addr.ip()) { @@ -332,17 +358,25 @@ impl TcpConfig { Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} Err(err) => return Err(err), }; - let stream = Async::new(socket.into_tcp_stream())?; - stream.writable().await?; + + // let stream = Async::new(socket.into_tcp_stream())?; + let stream = T::new_stream(socket.into_tcp_stream()).await?; + // stream.writable().await?; Ok(stream) } } -impl Transport for TcpConfig { - type Output = Async; +impl Transport for GenTcpConfig +where + T: Provider + Send + 'static, + T::Listener: Unpin, + T::IfWatcher: Unpin, + T::Stream: Unpin, +{ + type Output = T::Stream; type Error = io::Error; type Dial = Pin> + Send>>; - type Listener = TcpListenStream; + type Listener = TcpListenStream; type ListenerUpgrade = Ready>; fn listen_on(self, addr: Multiaddr) -> Result> { @@ -394,15 +428,15 @@ impl Transport for TcpConfig { } } -type TcpListenerEvent = ListenerEvent, io::Error>>, io::Error>; +type TcpListenerEvent = ListenerEvent>, io::Error>; -enum IfWatch { - Pending(BoxFuture<'static, io::Result>), - Ready(IfWatcher), +enum IfWatch { + Pending(BoxFuture<'static, io::Result>), + Ready(TIfWatcher), } /// The listening addresses of a [`TcpListenStream`]. -enum InAddr { +enum InAddr { /// The stream accepts connections on a single interface. One { addr: IpAddr, @@ -411,24 +445,27 @@ enum InAddr { /// The stream accepts connections on all interfaces. Any { addrs: HashSet, - if_watch: IfWatch, + if_watch: IfWatch, } } /// A stream of incoming connections on one or more interfaces. -pub struct TcpListenStream { +pub struct TcpListenStream +where + T: Provider +{ /// The socket address that the listening socket is bound to, /// which may be a "wildcard address" like `INADDR_ANY` or `IN6ADDR_ANY` /// when listening on all interfaces for IPv4 respectively IPv6 connections. listen_addr: SocketAddr, /// The async listening socket for incoming connections. - listener: Async, + listener: T::Listener, /// The IP addresses of network interfaces on which the listening socket /// is accepting connections. /// /// If the listen socket listens on all interfaces, these may change over /// time as interfaces become available or unavailable. - in_addr: InAddr, + in_addr: InAddr, /// The port reuse configuration for outgoing connections. /// /// If enabled, all IP addresses on which this listening stream @@ -440,16 +477,17 @@ pub struct TcpListenStream { /// to accept a new connection. sleep_on_error: Duration, /// The current pause, if any. - pause: Option, + pause: Option, } -impl TcpListenStream { +impl TcpListenStream +where + T: Provider +{ /// Constructs a `TcpListenStream` for incoming connections around /// the given `TcpListener`. - fn new(listener: Async, port_reuse: PortReuse) - -> io::Result - { - let listen_addr = listener.get_ref().local_addr()?; + fn new(listener: TcpListener, port_reuse: PortReuse) -> io::Result { + let listen_addr = listener.local_addr()?; let in_addr = if match &listen_addr { SocketAddr::V4(a) => a.ip().is_unspecified(), @@ -459,7 +497,7 @@ impl TcpListenStream { // `TcpListenStream` is polled. InAddr::Any { addrs: HashSet::new(), - if_watch: IfWatch::Pending(IfWatcher::new().boxed()), + if_watch: IfWatch::Pending(T::if_watcher()), } } else { InAddr::One { @@ -468,6 +506,8 @@ impl TcpListenStream { } }; + let listener = T::new_listener(listener)?; + Ok(TcpListenStream { port_reuse, listener, @@ -498,14 +538,23 @@ impl TcpListenStream { } } -impl Drop for TcpListenStream { +impl Drop for TcpListenStream +where + T: Provider +{ fn drop(&mut self) { self.disable_port_reuse(); } } -impl Stream for TcpListenStream { - type Item = Result; +impl Stream for TcpListenStream +where + T: Provider, + T::Listener: Unpin, + T::Stream: Unpin, + T::IfWatcher: Unpin, +{ + type Item = Result, io::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let me = Pin::into_inner(self); @@ -519,13 +568,18 @@ impl Stream for TcpListenStream { *if_watch = IfWatch::Ready(w); continue } - Err(e) => { - me.disable_port_reuse(); - return Poll::Ready(Some(Err(e))) + Err(err) => { + log::debug! { + "Failed to begin observing interfaces: {:?}. Scheduling retry.", + err + }; + *if_watch = IfWatch::Pending(T::if_watcher()); + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(ListenerEvent::Error(err)))); } }, // Consume all events for up/down interface changes. - IfWatch::Ready(watch) => while let Some(ev) = watch.next().now_or_never() { + IfWatch::Ready(watch) => while let Poll::Ready(ev) = T::poll_interfaces(watch, cx) { match ev { Ok(IfEvent::Up(inet)) => { let ip = inet.addr(); @@ -549,9 +603,13 @@ impl Stream for TcpListenStream { } } } - Err(e) => { - me.disable_port_reuse(); - return Poll::Ready(Some(Err(e))) + Err(err) => { + log::debug! { + "Failure polling interfaces: {:?}. Scheduling retry.", + err + }; + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(ListenerEvent::Error(err)))); } } }, @@ -574,41 +632,25 @@ impl Stream for TcpListenStream { } } - // Check if the listener is ready to accept a new connection. - match me.listener.poll_readable(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(err)) => { - me.disable_port_reuse(); - return Poll::Ready(Some(Err(err))) - } - } - // Take the pending connection from the backlog. - let (stream, sock_addr) = match me.listener.accept().now_or_never() { - Some(Ok(res)) => res, - Some(Err(e)) => { + let incoming = match T::poll_accept(&mut me.listener, cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(incoming)) => incoming, + Poll::Ready(Err(e)) => { // These errors are non-fatal for the listener stream. log::error!("error accepting incoming connection: {}", e); - me.pause = Some(Timer::after(me.sleep_on_error)); + me.pause = Some(Delay::new(me.sleep_on_error)); return Poll::Ready(Some(Ok(ListenerEvent::Error(e)))); } - None => unreachable!("poll_readable() signaled readiness"), }; - let local_addr = match stream.get_ref().local_addr() { - Ok(sock_addr) => ip_to_multiaddr(sock_addr.ip(), sock_addr.port()), - Err(err) => { - log::error!("Failed to get local address of incoming socket: {:?}", err); - continue; - } - }; - let remote_addr = ip_to_multiaddr(sock_addr.ip(), sock_addr.port()); + let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port()); + let remote_addr = ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port()); log::debug!("Incoming connection from {} at {}", remote_addr, local_addr); return Poll::Ready(Some(Ok(ListenerEvent::Upgrade { - upgrade: future::ok(stream), + upgrade: future::ok(incoming.stream), local_addr, remote_addr, }))); @@ -698,42 +740,62 @@ mod tests { fn communicating_between_dialer_and_listener() { env_logger::try_init().ok(); - fn test(addr: Multiaddr) { - let (mut ready_tx, mut ready_rx) = mpsc::channel(1); - - async_std::task::spawn(async move { - let tcp = TcpConfig::new(); - let mut listener = tcp.listen_on(addr).unwrap(); - - loop { - match listener.next().await.unwrap().unwrap() { - ListenerEvent::NewAddress(listen_addr) => { - ready_tx.send(listen_addr).await.unwrap(); - } - ListenerEvent::Upgrade { upgrade, .. } => { - let mut upgrade = upgrade.await.unwrap(); - let mut buf = [0u8; 3]; - upgrade.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, [1, 2, 3]); - upgrade.write_all(&[4, 5, 6]).await.unwrap(); - } - _ => unreachable!(), + async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { + let tcp = GenTcpConfig::::new(); + let mut listener = tcp.listen_on(addr).unwrap(); + loop { + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(listen_addr) => { + ready_tx.send(listen_addr).await.unwrap(); + } + ListenerEvent::Upgrade { upgrade, .. } => { + let mut upgrade = upgrade.await.unwrap(); + let mut buf = [0u8; 3]; + upgrade.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [1, 2, 3]); + upgrade.write_all(&[4, 5, 6]).await.unwrap(); + return } + e => panic!("Unexpected listener event: {:?}", e), } - }); + } + } - async_std::task::block_on(async move { - let addr = ready_rx.next().await.unwrap(); - let tcp = TcpConfig::new(); + async fn dialer(mut ready_rx: mpsc::Receiver) { + let addr = ready_rx.next().await.unwrap(); + let tcp = GenTcpConfig::::new(); - // Obtain a future socket through dialing - let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap(); - socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + // Obtain a future socket through dialing + let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap(); + socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); - let mut buf = [0u8; 3]; - socket.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, [4, 5, 6]); - }); + let mut buf = [0u8; 3]; + socket.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [4, 5, 6]); + } + + fn test(addr: Multiaddr) { + #[cfg(feature = "async-io")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(ready_rx); + let listener = async_std::task::spawn(listener); + async_std::task::block_on(dialer); + async_std::task::block_on(listener); + } + + #[cfg(feature = "tokio")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(ready_rx); + let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap(); + let tasks = tokio_crate::task::LocalSet::new(); + let listener = tasks.spawn_local(listener); + tasks.block_on(&rt, dialer); + tasks.block_on(&rt, listener).unwrap(); + } } test("/ip4/127.0.0.1/tcp/0".parse().unwrap()); @@ -744,39 +806,60 @@ mod tests { fn wildcard_expansion() { env_logger::try_init().ok(); - fn test(addr: Multiaddr) { - let (mut ready_tx, mut ready_rx) = mpsc::channel(1); + async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { + let tcp = GenTcpConfig::::new(); + let mut listener = tcp.listen_on(addr).unwrap(); - async_std::task::spawn(async move { - let tcp = TcpConfig::new(); - let mut listener = tcp.listen_on(addr).unwrap(); - - loop { - match listener.next().await.unwrap().unwrap() { - ListenerEvent::NewAddress(a) => { - let mut iter = a.iter(); - match iter.next().expect("ip address") { - Protocol::Ip4(ip) => assert!(!ip.is_unspecified()), - Protocol::Ip6(ip) => assert!(!ip.is_unspecified()), - other => panic!("Unexpected protocol: {}", other), - } - if let Protocol::Tcp(port) = iter.next().expect("port") { - assert_ne!(0, port) - } else { - panic!("No TCP port in address: {}", a) - } - ready_tx.send(a).await.ok(); + loop { + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(a) => { + let mut iter = a.iter(); + match iter.next().expect("ip address") { + Protocol::Ip4(ip) => assert!(!ip.is_unspecified()), + Protocol::Ip6(ip) => assert!(!ip.is_unspecified()), + other => panic!("Unexpected protocol: {}", other), + } + if let Protocol::Tcp(port) = iter.next().expect("port") { + assert_ne!(0, port) + } else { + panic!("No TCP port in address: {}", a) } - _ => {} + ready_tx.send(a).await.ok(); + return } + _ => {} } - }); + } + } - async_std::task::block_on(async move { - let dest_addr = ready_rx.next().await.unwrap(); - let tcp = TcpConfig::new(); - tcp.dial(dest_addr).unwrap().await.unwrap(); - }); + async fn dialer(mut ready_rx: mpsc::Receiver) { + let dest_addr = ready_rx.next().await.unwrap(); + let tcp = GenTcpConfig::::new(); + tcp.dial(dest_addr).unwrap().await.unwrap(); + } + + fn test(addr: Multiaddr) { + #[cfg(feature = "async-io")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(ready_rx); + let listener = async_std::task::spawn(listener); + async_std::task::block_on(dialer); + async_std::task::block_on(listener); + } + + #[cfg(feature = "tokio")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(ready_rx); + let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap(); + let tasks = tokio_crate::task::LocalSet::new(); + let listener = tasks.spawn_local(listener); + tasks.block_on(&rt, dialer); + tasks.block_on(&rt, listener).unwrap(); + } } test("/ip4/0.0.0.0/tcp/0".parse().unwrap()); @@ -787,47 +870,67 @@ mod tests { fn port_reuse_dialing() { env_logger::try_init().ok(); - fn test(addr: Multiaddr) { - let (mut ready_tx, mut ready_rx) = mpsc::channel(1); - let addr2 = addr.clone(); - - async_std::task::spawn(async move { - let tcp = TcpConfig::new(); - let mut listener = tcp.listen_on(addr).unwrap(); - - loop { - match listener.next().await.unwrap().unwrap() { - ListenerEvent::NewAddress(listen_addr) => { - ready_tx.send(listen_addr).await.ok(); - } - ListenerEvent::Upgrade { upgrade, .. } => { - let mut upgrade = upgrade.await.unwrap(); - let mut buf = [0u8; 3]; - upgrade.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, [1, 2, 3]); - upgrade.write_all(&[4, 5, 6]).await.unwrap(); - } - _ => unreachable!(), - } - } - }); - - async_std::task::block_on(async move { - let dest_addr = ready_rx.next().await.unwrap(); - let tcp = TcpConfig::new().port_reuse(true); - let mut listener = tcp.clone().listen_on(addr2).unwrap(); + async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { + let tcp = GenTcpConfig::::new(); + let mut listener = tcp.listen_on(addr).unwrap(); + loop { match listener.next().await.unwrap().unwrap() { - ListenerEvent::NewAddress(_) => { - // Obtain a future socket through dialing - let mut socket = tcp.dial(dest_addr).unwrap().await.unwrap(); - socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + ListenerEvent::NewAddress(listen_addr) => { + ready_tx.send(listen_addr).await.ok(); + } + ListenerEvent::Upgrade { upgrade, .. } => { + let mut upgrade = upgrade.await.unwrap(); let mut buf = [0u8; 3]; - socket.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, [4, 5, 6]); + upgrade.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [1, 2, 3]); + upgrade.write_all(&[4, 5, 6]).await.unwrap(); + return } - e => panic!("Unexpected listener event: {:?}", e) + e => panic!("Unexpected event: {:?}", e), } - }); + } + } + + async fn dialer(addr: Multiaddr, mut ready_rx: mpsc::Receiver) { + let dest_addr = ready_rx.next().await.unwrap(); + let tcp = GenTcpConfig::::new().port_reuse(true); + let mut listener = tcp.clone().listen_on(addr).unwrap(); + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(_) => { + // Obtain a future socket through dialing + let mut socket = tcp.dial(dest_addr).unwrap().await.unwrap(); + socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + // socket.flush().await; + let mut buf = [0u8; 3]; + socket.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [4, 5, 6]); + } + e => panic!("Unexpected listener event: {:?}", e) + } + } + + fn test(addr: Multiaddr) { + #[cfg(feature = "async-io")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(addr.clone(), ready_rx); + let listener = async_std::task::spawn(listener); + async_std::task::block_on(dialer); + async_std::task::block_on(listener); + } + + #[cfg(feature = "tokio")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(addr.clone(), ready_rx); + let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap(); + let tasks = tokio_crate::task::LocalSet::new(); + let listener = tasks.spawn_local(listener); + tasks.block_on(&rt, dialer); + tasks.block_on(&rt, listener).unwrap(); + } } test("/ip4/127.0.0.1/tcp/0".parse().unwrap()); @@ -838,84 +941,96 @@ mod tests { fn port_reuse_listening() { env_logger::try_init().ok(); - fn test(addr: Multiaddr) { - let tcp = TcpConfig::new().port_reuse(true); - - async_std::task::spawn(async move { - let mut listener1 = tcp.clone().listen_on(addr).unwrap(); - match listener1.next().await.unwrap().unwrap() { - ListenerEvent::NewAddress(addr1) => { - // Listen on the same address a second time. - let mut listener2 = tcp.clone().listen_on(addr1.clone()).unwrap(); - match listener2.next().await.unwrap().unwrap() { - ListenerEvent::NewAddress(addr2) => { - assert_eq!(addr1, addr2); - return - } - e => panic!("Unexpected listener event: {:?}", e), + async fn listen_twice(addr: Multiaddr) { + let tcp = GenTcpConfig::::new().port_reuse(true); + let mut listener1 = tcp.clone().listen_on(addr).unwrap(); + match listener1.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(addr1) => { + // Listen on the same address a second time. + let mut listener2 = tcp.clone().listen_on(addr1.clone()).unwrap(); + match listener2.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(addr2) => { + assert_eq!(addr1, addr2); + return } + e => panic!("Unexpected listener event: {:?}", e), } - e => panic!("Unexpected listener event: {:?}", e), } - }); + e => panic!("Unexpected listener event: {:?}", e), + } } - test("/ip4/127.0.0.1/tcp/0".parse().unwrap()); - } - - #[async_std::test] - async fn replace_port_0_in_returned_multiaddr_ipv4() { - env_logger::try_init().ok(); - - let tcp = TcpConfig::new(); - - let addr = "/ip4/127.0.0.1/tcp/0".parse::().unwrap(); - assert!(addr.to_string().contains("tcp/0")); + fn test(addr: Multiaddr) { + #[cfg(feature = "async-io")] + { + let listener = listen_twice::(addr.clone()); + async_std::task::block_on(listener); + } - let new_addr = tcp - .listen_on(addr) - .unwrap() - .next() - .await - .expect("some event") - .expect("no error") - .into_new_address() - .expect("listen address"); + #[cfg(feature = "tokio")] + { + let listener = listen_twice::(addr.clone()); + let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap(); + rt.block_on(listener); + } + } - assert!(!new_addr.to_string().contains("tcp/0")); + test("/ip4/127.0.0.1/tcp/0".parse().unwrap()); } - #[async_std::test] - async fn replace_port_0_in_returned_multiaddr_ipv6() { + #[test] + fn listen_port_0() { env_logger::try_init().ok(); - let tcp = TcpConfig::new(); + async fn listen(addr: Multiaddr) -> Multiaddr { + GenTcpConfig::::new() + .listen_on(addr) + .unwrap() + .next() + .await + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address") + } - let addr: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap(); - assert!(addr.to_string().contains("tcp/0")); + fn test(addr: Multiaddr) { + #[cfg(feature = "async-io")] + { + let new_addr = async_std::task::block_on(listen::(addr.clone())); + assert!(!new_addr.to_string().contains("tcp/0")); + } - let new_addr = tcp - .listen_on(addr) - .unwrap() - .next() - .await - .expect("some event") - .expect("no error") - .into_new_address() - .expect("listen address"); + #[cfg(feature = "tokio")] + { + let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap(); + let new_addr = rt.block_on(listen::(addr.clone())); + assert!(!new_addr.to_string().contains("tcp/0")); + } + } - assert!(!new_addr.to_string().contains("tcp/0")); + test("/ip6/::1/tcp/0".parse().unwrap()); + test("/ip4/127.0.0.1/tcp/0".parse().unwrap()); } - #[async_std::test] - async fn larger_addr_denied() { + #[test] + fn listen_invalid_addr() { env_logger::try_init().ok(); - let tcp = TcpConfig::new(); + fn test(addr: Multiaddr) { + #[cfg(feature = "async-io")] + { + let tcp = TcpConfig::new(); + assert!(tcp.listen_on(addr.clone()).is_err()); + } + + #[cfg(feature = "tokio")] + { + let tcp = TokioTcpConfig::new(); + assert!(tcp.listen_on(addr.clone()).is_err()); + } + } - let addr = "/ip4/127.0.0.1/tcp/12345/tcp/12345" - .parse::() - .unwrap(); - assert!(tcp.listen_on(addr).is_err()); + test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap()); } }