diff --git a/crates/entropy/src/main.rs b/crates/entropy/src/main.rs index 053ef75f..301a699c 100644 --- a/crates/entropy/src/main.rs +++ b/crates/entropy/src/main.rs @@ -18,6 +18,7 @@ use augustus::{ DigestHash as _, H256, }, event::{ + self, erased::{ session::{Buffered, Sender}, Blanket, Session, @@ -39,7 +40,6 @@ use axum::{ routing::{get, post}, Json, Router, }; - use entropy::{ BulkService, CodecWorker, Get, GetOk, MessageNet, Net, Peer, Put, PutOk, SendCodecEvent, SendFsEvent, @@ -296,8 +296,9 @@ async fn start_peer( // it done) kademlia::Peer::<_, _, _, BlackHole, _>::new( buckets, - Box::new(MessageNet::new(dispatch::Net::from(tcp_control_session.sender()))) - as Box + Send + Sync>, + Box::new(MessageNet::new(dispatch::Net::from( + tcp_control_session.sender(), + ))) as Box + Send + Sync>, // MessageNet::new(DispatchNet(Sender::from(quic_control_session.sender()))), Sender::from(kademlia_control_session.sender()), Box::new(kademlia::CryptoWorker::from(Worker::Inline( @@ -333,7 +334,7 @@ async fn start_peer( ))) as _, Box::new(fs_sender) as _, ))); - let mut tcp_control = Unify(Dispatch::new( + let mut tcp_control = Unify(event::Buffered::from(Dispatch::new( augustus::net::session::Tcp::new(addr)?, { // let mut quic_control = Blanket(Unify(Dispatch::new(quic.clone(), { @@ -349,7 +350,7 @@ async fn start_peer( } }, Once(tcp_control_session.sender()), - )?); + )?)); let socket_session = augustus::net::session::tcp::accept_session(listener, tcp_control_session.sender()); diff --git a/examples/bench-unreplicated.rs b/examples/bench-unreplicated.rs index 198dbab1..d55ffd41 100644 --- a/examples/bench-unreplicated.rs +++ b/examples/bench-unreplicated.rs @@ -28,7 +28,8 @@ use augustus::{ blocking, erased::{self, events::Init, Blanket}, ordered::Timer, - BlackHole, Inline, OnTimer, Once, SendEvent as _, Session, Unify, UnreachableTimer, + BlackHole, Buffered, Inline, OnTimer, Once, SendEvent as _, Session, Unify, + UnreachableTimer, }, net::{ dispatch::Net, @@ -153,11 +154,11 @@ async fn main() -> anyhow::Result<()> { erased::session::Sender::from(close_loop_session.sender()), )); let mut state_sender = state_session.sender(); - let mut tcp_control = Unify(Dispatch::new( + let mut tcp_control = Unify(Buffered::from(Dispatch::new( Tcp::new(listener.local_addr()?)?, move |buf: &_| to_client_on_buf(buf, &mut state_sender), Once(tcp_session.sender()), - )?); + )?)); let tcp_sender = tcp_session.sender(); sessions @@ -186,11 +187,11 @@ async fn main() -> anyhow::Result<()> { erased::session::Sender::from(close_loop_session.sender()), )); let mut state_sender = state_session.sender(); - let mut quic_control = Unify(Dispatch::new( + let mut quic_control = Unify(Buffered::from(Dispatch::new( quic.clone(), move |buf: &_| to_client_on_buf(buf, &mut state_sender), Once(quic_session.sender()), - )?); + )?)); let quic_sender = quic_session.sender(); sessions.spawn_on(quic::accept_session(quic, quic_sender), runtime.handle()); @@ -334,11 +335,11 @@ async fn main() -> anyhow::Result<()> { let mut state = Unify(Replica::new(Null, ToClientMessageNet::new(raw_net))); let mut state_session = Session::new(); let mut state_sender = state_session.sender(); - let mut tcp_control = Unify(Dispatch::new( + let mut tcp_control = Unify(Buffered::from(Dispatch::new( Tcp::new(listener.local_addr()?)?, move |buf: &_| to_replica_on_buf::(buf, &mut state_sender), Once(tcp_session.sender()), - )?); + )?)); let accept_session = tcp::accept_session(listener, tcp_session.sender()); let tcp_session = tcp_session.run(&mut tcp_control); @@ -364,11 +365,11 @@ async fn main() -> anyhow::Result<()> { let mut state_session = Session::new(); let mut state_sender = state_session.sender(); let quic = Quic::new(replica_addr)?; - let mut quic_control = Unify(Dispatch::new( + let mut quic_control = Unify(Buffered::from(Dispatch::new( quic.clone(), move |buf: &_| to_replica_on_buf::(buf, &mut state_sender), Once(quic_session.sender()), - )?); + )?)); let accept_session = quic::accept_session(quic, quic_session.sender()); let quic_session = quic_session.run(&mut quic_control); diff --git a/examples/stress-connect.rs b/examples/stress-connect.rs index 23b0e9a5..3b000236 100644 --- a/examples/stress-connect.rs +++ b/examples/stress-connect.rs @@ -9,7 +9,7 @@ use std::{ }; use augustus::{ - event::{Once, Session, Unify}, + event::{Buffered, Once, Session, Unify}, net::{dispatch::Net, Dispatch, SendMessage}, }; @@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> { let quic = augustus::net::session::Quic::new(SocketAddr::from(([0, 0, 0, 0], 3000 + i)))?; let mut control_session = Session::new(); - let mut control = Unify(Dispatch::new( + let mut control = Unify(Buffered::from(Dispatch::new( // augustus::net::session::Tcp::new(None)?, quic.clone(), { @@ -56,7 +56,7 @@ async fn main() -> anyhow::Result<()> { } }, Once(control_session.sender()), - )?); + )?)); // sessions.spawn(augustus::net::session::tcp_accept_session( // listener, diff --git a/src/event.rs b/src/event.rs index b540be91..9c3961c8 100644 --- a/src/event.rs +++ b/src/event.rs @@ -131,9 +131,9 @@ pub trait OnTimer { pub struct Inline<'a, S, T>(pub &'a mut S, pub &'a mut T); -impl, T: Timer> SendEvent for Inline<'_, S, T> { - fn send(&mut self, event: S::Event) -> anyhow::Result<()> { - self.0.on_event(event, self.1) +impl, T: Timer, M: Into> SendEvent for Inline<'_, S, T> { + fn send(&mut self, event: M) -> anyhow::Result<()> { + self.0.on_event(event.into(), self.1) } } @@ -164,6 +164,15 @@ pub struct Buffered { attached: HashMap M + Send + Sync>>, } +impl From for Buffered { + fn from(value: S) -> Self { + Self { + inner: value, + attached: Default::default(), + } + } +} + struct BufferedTimer<'a, T, M> { inner: &'a mut T, attached: &'a mut HashMap M + Send + Sync>>, @@ -268,6 +277,7 @@ pub mod erased { pub struct Inline<'a, S, T>(pub &'a mut S, pub &'a mut T); + // we probably cannot have `impl SendEvent>` impl, M, T: Timer> SendEvent for Inline<'_, S, T> { fn send(&mut self, event: M) -> anyhow::Result<()> { self.0.on_event(event, self.1) diff --git a/src/net/dispatch.rs b/src/net/dispatch.rs index 70d6e8ba..61351963 100644 --- a/src/net/dispatch.rs +++ b/src/net/dispatch.rs @@ -33,7 +33,10 @@ use derive_where::derive_where; use tracing::{debug, warn}; -use crate::event::{erased, OnEvent, OnTimer, SendEvent, SendEventOnce, Timer}; +use crate::event::{ + erased, OnEventRichTimer as OnEvent, RichTimer as Timer, SendEvent, SendEventOnce, + UnreachableTimer, +}; use super::{Addr, Buf, IterAddr, SendMessage}; @@ -65,8 +68,8 @@ impl, A, B, F> Dispatch { } } -// go with typed event for this state machine because it takes a sender that -// sends to itself and must be `Clone` at the same time +// go with typed event for this state machine because it (1) owns a sender that +// (2) sends to itself and (3) must be `Clone` at the same time // i.e. "the horror" of type erasure #[derive(derive_more::From)] pub enum Event, A, B> { @@ -146,9 +149,9 @@ impl< fn on_event(&mut self, event: Self::Event, timer: &mut impl Timer) -> anyhow::Result<()> { match event { - Event::Outgoing(event) => erased::OnEvent::on_event(self, event, timer), - Event::Incoming(event) => erased::OnEvent::on_event(self, event, timer), - Event::Closed(event) => erased::OnEvent::on_event(self, event, timer), + Event::Outgoing(event) => self.handle_outgoing(event, timer), + Event::Incoming(event) => erased::OnEvent::on_event(self, event, &mut UnreachableTimer), + Event::Closed(event) => self.handle_closed(event, timer), } } } @@ -159,9 +162,9 @@ impl< A: Addr, B: Buf, F: FnMut(&[u8]) -> anyhow::Result<()> + Clone + Send + 'static, - > erased::OnEvent> for Dispatch + > Dispatch { - fn on_event( + fn handle_outgoing( &mut self, Outgoing(remote, buf): Outgoing, _: &mut impl Timer, @@ -217,7 +220,7 @@ impl< fn on_event( &mut self, Incoming(event): Incoming, - _: &mut impl Timer, + _: &mut impl crate::event::Timer, ) -> anyhow::Result<()> { self.seq += 1; let close_guard = CloseGuard(self.close_sender.clone(), None, self.seq); @@ -239,8 +242,12 @@ impl< } } -impl, A: Addr, B, F> erased::OnEvent> for Dispatch { - fn on_event(&mut self, Closed(addr, seq): Closed, _: &mut impl Timer) -> anyhow::Result<()> { +impl, A: Addr, B, F> Dispatch { + fn handle_closed( + &mut self, + Closed(addr, seq): Closed, + _: &mut impl Timer, + ) -> anyhow::Result<()> { if let Some(connection) = self.connections.get(&addr) { if connection.seq == seq { debug!(">>> {addr:?} outgoing connection closed"); @@ -250,9 +257,3 @@ impl, A: Addr, B, F> erased::OnEvent> for Dispatc Ok(()) } } - -impl, A, B, F> OnTimer for Dispatch { - fn on_timer(&mut self, _: crate::event::TimerId, _: &mut impl Timer) -> anyhow::Result<()> { - unreachable!() - } -} diff --git a/src/net/session/quic.rs b/src/net/session/quic.rs index 02d88f74..b6f5648b 100644 --- a/src/net/session/quic.rs +++ b/src/net/session/quic.rs @@ -61,11 +61,7 @@ impl Quic { let mut stream = match connection.accept_uni().await { Ok(stream) => stream, // TODO - Err( - quinn::ConnectionError::ConnectionClosed(_) - | quinn::ConnectionError::LocallyClosed - | quinn::ConnectionError::TimedOut, - ) => break, + // Err(quinn::ConnectionError::ConnectionClosed(_)) => break, Err(err) => { warn!("<<< {remote_addr} {err}"); break; @@ -148,11 +144,8 @@ impl Protocol for Quic { endpoint.connect(remote, "neatworks.quic") }?; drop(span.exit()); - anyhow::Ok( - connecting - .instrument(tracing::debug_span!("connect", local = ?endpoint.local_addr(), remote = ?remote)) - .await?, - ) + let span = tracing::debug_span!("connect", local = ?endpoint.local_addr(), remote = ?remote); + anyhow::Ok(connecting.instrument(span).await?) }; let connection = match task.await { Ok(connection) => connection,