From 36728d67c27d03dd76a3029b8af9cf446d07a14d Mon Sep 17 00:00:00 2001 From: sgdxbc Date: Sat, 13 Apr 2024 10:28:51 +0000 Subject: [PATCH] Generalize Dispatch over address type --- crates/entropy/src/main.rs | 4 +- examples/bench-unreplicated.rs | 12 ++-- examples/stress-connect.rs | 2 +- src/net/dispatch.rs | 114 ++++++++++++++++++--------------- src/net/session/quic.rs | 14 ++-- src/net/session/tcp.rs | 10 +-- 6 files changed, 83 insertions(+), 73 deletions(-) diff --git a/crates/entropy/src/main.rs b/crates/entropy/src/main.rs index e81900db..053ef75f 100644 --- a/crates/entropy/src/main.rs +++ b/crates/entropy/src/main.rs @@ -296,7 +296,7 @@ async fn start_peer( // it done) kademlia::Peer::<_, _, _, BlackHole, _>::new( buckets, - Box::new(MessageNet::new(dispatch::Net(tcp_control_session.sender()))) + Box::new(MessageNet::new(dispatch::Net::from(tcp_control_session.sender()))) as Box + Send + Sync>, // MessageNet::new(DispatchNet(Sender::from(quic_control_session.sender()))), Sender::from(kademlia_control_session.sender()), @@ -308,7 +308,7 @@ async fn start_peer( ), )); let mut kademlia_control = Blanket(Buffered::from(Control::new( - Box::new(dispatch::Net(tcp_control_session.sender())) + Box::new(dispatch::Net::from(tcp_control_session.sender())) as Box + Send + Sync>, // DispatchNet(Sender::from(quic_control_session.sender())), Box::new(Sender::from(kademlia_session.sender())) diff --git a/examples/bench-unreplicated.rs b/examples/bench-unreplicated.rs index d4298ea3..198dbab1 100644 --- a/examples/bench-unreplicated.rs +++ b/examples/bench-unreplicated.rs @@ -117,7 +117,7 @@ async fn main() -> anyhow::Result<()> { erased::session::Sender::from(close_loop_session.sender()), )); let mut state_sender = state_session.sender(); - let mut tcp_control = Dispatch::<_, _, bytes::Bytes, _>::new( + let mut tcp_control = Dispatch::<_, _, _, bytes::Bytes, _>::new( Tcp::new(None)?, move |buf: &_| to_client_on_buf(buf, &mut state_sender), // effectively disable connection table clean up @@ -141,7 +141,7 @@ async fn main() -> anyhow::Result<()> { ); } else { let mut tcp_session = Session::new(); - let raw_net = Net(tcp_session.sender()); + let raw_net = Net::from(tcp_session.sender()); let mut state = Unify(Client::new( id, listener.local_addr()?, @@ -173,7 +173,7 @@ async fn main() -> anyhow::Result<()> { } } else if flag_quic { let mut quic_session = Session::new(); - let raw_net = Net(quic_session.sender()); + let raw_net = Net::from(quic_session.sender()); let quic = Quic::new(client_addr)?; let mut state = Unify(Client::new( id, @@ -317,7 +317,7 @@ async fn main() -> anyhow::Result<()> { let mut state = Unify(Replica::new(Null, ToClientMessageNet::new(simplex::Tcp))); let mut state_session = Session::new(); let mut state_sender = state_session.sender(); - let mut tcp_control = Dispatch::<_, _, bytes::Bytes, _>::new( + let mut tcp_control = Dispatch::<_, _, _, bytes::Bytes, _>::new( Tcp::new(None)?, move |buf: &_| to_replica_on_buf::(buf, &mut state_sender), BlackHole, @@ -330,7 +330,7 @@ async fn main() -> anyhow::Result<()> { } let mut tcp_session = Session::new(); - let raw_net = Net(tcp_session.sender()); + let raw_net = Net::from(tcp_session.sender()); let mut state = Unify(Replica::new(Null, ToClientMessageNet::new(raw_net))); let mut state_session = Session::new(); let mut state_sender = state_session.sender(); @@ -359,7 +359,7 @@ async fn main() -> anyhow::Result<()> { if flag_quic { let mut quic_session = Session::new(); - let raw_net = Net(quic_session.sender()); + let raw_net = Net::from(quic_session.sender()); let mut state = Unify(Replica::new(Null, ToClientMessageNet::new(raw_net))); let mut state_session = Session::new(); let mut state_sender = state_session.sender(); diff --git a/examples/stress-connect.rs b/examples/stress-connect.rs index 25733ae6..23b0e9a5 100644 --- a/examples/stress-connect.rs +++ b/examples/stress-connect.rs @@ -64,7 +64,7 @@ async fn main() -> anyhow::Result<()> { quic, control_session.sender(), )); - let mut net = Net(control_session.sender()); + let mut net = Net::from(control_session.sender()); sessions.spawn(async move { control_session.run(&mut control).await }); sessions.spawn(async move { for j in 0..multiplier { diff --git a/src/net/dispatch.rs b/src/net/dispatch.rs index 1d489a9a..70d6e8ba 100644 --- a/src/net/dispatch.rs +++ b/src/net/dispatch.rs @@ -27,10 +27,7 @@ // `Dispatch`. consider the fact that kernel already always maintains a // connection table (and yet another queuing layer), i generally don't satisfy // with this solution -use std::{ - collections::{hash_map::Entry, HashMap}, - net::SocketAddr, -}; +use std::collections::{hash_map::Entry, HashMap}; use derive_where::derive_where; @@ -38,25 +35,25 @@ use tracing::{debug, warn}; use crate::event::{erased, OnEvent, OnTimer, SendEvent, SendEventOnce, Timer}; -use super::{Buf, IterAddr, SendMessage}; +use super::{Addr, Buf, IterAddr, SendMessage}; -#[derive_where(Debug; E, P, P::Sender)] -pub struct Dispatch, B, F> { +#[derive_where(Debug; E, P, P::Sender, A)] +pub struct Dispatch, A, B, F> { protocol: P, - connections: HashMap>, + connections: HashMap>, seq: u32, close_sender: E, #[derive_where(skip)] on_buf: F, } -#[derive_where(Debug; P::Sender)] -struct Connection, B> { - sender: P::Sender, +#[derive(Debug)] +struct Connection { + sender: E, seq: u32, } -impl, B, F> Dispatch { +impl, A, B, F> Dispatch { pub fn new(protocol: P, on_buf: F, close_sender: E) -> anyhow::Result { Ok(Self { protocol, @@ -68,19 +65,22 @@ impl, B, F> Dispatch { } } +// go with typed event for this state machine because it takes a sender that +// sends to itself and must be `Clone` at the same time +// i.e. "the horror" of type erasure #[derive(derive_more::From)] -pub enum Event, B> { +pub enum Event, A, B> { Incoming(Incoming), - Outgoing(Outgoing), - Closed(Closed), + Outgoing(Outgoing), + Closed(Closed), } -pub struct Closed(SocketAddr, u32); +pub struct Closed(A, u32); -pub struct CloseGuard(E, Option, u32); +pub struct CloseGuard(E, Option, u32); -impl> CloseGuard { - pub fn close(self, addr: SocketAddr) -> anyhow::Result<()> { +impl>, A: Addr> CloseGuard { + pub fn close(self, addr: A) -> anyhow::Result<()> { if let Some(also_addr) = self.1 { anyhow::ensure!(addr == also_addr) } @@ -88,38 +88,45 @@ impl> CloseGuard { } } -pub trait Protocol { +pub trait Protocol { type Sender: SendEvent; - fn connect + Send + 'static>( + fn connect> + Send + 'static>( &self, - remote: SocketAddr, + remote: A, on_buf: impl FnMut(&[u8]) -> anyhow::Result<()> + Clone + Send + 'static, - close_guard: CloseGuard, + close_guard: CloseGuard, ) -> Self::Sender; type Incoming; - fn accept + Send + 'static>( + fn accept> + Send + 'static>( connection: Self::Incoming, on_buf: impl FnMut(&[u8]) -> anyhow::Result<()> + Clone + Send + 'static, - close_guard: CloseGuard, - ) -> Option<(SocketAddr, Self::Sender)>; + close_guard: CloseGuard, + ) -> Option<(A, Self::Sender)>; } -pub struct Outgoing(SocketAddr, B); +pub struct Outgoing(A, B); #[derive(Clone)] -pub struct Net(pub E); +pub struct Net(pub E, std::marker::PhantomData); +// mark address type so the following implementations not conflict -impl>, B> SendMessage for Net { - fn send(&mut self, dest: SocketAddr, message: B) -> anyhow::Result<()> { +impl From for Net { + fn from(value: E) -> Self { + Self(value, Default::default()) + } +} + +impl>, A, B> SendMessage for Net { + fn send(&mut self, dest: A, message: B) -> anyhow::Result<()> { self.0.send(Outgoing(dest, message)) } } -impl>, B: Buf> SendMessage, B> for Net { - fn send(&mut self, dest: IterAddr<'_, SocketAddr>, message: B) -> anyhow::Result<()> { +impl>, A, B: Buf> SendMessage, B> for Net { + fn send(&mut self, dest: IterAddr<'_, A>, message: B) -> anyhow::Result<()> { for addr in dest.0 { SendMessage::send(self, addr, message.clone())? } @@ -128,13 +135,14 @@ impl>, B: Buf> SendMessage, B> } impl< - E: SendEventOnce + Clone + Send + 'static, - P: Protocol, + E: SendEventOnce> + Clone + Send + 'static, + P: Protocol, + A: Addr, B: Buf, F: FnMut(&[u8]) -> anyhow::Result<()> + Clone + Send + 'static, - > OnEvent for Dispatch + > OnEvent for Dispatch { - type Event = Event; + type Event = Event; fn on_event(&mut self, event: Self::Event, timer: &mut impl Timer) -> anyhow::Result<()> { match event { @@ -146,22 +154,23 @@ impl< } impl< - E: SendEventOnce + Clone + Send + 'static, - P: Protocol, + E: SendEventOnce> + Clone + Send + 'static, + P: Protocol, + A: Addr, B: Buf, F: FnMut(&[u8]) -> anyhow::Result<()> + Clone + Send + 'static, - > erased::OnEvent> for Dispatch + > erased::OnEvent> for Dispatch { fn on_event( &mut self, - Outgoing(remote, buf): Outgoing, + Outgoing(remote, buf): Outgoing, _: &mut impl Timer, ) -> anyhow::Result<()> { if let Some(connection) = self.connections.get_mut(&remote) { match connection.sender.send(buf.clone()) { Ok(()) => return Ok(()), Err(err) => { - warn!(">=> {remote} connection discontinued: {err}"); + warn!(">=> {remote:?} connection discontinued: {err}"); self.connections.remove(&remote); // in an ideal world the SendError will return the buf back to us, and we can // directly reuse that in below, saving a `clone` above especially for fast path @@ -173,12 +182,12 @@ impl< } } self.seq += 1; - let close_guard = CloseGuard(self.close_sender.clone(), Some(remote), self.seq); + let close_guard = CloseGuard(self.close_sender.clone(), Some(remote.clone()), self.seq); let mut sender = self .protocol - .connect(remote, self.on_buf.clone(), close_guard); + .connect(remote.clone(), self.on_buf.clone(), close_guard); if sender.send(buf).is_err() { - warn!(">=> {remote} new connection immediately fail") + warn!(">=> {remote:?} new connection immediately fail") // we don't try again in such case since the remote is probably never reachable anymore // not sure whether this should be considered as a fatal error. if this is happening, // will it happen for every following outgoing connection? @@ -198,11 +207,12 @@ impl< pub struct Incoming(pub T); impl< - E: SendEventOnce + Clone + Send + 'static, - P: Protocol, + E: SendEventOnce> + Clone + Send + 'static, + P: Protocol, + A: Addr, B: Buf, F: FnMut(&[u8]) -> anyhow::Result<()> + Clone + Send + 'static, - > erased::OnEvent> for Dispatch + > erased::OnEvent> for Dispatch { fn on_event( &mut self, @@ -217,20 +227,20 @@ impl< // always prefer to keep the connection created locally // the connection in `self.connections` may not be created locally, but the incoming // connection is definitely created remotely - if let Entry::Vacant(entry) = self.connections.entry(remote) { + if let Entry::Vacant(entry) = self.connections.entry(remote.clone()) { entry.insert(Connection { sender, seq: self.seq, }); } else { - warn!("<<< {remote} incoming connection from connected address") + warn!("<<< {remote:?} incoming connection from connected address") } Ok(()) } } -impl, B, F> erased::OnEvent for Dispatch { - fn on_event(&mut self, Closed(addr, seq): Closed, _: &mut impl Timer) -> anyhow::Result<()> { +impl, A: Addr, B, F> erased::OnEvent> for Dispatch { + fn on_event(&mut self, Closed(addr, seq): Closed, _: &mut impl Timer) -> anyhow::Result<()> { if let Some(connection) = self.connections.get(&addr) { if connection.seq == seq { debug!(">>> {addr:?} outgoing connection closed"); @@ -241,7 +251,7 @@ impl, B, F> erased::OnEvent for Dispatch { } } -impl, B, F> OnTimer for Dispatch { +impl, A, B, F> OnTimer for Dispatch { fn on_timer(&mut self, _: crate::event::TimerId, _: &mut impl Timer) -> anyhow::Result<()> { unreachable!() } diff --git a/src/net/session/quic.rs b/src/net/session/quic.rs index 3db76ed6..02d88f74 100644 --- a/src/net/session/quic.rs +++ b/src/net/session/quic.rs @@ -86,10 +86,10 @@ impl Quic { } } - async fn write_task>( + async fn write_task>>( connection: quinn::Connection, mut receiver: UnboundedReceiver, - close_guard: CloseGuard, + close_guard: CloseGuard, ) { loop { enum Select { @@ -128,14 +128,14 @@ impl Quic { } } -impl Protocol for Quic { +impl Protocol for Quic { type Sender = UnboundedSender; - fn connect + Send + 'static>( + fn connect> + Send + 'static>( &self, remote: SocketAddr, on_buf: impl FnMut(&[u8]) -> anyhow::Result<()> + Clone + Send + 'static, - close_guard: CloseGuard, + close_guard: CloseGuard, ) -> Self::Sender { let endpoint = self.0.clone(); // tracing::debug!("{:?} connect {remote}", endpoint.local_addr()); @@ -169,10 +169,10 @@ impl Protocol for Quic { type Incoming = quinn::Connection; - fn accept + Send + 'static>( + fn accept> + Send + 'static>( connection: Self::Incoming, on_buf: impl FnMut(&[u8]) -> anyhow::Result<()> + Clone + Send + 'static, - close_guard: CloseGuard, + close_guard: CloseGuard, ) -> Option<(SocketAddr, Self::Sender)> { let remote = connection.remote_address(); tokio::spawn(Self::read_task(connection.clone(), on_buf)); diff --git a/src/net/session/tcp.rs b/src/net/session/tcp.rs index 0d493be9..a8736e1c 100644 --- a/src/net/session/tcp.rs +++ b/src/net/session/tcp.rs @@ -87,14 +87,14 @@ impl Tcp { } } -impl Protocol for Tcp { +impl Protocol for Tcp { type Sender = UnboundedSender; - fn connect>( + fn connect>>( &self, remote: SocketAddr, on_buf: impl FnMut(&[u8]) -> anyhow::Result<()> + Send + 'static, - close_guard: CloseGuard, + close_guard: CloseGuard, ) -> Self::Sender { let preamble = self.0.clone(); let (sender, receiver) = unbounded_channel(); @@ -124,10 +124,10 @@ impl Protocol for Tcp { type Incoming = (TcpPreamble, TcpStream); - fn accept>( + fn accept>>( (preamble, stream): Self::Incoming, on_buf: impl FnMut(&[u8]) -> anyhow::Result<()> + Send + 'static, - close_guard: CloseGuard, + close_guard: CloseGuard, ) -> Option<(SocketAddr, Self::Sender)> { let (read, write) = stream.into_split(); tokio::spawn(Tcp::read_task(read, on_buf, preamble));