From 7e1965f4b7a46a9cff755f93dafd95d4e4e80cdb Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 18 Dec 2024 15:26:26 +0100 Subject: [PATCH 1/9] refactor(iroh): ActiveRelayActor terminates itself This moves the logic of terminating an ActiveRelayActor from the RelayActor to the ActiveRelayActor itself. This is nice for two reasons: - The interactions between ActiveRelayActor and RelayActor are even simpler. RelayActor logic is now easy to reason about. - It will allow ActiveRelayActor to manage reconnections with exponential backoff better. Currently this behaviour is not changed but the RelayActor getting involved meant the backoff behaviour was influcenced in two places. Now that the ActiveRelayActor lifecyle is much more straight forward the RelayActor's terminating is tidied up as well. --- iroh/src/magicsock/relay_actor.rs | 216 ++++++++++++++++-------------- 1 file changed, 114 insertions(+), 102 deletions(-) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index f624d2804a..8b15b20ceb 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -12,7 +12,6 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, - time::{Duration, Instant}, }; use anyhow::Context; @@ -26,7 +25,7 @@ use iroh_relay::{self as relay, client::ClientError, ReceivedMessage, MAX_PACKET use tokio::{ sync::{mpsc, oneshot}, task::JoinSet, - time, + time::{self, Duration, Instant}, }; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; @@ -41,9 +40,6 @@ use crate::{ /// How long a non-home relay connection needs to be idle (last written to) before we close it. const RELAY_INACTIVE_CLEANUP_TIME: Duration = Duration::from_secs(60); -/// How often `clean_stale_relay` runs when there are potentially-stale relay connections to close. -const RELAY_CLEAN_STALE_INTERVAL: Duration = Duration::from_secs(15); - /// Maximum size a datagram payload is allowed to be. const MAX_PAYLOAD_SIZE: usize = MAX_PACKET_SIZE - PublicKey::LENGTH; @@ -53,9 +49,6 @@ const MAX_PAYLOAD_SIZE: usize = MAX_PACKET_SIZE - PublicKey::LENGTH; /// communication with it. #[derive(Debug)] struct ActiveRelayActor { - /// The time of the last request for its write - /// channel (currently even if there was no write). - last_write: Instant, /// Queue to send received relay datagrams on. relay_datagrams_recv: Arc, /// Channel on which we receive packets to send to the relay. @@ -80,7 +73,6 @@ struct ActiveRelayActor { #[derive(Debug)] #[allow(clippy::large_enum_variant)] enum ActiveRelayMessage { - GetLastWrite(oneshot::Sender), /// Returns whether or not this relay can reach the NodeId. HasNodeRoute(NodeId, oneshot::Sender), /// Triggers a connection check to the relay server. @@ -131,7 +123,6 @@ impl ActiveRelayActor { Self::create_relay_client(url.clone(), connection_opts.clone()); ActiveRelayActor { - last_write: Instant::now(), relay_datagrams_recv, relay_datagrams_send, url, @@ -184,6 +175,11 @@ impl ActiveRelayActor { // the same time. let mut relay_send_fut = MaybeFuture::none(); + // If inactive for this long the actor should exit. Inactivity is only tracked on + // the last datagrams sent to the relay, received datagrams will trigger ACKs which + // is sufficient to keep active connections open. + let mut inactive_timeout = std::pin::pin!(tokio::time::sleep(RELAY_INACTIVE_CLEANUP_TIME)); + loop { // If a read error occurred on the connection it might have been lost. But we // need this connection to stay alive so we can receive more messages sent by @@ -213,7 +209,7 @@ impl ActiveRelayActor { relay_client.send(msg.node_id, msg.packet).await }; relay_send_fut = MaybeFuture::with_future(Box::pin(fut)); - self.last_write = Instant::now(); + inactive_timeout.as_mut().reset(Instant::now() + RELAY_INACTIVE_CLEANUP_TIME); } msg = self.relay_client_receiver.recv() => { @@ -225,6 +221,10 @@ impl ActiveRelayActor { } } } + _ = &mut inactive_timeout => { + debug!("Inactive for {RELAY_INACTIVE_CLEANUP_TIME:?}, exiting"); + break; + } } } debug!("exiting"); @@ -236,9 +236,6 @@ impl ActiveRelayActor { async fn handle_actor_msg(&mut self, msg: ActiveRelayMessage) -> bool { trace!("tick: inbox: {:?}", msg); match msg { - ActiveRelayMessage::GetLastWrite(r) => { - r.send(self.last_write).ok(); - } ActiveRelayMessage::SetHomeRelay(is_preferred) => { self.is_home_relay = is_preferred; self.relay_client.note_preferred(is_preferred).await; @@ -453,11 +450,6 @@ impl RelayActor { } pub(super) async fn run(mut self, mut receiver: mpsc::Receiver) { - let mut cleanup_timer = time::interval_at( - time::Instant::now() + RELAY_CLEAN_STALE_INTERVAL, - RELAY_CLEAN_STALE_INTERVAL, - ); - loop { tokio::select! { biased; @@ -473,7 +465,7 @@ impl RelayActor { if !err.is_cancelled() { error!("ActiveRelayActor failed: {err:?}"); } - self.clean_stale_relay().await; + self.clean_stopped_active_relays(); } msg = receiver.recv() => { let Some(msg) = msg else { @@ -483,16 +475,15 @@ impl RelayActor { let cancel_token = self.cancel_token.child_token(); cancel_token.run_until_cancelled(self.handle_msg(msg)).await; } - _ = cleanup_timer.tick() => { - trace!("tick: cleanup"); - let cancel_token = self.cancel_token.child_token(); - cancel_token.run_until_cancelled(self.clean_stale_relay()).await; - } } } // try shutdown - self.close_all_relay("conn-close").await; + if let Err(_) = + tokio::time::timeout(Duration::from_secs(3), self.close_all_relay("conn-close")).await + { + warn!("Failed to shut down all ActiveRelayActors"); + } } async fn handle_msg(&mut self, msg: RelayActorMessage) { @@ -518,21 +509,6 @@ impl RelayActor { } } - async fn set_home_relay(&mut self, home_url: &RelayUrl) { - futures_buffered::join_all(self.active_relays.iter().map(|(url, handle)| async move { - let is_preferred = url == home_url; - handle - .inbox_addr - .send(ActiveRelayMessage::SetHomeRelay(is_preferred)) - .await - .ok() - })) - .await; - - // Ensure we have an ActiveRelayActor for the current home relay. - self.active_relay_handle(home_url).await; - } - async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, remote_node: NodeId) { let total_bytes = contents.iter().map(|c| c.len() as u64).sum::(); trace!( @@ -560,6 +536,21 @@ impl RelayActor { } } + async fn set_home_relay(&mut self, home_url: &RelayUrl) { + futures_buffered::join_all(self.active_relays.iter().map(|(url, handle)| async move { + let is_preferred = url == home_url; + handle + .inbox_addr + .send(ActiveRelayMessage::SetHomeRelay(is_preferred)) + .await + .ok() + })) + .await; + + // Ensure we have an ActiveRelayActor for the current home relay. + self.active_relay_handle(home_url); + } + /// Returns the handle for the [`ActiveRelayActor`] to reach `remote_node`. /// /// The node is expected to be reachable on `url`, but if no [`ActiveRelayActor`] for @@ -600,19 +591,21 @@ impl RelayActor { } } let url = found_relay.as_ref().unwrap_or(url); - self.active_relay_handle(url).await + self.active_relay_handle(url) } /// Returns the handle of the [`ActiveRelayActor`]. - async fn active_relay_handle(&mut self, url: &RelayUrl) -> &ActiveRelayHandle { + fn active_relay_handle(&mut self, url: &RelayUrl) -> &ActiveRelayHandle { if !self.active_relays.contains_key(url) { let handle = self.start_active_relay(url.clone()); if Some(url) == self.msock.my_relay().as_ref() { - handle + if let Err(err) = handle .inbox_addr - .send(ActiveRelayMessage::SetHomeRelay(true)) - .await - .ok(); + .try_send(ActiveRelayMessage::SetHomeRelay(true)) + { + error!("Send to newly started active relay failed: {err:#}."); + warn!("Home relay not set"); + } } self.active_relays.insert(url.clone(), handle); } @@ -676,75 +669,41 @@ impl RelayActor { self.log_active_relay(); } - /// Cleans up stale [`ActiveRelayActor`]s. - /// - /// This not only checks if the relays have been used recently, but also makes sure that - /// all relay actors are running. In particular this is called whenever an - /// [`ActiveRelayActor`] task finishes. - async fn clean_stale_relay(&mut self) { - trace!("checking {} relays for staleness", self.active_relays.len()); - let now = Instant::now(); - - // Futures who return Some(RelayUrl) if the relay needs to be cleaned up. - let check_futs = self.active_relays.iter().map(|(url, handle)| async move { - let (tx, rx) = oneshot::channel(); - handle - .inbox_addr - .send(ActiveRelayMessage::GetLastWrite(tx)) - .await - .ok(); - match rx.await { - Ok(last_write) if last_write.duration_since(now) <= RELAY_INACTIVE_CLEANUP_TIME => { - None - } - _ => Some(url.clone()), + /// Cleans up [`ActiveRelayActor`]s which have stopped running. + fn clean_stopped_active_relays(&mut self) { + let mut stopped_actors = Vec::with_capacity(self.active_relays.len()); + for (url, handle) in self.active_relays.iter() { + if handle.inbox_addr.is_closed() { + stopped_actors.push(url.clone()); } - }); - let futures = FuturesUnorderedBounded::from_iter(check_futs); - let to_close: Vec<_> = futures.filter_map(|maybe_url| maybe_url).collect().await; - - let dirty = !to_close.is_empty(); - trace!( - "closing {} of {} relays", - to_close.len(), - self.active_relays.len() - ); - for i in to_close { - self.close_active_relay(&i, "idle").await; + } + for url in stopped_actors { + self.active_relays.remove(&url); } // Make sure home relay exists if let Some(ref url) = self.msock.my_relay() { - self.active_relay_handle(url).await; - } - - if dirty { - self.log_active_relay(); - } - } - - async fn close_all_relay(&mut self, why: &'static str) { - if self.active_relays.is_empty() { - return; - } - // Need to collect to avoid double borrow - let urls: Vec<_> = self.active_relays.keys().cloned().collect(); - for url in urls { - self.close_active_relay(&url, why).await; + self.active_relay_handle(url); } self.log_active_relay(); } - async fn close_active_relay(&mut self, url: &RelayUrl, why: &'static str) { - if let Some(handle) = self.active_relays.remove(url) { - debug!(%url, "closing connection: {}", why); - + /// Stops all [`ActiveRelayActor`]s and awaits for them to finish. + async fn close_all_relay(&mut self, why: &'static str) { + let send_futs = self.active_relays.iter().map(|(url, handle)| async move { + debug!(%url, why, "closing connection"); handle .inbox_addr .send(ActiveRelayMessage::Shutdown) .await .ok(); - } + }); + futures_buffered::join_all(send_futs).await; + + let tasks = std::mem::take(&mut self.active_relay_tasks); + tasks.join_all().await; + + self.log_active_relay(); } fn log_active_relay(&self) { @@ -1111,4 +1070,57 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_active_relay_inactive() -> TestResult { + let _guard = iroh_test::logging::setup(); + let (_relay_map, relay_url, _server) = test_utils::run_relay_server().await?; + + let secret_key = SecretKey::from_bytes(&[1u8; 32]); + let node_id = secret_key.public(); + let datagram_recv_queue = Arc::new(RelayDatagramsQueue::new()); + let (_send_datagram_tx, send_datagram_rx) = mpsc::channel(16); + let (inbox_tx, inbox_rx) = mpsc::channel(16); + let mut task = start_active_relay_actor( + secret_key, + relay_url, + inbox_rx, + send_datagram_rx, + datagram_recv_queue.clone(), + ); + + // Give the task some time to run. If it responds to HasNodeRoute it is running. + let (tx, rx) = oneshot::channel(); + inbox_tx + .send(ActiveRelayMessage::HasNodeRoute(node_id, tx)) + .await + .ok(); + rx.await?; + + // We now have an idling ActiveRelayActor. If we advance time just a little it + // should stay alive. + tokio::time::pause(); + tokio::time::advance(RELAY_INACTIVE_CLEANUP_TIME / 2).await; + tokio::time::resume(); + + assert!( + tokio::time::timeout(Duration::from_millis(100), &mut task) + .await + .is_err(), + "actor task terminated" + ); + + // If we advance time a lot it should finish. + tokio::time::pause(); + tokio::time::advance(RELAY_INACTIVE_CLEANUP_TIME).await; + tokio::time::resume(); + assert!( + tokio::time::timeout(Duration::from_millis(100), task) + .await + .is_ok(), + "actor task still running" + ); + + Ok(()) + } } From 2fe76eb8012290a78217620fe00076ec189c12a6 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 18 Dec 2024 15:35:27 +0100 Subject: [PATCH 2/9] rename method --- iroh/src/magicsock/relay_actor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index 8b15b20ceb..1ae1d5e9b5 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -465,7 +465,7 @@ impl RelayActor { if !err.is_cancelled() { error!("ActiveRelayActor failed: {err:?}"); } - self.clean_stopped_active_relays(); + self.reap_active_relays(); } msg = receiver.recv() => { let Some(msg) = msg else { @@ -670,7 +670,7 @@ impl RelayActor { } /// Cleans up [`ActiveRelayActor`]s which have stopped running. - fn clean_stopped_active_relays(&mut self) { + fn reap_active_relays(&mut self) { let mut stopped_actors = Vec::with_capacity(self.active_relays.len()); for (url, handle) in self.active_relays.iter() { if handle.inbox_addr.is_closed() { From 8c17d9aa22f6431d5a3bcbe12b82a2dda1aa1459 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 18 Dec 2024 15:38:29 +0100 Subject: [PATCH 3/9] There is only one caller, why is not needed --- iroh/src/magicsock/relay_actor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index 1ae1d5e9b5..dfe70a81de 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -480,7 +480,7 @@ impl RelayActor { // try shutdown if let Err(_) = - tokio::time::timeout(Duration::from_secs(3), self.close_all_relay("conn-close")).await + tokio::time::timeout(Duration::from_secs(3), self.close_all_active_relays()).await { warn!("Failed to shut down all ActiveRelayActors"); } @@ -689,9 +689,9 @@ impl RelayActor { } /// Stops all [`ActiveRelayActor`]s and awaits for them to finish. - async fn close_all_relay(&mut self, why: &'static str) { + async fn close_all_active_relays(&mut self) { let send_futs = self.active_relays.iter().map(|(url, handle)| async move { - debug!(%url, why, "closing connection"); + debug!(%url, "Shutting down ActiveRelayActor"); handle .inbox_addr .send(ActiveRelayMessage::Shutdown) From f74a046074e3a2eb6c47b28bd98a9d3356670597 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 18 Dec 2024 15:47:06 +0100 Subject: [PATCH 4/9] clippy --- iroh/src/magicsock/relay_actor.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index dfe70a81de..3460147cc0 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -479,8 +479,9 @@ impl RelayActor { } // try shutdown - if let Err(_) = - tokio::time::timeout(Duration::from_secs(3), self.close_all_active_relays()).await + if tokio::time::timeout(Duration::from_secs(3), self.close_all_active_relays()) + .await + .is_err() { warn!("Failed to shut down all ActiveRelayActors"); } From 8f91cb5f728e88d23ae5c6bc64f5be71c98cff49 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 18 Dec 2024 16:43:11 +0100 Subject: [PATCH 5/9] Consolidate log messages --- iroh/src/magicsock/relay_actor.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index 3460147cc0..5a9f914b85 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -604,8 +604,7 @@ impl RelayActor { .inbox_addr .try_send(ActiveRelayMessage::SetHomeRelay(true)) { - error!("Send to newly started active relay failed: {err:#}."); - warn!("Home relay not set"); + error!("Home relay not set, send to new actor failed: {err:#}."); } } self.active_relays.insert(url.clone(), handle); From 7a19e35f3f4bbf3b346b969c2a770fc9651a3135 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 19 Dec 2024 10:32:46 +0100 Subject: [PATCH 6/9] Use retain --- iroh/src/magicsock/relay_actor.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index 54afe4fdc9..c3d8e1a115 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -671,15 +671,8 @@ impl RelayActor { /// Cleans up [`ActiveRelayActor`]s which have stopped running. fn reap_active_relays(&mut self) { - let mut stopped_actors = Vec::with_capacity(self.active_relays.len()); - for (url, handle) in self.active_relays.iter() { - if handle.inbox_addr.is_closed() { - stopped_actors.push(url.clone()); - } - } - for url in stopped_actors { - self.active_relays.remove(&url); - } + self.active_relays + .retain(|_url, handle| !handle.inbox_addr.is_closed()); // Make sure home relay exists if let Some(ref url) = self.msock.my_relay() { From 4968f81b8e19690574df2bf4da3969092e68efc5 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 19 Dec 2024 10:37:06 +0100 Subject: [PATCH 7/9] Use interval --- iroh/src/magicsock/relay_actor.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index c3d8e1a115..3ab2b4efe8 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -175,10 +175,10 @@ impl ActiveRelayActor { // the same time. let mut relay_send_fut = std::pin::pin!(MaybeFuture::none()); - // If inactive for this long the actor should exit. Inactivity is only tracked on + // If inactive for one tick the actor should exit. Inactivity is only tracked on // the last datagrams sent to the relay, received datagrams will trigger ACKs which // is sufficient to keep active connections open. - let mut inactive_timeout = std::pin::pin!(tokio::time::sleep(RELAY_INACTIVE_CLEANUP_TIME)); + let mut inactive_timeout = tokio::time::interval(RELAY_INACTIVE_CLEANUP_TIME); loop { // If a read error occurred on the connection it might have been lost. But we @@ -209,7 +209,7 @@ impl ActiveRelayActor { relay_client.send(msg.node_id, msg.packet).await }; relay_send_fut.as_mut().set_future(fut); - inactive_timeout.as_mut().reset(Instant::now() + RELAY_INACTIVE_CLEANUP_TIME); + inactive_timeout.reset(); } msg = self.relay_client_receiver.recv() => { @@ -221,7 +221,7 @@ impl ActiveRelayActor { } } } - _ = &mut inactive_timeout => { + _ = inactive_timeout.tick() => { debug!("Inactive for {RELAY_INACTIVE_CLEANUP_TIME:?}, exiting"); break; } From edfe9cd0ff405f0dbf886e92bfc9371133cb2aa6 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 19 Dec 2024 11:04:48 +0100 Subject: [PATCH 8/9] Fix interval usage This kind of shows why I avoided it the first time: it is a lot more complex to use. --- iroh/src/magicsock/relay_actor.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index 3ab2b4efe8..f0e1de47e2 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -179,6 +179,7 @@ impl ActiveRelayActor { // the last datagrams sent to the relay, received datagrams will trigger ACKs which // is sufficient to keep active connections open. let mut inactive_timeout = tokio::time::interval(RELAY_INACTIVE_CLEANUP_TIME); + inactive_timeout.reset(); // skip immediate tick loop { // If a read error occurred on the connection it might have been lost. But we @@ -1092,6 +1093,7 @@ mod tests { // We now have an idling ActiveRelayActor. If we advance time just a little it // should stay alive. + info!("Stepping time forwards by RELAY_INACTIVE_CLEANUP_TIME / 2"); tokio::time::pause(); tokio::time::advance(RELAY_INACTIVE_CLEANUP_TIME / 2).await; tokio::time::resume(); @@ -1104,6 +1106,7 @@ mod tests { ); // If we advance time a lot it should finish. + info!("Stepping time forwards by RELAY_INACTIVE_CLEANUP_TIME"); tokio::time::pause(); tokio::time::advance(RELAY_INACTIVE_CLEANUP_TIME).await; tokio::time::resume(); From 34a35073a65a3541ba918e1f516f59bfafc0f90d Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Thu, 19 Dec 2024 13:48:58 +0100 Subject: [PATCH 9/9] refactor(iroh): improve get active_relay path (#3064) --- iroh/src/magicsock/relay_actor.rs | 54 ++++++++++++++++++------------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index f0e1de47e2..67152df1df 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -498,7 +498,7 @@ impl RelayActor { self.send_relay(&url, contents, remote_node).await; } RelayActorMessage::SetHome { url } => { - self.set_home_relay(&url).await; + self.set_home_relay(url).await; } RelayActorMessage::MaybeCloseRelaysOnRebind(ifs) => { self.maybe_close_relays_on_rebind(&ifs).await; @@ -538,9 +538,10 @@ impl RelayActor { } } - async fn set_home_relay(&mut self, home_url: &RelayUrl) { + async fn set_home_relay(&mut self, home_url: RelayUrl) { + let home_url_ref = &home_url; futures_buffered::join_all(self.active_relays.iter().map(|(url, handle)| async move { - let is_preferred = url == home_url; + let is_preferred = url == home_url_ref; handle .inbox_addr .send(ActiveRelayMessage::SetHomeRelay(is_preferred)) @@ -562,15 +563,19 @@ impl RelayActor { &mut self, url: &RelayUrl, remote_node: &NodeId, - ) -> &ActiveRelayHandle { + ) -> ActiveRelayHandle { + if let Some(handle) = self.active_relays.get(url) { + return handle.clone(); + } + let mut found_relay: Option = None; - if !self.active_relays.contains_key(url) { - // If we don't have an open connection to the remote node's home relay, see if - // we have an open connection to a relay node where we'd heard from that peer - // already. E.g. maybe they dialed our home relay recently. - // TODO: LRU cache the NodeId -> relay mapping so this is much faster for repeat - // senders. + // If we don't have an open connection to the remote node's home relay, see if + // we have an open connection to a relay node where we'd heard from that peer + // already. E.g. maybe they dialed our home relay recently. + // TODO: LRU cache the NodeId -> relay mapping so this is much faster for repeat + // senders. + { // Futures which return Some(RelayUrl) if the relay knows about the remote node. let check_futs = self.active_relays.iter().map(|(url, handle)| async move { let (tx, rx) = oneshot::channel(); @@ -592,25 +597,28 @@ impl RelayActor { } } } - let url = found_relay.as_ref().unwrap_or(url); + let url = found_relay.unwrap_or(url.clone()); self.active_relay_handle(url) } /// Returns the handle of the [`ActiveRelayActor`]. - fn active_relay_handle(&mut self, url: &RelayUrl) -> &ActiveRelayHandle { - if !self.active_relays.contains_key(url) { - let handle = self.start_active_relay(url.clone()); - if Some(url) == self.msock.my_relay().as_ref() { - if let Err(err) = handle - .inbox_addr - .try_send(ActiveRelayMessage::SetHomeRelay(true)) - { - error!("Home relay not set, send to new actor failed: {err:#}."); + fn active_relay_handle(&mut self, url: RelayUrl) -> ActiveRelayHandle { + match self.active_relays.get(&url) { + Some(e) => e.clone(), + None => { + let handle = self.start_active_relay(url.clone()); + if Some(&url) == self.msock.my_relay().as_ref() { + if let Err(err) = handle + .inbox_addr + .try_send(ActiveRelayMessage::SetHomeRelay(true)) + { + error!("Home relay not set, send to new actor failed: {err:#}."); + } } + self.active_relays.insert(url, handle.clone()); + handle } - self.active_relays.insert(url.clone(), handle); } - self.active_relays.get(url).expect("just inserted") } fn start_active_relay(&mut self, url: RelayUrl) -> ActiveRelayHandle { @@ -677,7 +685,7 @@ impl RelayActor { // Make sure home relay exists if let Some(ref url) = self.msock.my_relay() { - self.active_relay_handle(url); + self.active_relay_handle(url.clone()); } self.log_active_relay(); }