diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index 115a61ca9e..6b5ec660af 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -12,7 +12,7 @@ use tokio::{ sync::{broadcast, mpsc, oneshot, watch}, task::JoinHandle, }; -use tracing::{debug, trace, warn}; +use tracing::{debug, error_span, trace, warn, Instrument}; use self::util::{read_message, write_message, Dialer, Timers}; use crate::proto::{self, PeerData, Scope, TopicId}; @@ -85,6 +85,8 @@ impl Gossip { let (to_actor_tx, to_actor_rx) = mpsc::channel(TO_ACTOR_CAP); let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP); let (on_endpoints_tx, on_endpoints_rx) = watch::channel(Default::default()); + + let me = endpoint.peer_id().fmt_short(); let actor = Actor { endpoint, state, @@ -100,14 +102,18 @@ impl Gossip { subscribers_all: None, subscribers_topic: Default::default(), }; - let actor_handle = tokio::spawn(async move { - if let Err(err) = actor.run().await { - warn!("gossip actor closed with error: {err:?}"); - Err(err) - } else { - Ok(()) + + let actor_handle = tokio::spawn( + async move { + if let Err(err) = actor.run().await { + warn!("gossip actor closed with error: {err:?}"); + Err(err) + } else { + Ok(()) + } } - }); + .instrument(error_span!("gossip", %me)), + ); Self { to_actor_tx, on_endpoints_tx: Arc::new(on_endpoints_tx), @@ -333,7 +339,6 @@ struct Actor { impl Actor { pub async fn run(mut self) -> anyhow::Result<()> { - let me = *self.state.me(); loop { tokio::select! { biased; @@ -341,7 +346,7 @@ impl Actor { match msg { Some(msg) => self.handle_to_actor_msg(msg, Instant::now()).await?, None => { - debug!(?me, "all gossip handles dropped, stop gossip actor"); + debug!("all gossip handles dropped, stop gossip actor"); break; } } @@ -354,11 +359,11 @@ impl Actor { (peer_id, res) = self.dialer.next_conn() => { match res { Ok(conn) => { - debug!(?me, peer = ?peer_id, "dial successfull"); + debug!(peer = ?peer_id, "dial successfull"); self.handle_to_actor_msg(ToActor::ConnIncoming(peer_id, ConnOrigin::Dial, conn), Instant::now()).await.context("dialer.next -> conn -> handle_to_actor_msg")?; } Err(err) => { - warn!(?me, peer = ?peer_id, "dial failed: {err}"); + warn!(peer = ?peer_id, "dial failed: {err}"); } } } @@ -383,8 +388,7 @@ impl Actor { } async fn handle_to_actor_msg(&mut self, msg: ToActor, now: Instant) -> anyhow::Result<()> { - let me = *self.state.me(); - trace!(?me, "handle to_actor {msg:?}"); + trace!("handle to_actor {msg:?}"); match msg { ToActor::ConnIncoming(peer_id, origin, conn) => { self.conns.insert(peer_id, conn.clone()); @@ -395,13 +399,13 @@ impl Actor { // Spawn a task for this connection let in_event_tx = self.in_event_tx.clone(); tokio::spawn(async move { - debug!(?me, peer = ?peer_id, "connection established"); + debug!(peer = ?peer_id, "connection established"); match connection_loop(peer_id, conn, origin, send_rx, &in_event_tx).await { Ok(()) => { - debug!(?me, peer = ?peer_id, "connection closed without error") + debug!(peer = ?peer_id, "connection closed without error") } Err(err) => { - debug!(?me, peer = ?peer_id, "connection closed with error {err:?}") + debug!(peer = ?peer_id, "connection closed with error {err:?}") } } in_event_tx @@ -458,11 +462,10 @@ impl Actor { } async fn handle_in_event(&mut self, event: InEvent, now: Instant) -> anyhow::Result<()> { - let me = *self.state.me(); if matches!(event, InEvent::TimerExpired(_)) { - trace!(?me, "handle in_event {event:?}"); + trace!("handle in_event {event:?}"); } else { - debug!(?me, "handle in_event {event:?}"); + debug!("handle in_event {event:?}"); }; if let InEvent::PeerDisconnected(peer) = &event { self.conn_send_tx.remove(peer); @@ -470,9 +473,9 @@ impl Actor { let out = self.state.handle(event, now); for event in out { if matches!(event, OutEvent::ScheduleTimer(_, _)) { - trace!(?me, "handle out_event {event:?}"); + trace!("handle out_event {event:?}"); } else { - debug!(?me, "handle out_event {event:?}"); + debug!("handle out_event {event:?}"); }; match event { OutEvent::SendMessage(peer_id, message) => { @@ -482,7 +485,7 @@ impl Actor { self.conn_send_tx.remove(&peer_id); } } else { - debug!(?me, peer = ?peer_id, "dial"); + debug!(peer = ?peer_id, "dial"); self.dialer.queue_dial(peer_id, GOSSIP_ALPN); // TODO: Enforce max length self.pending_sends.entry(peer_id).or_default().push(message); @@ -516,13 +519,13 @@ impl Actor { OutEvent::PeerData(peer, data) => match decode_peer_data(&data) { Err(err) => warn!("Failed to decode {data:?} from {peer}: {err}"), Ok(info) => { - debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known addrs: {info:?}"); + debug!(peer = ?peer, "add known addrs: {info:?}"); let peer_addr = PeerAddr { peer_id: peer, info, }; if let Err(err) = self.endpoint.add_peer_addr(peer_addr).await { - debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known failed: {err:?}"); + debug!(peer = ?peer, "add known failed: {err:?}"); } } }, diff --git a/iroh-gossip/src/proto/hyparview.rs b/iroh-gossip/src/proto/hyparview.rs index 89b871145d..f6e74e8e1f 100644 --- a/iroh-gossip/src/proto/hyparview.rs +++ b/iroh-gossip/src/proto/hyparview.rs @@ -651,7 +651,7 @@ where io.push(OutEvent::EmitEvent(Event::NeighborDown(peer))); let data = self.peer_data.remove(&peer); self.add_passive(peer, data, io); - debug!(peer = ?self.me, other = ?peer, "removed from active view, reason: {reason:?}"); + debug!(other = ?peer, "removed from active view, reason: {reason:?}"); Some(peer) } else { None @@ -701,7 +701,7 @@ where fn add_active_unchecked(&mut self, peer: PI, priority: Priority, io: &mut impl IO) { self.passive_view.remove(&peer); self.active_view.insert(peer); - debug!(peer = ?self.me, other = ?peer, "add to active view"); + debug!(other = ?peer, "add to active view"); let message = Message::Neighbor(Neighbor { priority, diff --git a/iroh-net/src/key.rs b/iroh-net/src/key.rs index a2dbe158ee..e0917b0f40 100644 --- a/iroh-net/src/key.rs +++ b/iroh-net/src/key.rs @@ -82,6 +82,14 @@ impl PublicKey { pub fn verify(&self, message: &[u8], signature: &Signature) -> Result<(), SignatureError> { self.public.verify_strict(message, signature) } + + /// Convert to a base32 string limited to the first 10 bytes for a friendly string + /// representation of the key. + pub fn fmt_short(&self) -> String { + let mut text = data_encoding::BASE32_NOPAD.encode(&self.as_bytes()[..10]); + text.make_ascii_lowercase(); + text + } } impl TryFrom<&[u8]> for PublicKey { diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index d85fe2e5d2..69b86de5e6 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -42,7 +42,7 @@ use tokio::{ sync::{self, mpsc, Mutex}, time, }; -use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument}; +use tracing::{debug, error, error_span, info, info_span, instrument, trace, warn, Instrument}; use crate::{ config::{self, DERP_MAGIC_IP}, @@ -204,7 +204,8 @@ struct Inner { actor_sender: mpsc::Sender, /// Sends network messages. network_sender: mpsc::Sender>, - name: String, + /// String representation of the peer_id of this node. + me: String, #[allow(clippy::type_complexity)] #[debug("on_endpoints: Option>")] on_endpoints: Option>, @@ -309,16 +310,13 @@ impl MagicSock { /// /// [`Callbacks::on_endpoint`]: crate::magicsock::conn::Callbacks::on_endpoints pub async fn new(opts: Options) -> Result { - let name = format!( - "magic-{}", - hex::encode(&opts.secret_key.public().as_bytes()[..8]) - ); + let me = opts.secret_key.public().fmt_short(); if crate::util::derp_only_mode() { warn!("creating a MagicSock that will only send packets over a DERP relay connection."); } - Self::with_name(name.clone(), opts) - .instrument(info_span!("magicsock", %name)) + Self::with_name(me.clone(), opts) + .instrument(error_span!("magicsock", %me)) .await } @@ -327,7 +325,7 @@ impl MagicSock { self.inner.has_derp_region(region).await } - async fn with_name(name: String, opts: Options) -> Result { + async fn with_name(me: String, opts: Options) -> Result { let port_mapper = portmapper::Client::default().await; let Options { @@ -375,7 +373,7 @@ impl MagicSock { let (network_sender, network_receiver) = mpsc::channel(128); let inner = Arc::new(Inner { - name, + me, on_endpoints, on_derp_active, on_net_info, @@ -523,7 +521,7 @@ impl MagicSock { } /// Triggers an address discovery. The provided why string is for debug logging only. - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] pub async fn re_stun(&self, why: &'static str) { self.inner .actor_sender @@ -552,7 +550,7 @@ impl MagicSock { // TODO // /// Handles a "ping" CLI query. - // #[instrument(skip_all, fields(self.name = %self.name))] + // #[instrument(skip_all, fields(me = %self.inner.me))] // pub async fn ping(&self, peer: config::Node, mut res: config::PingResult, cb: F) // where // F: Fn(config::PingResult) -> BoxFuture<'static, ()> + Send + Sync + 'static, @@ -586,7 +584,7 @@ impl MagicSock { // } /// Sets the connection's preferred local port. - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] pub async fn set_preferred_port(&self, port: u16) { let (s, r) = sync::oneshot::channel(); self.inner @@ -609,7 +607,7 @@ impl MagicSock { } } - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] /// Add addresses for a node to the magic socket's addresbook. pub async fn add_peer_addr(&self, addr: PeerAddr) -> Result<()> { let (s, r) = sync::oneshot::channel(); @@ -624,7 +622,7 @@ impl MagicSock { /// Closes the connection. /// /// Only the first close does anything. Any later closes return nil. - #[instrument(skip_all, fields(name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] pub async fn close(&self) -> Result<()> { if self.inner.is_closed() { return Ok(()); @@ -647,7 +645,7 @@ impl MagicSock { /// Closes and re-binds the UDP sockets and resets the DERP connection. /// It should be followed by a call to ReSTUN. - #[instrument(skip_all, fields(name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] pub async fn rebind_all(&self) { let (s, r) = sync::oneshot::channel(); self.inner @@ -708,7 +706,7 @@ fn endpoint_sets_equal(xs: &[config::Endpoint], ys: &[config::Endpoint]) -> bool } impl AsyncUdpSocket for MagicSock { - #[instrument(skip_all, fields(name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] fn poll_send( &self, _udp_state: &quinn_udp::UdpState, @@ -763,7 +761,7 @@ impl AsyncUdpSocket for MagicSock { Poll::Pending } - #[instrument(skip_all, fields(name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] fn poll_recv( &self, cx: &mut Context, @@ -828,7 +826,7 @@ impl AsyncUdpSocket for MagicSock { "[QUINN] <- {} ({}b) ({}) ({:?}, {:?})", meta_out.addr, meta_out.len, - self.inner.name, + self.inner.me, meta_out.dst_ip, source ); @@ -1839,7 +1837,7 @@ impl Actor { } /// Records the new endpoints, reporting whether they're changed. - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] async fn set_endpoints(&mut self, endpoints: &[config::Endpoint]) -> bool { self.last_endpoints_time = Some(Instant::now()); for (_de, f) in self.on_endpoint_refreshed.drain() { @@ -1857,7 +1855,7 @@ impl Actor { true } - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] async fn enqueue_call_me_maybe(&mut self, derp_region: u16, endpoint_id: usize) { let endpoint = self.peer_map.by_id(&endpoint_id); if endpoint.is_none() { @@ -1921,7 +1919,7 @@ impl Actor { } } - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] async fn rebind_all(&mut self) { inc!(MagicsockMetrics, rebind_calls); if let Err(err) = self.rebind(CurrentPortFate::Keep).await { @@ -1936,7 +1934,7 @@ impl Actor { /// Resets the preferred address for all peers. /// This is called when connectivity changes enough that we no longer trust the old routes. - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] fn reset_endpoint_states(&mut self) { for (_, ep) in self.peer_map.endpoints_mut() { ep.note_connectivity_change(); @@ -1945,7 +1943,7 @@ impl Actor { /// Closes and re-binds the UDP sockets. /// We consider it successful if we manage to bind the IPv4 socket. - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] async fn rebind(&mut self, cur_port_fate: CurrentPortFate) -> Result<()> { let mut ipv6_addr = None; @@ -1985,7 +1983,7 @@ impl Actor { Ok(()) } - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] pub async fn set_preferred_port(&mut self, port: u16) { let existing_port = self.inner.port.swap(port, Ordering::Relaxed); if existing_port == port { diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index 14927265e5..aaa679c1fc 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -43,7 +43,7 @@ use iroh_bytes::{ use iroh_net::{key::PublicKey, MagicEndpoint}; use tokio::sync::{mpsc, oneshot}; use tokio_util::{sync::CancellationToken, time::delay_queue}; -use tracing::{debug, trace}; +use tracing::{debug, error_span, trace, Instrument}; mod get; mod invariants; @@ -225,6 +225,7 @@ impl Downloader { S: Store, C: CollectionParser, { + let me = endpoint.peer_id().fmt_short(); let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY); let dialer = iroh_gossip::net::util::Dialer::new(endpoint); @@ -237,7 +238,7 @@ impl Downloader { let service = Service::new(getter, dialer, concurrency_limits, msg_rx); - service.run() + service.run().instrument(error_span!("downloader", %me)) }; rt.local_pool().spawn_pinned(create_future); Self { next_id: 0, msg_tx } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index d70a55ca3c..d38b55be22 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -54,7 +54,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::task::JoinError; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::dial::Ticket; use crate::downloader::Downloader; @@ -450,22 +450,26 @@ where inner: inner.clone(), collection_parser: self.collection_parser.clone(), }; - rt2.main().spawn(async move { - Self::run( - endpoint, - callbacks, - cb_receiver, - handler, - self.rpc_endpoint, - internal_rpc, - self.custom_get_handler, - self.auth_handler, - self.collection_parser, - rt3, - gossip, - ) - .await - }) + let me = endpoint.peer_id().fmt_short(); + rt2.main().spawn( + async move { + Self::run( + endpoint, + callbacks, + cb_receiver, + handler, + self.rpc_endpoint, + internal_rpc, + self.custom_get_handler, + self.auth_handler, + self.collection_parser, + rt3, + gossip, + ) + .await + } + .instrument(error_span!("node", %me)), + ) }; let node = Node { inner, diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index c40e06d919..2563566d9f 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -35,7 +35,7 @@ use tokio::{ task::JoinError, }; use tokio_util::sync::CancellationToken; -use tracing::{debug, debug_span, error, warn, Instrument}; +use tracing::{debug, error, error_span, warn, Instrument}; pub use iroh_sync::ContentStatus; @@ -183,7 +183,7 @@ impl LiveSync { downloader: Downloader, ) -> Self { let (to_actor_tx, to_actor_rx) = mpsc::channel(CHANNEL_CAP); - let me = base32::fmt_short(endpoint.peer_id()); + let me = endpoint.peer_id().fmt_short(); let mut actor = Actor::new( endpoint, gossip, @@ -193,12 +193,14 @@ impl LiveSync { to_actor_rx, to_actor_tx.clone(), ); - let span = debug_span!("sync", %me); - let task = rt.main().spawn(async move { - if let Err(err) = actor.run().instrument(span).await { - error!("live sync failed: {err:?}"); + let task = rt.main().spawn( + async move { + if let Err(err) = actor.run().await { + error!("live sync failed: {err:?}"); + } } - }); + .instrument(error_span!("sync", %me)), + ); let handle = LiveSync { to_actor_tx, task: task.map_err(Arc::new).boxed().shared(), @@ -1016,16 +1018,3 @@ async fn notify_all(subs: &mut HashMap, event: LiveEve } } } - -/// Utilities for working with byte array identifiers -// TODO: copy-pasted from iroh-gossip/src/proto/util.rs -// Unify into iroh-common crate or similar -pub(super) mod base32 { - /// Convert to a base32 string limited to the first 10 bytes - pub fn fmt_short(bytes: impl AsRef<[u8]>) -> String { - let len = bytes.as_ref().len().min(10); - let mut text = data_encoding::BASE32_NOPAD.encode(&bytes.as_ref()[..len]); - text.make_ascii_lowercase(); - text - } -}