Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(*): log me #1561

Merged
merged 4 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 28 additions & 25 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
sync::{broadcast, mpsc, oneshot, watch},
task::JoinHandle,
};
use tracing::{debug, trace, warn};
use tracing::{debug, error_span, trace, warn, Instrument};

use self::util::{read_message, write_message, Dialer, Timers};
use crate::proto::{self, PeerData, Scope, TopicId};
Expand Down Expand Up @@ -85,6 +85,8 @@ impl Gossip {
let (to_actor_tx, to_actor_rx) = mpsc::channel(TO_ACTOR_CAP);
let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
let (on_endpoints_tx, on_endpoints_rx) = watch::channel(Default::default());

let me = endpoint.peer_id().fmt_short();
let actor = Actor {
endpoint,
state,
Expand All @@ -100,14 +102,18 @@ impl Gossip {
subscribers_all: None,
subscribers_topic: Default::default(),
};
let actor_handle = tokio::spawn(async move {
if let Err(err) = actor.run().await {
warn!("gossip actor closed with error: {err:?}");
Err(err)
} else {
Ok(())

let actor_handle = tokio::spawn(
async move {
if let Err(err) = actor.run().await {
warn!("gossip actor closed with error: {err:?}");
Err(err)
} else {
Ok(())
}
}
});
.instrument(error_span!("gossip", %me)),
);
Self {
to_actor_tx,
on_endpoints_tx: Arc::new(on_endpoints_tx),
Expand Down Expand Up @@ -333,15 +339,14 @@ struct Actor {

impl Actor {
pub async fn run(mut self) -> anyhow::Result<()> {
let me = *self.state.me();
loop {
tokio::select! {
biased;
msg = self.to_actor_rx.recv() => {
match msg {
Some(msg) => self.handle_to_actor_msg(msg, Instant::now()).await?,
None => {
debug!(?me, "all gossip handles dropped, stop gossip actor");
debug!("all gossip handles dropped, stop gossip actor");
break;
}
}
Expand All @@ -354,11 +359,11 @@ impl Actor {
(peer_id, res) = self.dialer.next_conn() => {
match res {
Ok(conn) => {
debug!(?me, peer = ?peer_id, "dial successfull");
debug!(peer = ?peer_id, "dial successfull");
self.handle_to_actor_msg(ToActor::ConnIncoming(peer_id, ConnOrigin::Dial, conn), Instant::now()).await.context("dialer.next -> conn -> handle_to_actor_msg")?;
}
Err(err) => {
warn!(?me, peer = ?peer_id, "dial failed: {err}");
warn!(peer = ?peer_id, "dial failed: {err}");
}
}
}
Expand All @@ -383,8 +388,7 @@ impl Actor {
}

async fn handle_to_actor_msg(&mut self, msg: ToActor, now: Instant) -> anyhow::Result<()> {
let me = *self.state.me();
trace!(?me, "handle to_actor {msg:?}");
trace!("handle to_actor {msg:?}");
match msg {
ToActor::ConnIncoming(peer_id, origin, conn) => {
self.conns.insert(peer_id, conn.clone());
Expand All @@ -395,13 +399,13 @@ impl Actor {
// Spawn a task for this connection
let in_event_tx = self.in_event_tx.clone();
tokio::spawn(async move {
debug!(?me, peer = ?peer_id, "connection established");
debug!(peer = ?peer_id, "connection established");
match connection_loop(peer_id, conn, origin, send_rx, &in_event_tx).await {
Ok(()) => {
debug!(?me, peer = ?peer_id, "connection closed without error")
debug!(peer = ?peer_id, "connection closed without error")
}
Err(err) => {
debug!(?me, peer = ?peer_id, "connection closed with error {err:?}")
debug!(peer = ?peer_id, "connection closed with error {err:?}")
}
}
in_event_tx
Expand Down Expand Up @@ -458,21 +462,20 @@ impl Actor {
}

async fn handle_in_event(&mut self, event: InEvent, now: Instant) -> anyhow::Result<()> {
let me = *self.state.me();
if matches!(event, InEvent::TimerExpired(_)) {
trace!(?me, "handle in_event {event:?}");
trace!("handle in_event {event:?}");
} else {
debug!(?me, "handle in_event {event:?}");
debug!("handle in_event {event:?}");
};
if let InEvent::PeerDisconnected(peer) = &event {
self.conn_send_tx.remove(peer);
}
let out = self.state.handle(event, now);
for event in out {
if matches!(event, OutEvent::ScheduleTimer(_, _)) {
trace!(?me, "handle out_event {event:?}");
trace!("handle out_event {event:?}");
} else {
debug!(?me, "handle out_event {event:?}");
debug!("handle out_event {event:?}");
};
match event {
OutEvent::SendMessage(peer_id, message) => {
Expand All @@ -482,7 +485,7 @@ impl Actor {
self.conn_send_tx.remove(&peer_id);
}
} else {
debug!(?me, peer = ?peer_id, "dial");
debug!(peer = ?peer_id, "dial");
self.dialer.queue_dial(peer_id, GOSSIP_ALPN);
// TODO: Enforce max length
self.pending_sends.entry(peer_id).or_default().push(message);
Expand Down Expand Up @@ -516,13 +519,13 @@ impl Actor {
OutEvent::PeerData(peer, data) => match decode_peer_data(&data) {
Err(err) => warn!("Failed to decode {data:?} from {peer}: {err}"),
Ok(info) => {
debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known addrs: {info:?}");
debug!(peer = ?peer, "add known addrs: {info:?}");
let peer_addr = PeerAddr {
peer_id: peer,
info,
};
if let Err(err) = self.endpoint.add_peer_addr(peer_addr).await {
debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known failed: {err:?}");
debug!(peer = ?peer, "add known failed: {err:?}");
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions iroh-gossip/src/proto/hyparview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ where
io.push(OutEvent::EmitEvent(Event::NeighborDown(peer)));
let data = self.peer_data.remove(&peer);
self.add_passive(peer, data, io);
debug!(peer = ?self.me, other = ?peer, "removed from active view, reason: {reason:?}");
debug!(other = ?peer, "removed from active view, reason: {reason:?}");
Some(peer)
} else {
None
Expand Down Expand Up @@ -701,7 +701,7 @@ where
fn add_active_unchecked(&mut self, peer: PI, priority: Priority, io: &mut impl IO<PI>) {
self.passive_view.remove(&peer);
self.active_view.insert(peer);
debug!(peer = ?self.me, other = ?peer, "add to active view");
debug!(other = ?peer, "add to active view");

let message = Message::Neighbor(Neighbor {
priority,
Expand Down
8 changes: 8 additions & 0 deletions iroh-net/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ impl PublicKey {
pub fn verify(&self, message: &[u8], signature: &Signature) -> Result<(), SignatureError> {
self.public.verify_strict(message, signature)
}

/// Convert to a base32 string limited to the first 10 bytes for a friendly string
/// representation of the key.
pub fn fmt_short(&self) -> String {
let mut text = data_encoding::BASE32_NOPAD.encode(&self.as_bytes()[..10]);
text.make_ascii_lowercase();
text
}
}

impl TryFrom<&[u8]> for PublicKey {
Expand Down
48 changes: 23 additions & 25 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use tokio::{
sync::{self, mpsc, Mutex},
time,
};
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
use tracing::{debug, error, error_span, info, info_span, instrument, trace, warn, Instrument};

use crate::{
config::{self, DERP_MAGIC_IP},
Expand Down Expand Up @@ -204,7 +204,8 @@ struct Inner {
actor_sender: mpsc::Sender<ActorMessage>,
/// Sends network messages.
network_sender: mpsc::Sender<Vec<quinn_udp::Transmit>>,
name: String,
/// String representation of the peer_id of this node.
me: String,
#[allow(clippy::type_complexity)]
#[debug("on_endpoints: Option<Box<..>>")]
on_endpoints: Option<Box<dyn Fn(&[config::Endpoint]) + Send + Sync + 'static>>,
Expand Down Expand Up @@ -309,16 +310,13 @@ impl MagicSock {
///
/// [`Callbacks::on_endpoint`]: crate::magicsock::conn::Callbacks::on_endpoints
pub async fn new(opts: Options) -> Result<Self> {
let name = format!(
"magic-{}",
hex::encode(&opts.secret_key.public().as_bytes()[..8])
);
let me = opts.secret_key.public().fmt_short();
if crate::util::derp_only_mode() {
warn!("creating a MagicSock that will only send packets over a DERP relay connection.");
}

Self::with_name(name.clone(), opts)
.instrument(info_span!("magicsock", %name))
Self::with_name(me.clone(), opts)
.instrument(error_span!("magicsock", %me))
.await
}

Expand All @@ -327,7 +325,7 @@ impl MagicSock {
self.inner.has_derp_region(region).await
}

async fn with_name(name: String, opts: Options) -> Result<Self> {
async fn with_name(me: String, opts: Options) -> Result<Self> {
let port_mapper = portmapper::Client::default().await;

let Options {
Expand Down Expand Up @@ -375,7 +373,7 @@ impl MagicSock {
let (network_sender, network_receiver) = mpsc::channel(128);

let inner = Arc::new(Inner {
name,
me,
on_endpoints,
on_derp_active,
on_net_info,
Expand Down Expand Up @@ -523,7 +521,7 @@ impl MagicSock {
}

/// Triggers an address discovery. The provided why string is for debug logging only.
#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
pub async fn re_stun(&self, why: &'static str) {
self.inner
.actor_sender
Expand Down Expand Up @@ -552,7 +550,7 @@ impl MagicSock {

// TODO
// /// Handles a "ping" CLI query.
// #[instrument(skip_all, fields(self.name = %self.name))]
// #[instrument(skip_all, fields(me = %self.inner.me))]
// pub async fn ping<F>(&self, peer: config::Node, mut res: config::PingResult, cb: F)
// where
// F: Fn(config::PingResult) -> BoxFuture<'static, ()> + Send + Sync + 'static,
Expand Down Expand Up @@ -586,7 +584,7 @@ impl MagicSock {
// }

/// Sets the connection's preferred local port.
#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
pub async fn set_preferred_port(&self, port: u16) {
let (s, r) = sync::oneshot::channel();
self.inner
Expand All @@ -609,7 +607,7 @@ impl MagicSock {
}
}

#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
/// Add addresses for a node to the magic socket's addresbook.
pub async fn add_peer_addr(&self, addr: PeerAddr) -> Result<()> {
let (s, r) = sync::oneshot::channel();
Expand All @@ -624,7 +622,7 @@ impl MagicSock {
/// Closes the connection.
///
/// Only the first close does anything. Any later closes return nil.
#[instrument(skip_all, fields(name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
pub async fn close(&self) -> Result<()> {
if self.inner.is_closed() {
return Ok(());
Expand All @@ -647,7 +645,7 @@ impl MagicSock {

/// Closes and re-binds the UDP sockets and resets the DERP connection.
/// It should be followed by a call to ReSTUN.
#[instrument(skip_all, fields(name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
pub async fn rebind_all(&self) {
let (s, r) = sync::oneshot::channel();
self.inner
Expand Down Expand Up @@ -708,7 +706,7 @@ fn endpoint_sets_equal(xs: &[config::Endpoint], ys: &[config::Endpoint]) -> bool
}

impl AsyncUdpSocket for MagicSock {
#[instrument(skip_all, fields(name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
fn poll_send(
&self,
_udp_state: &quinn_udp::UdpState,
Expand Down Expand Up @@ -763,7 +761,7 @@ impl AsyncUdpSocket for MagicSock {
Poll::Pending
}

#[instrument(skip_all, fields(name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
fn poll_recv(
&self,
cx: &mut Context,
Expand Down Expand Up @@ -828,7 +826,7 @@ impl AsyncUdpSocket for MagicSock {
"[QUINN] <- {} ({}b) ({}) ({:?}, {:?})",
meta_out.addr,
meta_out.len,
self.inner.name,
self.inner.me,
meta_out.dst_ip,
source
);
Expand Down Expand Up @@ -1839,7 +1837,7 @@ impl Actor {
}

/// Records the new endpoints, reporting whether they're changed.
#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
async fn set_endpoints(&mut self, endpoints: &[config::Endpoint]) -> bool {
self.last_endpoints_time = Some(Instant::now());
for (_de, f) in self.on_endpoint_refreshed.drain() {
Expand All @@ -1857,7 +1855,7 @@ impl Actor {
true
}

#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
async fn enqueue_call_me_maybe(&mut self, derp_region: u16, endpoint_id: usize) {
let endpoint = self.peer_map.by_id(&endpoint_id);
if endpoint.is_none() {
Expand Down Expand Up @@ -1921,7 +1919,7 @@ impl Actor {
}
}

#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
async fn rebind_all(&mut self) {
inc!(MagicsockMetrics, rebind_calls);
if let Err(err) = self.rebind(CurrentPortFate::Keep).await {
Expand All @@ -1936,7 +1934,7 @@ impl Actor {

/// Resets the preferred address for all peers.
/// This is called when connectivity changes enough that we no longer trust the old routes.
#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
fn reset_endpoint_states(&mut self) {
for (_, ep) in self.peer_map.endpoints_mut() {
ep.note_connectivity_change();
Expand All @@ -1945,7 +1943,7 @@ impl Actor {

/// Closes and re-binds the UDP sockets.
/// We consider it successful if we manage to bind the IPv4 socket.
#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
async fn rebind(&mut self, cur_port_fate: CurrentPortFate) -> Result<()> {
let mut ipv6_addr = None;

Expand Down Expand Up @@ -1985,7 +1983,7 @@ impl Actor {
Ok(())
}

#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
pub async fn set_preferred_port(&mut self, port: u16) {
let existing_port = self.inner.port.swap(port, Ordering::Relaxed);
if existing_port == port {
Expand Down
Loading