From db175c251c1f8b2d0d666347a682f71c2dd38524 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 14 Sep 2023 11:07:42 +0200 Subject: [PATCH 01/18] feat: allow to send gossip to neighbors only --- iroh-gossip/examples/chat.rs | 4 +- iroh-gossip/src/net.rs | 46 ++++++--- iroh-gossip/src/proto.rs | 19 +++- iroh-gossip/src/proto/plumtree.rs | 158 ++++++++++++++++++++---------- iroh-gossip/src/proto/tests.rs | 11 ++- iroh-gossip/src/proto/topic.rs | 15 +-- iroh/src/sync_engine/live.rs | 8 +- 7 files changed, 174 insertions(+), 87 deletions(-) diff --git a/iroh-gossip/examples/chat.rs b/iroh-gossip/examples/chat.rs index a1536a9ab4..fcda9be220 100644 --- a/iroh-gossip/examples/chat.rs +++ b/iroh-gossip/examples/chat.rs @@ -203,8 +203,8 @@ async fn subscribe_loop(gossip: Gossip, topic: TopicId) -> anyhow::Result<()> { let mut stream = gossip.subscribe(topic).await?; loop { let event = stream.recv().await?; - if let Event::Received(data, _prev_peer) = event { - let (from, message) = SignedMessage::verify_and_decode(&data)?; + if let Event::Received(msg) = event { + let (from, message) = SignedMessage::verify_and_decode(&msg.content)?; match message { Message::AboutMe { name } => { names.insert(from, name.clone()); diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index f4e91f41c6..9ffbb64dc2 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -20,7 +20,7 @@ use tokio::{ use tracing::{debug, trace, warn}; use self::util::{read_message, write_message, Dialer, Timers}; -use crate::proto::{self, TopicId}; +use crate::proto::{self, Scope, TopicId}; pub mod util; @@ -154,13 +154,26 @@ impl Gossip { Ok(()) } - /// Broadcast a message on a topic. + /// Broadcast a message on a topic to all peers in the swarm. /// /// This does not join the topic automatically, so you have to call [Self::join] yourself /// for messages to be broadcast to peers. pub async fn broadcast(&self, topic: TopicId, message: Bytes) -> anyhow::Result<()> { let (tx, rx) = oneshot::channel(); - self.send(ToActor::Broadcast(topic, message, tx)).await?; + self.send(ToActor::Broadcast(topic, message, Scope::Swarm, tx)) + .await?; + rx.await??; + Ok(()) + } + + /// Broadcast a message on a topic to the immediate neigborrs. + /// + /// This does not join the topic automatically, so you have to call [Self::join] yourself + /// for messages to be broadcast to peers. + pub async fn broadcast_neighbors(&self, topic: TopicId, message: Bytes) -> anyhow::Result<()> { + let (tx, rx) = oneshot::channel(); + self.send(ToActor::Broadcast(topic, message, Scope::Neighbors, tx)) + .await?; rx.await??; Ok(()) } @@ -296,7 +309,7 @@ enum ToActor { /// Leave a topic, send disconnect messages and drop all state. Quit(TopicId), /// Broadcast a message on a topic. - Broadcast(TopicId, Bytes, oneshot::Sender>), + Broadcast(TopicId, Bytes, Scope, oneshot::Sender>), /// Subscribe to a topic. Return oneshot which resolves to a broadcast receiver for events on a /// topic. Subscribe( @@ -316,8 +329,12 @@ impl fmt::Debug for ToActor { } ToActor::Join(topic, peers, _reply) => write!(f, "Join({topic:?}, {peers:?})"), ToActor::Quit(topic) => write!(f, "Quit({topic:?})"), - ToActor::Broadcast(topic, message, _reply) => { - write!(f, "Broadcast({topic:?}, bytes<{}>)", message.len()) + ToActor::Broadcast(topic, message, scope, _reply) => { + write!( + f, + "Broadcast({topic:?}, {scope:?}, bytes<{}>)", + message.len() + ) } ToActor::Subscribe(topic, _reply) => write!(f, "Subscribe({topic:?})"), ToActor::SubscribeAll(_reply) => write!(f, "SubscribeAll"), @@ -460,9 +477,12 @@ impl Actor { .await?; self.subscribers_topic.remove(&topic_id); } - ToActor::Broadcast(topic_id, message, reply) => { - self.handle_in_event(InEvent::Command(topic_id, Command::Broadcast(message)), now) - .await?; + ToActor::Broadcast(topic_id, message, scope, reply) => { + self.handle_in_event( + InEvent::Command(topic_id, Command::Broadcast(message, scope)), + now, + ) + .await?; reply.send(Ok(())).ok(); } ToActor::Subscribe(topic_id, reply) => { @@ -716,8 +736,8 @@ mod test { loop { let ev = stream2.recv().await.unwrap(); info!("go2 event: {ev:?}"); - if let Event::Received(msg, _prev_peer) = ev { - recv.push(msg); + if let Event::Received(msg) = ev { + recv.push(msg.content); } if recv.len() == len { return recv; @@ -731,8 +751,8 @@ mod test { loop { let ev = stream3.recv().await.unwrap(); info!("go3 event: {ev:?}"); - if let Event::Received(msg, _prev_peer) = ev { - recv.push(msg); + if let Event::Received(msg) = ev { + recv.push(msg.content); } if recv.len() == len { return recv; diff --git a/iroh-gossip/src/proto.rs b/iroh-gossip/src/proto.rs index 4ac21d6b80..53227a9e4b 100644 --- a/iroh-gossip/src/proto.rs +++ b/iroh-gossip/src/proto.rs @@ -58,6 +58,7 @@ pub mod util; #[cfg(test)] mod tests; +pub use plumtree::Scope; pub use state::{InEvent, Message, OutEvent, State, Timer, TopicId}; pub use topic::{Command, Config, Event, IO}; @@ -106,7 +107,7 @@ mod test { assert_synchronous_active, report_round_distribution, sort, Network, Simulator, SimulatorConfig, }, - TopicId, + Scope, TopicId, }; #[test] @@ -215,10 +216,14 @@ mod test { assert!(assert_synchronous_active(&network)); // now broadcast a first message - network.command(1, t, Command::Broadcast(b"hi1".to_vec().into())); + network.command( + 1, + t, + Command::Broadcast(b"hi1".to_vec().into(), Scope::Swarm), + ); network.ticks(broadcast_ticks); let events = network.events(); - let received = events.filter(|x| matches!(x, (_, _, Event::Received(_, _)))); + let received = events.filter(|x| matches!(x, (_, _, Event::Received(_)))); // message should be received by two other nodes assert_eq!(received.count(), 2); assert!(assert_synchronous_active(&network)); @@ -230,10 +235,14 @@ mod test { report_round_distribution(&network); // now broadcast again - network.command(1, t, Command::Broadcast(b"hi2".to_vec().into())); + network.command( + 1, + t, + Command::Broadcast(b"hi2".to_vec().into(), Scope::Swarm), + ); network.ticks(broadcast_ticks); let events = network.events(); - let received = events.filter(|x| matches!(x, (_, _, Event::Received(_, _)))); + let received = events.filter(|x| matches!(x, (_, _, Event::Received(_)))); // message should be received by all 5 other nodes assert_eq!(received.count(), 5); assert!(assert_synchronous_active(&network)); diff --git a/iroh-gossip/src/proto/plumtree.rs b/iroh-gossip/src/proto/plumtree.rs index 3e4ec086ed..44812cb24d 100644 --- a/iroh-gossip/src/proto/plumtree.rs +++ b/iroh-gossip/src/proto/plumtree.rs @@ -41,8 +41,8 @@ impl MessageId { pub enum InEvent { /// A [`Message`] was received from the peer. RecvMessage(PI, Message), - /// Broadcast the contained payload. - Broadcast(Bytes), + /// Broadcast the contained payload to the full swarm. + Broadcast(Bytes, Scope), /// A timer has expired. TimerExpired(Timer), /// New member `PI` has joined the topic. @@ -81,13 +81,29 @@ pub enum Timer { #[derive(Clone, Debug, PartialEq, Eq)] pub enum Event { /// A new gossip message was received. - Received( - /// The content of the gossip message. - Bytes, - /// The peer that we received the gossip message from. Note that this is not the peer that - /// originally broadcasted the message, but the peer before us in the gossiping path. - PI, - ), + Received(GossipEvent), +} + +#[derive(Clone, derive_more::Debug, PartialEq, Eq, Ord, PartialOrd)] +pub struct GossipEvent { + /// The content of the gossip message. + #[debug("<{}b>", content.len())] + pub content: Bytes, + /// The peer that we received the gossip message from. Note that this is not the peer that + /// originally broadcasted the message, but the peer before us in the gossiping path. + pub delivered_from: PI, + /// The broadcast scope of the message + pub scope: Scope, +} + +impl GossipEvent { + fn from_message(message: &Gossip, from: PI) -> Self { + Self { + content: message.content.clone(), + scope: message.scope, + delivered_from: from, + } + } } /// Number of delivery hops a message has taken. @@ -128,6 +144,17 @@ pub struct Gossip { /// Message contents. #[debug("<{}b>", content.len())] content: Bytes, + /// Scope to publish to + scope: Scope, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Copy)] +/// The broadcast scope of a gossip message. +pub enum Scope { + /// The message is broadcast to all peers in the swarm. + Swarm, + /// The message is broadcast only to the immediate neighbors of a peer. + Neighbors, } impl Gossip { @@ -137,6 +164,7 @@ impl Gossip { id: self.id, content: self.content, round: self.round.next(), + scope: self.scope, } } @@ -337,7 +365,7 @@ impl State { } match event { InEvent::RecvMessage(from, message) => self.handle_message(from, message, now, io), - InEvent::Broadcast(data) => self.do_broadcast(data, now, io), + InEvent::Broadcast(data, scope) => self.broadcast(data, scope, now, io), InEvent::NeighborUp(peer) => self.on_neighbor_up(peer), InEvent::NeighborDown(peer) => self.on_neighbor_down(peer), InEvent::TimerExpired(timer) => match timer { @@ -383,23 +411,28 @@ impl State { /// /// Will be pushed in full to eager peers. /// Pushing the message id to the lazy peers is delayed by a timer. - fn do_broadcast(&mut self, data: Bytes, now: Instant, io: &mut impl IO) { - let id = MessageId::from_content(&data); + fn broadcast(&mut self, content: Bytes, scope: Scope, now: Instant, io: &mut impl IO) { + let id = MessageId::from_content(&content); let message = Gossip { id, round: Round(0), - content: data, + content, + scope, }; - self.received_messages - .insert(id, (), now + self.config.message_id_retention); - self.cache.insert( - id, - message.clone(), - now + self.config.message_cache_retention, - ); let me = self.me; + // TODO: Do we want to lazy push neighbor messages as well? + if let Scope::Swarm = scope { + self.received_messages + .insert(id, (), now + self.config.message_id_retention); + self.cache.insert( + id, + message.clone(), + now + self.config.message_cache_retention, + ); + self.lazy_push(message.clone(), &me, io); + } + self.eager_push(message.clone(), &me, io); - self.lazy_push(message, &me, io); } /// Handle receiving a [`Message::Gossip`]. @@ -422,41 +455,41 @@ impl State { io.push(OutEvent::SendMessage(sender, Message::Prune)); // otherwise store the message, emit to application and forward to peers } else { - // insert the message in the list of received messages - self.received_messages - .insert(message.id, (), now + self.config.message_id_retention); - - // increase the round for forwarding the message, and add to cache - // to reply to Graft messages later - // TODO: use an LRU cache for self.cache - // TODO: add callback/event to application to get missing messages that were received before? - let message = message.next_round(); - self.cache.insert( - message.id, - message.clone(), - now + self.config.message_cache_retention, - ); - - // push the message to our peers - self.eager_push(message.clone(), &sender, io); - self.lazy_push(message.clone(), &sender, io); - - // cleanup places where we track missing messages - self.graft_timer_scheduled.remove(&message.id); - let previous_ihaves = self.missing_messages.remove(&message.id); - // do the optimization step from the paper - if let Some(previous_ihaves) = previous_ihaves { - self.optimize_tree(&sender, &message, previous_ihaves, io); + if let Scope::Swarm = message.scope { + // insert the message in the list of received messages + self.received_messages.insert( + message.id, + (), + now + self.config.message_id_retention, + ); + // increase the round for forwarding the message, and add to cache + // to reply to Graft messages later + // TODO: add callback/event to application to get missing messages that were received before? + let message = message.clone().next_round(); + + self.cache.insert( + message.id, + message.clone(), + now + self.config.message_cache_retention, + ); + // push the message to our peers + self.eager_push(message.clone(), &sender, io); + self.lazy_push(message.clone(), &sender, io); + // cleanup places where we track missing messages + self.graft_timer_scheduled.remove(&message.id); + let previous_ihaves = self.missing_messages.remove(&message.id); + // do the optimization step from the paper + if let Some(previous_ihaves) = previous_ihaves { + self.optimize_tree(&sender, &message, previous_ihaves, io); + } + self.stats.max_last_delivery_hop = + self.stats.max_last_delivery_hop.max(message.round.0); } // emit event to application io.push(OutEvent::EmitEvent(Event::Received( - message.content, - sender, + GossipEvent::from_message(&message, sender), ))); - - self.stats.max_last_delivery_hop = - self.stats.max_last_delivery_hop.max(message.round.0); } } @@ -676,6 +709,7 @@ mod test { id, round: Round(6), content: content.clone(), + scope: Scope::Swarm, }), ); state.handle(event, now, &mut io); @@ -687,7 +721,11 @@ mod test { config.dispatch_timeout, Timer::DispatchLazyPush, )); - io.push(OutEvent::EmitEvent(Event::Received(content, 3))); + io.push(OutEvent::EmitEvent(Event::Received(GossipEvent { + content, + delivered_from: 3, + scope: Scope::Swarm, + }))); io }; assert_eq!(io, expected); @@ -715,6 +753,7 @@ mod test { id, round: Round(9), content: content.clone(), + scope: Scope::Swarm, }), ); state.handle(event, now, &mut io); @@ -730,7 +769,11 @@ mod test { }), )); io.push(OutEvent::SendMessage(3, Message::Prune)); - io.push(OutEvent::EmitEvent(Event::Received(content, 3))); + io.push(OutEvent::EmitEvent(Event::Received(GossipEvent { + content, + delivered_from: 3, + scope: Scope::Swarm, + }))); io }; assert_eq!(io, expected); @@ -748,6 +791,7 @@ mod test { content: content.clone(), round: Round(1), id: MessageId::from_content(&content), + scope: Scope::Swarm, }); let mut io = VecDeque::new(); state.handle(InEvent::RecvMessage(2, message), now, &mut io); @@ -761,7 +805,11 @@ mod test { config.dispatch_timeout, Timer::DispatchLazyPush, )); - io.push(OutEvent::EmitEvent(Event::Received(content, 2))); + io.push(OutEvent::EmitEvent(Event::Received(GossipEvent { + content, + delivered_from: 2, + scope: Scope::Swarm, + }))); io }; assert_eq!(io, expected); @@ -772,6 +820,7 @@ mod test { content, round: Round(1), id: MessageId::from_content(b"foo"), + scope: Scope::Swarm, }); let mut io = VecDeque::new(); state.handle(InEvent::RecvMessage(2, message), now, &mut io); @@ -789,6 +838,7 @@ mod test { content: content.clone(), round: Round(1), id: MessageId::from_content(&content), + scope: Scope::Swarm, }); let mut io = VecDeque::new(); state.handle(InEvent::RecvMessage(2, message), now, &mut io); diff --git a/iroh-gossip/src/proto/tests.rs b/iroh-gossip/src/proto/tests.rs index b189a5dc63..f7607b06cf 100644 --- a/iroh-gossip/src/proto/tests.rs +++ b/iroh-gossip/src/proto/tests.rs @@ -10,6 +10,8 @@ use rand::Rng; use rand_core::SeedableRng; use tracing::{debug, warn}; +use crate::proto::Scope; + use super::{ util::TimerMap, Command, Config, Event, InEvent, OutEvent, PeerIdentity, State, Timer, TopicId, }; @@ -315,8 +317,11 @@ impl Simulator { .filter(|p| *p != from), ); let expected_len = expected.len() as u64; - self.network - .command(from, TOPIC, Command::Broadcast(message.clone())); + self.network.command( + from, + TOPIC, + Command::Broadcast(message.clone(), Scope::Swarm), + ); let mut tick = 0; loop { @@ -331,7 +336,7 @@ impl Simulator { let events = self.network.events(); let received: HashSet<_> = events .filter( - |(_peer, _topic, event)| matches!(event, Event::Received(recv, _) if recv == &message), + |(_peer, _topic, event)| matches!(event, Event::Received(recv) if recv.content == &message), ) .map(|(peer, _topic, _msg)| peer) .collect(); diff --git a/iroh-gossip/src/proto/topic.rs b/iroh-gossip/src/proto/topic.rs index dbce43a8c2..f37799b27c 100644 --- a/iroh-gossip/src/proto/topic.rs +++ b/iroh-gossip/src/proto/topic.rs @@ -11,7 +11,7 @@ use rand::Rng; use rand_core::SeedableRng; use serde::{Deserialize, Serialize}; -use super::plumtree::{self, InEvent as GossipIn}; +use super::plumtree::{self, GossipEvent, InEvent as GossipIn, Scope}; use super::{ hyparview::{self, InEvent as SwarmIn}, state::MessageKind, @@ -112,14 +112,14 @@ impl Message { } /// An event to be emitted to the application for a particular topic. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, derive_more::Debug)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] pub enum Event { /// We have a new, direct neighbor in the swarm membership layer for this topic NeighborUp(PI), /// We dropped direct neighbor in the swarm membership layer for this topic NeighborDown(PI), /// A gossip message was received for this topic - Received(#[debug("<{}b>", _0.len())] Bytes, PI), + Received(GossipEvent), } impl From> for Event { @@ -134,7 +134,7 @@ impl From> for Event { impl From> for Event { fn from(value: plumtree::Event) -> Self { match value { - plumtree::Event::Received(message, prev_peer) => Self::Received(message, prev_peer), + plumtree::Event::Received(event) => Self::Received(event), } } } @@ -160,7 +160,7 @@ pub enum Command { /// but only become operational after the first join request by another peer. Join(Vec), /// Broadcast a message for this topic. - Broadcast(#[debug("<{}b>", _0.len())] Bytes), + Broadcast(#[debug("<{}b>", _0.len())] Bytes, Scope), /// Leave this topic and drop all state. Quit, } @@ -232,7 +232,10 @@ impl State { self.swarm.handle(SwarmIn::RequestJoin(peer), now, io); } } - Command::Broadcast(data) => self.gossip.handle(GossipIn::Broadcast(data), now, io), + Command::Broadcast(data, scope) => { + self.gossip + .handle(GossipIn::Broadcast(data, scope), now, io) + } Command::Quit => self.swarm.handle(SwarmIn::Quit, now, io), }, InEvent::RecvMessage(from, message) => { diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index ad1fdb9700..ed39674de0 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -784,12 +784,12 @@ impl Actor { }; match event { // We received a gossip message. Try to insert it into our replica. - Event::Received(data, prev_peer) => { - let op: Op = postcard::from_bytes(&data)?; + Event::Received(msg) => { + let op: Op = postcard::from_bytes(&msg.content)?; match op { Op::Put(entry) => { - debug!(peer = ?prev_peer, topic = ?topic, "received entry via gossip"); - replica.insert_remote_entry(entry, *prev_peer.as_bytes())? + debug!(peer = ?msg.delivered_from, topic = ?topic, "received entry via gossip"); + replica.insert_remote_entry(entry, *msg.delivered_from.as_bytes())? } } } From 0a30d6d536fb9dac258864f474514759dfb0b9dc Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 14 Sep 2023 12:43:32 +0200 Subject: [PATCH 02/18] feat: provider and candidate roles for downloader --- iroh/src/downloader.rs | 162 +++++++++++++++++++++++------ iroh/src/downloader/test.rs | 70 +++++++++++-- iroh/src/downloader/test/dialer.rs | 1 + iroh/src/sync_engine/live.rs | 7 +- 4 files changed, 196 insertions(+), 44 deletions(-) diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index df139a2266..3de10ed827 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -29,7 +29,7 @@ //! requests to a single peer is also limited. use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, HashMap}, num::NonZeroUsize, }; @@ -244,7 +244,7 @@ impl Downloader { } /// Queue a download. - pub async fn queue(&mut self, kind: DownloadKind, peers: Vec) -> DownloadHandle { + pub async fn queue(&mut self, kind: DownloadKind, peers: Vec) -> DownloadHandle { let id = self.next_id; self.next_id = self.next_id.wrapping_add(1); @@ -285,7 +285,7 @@ impl Downloader { } /// Declare that certains peers can be used to download a hash. - pub async fn peers_have(&mut self, hash: Hash, peers: Vec) { + pub async fn peers_have(&mut self, hash: Hash, peers: Vec) { let msg = Message::PeersHave { hash, peers }; if let Err(send_err) = self.msg_tx.send(msg).await { let msg = send_err.0; @@ -294,6 +294,51 @@ impl Downloader { } } +/// A peer and its role with regard to a hash. +#[derive(Debug, Clone, Copy)] +pub struct PeerInfo { + node_id: PublicKey, + role: PeerRole, +} + +impl PeerInfo { + /// Create a new [`Peer`] from its parts. + pub fn new(node_id: PublicKey, role: PeerRole) -> Self { + Self { node_id, role } + } +} + +impl From<(PublicKey, PeerRole)> for PeerInfo { + fn from((node_id, role): (PublicKey, PeerRole)) -> Self { + Self { node_id, role } + } +} + +/// The role of a peer with regard to a download intent. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum PeerRole { + /// We have information that this peer has the requested blob. + Provider, + /// We do not have information if this peer has the requested blob. + Candidate, +} + +impl PartialOrd for PeerRole { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} +impl Ord for PeerRole { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + match (self, other) { + (PeerRole::Provider, PeerRole::Provider) => std::cmp::Ordering::Equal, + (PeerRole::Candidate, PeerRole::Candidate) => std::cmp::Ordering::Equal, + (PeerRole::Provider, PeerRole::Candidate) => std::cmp::Ordering::Greater, + (PeerRole::Candidate, PeerRole::Provider) => std::cmp::Ordering::Less, + } + } +} + /// Messages the service can receive. #[derive(derive_more::Debug)] enum Message { @@ -303,13 +348,13 @@ enum Message { id: Id, #[debug(skip)] sender: oneshot::Sender, - peers: Vec, + peers: Vec, }, /// Cancel an intent. The associated request will be cancelled when the last intent is /// cancelled. Cancel { id: Id, kind: DownloadKind }, /// Declare that peers have certains hash and can be used for downloading. This feeds the [`ProviderMap`]. - PeersHave { hash: Hash, peers: Vec }, + PeersHave { hash: Hash, peers: Vec }, } /// Information about a request being processed. @@ -340,7 +385,7 @@ struct PendingRequestInfo { delay_key: delay_queue::Key, /// If this attempt was scheduled with a known potential peer, this is stored here to /// prevent another query to the [`ProviderMap`]. - next_peer: Option, + next_peer: Option, } /// State of the connection to this peer. @@ -507,7 +552,7 @@ impl, D: Dialer> Service { kind: DownloadKind, id: Id, sender: oneshot::Sender, - peers: Vec, + peers: Vec, ) { self.providers.add_peers(*kind.hash(), &peers); if let Some(info) = self.current_requests.get_mut(&kind) { @@ -534,9 +579,29 @@ impl, D: Dialer> Service { Some(info) => { info.intents.insert(id, sender); // pre-emptively get a peer if we don't already have one - if info.next_peer.is_none() { - info.next_peer = next_peer + let mut reset = false; + match (info.next_peer, next_peer) { + // We did not yet have next peer, but have a peer now. + (None, Some(next_peer)) => { + reset = matches!(next_peer.role, PeerRole::Provider); + info.next_peer = Some(next_peer); + } + // We did have a next peer already, but the new peer has a better role + // (i.e. is a Provider, while the old next peer is a only Candidate) + (Some(old_next_peer), Some(next_peer)) + if old_next_peer.role < next_peer.role => + { + info.next_peer = Some(next_peer); + reset = true; + } + _ => {} } + // Reset the delay queue timer if we inserted a `Provider` peer. + if reset { + self.scheduled_request_queue + .reset(&info.delay_key, std::time::Duration::ZERO); + } + // increasing the retries by one accounts for multiple intents for the same request in // a conservative way info.remaining_retries += 1; @@ -544,7 +609,7 @@ impl, D: Dialer> Service { } None => { let intents = HashMap::from([(id, sender)]); - self.schedule_request(kind, INITIAL_RETRY_COUNT, next_peer, intents) + self.schedule_request(kind, INITIAL_RETRY_COUNT, next_peer, intents, false) } } } @@ -557,7 +622,7 @@ impl, D: Dialer> Service { /// /// If the selected candidate is not connected and we have capacity for another connection, a /// dial is queued. - fn get_best_candidate(&mut self, hash: &Hash) -> Option { + fn get_best_candidate(&mut self, hash: &Hash) -> Option { /// Model the state of peers found in the candidates #[derive(PartialEq, Eq, Clone, Copy)] enum ConnState { @@ -602,13 +667,13 @@ impl, D: Dialer> Service { .providers .get_candidates(hash) .filter_map(|peer| { - if let Some(info) = self.peers.get(peer) { + if let Some(info) = self.peers.get(&peer.node_id) { info.conn.as_ref()?; let req_count = info.active_requests(); // filter out peers at capacity let has_capacity = !self.concurrency_limits.peer_at_request_capacity(req_count); has_capacity.then_some((peer, ConnState::Connected(req_count))) - } else if self.dialer.is_pending(peer) { + } else if self.dialer.is_pending(&peer.node_id) { Some((peer, ConnState::Dialing)) } else { Some((peer, ConnState::NotConnected)) @@ -616,7 +681,10 @@ impl, D: Dialer> Service { }) .collect::>(); - candidates.sort_unstable_by_key(|peer_and_state| peer_and_state.1 /* state */); + // Sort candidates by: + // * Role (Providers > Candidates) + // * ConnState (Connected > Dialing > NotConnected) + candidates.sort_unstable_by_key(|(PeerInfo { role, .. }, state)| (*role, *state)); // this is our best peer, check if we need to dial it let (peer, state) = candidates.pop()?; @@ -624,11 +692,11 @@ impl, D: Dialer> Service { if let ConnState::NotConnected = state { if !self.at_connections_capacity() { // peer is not connected, not dialing and concurrency limits allow another connection - debug!(%peer, "dialing peer"); - self.dialer.queue_dial(*peer); + debug!(peer = %peer.node_id, "dialing peer"); + self.dialer.queue_dial(peer.node_id); Some(*peer) } else { - trace!(%peer, "required peer not dialed to maintain concurrency limits"); + trace!(peer = %peer.node_id, "required peer not dialed to maintain concurrency limits"); None } } else { @@ -663,7 +731,7 @@ impl, D: Dialer> Service { } /// Handle a [`Message::PeersHave`]. - fn handle_peers_have(&mut self, hash: Hash, peers: Vec) { + fn handle_peers_have(&mut self, hash: Hash, peers: Vec) { // check if this still needed if self.is_needed(hash) { self.providers.add_peers(hash, &peers); @@ -756,7 +824,7 @@ impl, D: Dialer> Service { debug!(%peer, ?kind, %reason, "download attempt failed"); remaining_retries -= 1; let next_peer = self.get_best_candidate(kind.hash()); - self.schedule_request(kind, remaining_retries, next_peer, intents); + self.schedule_request(kind, remaining_retries, next_peer, intents, true); } else { debug!(%peer, ?kind, %reason, "download failed"); for sender in intents.into_values() { @@ -785,10 +853,10 @@ impl, D: Dialer> Service { // first try with the peer that was initially assigned if let Some((peer, conn)) = next_peer.and_then(|peer| { - self.get_peer_connection_for_download(&peer) + self.get_peer_connection_for_download(&peer.node_id) .map(|conn| (peer, conn)) }) { - return self.start_download(kind, peer, conn, remaining_retries, intents); + return self.start_download(kind, peer.node_id, conn, remaining_retries, intents); } // we either didn't have a peer or the peer is busy or dialing. In any case try to get @@ -797,9 +865,15 @@ impl, D: Dialer> Service { None => None, Some(peer) => { // optimistically check if the peer could do the request right away - match self.get_peer_connection_for_download(&peer) { + match self.get_peer_connection_for_download(&peer.node_id) { Some(conn) => { - return self.start_download(kind, peer, conn, remaining_retries, intents) + return self.start_download( + kind, + peer.node_id, + conn, + remaining_retries, + intents, + ) } None => Some(peer), } @@ -810,7 +884,7 @@ impl, D: Dialer> Service { // is failed if remaining_retries > 0 { remaining_retries -= 1; - self.schedule_request(kind, remaining_retries, next_peer, intents); + self.schedule_request(kind, remaining_retries, next_peer, intents, false); } else { // request can't be retried for sender in intents.into_values() { @@ -864,13 +938,27 @@ impl, D: Dialer> Service { &mut self, kind: DownloadKind, remaining_retries: u8, - next_peer: Option, + next_peer: Option, intents: HashMap>, + is_retry: bool ) { // this is simply INITIAL_REQUEST_DELAY * attempt_num where attempt_num (as an ordinal // number) is maxed at INITIAL_RETRY_COUNT - let delay = INITIAL_REQUEST_DELAY - * (INITIAL_RETRY_COUNT.saturating_sub(remaining_retries) as u32 + 1); + let delay = match (is_retry, next_peer) { + // The next peer has the Provider role, which means we assume it can provide the hash. + // Thus, start the download immediately. + (false, Some(PeerInfo { + role: PeerRole::Provider, + .. + })) => std::time::Duration::ZERO, + // We either have no next peer yet or the next peer only has the Candidate role, thus + // delay the request. + _ => { + INITIAL_REQUEST_DELAY + * (INITIAL_RETRY_COUNT.saturating_sub(remaining_retries) as u32 + 1) + } + }; + let delay_key = self.scheduled_request_queue.insert(kind.clone(), delay); let info = PendingRequestInfo { @@ -939,15 +1027,15 @@ impl, D: Dialer> Service { #[derive(Default, Debug)] pub struct ProviderMap { /// Candidates to download a hash. - candidates: HashMap>, + candidates: HashMap>, } struct ProviderIter<'a> { - inner: Option>, + inner: Option>, } impl<'a> Iterator for ProviderIter<'a> { - type Item = &'a PublicKey; + type Item = &'a PeerInfo; fn next(&mut self) -> Option { self.inner.as_mut().and_then(|iter| iter.next()) @@ -956,14 +1044,20 @@ impl<'a> Iterator for ProviderIter<'a> { impl ProviderMap { /// Get candidates to download this hash. - fn get_candidates(&self, hash: &Hash) -> impl Iterator { - let inner = self.candidates.get(hash).map(|peer_set| peer_set.iter()); + fn get_candidates(&self, hash: &Hash) -> impl Iterator { + let inner = self.candidates.get(hash).map(|peers| peers.values()); ProviderIter { inner } } /// Register peers for a hash. Should only be done for hashes we care to download. - fn add_peers(&mut self, hash: Hash, peers: &[PublicKey]) { - self.candidates.entry(hash).or_default().extend(peers) + fn add_peers(&mut self, hash: Hash, peers: &[PeerInfo]) { + let entry = self.candidates.entry(hash).or_default(); + for peer in peers { + entry + .entry(peer.node_id) + .and_modify(|existing| existing.role = (existing.role).max(peer.role)) + .or_insert(*peer); + } } /// Signal the registry that this hash is no longer of interest. diff --git a/iroh/src/downloader/test.rs b/iroh/src/downloader/test.rs index 80454226fc..98ab99981c 100644 --- a/iroh/src/downloader/test.rs +++ b/iroh/src/downloader/test.rs @@ -46,7 +46,9 @@ async fn smoke_test() { let kind = DownloadKind::Blob { hash: Hash::new([0u8; 32]), }; - let handle = downloader.queue(kind.clone(), vec![peer]).await; + let handle = downloader + .queue(kind.clone(), vec![(peer, PeerRole::Candidate).into()]) + .await; // wait for the download result to be reported handle.await.expect("should report success"); // verify that the peer was dialed @@ -73,7 +75,9 @@ async fn deduplication() { }; let mut handles = Vec::with_capacity(10); for _ in 0..10 { - let h = downloader.queue(kind.clone(), vec![peer]).await; + let h = downloader + .queue(kind.clone(), vec![(peer, PeerRole::Candidate).into()]) + .await; handles.push(h); } assert!( @@ -103,16 +107,24 @@ async fn cancellation() { let kind_1 = DownloadKind::Blob { hash: Hash::new([0u8; 32]), }; - let handle_a = downloader.queue(kind_1.clone(), vec![peer]).await; - let handle_b = downloader.queue(kind_1.clone(), vec![peer]).await; + let handle_a = downloader + .queue(kind_1.clone(), vec![(peer, PeerRole::Candidate).into()]) + .await; + let handle_b = downloader + .queue(kind_1.clone(), vec![(peer, PeerRole::Candidate).into()]) + .await; downloader.cancel(handle_a).await; // create a request with two intents and cancel them both let kind_2 = DownloadKind::Blob { hash: Hash::new([1u8; 32]), }; - let handle_c = downloader.queue(kind_2.clone(), vec![peer]).await; - let handle_d = downloader.queue(kind_2.clone(), vec![peer]).await; + let handle_c = downloader + .queue(kind_2.clone(), vec![(peer, PeerRole::Candidate).into()]) + .await; + let handle_d = downloader + .queue(kind_2.clone(), vec![(peer, PeerRole::Candidate).into()]) + .await; downloader.cancel(handle_c).await; downloader.cancel(handle_d).await; @@ -148,7 +160,9 @@ async fn max_concurrent_requests() { let kind = DownloadKind::Blob { hash: Hash::new([i; 32]), }; - let h = downloader.queue(kind.clone(), vec![peer]).await; + let h = downloader + .queue(kind.clone(), vec![(peer, PeerRole::Candidate).into()]) + .await; expected_history.push((kind, peer)); handles.push(h); } @@ -191,9 +205,49 @@ async fn max_concurrent_requests_per_peer() { let kind = DownloadKind::Blob { hash: Hash::new([i; 32]), }; - let h = downloader.queue(kind.clone(), vec![peer]).await; + let h = downloader + .queue(kind.clone(), vec![(peer, PeerRole::Candidate).into()]) + .await; handles.push(h); } futures::future::join_all(handles).await; } + +/// Tests that providers are preferred over candidates. +#[tokio::test] +async fn peer_role_provider() { + let dialer = dialer::TestingDialer::default(); + let getter = getter::TestingGetter::default(); + let concurrency_limits = ConcurrencyLimits::default(); + + let mut downloader = + Downloader::spawn_for_test(dialer.clone(), getter.clone(), concurrency_limits); + + let peer_candidate1 = SecretKey::from_bytes(&[0u8; 32]).public(); + let peer_candidate2 = SecretKey::from_bytes(&[1u8; 32]).public(); + let peer_provider = SecretKey::from_bytes(&[2u8; 32]).public(); + let kind = DownloadKind::Blob { + hash: Hash::new([0u8; 32]), + }; + let handle = downloader + .queue( + kind.clone(), + vec![ + (peer_candidate1, PeerRole::Candidate).into(), + (peer_provider, PeerRole::Provider).into(), + (peer_candidate2, PeerRole::Candidate).into(), + ], + ) + .await; + let now = std::time::Instant::now(); + assert!(handle.await.is_ok(), "download succeeded"); + // this is, I think, currently the best way to test that no delay was performed. It should be + // safe enough to assume that test runtime is not longer than the delay of 500ms. + assert!( + now.elapsed() < INITIAL_REQUEST_DELAY, + "now initial delay was added to fetching from a provider" + ); + getter.assert_history(&[(kind, peer_provider)]); + dialer.assert_history(&[peer_provider]); +} diff --git a/iroh/src/downloader/test/dialer.rs b/iroh/src/downloader/test/dialer.rs index 7f40dd4d01..ae6e1af379 100644 --- a/iroh/src/downloader/test/dialer.rs +++ b/iroh/src/downloader/test/dialer.rs @@ -1,6 +1,7 @@ //! Implementation of [`super::Dialer`] used for testing. use std::{ + collections::HashSet, sync::Arc, task::{Context, Poll}, time::Duration, diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index ed39674de0..a772904290 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -7,7 +7,7 @@ use std::{ time::SystemTime, }; -use crate::downloader::{DownloadKind, Downloader}; +use crate::downloader::{DownloadKind, Downloader, PeerRole}; use anyhow::{anyhow, bail, Result}; use flume::r#async::RecvStream; use futures::{ @@ -841,7 +841,10 @@ impl Actor { if matches!(entry_status, EntryStatus::NotFound) { let handle = self .downloader - .queue(DownloadKind::Blob { hash }, vec![from]) + .queue( + DownloadKind::Blob { hash }, + vec![(from, PeerRole::Candidate).into()], + ) .await; let fut = async move { // NOTE: this ignores the result for now, simply keeping the option From 40f38fa1c9ee7dfbe1637028178d5b9381ba138c Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 14 Sep 2023 13:41:53 +0200 Subject: [PATCH 03/18] feat: transfer content status during set reconciliation --- iroh-sync/src/ranger.rs | 77 +++++++++++++++++++++++++----------- iroh-sync/src/sync.rs | 73 +++++++++++++++++++++++++++++----- iroh/src/sync_engine/live.rs | 63 +++++++++++++++-------------- 3 files changed, 151 insertions(+), 62 deletions(-) diff --git a/iroh-sync/src/ranger.rs b/iroh-sync/src/ranger.rs index be791629d8..9dc2b76416 100644 --- a/iroh-sync/src/ranger.rs +++ b/iroh-sync/src/ranger.rs @@ -8,6 +8,8 @@ use std::marker::PhantomData; use serde::{Deserialize, Serialize}; +use crate::ContentStatus; + /// Store entries that can be fingerprinted and put into ranges. pub trait RangeEntry: Debug + Clone + PartialOrd { /// The key for this entry, to be used in ranges. @@ -126,7 +128,7 @@ pub struct RangeItem { ))] pub range: Range, #[serde(bound(serialize = "E: Serialize", deserialize = "E: Deserialize<'de>"))] - pub values: Vec, + pub values: Vec<(E, ContentStatus)>, /// If false, requests to send local items in the range. /// Otherwise not. pub have_local: bool, @@ -155,7 +157,7 @@ impl MessagePart { matches!(self, MessagePart::RangeItem(_)) } - pub fn values(&self) -> Option<&[E]> { + pub fn values(&self) -> Option<&[(E, ContentStatus)]> { match self { MessagePart::RangeFingerprint(_) => None, MessagePart::RangeItem(RangeItem { values, .. }) => Some(values), @@ -269,13 +271,15 @@ where /// `validate_cb` is called before an entry received from the remote is inserted into the store. /// It must return true if the entry is valid and should be stored, and false otherwise /// (which means the entry will be dropped and not stored). - pub fn process_message( + pub fn process_message( &mut self, message: Message, validate_cb: F, + content_status_cb: F2, ) -> Result>, S::Error> where - F: Fn(&S, &E) -> bool, + F: Fn(&S, &E, ContentStatus) -> bool, + F2: Fn(&S, &E) -> ContentStatus, { let mut out = Vec::new(); @@ -308,7 +312,10 @@ where .get_range(range.clone())? .filter_map(|existing| match existing { Ok(existing) => { - if !values.iter().any(|entry| existing.key() == entry.key()) { + if !values + .iter() + .any(|(entry, _)| existing.key() == entry.key()) + { Some(Ok(existing)) } else { None @@ -316,13 +323,19 @@ where } Err(err) => Some(Err(err)), }) + .map(|entry| { + entry.map(|entry| { + let content_status = content_status_cb(&self.store, &entry); + (entry, content_status) + }) + }) .collect::>()?, ) }; // Store incoming values - for entry in values { - if validate_cb(&self.store, &entry) { + for (entry, content_status) in values { + if validate_cb(&self.store, &entry, content_status) { self.store.put(entry)?; } } @@ -355,9 +368,16 @@ where .get_range(range.clone())? .collect::>()?; if local_values.len() <= 1 || fingerprint == Fingerprint::empty() { + let values = local_values + .into_iter() + .map(|entry| { + let content_status = content_status_cb(&self.store, &entry); + (entry, content_status) + }) + .collect::>(); out.push(MessagePart::RangeItem(RangeItem { range, - values: local_values, + values, have_local: false, })); } else { @@ -443,7 +463,15 @@ where fingerprint, })); } else { - let values = chunk.into_iter().collect::>()?; + let values = chunk + .into_iter() + .map(|entry| { + entry.map(|entry| { + let content_status = content_status_cb(&self.store, &entry); + (entry, content_status) + }) + }) + .collect::>()?; out.push(MessagePart::RangeItem(RangeItem { range, values, @@ -843,14 +871,14 @@ mod tests { let validate_alice: ValidateCb<&str, i32> = Box::new({ let alice_validate_set = alice_validate_set.clone(); - move |_, e| { + move |_, e, _| { alice_validate_set.borrow_mut().push(*e); false } }); let validate_bob: ValidateCb<&str, i32> = Box::new({ let bob_validate_set = bob_validate_set.clone(); - move |_, e| { + move |_, e, _| { bob_validate_set.borrow_mut().push(*e); false } @@ -974,15 +1002,15 @@ mod tests { } } - type ValidateCb = Box, &(K, V)) -> bool>; + type ValidateCb = Box, &(K, V), ContentStatus) -> bool>; fn sync(alice_set: &[(K, V)], bob_set: &[(K, V)]) -> SyncResult where K: RangeKey + PartialEq + Clone + Default + Debug, V: Debug + Clone + PartialOrd, { - let alice_validate_cb: ValidateCb = Box::new(|_, _| true); - let bob_validate_cb: ValidateCb = Box::new(|_, _| true); + let alice_validate_cb: ValidateCb = Box::new(|_, _, _| true); + let bob_validate_cb: ValidateCb = Box::new(|_, _, _| true); sync_with_validate_cb_and_assert(alice_set, bob_set, &alice_validate_cb, &bob_validate_cb) } @@ -995,8 +1023,8 @@ mod tests { where K: RangeKey + PartialEq + Clone + Default + Debug, V: Debug + Clone + PartialOrd, - F1: Fn(&SimpleStore, &(K, V)) -> bool, - F2: Fn(&SimpleStore, &(K, V)) -> bool, + F1: Fn(&SimpleStore, &(K, V), ContentStatus) -> bool, + F2: Fn(&SimpleStore, &(K, V), ContentStatus) -> bool, { let mut expected_set_alice = BTreeMap::new(); let mut expected_set_bob = BTreeMap::new(); @@ -1038,7 +1066,7 @@ mod tests { for msg in &res.alice_to_bob { for part in &msg.parts { if let Some(values) = part.values() { - for e in values { + for (e, _) in values { assert!( alice_sent.insert(e.key(), e).is_none(), "alice: duplicate {:?}", @@ -1053,7 +1081,7 @@ mod tests { for msg in &res.bob_to_alice { for part in &msg.parts { if let Some(values) = part.values() { - for e in values { + for (e, _) in values { assert!( bob_sent.insert(e.key(), e).is_none(), "bob: duplicate {:?}", @@ -1077,8 +1105,8 @@ mod tests { where K: RangeKey + PartialEq + Clone + Default + Debug, V: Debug + Clone + PartialOrd, - F1: Fn(&SimpleStore, &(K, V)) -> bool, - F2: Fn(&SimpleStore, &(K, V)) -> bool, + F1: Fn(&SimpleStore, &(K, V), ContentStatus) -> bool, + F2: Fn(&SimpleStore, &(K, V), ContentStatus) -> bool, { let mut alice_to_bob = Vec::new(); let mut bob_to_alice = Vec::new(); @@ -1091,9 +1119,14 @@ mod tests { rounds += 1; alice_to_bob.push(msg.clone()); - if let Some(msg) = bob.process_message(msg, &bob_validate_cb).unwrap() { + if let Some(msg) = bob + .process_message(msg, &bob_validate_cb, |_, _| ContentStatus::Complete) + .unwrap() + { bob_to_alice.push(msg.clone()); - next_to_bob = alice.process_message(msg, &alice_validate_cb).unwrap(); + next_to_bob = alice + .process_message(msg, &alice_validate_cb, |_, _| ContentStatus::Complete) + .unwrap(); } } SyncResult { diff --git a/iroh-sync/src/sync.rs b/iroh-sync/src/sync.rs index ecde915bd2..b25e1c87a7 100644 --- a/iroh-sync/src/sync.rs +++ b/iroh-sync/src/sync.rs @@ -52,7 +52,23 @@ pub enum InsertOrigin { /// The entry was inserted locally. Local, /// The entry was received from the remote peer identified by [`PeerIdBytes`]. - Sync(PeerIdBytes), + Sync { + /// The peer from which we received this entry. + from: PeerIdBytes, + /// Whether the peer claims to have the content blob for this entry. + content_status: ContentStatus, + }, +} + +/// Whether the content status is available on a node. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] +pub enum ContentStatus { + /// The content is completely available. + Complete, + /// The content is partially available. + Incomplete, + /// The content is missing. + Missing, } /// Local representation of a mutable, synchronizable key-value store. @@ -61,6 +77,11 @@ pub struct Replica + PublicKeyStore> { inner: Arc>>, #[allow(clippy::type_complexity)] on_insert_sender: Arc>>>, + + #[allow(clippy::type_complexity)] + #[debug("ContentStatusCallback")] + content_status_cb: + Arc ContentStatus + Send + Sync + 'static>>>>, } #[derive(derive_more::Debug)] @@ -85,6 +106,7 @@ impl + PublicKeyStore + 'static> Replica { peer: Peer::from_store(store), })), on_insert_sender: Arc::new(RwLock::new(None)), + content_status_cb: Arc::new(RwLock::new(None)), } } @@ -114,6 +136,24 @@ impl + PublicKeyStore + 'static> Replica { self.on_insert_sender.write().take().is_some() } + /// Set the content status callback. + /// + /// Only one callback can be active at a time. If a previous callback was registered, this + /// will return `false`. + pub fn set_content_status_callback( + &self, + cb: Box ContentStatus + Send + Sync + 'static>, + ) -> bool { + let mut content_status_cb = self.content_status_cb.write(); + match &*content_status_cb { + Some(_cb) => false, + None => { + *content_status_cb = Some(cb); + true + } + } + } + /// Insert a new record at the given key. /// /// The entry will by signed by the provided `author`. @@ -144,8 +184,12 @@ impl + PublicKeyStore + 'static> Replica { &self, entry: SignedEntry, received_from: PeerIdBytes, + content_status: ContentStatus, ) -> Result<(), InsertError> { - let origin = InsertOrigin::Sync(received_from); + let origin = InsertOrigin::Sync { + from: received_from, + content_status, + }; self.insert_entry(entry, origin) } @@ -177,7 +221,7 @@ impl + PublicKeyStore + 'static> Replica { inc!(Metrics, new_entries_local); inc_by!(Metrics, new_entries_local_size, len); } - InsertOrigin::Sync(_) => { + InsertOrigin::Sync { .. } => { inc!(Metrics, new_entries_remote); inc_by!(Metrics, new_entries_remote_size, len); } @@ -224,12 +268,13 @@ impl + PublicKeyStore + 'static> Replica { ) -> Result>, S::Error> { let expected_namespace = self.namespace(); let now = system_time_now(); - let reply = self - .inner - .write() - .peer - .process_message(message, |store, entry| { - let origin = InsertOrigin::Sync(from_peer); + let reply = self.inner.write().peer.process_message( + message, + |store, entry, content_status| { + let origin = InsertOrigin::Sync { + from: from_peer, + content_status, + }; if validate_entry(now, store, expected_namespace, entry, &origin).is_ok() { if let Some(sender) = self.on_insert_sender.read().as_ref() { sender.send((origin, entry.clone())).ok(); @@ -238,7 +283,15 @@ impl + PublicKeyStore + 'static> Replica { } else { false } - })?; + }, + |_store, entry| { + if let Some(cb) = self.content_status_cb.read().as_ref() { + cb(entry.content_hash()) + } else { + ContentStatus::Missing + } + }, + )?; Ok(reply) } diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index a772904290..cb945a610c 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -37,6 +37,8 @@ use tokio::{ }; use tracing::{debug, error, warn}; +pub use iroh_sync::sync::ContentStatus; + const CHANNEL_CAP: usize = 8; /// The address to connect to a peer @@ -201,28 +203,11 @@ pub enum LiveEvent { SyncFinished(SyncEvent), } -/// Availability status of an entry's content bytes -// TODO: Add IsDownloading -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum ContentStatus { - /// Fully available on the local node. - Complete, - /// Partially available on the local node. - Incomplete, - /// Not available on the local node. - /// - /// This currently means either that the content is about to be downloaded, failed to be - /// downloaded, or was never requested. - Missing, -} - -impl From for ContentStatus { - fn from(value: EntryStatus) -> Self { - match value { - EntryStatus::Complete => ContentStatus::Complete, - EntryStatus::Partial => ContentStatus::Incomplete, - EntryStatus::NotFound => ContentStatus::Missing, - } +fn entry_to_content_status(entry: EntryStatus) -> ContentStatus { + match entry { + EntryStatus::Complete => ContentStatus::Complete, + EntryStatus::Partial => ContentStatus::Incomplete, + EntryStatus::NotFound => ContentStatus::Missing, } } @@ -614,10 +599,19 @@ impl Actor { let Some(replica) = self.replica_store.open_replica(&namespace)? else { bail!("Replica not found"); }; + + // setup event subscription. let events = replica .subscribe() .ok_or_else(|| anyhow::anyhow!("trying to subscribe twice to the same replica"))?; self.replica_events.push(events.into_stream()); + + // setup content status callback + let bao_store = self.bao_store.clone(); + let content_status_cb = + Box::new(move |hash| entry_to_content_status(bao_store.contains(&hash))); + replica.set_content_status_callback(content_status_cb); + self.open_replicas.insert(namespace); } Ok(()) @@ -789,7 +783,12 @@ impl Actor { match op { Op::Put(entry) => { debug!(peer = ?msg.delivered_from, topic = ?topic, "received entry via gossip"); - replica.insert_remote_entry(entry, *msg.delivered_from.as_bytes())? + // At this point, we do not know if the peer has the content. + replica.insert_remote_entry( + entry, + *msg.delivered_from.as_bytes(), + ContentStatus::Missing, + )? } } } @@ -830,7 +829,10 @@ impl Actor { notify_all(subs, event).await; } } - InsertOrigin::Sync(peer_id) => { + InsertOrigin::Sync { + from: peer_id, + content_status, + } => { let from = PublicKey::from_bytes(&peer_id)?; let entry = signed_entry.entry(); let hash = entry.record().content_hash(); @@ -838,13 +840,14 @@ impl Actor { // A new entry was inserted from initial sync or gossip. Queue downloading the // content. let entry_status = self.bao_store.contains(&hash); - if matches!(entry_status, EntryStatus::NotFound) { + if matches!(entry_status, EntryStatus::NotFound | EntryStatus::Partial) { + let role = match content_status { + ContentStatus::Complete => PeerRole::Provider, + _ => PeerRole::Candidate, + }; let handle = self .downloader - .queue( - DownloadKind::Blob { hash }, - vec![(from, PeerRole::Candidate).into()], - ) + .queue(DownloadKind::Blob { hash }, vec![(from, role).into()]) .await; let fut = async move { // NOTE: this ignores the result for now, simply keeping the option @@ -860,7 +863,7 @@ impl Actor { let event = LiveEvent::InsertRemote { from, entry: entry.clone(), - content_status: entry_status.into(), + content_status: entry_to_content_status(entry_status), }; notify_all(subs, event).await; } From dccb4de829d11099980659b7cb8c4a72c5a5392d Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 14 Sep 2023 13:46:53 +0200 Subject: [PATCH 04/18] feat: inform neighbors about finished content downloads --- iroh/src/sync_engine/live.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index cb945a610c..496e97503a 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -37,7 +37,7 @@ use tokio::{ }; use tracing::{debug, error, warn}; -pub use iroh_sync::sync::ContentStatus; +pub use iroh_sync::ContentStatus; const CHANNEL_CAP: usize = 8; @@ -105,6 +105,8 @@ impl FromStr for PeerSource { pub enum Op { /// A new entry was inserted into the document. Put(SignedEntry), + /// A peer now has content available for a hash. + ContentReady(Hash), } #[derive(Debug)] @@ -475,7 +477,7 @@ impl Actor { // new gossip message Some(event) = self.gossip_events.next() => { let (topic, event) = event?; - if let Err(err) = self.on_gossip_event(topic, event) { + if let Err(err) = self.on_gossip_event(topic, event).await { error!("Failed to process gossip event: {err:?}"); } }, @@ -505,6 +507,11 @@ impl Actor { let event = LiveEvent::ContentReady { hash }; notify_all(subs, event).await; } + + // Inform our neighbors that we have new content ready. + let op = Op::ContentReady(hash); + let message = postcard::to_stdvec(&op)?.into(); + self.gossip.broadcast_neighbors(topic, message).await?; } } @@ -790,6 +797,11 @@ impl Actor { ContentStatus::Missing, )? } + Op::ContentReady(hash) => { + // Inform the downloader that we now know that this peer has the content + // for this hash. + self.downloader.peers_have(hash, vec![(msg.delivered_from, PeerRole::Provider).into()]).await; + } } } // A new neighbor appeared in the gossip swarm. Try to sync with it directly. From 0535ea2b6aee1a12d74d0861e53e6c39e00439a5 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 14 Sep 2023 15:12:55 +0200 Subject: [PATCH 05/18] fix: better transfer of endpoints to gossip I do not know why occured now, but without this change the `sync_full_basic` test hangs for me. With this change, it passes. Also is a bit cleaner architecture this way. --- iroh/src/node.rs | 45 ++++++++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 633b8e71b0..4c7b18e692 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -44,12 +44,11 @@ use iroh_net::{ tls, MagicEndpoint, NodeAddr, }; use iroh_sync::store::Store as DocStore; -use once_cell::sync::OnceCell; use quic_rpc::server::RpcChannel; use quic_rpc::transport::flume::FlumeConnection; use quic_rpc::transport::misc::DummyServerEndpoint; use quic_rpc::{RpcClient, RpcServer, ServiceEndpoint}; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::task::JoinError; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; @@ -333,10 +332,6 @@ where .max_concurrent_bidi_streams(MAX_STREAMS.try_into()?) .max_concurrent_uni_streams(0u32.into()); - // init a cell that will hold our gossip handle to be used in endpoint callbacks - let gossip_cell: OnceCell = OnceCell::new(); - let gossip_cell2 = gossip_cell.clone(); - let endpoint = MagicEndpoint::builder() .secret_key(self.secret_key.clone()) .alpns(PROTOCOLS.iter().map(|p| p.to_vec()).collect()) @@ -344,15 +339,8 @@ where .transport_config(transport_config) .concurrent_connections(MAX_CONNECTIONS) .on_endpoints(Box::new(move |eps| { - if eps.is_empty() { - return; - } - // send our updated endpoints to the gossip protocol to be sent as PeerData to peers - if let Some(gossip) = gossip_cell2.get() { - gossip.update_endpoints(eps).ok(); - } - if !endpoints_update_s.is_disconnected() { - endpoints_update_s.send(()).ok(); + if !eps.is_empty() { + endpoints_update_s.send(eps.to_vec()).ok(); } })); let endpoint = match self.derp_map { @@ -369,8 +357,6 @@ where // initialize the gossip protocol let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default()); - // insert into the gossip cell to be used in the endpoint callbacks above - gossip_cell.set(gossip.clone()).unwrap(); // spawn the sync engine let downloader = Downloader::new( @@ -401,10 +387,11 @@ where cancel_token, callbacks: callbacks.clone(), cb_sender, - rt, + rt: rt.clone(), sync, }); let task = { + let gossip = gossip.clone(); let handler = RpcHandler { inner: inner.clone(), collection_parser: self.collection_parser.clone(), @@ -431,13 +418,25 @@ where task: task.map_err(Arc::new).boxed().shared(), }; + // spawn a task that updates the gossip endpoints. + let (first_endpoint_update_tx, first_endpoint_update_rx) = oneshot::channel(); + let mut first_endpoint_update_tx = Some(first_endpoint_update_tx); + rt.main().spawn(async move { + while let Ok(eps) = endpoints_update_r.recv_async().await { + if let Err(err) = gossip.update_endpoints(&eps) { + warn!("Failed to update gossip endpoints: {err:?}"); + } + if let Some(tx) = first_endpoint_update_tx.take() { + tx.send(()).ok(); + } + } + }); + // Wait for a single endpoint update, to make sure // we found some endpoints - tokio::time::timeout(ENDPOINT_WAIT, async move { - endpoints_update_r.recv_async().await - }) - .await - .context("waiting for endpoint")??; + tokio::time::timeout(ENDPOINT_WAIT, first_endpoint_update_rx) + .await + .context("waiting for endpoint")??; Ok(node) } From e3c401f9744dec6178d233da8325fdb40be99a47 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 14 Sep 2023 15:14:03 +0200 Subject: [PATCH 06/18] chore: fmt & clippy --- iroh-gossip/src/proto/tests.rs | 2 +- iroh/src/downloader.rs | 13 ++++++++----- iroh/src/sync_engine/live.rs | 4 +++- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/iroh-gossip/src/proto/tests.rs b/iroh-gossip/src/proto/tests.rs index f7607b06cf..126f6d0ff7 100644 --- a/iroh-gossip/src/proto/tests.rs +++ b/iroh-gossip/src/proto/tests.rs @@ -336,7 +336,7 @@ impl Simulator { let events = self.network.events(); let received: HashSet<_> = events .filter( - |(_peer, _topic, event)| matches!(event, Event::Received(recv) if recv.content == &message), + |(_peer, _topic, event)| matches!(event, Event::Received(recv) if recv.content == message), ) .map(|(peer, _topic, _msg)| peer) .collect(); diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index 3de10ed827..93eef6e541 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -940,17 +940,20 @@ impl, D: Dialer> Service { remaining_retries: u8, next_peer: Option, intents: HashMap>, - is_retry: bool + is_retry: bool, ) { // this is simply INITIAL_REQUEST_DELAY * attempt_num where attempt_num (as an ordinal // number) is maxed at INITIAL_RETRY_COUNT let delay = match (is_retry, next_peer) { // The next peer has the Provider role, which means we assume it can provide the hash. // Thus, start the download immediately. - (false, Some(PeerInfo { - role: PeerRole::Provider, - .. - })) => std::time::Duration::ZERO, + ( + false, + Some(PeerInfo { + role: PeerRole::Provider, + .. + }), + ) => std::time::Duration::ZERO, // We either have no next peer yet or the next peer only has the Candidate role, thus // delay the request. _ => { diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index 496e97503a..d4720a633a 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -800,7 +800,9 @@ impl Actor { Op::ContentReady(hash) => { // Inform the downloader that we now know that this peer has the content // for this hash. - self.downloader.peers_have(hash, vec![(msg.delivered_from, PeerRole::Provider).into()]).await; + self.downloader + .peers_have(hash, vec![(msg.delivered_from, PeerRole::Provider).into()]) + .await; } } } From b0e0268f559643827771cbb7f27106719b5797b1 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 18 Sep 2023 10:36:02 +0200 Subject: [PATCH 07/18] feat(downloader): start hashes for PeerRole::Provider immediately --- iroh/src/downloader.rs | 142 +++++++++++++++++++++++------ iroh/src/downloader/test.rs | 3 +- iroh/src/downloader/test/dialer.rs | 5 + 3 files changed, 123 insertions(+), 27 deletions(-) diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index 93eef6e541..0d259f8d92 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -29,7 +29,7 @@ //! requests to a single peer is also limited. use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, HashMap, VecDeque}, num::NonZeroUsize, }; @@ -302,7 +302,7 @@ pub struct PeerInfo { } impl PeerInfo { - /// Create a new [`Peer`] from its parts. + /// Create a new [`PeerInfo`] from its parts. pub fn new(node_id: PublicKey, role: PeerRole) -> Self { Self { node_id, role } } @@ -609,7 +609,7 @@ impl, D: Dialer> Service { } None => { let intents = HashMap::from([(id, sender)]); - self.schedule_request(kind, INITIAL_RETRY_COUNT, next_peer, intents, false) + self.schedule_request(kind, INITIAL_RETRY_COUNT, next_peer, intents) } } } @@ -748,6 +748,31 @@ impl, D: Dialer> Service { || self.scheduled_requests.contains_key(&as_collection) } + /// Check if this hash is currently being downloaded. + fn is_current_request(&self, hash: Hash) -> bool { + let as_blob = DownloadKind::Blob { hash }; + let as_collection = DownloadKind::Collection { hash }; + self.current_requests.contains_key(&as_blob) + || self.current_requests.contains_key(&as_collection) + } + + /// Remove a hash from the scheduled queue. + fn unschedule(&mut self, hash: Hash) -> Option<(DownloadKind, PendingRequestInfo)> { + let as_blob = DownloadKind::Blob { hash }; + let as_collection = DownloadKind::Collection { hash }; + let info = match self.scheduled_requests.remove(&as_blob) { + Some(req) => Some(req), + None => self.scheduled_requests.remove(&as_collection), + }; + if let Some(info) = info { + let kind = self.scheduled_request_queue.remove(&info.delay_key); + let kind = kind.into_inner(); + Some((kind, info)) + } else { + None + } + } + /// Handle receiving a new connection. fn on_connection_ready(&mut self, peer: PublicKey, result: anyhow::Result) { match result { @@ -756,6 +781,7 @@ impl, D: Dialer> Service { let drop_key = self.goodbye_peer_queue.insert(peer, IDLE_PEER_TIMEOUT); self.peers .insert(peer, ConnectionInfo::new_idle(connection, drop_key)); + self.on_peer_ready(peer); } Err(err) => { debug!(%peer, %err, "connection to peer failed") @@ -763,6 +789,40 @@ impl, D: Dialer> Service { } } + /// Called after the connection to a peer is established, and after finishing a download. + /// + /// Starts the next provider hash download, if there is one. + fn on_peer_ready(&mut self, peer: PublicKey) { + // Get the next provider hash for this peer. + let Some(hash) = self.providers.get_next_provider_hash_for_peer(&peer) else { + return; + }; + + if self.is_current_request(hash) { + return; + } + + let Some(conn) = self.get_peer_connection_for_download(&peer) else { + return; + }; + + let Some((kind, info)) = self.unschedule(hash) else { + debug_assert!( + false, + "invalid state: expected {hash} to be scheduled, but it wasn't" + ); + return; + }; + + let PendingRequestInfo { + intents, + remaining_retries, + .. + } = info; + + self.start_download(kind, peer, conn, remaining_retries, intents); + } + fn on_download_completed(&mut self, kind: DownloadKind, result: Result<(), FailureAction>) { // first remove the request let info = self @@ -798,18 +858,20 @@ impl, D: Dialer> Service { let hash = *kind.hash(); - match result { + let peer_ready = match result { Ok(()) => { debug!(%peer, ?kind, "download completed"); for sender in intents.into_values() { let _ = sender.send(Ok(())); } + true } Err(FailureAction::AbortRequest(reason)) => { debug!(%peer, ?kind, %reason, "aborting request"); for sender in intents.into_values() { let _ = sender.send(Err(anyhow::anyhow!("request aborted"))); } + true } Err(FailureAction::DropPeer(reason)) => { debug!(%peer, ?kind, %reason, "peer will be dropped"); @@ -817,6 +879,7 @@ impl, D: Dialer> Service { // TODO(@divma): this will fail open streams, do we want this? // connection.close(..) } + false } Err(FailureAction::RetryLater(reason)) => { // check if the download can be retried @@ -824,18 +887,24 @@ impl, D: Dialer> Service { debug!(%peer, ?kind, %reason, "download attempt failed"); remaining_retries -= 1; let next_peer = self.get_best_candidate(kind.hash()); - self.schedule_request(kind, remaining_retries, next_peer, intents, true); + self.schedule_request(kind, remaining_retries, next_peer, intents); } else { debug!(%peer, ?kind, %reason, "download failed"); for sender in intents.into_values() { let _ = sender.send(Err(anyhow::anyhow!("download ran out of attempts"))); } } + false } - } + }; if !self.is_needed(hash) { self.providers.remove(hash) + } else { + self.providers.move_hash_to_back(&peer, hash); + } + if peer_ready { + self.on_peer_ready(peer); } } @@ -884,7 +953,7 @@ impl, D: Dialer> Service { // is failed if remaining_retries > 0 { remaining_retries -= 1; - self.schedule_request(kind, remaining_retries, next_peer, intents, false); + self.schedule_request(kind, remaining_retries, next_peer, intents); } else { // request can't be retried for sender in intents.into_values() { @@ -940,27 +1009,11 @@ impl, D: Dialer> Service { remaining_retries: u8, next_peer: Option, intents: HashMap>, - is_retry: bool, ) { // this is simply INITIAL_REQUEST_DELAY * attempt_num where attempt_num (as an ordinal // number) is maxed at INITIAL_RETRY_COUNT - let delay = match (is_retry, next_peer) { - // The next peer has the Provider role, which means we assume it can provide the hash. - // Thus, start the download immediately. - ( - false, - Some(PeerInfo { - role: PeerRole::Provider, - .. - }), - ) => std::time::Duration::ZERO, - // We either have no next peer yet or the next peer only has the Candidate role, thus - // delay the request. - _ => { - INITIAL_REQUEST_DELAY - * (INITIAL_RETRY_COUNT.saturating_sub(remaining_retries) as u32 + 1) - } - }; + let delay = INITIAL_REQUEST_DELAY + * (INITIAL_RETRY_COUNT.saturating_sub(remaining_retries) as u32 + 1); let delay_key = self.scheduled_request_queue.insert(kind.clone(), delay); @@ -1031,6 +1084,10 @@ impl, D: Dialer> Service { pub struct ProviderMap { /// Candidates to download a hash. candidates: HashMap>, + /// Ordered list of provider hashes per peer. + /// + /// I.e. blobs we assume the peer can provide. + provider_hashes_by_peer: HashMap>, } struct ProviderIter<'a> { @@ -1060,12 +1117,45 @@ impl ProviderMap { .entry(peer.node_id) .and_modify(|existing| existing.role = (existing.role).max(peer.role)) .or_insert(*peer); + if let PeerRole::Provider = peer.role { + self.provider_hashes_by_peer + .entry(peer.node_id) + .or_default() + .push_back(hash); + } } } + /// Get the next provider hash for a peer. + /// + /// I.e. get the next hash that was added with [`PeerRole::Provider`] for this peer. + fn get_next_provider_hash_for_peer(&mut self, peer: &PublicKey) -> Option { + self.provider_hashes_by_peer + .get(peer) + .and_then(|hashes| hashes.front()) + .copied() + } + /// Signal the registry that this hash is no longer of interest. fn remove(&mut self, hash: Hash) { - self.candidates.remove(&hash); + let peers = self.candidates.remove(&hash); + if let Some(peers) = peers { + for peer in peers.keys() { + if let Some(hashes) = self.provider_hashes_by_peer.get_mut(peer) { + hashes.retain(|h| *h != hash); + } + } + } + } + + /// Move a hash to the back of the provider queue for a peer. + fn move_hash_to_back(&mut self, peer: &PublicKey, hash: Hash) { + let hashes = self.provider_hashes_by_peer.get_mut(peer); + if let Some(hashes) = hashes { + let front = hashes.pop_front(); + debug_assert_eq!(front, Some(hash)); + hashes.push_back(hash); + } } } diff --git a/iroh/src/downloader/test.rs b/iroh/src/downloader/test.rs index 98ab99981c..94feb55572 100644 --- a/iroh/src/downloader/test.rs +++ b/iroh/src/downloader/test.rs @@ -218,6 +218,7 @@ async fn max_concurrent_requests_per_peer() { #[tokio::test] async fn peer_role_provider() { let dialer = dialer::TestingDialer::default(); + dialer.set_dial_duration(Duration::from_millis(100)); let getter = getter::TestingGetter::default(); let concurrency_limits = ConcurrencyLimits::default(); @@ -246,7 +247,7 @@ async fn peer_role_provider() { // safe enough to assume that test runtime is not longer than the delay of 500ms. assert!( now.elapsed() < INITIAL_REQUEST_DELAY, - "now initial delay was added to fetching from a provider" + "no initial delay was added to fetching from a provider" ); getter.assert_history(&[(kind, peer_provider)]); dialer.assert_history(&[peer_provider]); diff --git a/iroh/src/downloader/test/dialer.rs b/iroh/src/downloader/test/dialer.rs index ae6e1af379..1a6fc5c35f 100644 --- a/iroh/src/downloader/test/dialer.rs +++ b/iroh/src/downloader/test/dialer.rs @@ -86,4 +86,9 @@ impl TestingDialer { pub(super) fn assert_history(&self, history: &[PublicKey]) { assert_eq!(self.0.read().dial_history, history) } + + pub(super) fn set_dial_duration(&self, duration: Duration) { + let mut inner = self.0.write(); + inner.dial_duration = duration; + } } From 71763ae4ebd73d63561333be6a1e487f72b321b3 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 18 Sep 2023 10:36:59 +0200 Subject: [PATCH 08/18] feat(gossip): expose message distance, use for content status in sync --- iroh-gossip/src/proto/plumtree.rs | 6 ++++++ iroh/src/sync_engine/live.rs | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/iroh-gossip/src/proto/plumtree.rs b/iroh-gossip/src/proto/plumtree.rs index 44812cb24d..9d9aac242f 100644 --- a/iroh-gossip/src/proto/plumtree.rs +++ b/iroh-gossip/src/proto/plumtree.rs @@ -94,6 +94,8 @@ pub struct GossipEvent { pub delivered_from: PI, /// The broadcast scope of the message pub scope: Scope, + /// The distance in hops that the message travelled from the original author. + pub distance: u16, } impl GossipEvent { @@ -102,6 +104,7 @@ impl GossipEvent { content: message.content.clone(), scope: message.scope, delivered_from: from, + distance: message.round.0, } } } @@ -725,6 +728,7 @@ mod test { content, delivered_from: 3, scope: Scope::Swarm, + distance: 6, }))); io }; @@ -773,6 +777,7 @@ mod test { content, delivered_from: 3, scope: Scope::Swarm, + distance: 9, }))); io }; @@ -809,6 +814,7 @@ mod test { content, delivered_from: 2, scope: Scope::Swarm, + distance: 1, }))); io }; diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index d4720a633a..78a92daee2 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -790,11 +790,17 @@ impl Actor { match op { Op::Put(entry) => { debug!(peer = ?msg.delivered_from, topic = ?topic, "received entry via gossip"); + // If the distance is 0, we received the message from its original author. + // In this case, assume that the peer can provide the content to us. + let content_status = match msg.distance { + 0 => ContentStatus::Complete, + _ => ContentStatus::Missing, + }; // At this point, we do not know if the peer has the content. replica.insert_remote_entry( entry, *msg.delivered_from.as_bytes(), - ContentStatus::Missing, + content_status, )? } Op::ContentReady(hash) => { From 7e2fe22c546535e8b030225700405e5f56fab04b Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 20 Sep 2023 10:20:59 +0200 Subject: [PATCH 09/18] fix: debug formatting, comments, typos --- iroh-gossip/src/net.rs | 66 +++++++++++++------------------ iroh-gossip/src/proto.rs | 15 ++++++- iroh-gossip/src/proto/plumtree.rs | 8 ++-- 3 files changed, 46 insertions(+), 43 deletions(-) diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index 9ffbb64dc2..488764b7b9 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -1,8 +1,7 @@ //! Networking for the `iroh-gossip` protocol use std::{ - collections::HashMap, fmt, future::Future, net::SocketAddr, sync::Arc, task::Poll, - time::Instant, + collections::HashMap, future::Future, net::SocketAddr, sync::Arc, task::Poll, time::Instant, }; use anyhow::{anyhow, Context}; @@ -55,16 +54,16 @@ type ProtoMessage = proto::Message; /// Each topic is a separate broadcast tree with separate memberships. /// /// A topic has to be joined before you can publish or subscribe on the topic. -/// To join the swarm for a topic, you have to know the [PublicKey] of at least one peer that also joined the topic. +/// To join the swarm for a topic, you have to know the [`PublicKey`] of at least one peer that also joined the topic. /// /// Messages published on the swarm will be delivered to all peers that joined the swarm for that /// topic. You will also be relaying (gossiping) messages published by other peers. /// /// With the default settings, the protocol will maintain up to 5 peer connections per topic. /// -/// Even though the [`Gossip`] is created from a [MagicEndpoint], it does not accept connections +/// Even though the [`Gossip`] is created from a [`MagicEndpoint`], it does not accept connections /// itself. You should run an accept loop on the MagicEndpoint yourself, check the ALPN protocol of incoming -/// connections, and if the ALPN protocol equals [GOSSIP_ALPN], forward the connection to the +/// connections, and if the ALPN protocol equals [`GOSSIP_ALPN`], forward the connection to the /// gossip actor through [Self::handle_connection]. /// /// The gossip actor will, however, initiate new connections to other peers by itself. @@ -156,7 +155,7 @@ impl Gossip { /// Broadcast a message on a topic to all peers in the swarm. /// - /// This does not join the topic automatically, so you have to call [Self::join] yourself + /// This does not join the topic automatically, so you have to call [`Self::join`] yourself /// for messages to be broadcast to peers. pub async fn broadcast(&self, topic: TopicId, message: Bytes) -> anyhow::Result<()> { let (tx, rx) = oneshot::channel(); @@ -166,9 +165,9 @@ impl Gossip { Ok(()) } - /// Broadcast a message on a topic to the immediate neigborrs. + /// Broadcast a message on a topic to the immediate neighbors. /// - /// This does not join the topic automatically, so you have to call [Self::join] yourself + /// This does not join the topic automatically, so you have to call [`Self::join`] yourself /// for messages to be broadcast to peers. pub async fn broadcast_neighbors(&self, topic: TopicId, message: Bytes) -> anyhow::Result<()> { let (tx, rx) = oneshot::channel(); @@ -180,7 +179,7 @@ impl Gossip { /// Subscribe to messages and event notifications for a topic. /// - /// Does not join the topic automatically, so you have to call [Self::join] yourself + /// Does not join the topic automatically, so you have to call [`Self::join`] yourself /// to actually receive messages. pub async fn subscribe(&self, topic: TopicId) -> anyhow::Result> { let (tx, rx) = oneshot::channel(); @@ -191,7 +190,7 @@ impl Gossip { /// Subscribe to all events published on topics that you joined. /// - /// Note that this method takes self by value. Usually you would clone the [Gossip] handle. + /// Note that this method takes self by value. Usually you would clone the [`Gossip`] handle. /// before. pub fn subscribe_all(self) -> impl Stream> { Gen::new(|co| async move { @@ -214,7 +213,7 @@ impl Gossip { } } - /// Pass an incoming [quinn::Connection] to the gossip actor. + /// Handle an incoming [`quinn::Connection`]. /// /// Make sure to check the ALPN protocol yourself before passing the connection. pub async fn handle_connection(&self, conn: quinn::Connection) -> anyhow::Result<()> { @@ -300,46 +299,37 @@ enum ConnOrigin { } /// Input messages for the gossip [`Actor`]. +#[derive(derive_more::Debug)] enum ToActor { /// Handle a new QUIC connection, either from accept (external to the actor) or from connect /// (happens internally in the actor). - ConnIncoming(PublicKey, ConnOrigin, quinn::Connection), + ConnIncoming(PublicKey, ConnOrigin, #[debug(skip)] quinn::Connection), /// Join a topic with a list of peers. Reply with oneshot once at least one peer joined. - Join(TopicId, Vec, oneshot::Sender>), + Join( + TopicId, + Vec, + #[debug(skip)] oneshot::Sender>, + ), /// Leave a topic, send disconnect messages and drop all state. Quit(TopicId), /// Broadcast a message on a topic. - Broadcast(TopicId, Bytes, Scope, oneshot::Sender>), + Broadcast( + TopicId, + #[debug("<{}b>", _1.len())] Bytes, + Scope, + #[debug(skip)] oneshot::Sender>, + ), /// Subscribe to a topic. Return oneshot which resolves to a broadcast receiver for events on a /// topic. Subscribe( TopicId, - oneshot::Sender>>, + #[debug(skip)] oneshot::Sender>>, ), /// Subscribe to a topic. Return oneshot which resolves to a broadcast receiver for events on a /// topic. - SubscribeAll(oneshot::Sender>>), -} - -impl fmt::Debug for ToActor { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ToActor::ConnIncoming(peer_id, origin, _conn) => { - write!(f, "ConnIncoming({peer_id:?}, {origin:?})") - } - ToActor::Join(topic, peers, _reply) => write!(f, "Join({topic:?}, {peers:?})"), - ToActor::Quit(topic) => write!(f, "Quit({topic:?})"), - ToActor::Broadcast(topic, message, scope, _reply) => { - write!( - f, - "Broadcast({topic:?}, {scope:?}, bytes<{}>)", - message.len() - ) - } - ToActor::Subscribe(topic, _reply) => write!(f, "Subscribe({topic:?})"), - ToActor::SubscribeAll(_reply) => write!(f, "SubscribeAll"), - } - } + SubscribeAll( + #[debug(skip)] oneshot::Sender>>, + ), } /// Actor that sends and handles messages between the connection and main state loops @@ -388,7 +378,7 @@ impl Actor { }, _ = self.on_endpoints_rx.changed() => { let info = IrohInfo::from_endpoint(&self.endpoint).await?; - let peer_data = postcard::to_stdvec(&info)?; + let peer_data = Bytes::from(postcard::to_stdvec(&info)?); self.handle_in_event(InEvent::UpdatePeerData(peer_data.into()), Instant::now()).await?; } (peer_id, res) = self.dialer.next_conn() => { diff --git a/iroh-gossip/src/proto.rs b/iroh-gossip/src/proto.rs index 53227a9e4b..877edd5eb6 100644 --- a/iroh-gossip/src/proto.rs +++ b/iroh-gossip/src/proto.rs @@ -80,7 +80,20 @@ impl PeerIdentity for T where T: Hash + Eq + Copy + fmt::Debug + Serialize + /// /// Implementations may use these bytes to supply addresses or other information needed to connect /// to a peer that is not included in the peer's [`PeerIdentity`]. -pub type PeerData = bytes::Bytes; +#[derive( + derive_more::Debug, + Serialize, + Deserialize, + Clone, + PartialEq, + Eq, + derive_more::From, + derive_more::Into, + derive_more::Deref, + Default, +)] +#[debug("PeerData({}b)", self.0.len())] +pub struct PeerData(bytes::Bytes); /// PeerInfo contains a peer's identifier and the opaque peer data as provided by the implementer. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] diff --git a/iroh-gossip/src/proto/plumtree.rs b/iroh-gossip/src/proto/plumtree.rs index 9d9aac242f..30e72bd5c9 100644 --- a/iroh-gossip/src/proto/plumtree.rs +++ b/iroh-gossip/src/proto/plumtree.rs @@ -41,7 +41,7 @@ impl MessageId { pub enum InEvent { /// A [`Message`] was received from the peer. RecvMessage(PI, Message), - /// Broadcast the contained payload to the full swarm. + /// Broadcast the contained payload to the given scope. Broadcast(Bytes, Scope), /// A timer has expired. TimerExpired(Timer), @@ -92,7 +92,7 @@ pub struct GossipEvent { /// The peer that we received the gossip message from. Note that this is not the peer that /// originally broadcasted the message, but the peer before us in the gossiping path. pub delivered_from: PI, - /// The broadcast scope of the message + /// The broadcast scope of the message. pub scope: Scope, /// The distance in hops that the message travelled from the original author. pub distance: u16, @@ -147,12 +147,12 @@ pub struct Gossip { /// Message contents. #[debug("<{}b>", content.len())] content: Bytes, - /// Scope to publish to + /// Scope to broadcast to. scope: Scope, } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Copy)] /// The broadcast scope of a gossip message. +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Copy)] pub enum Scope { /// The message is broadcast to all peers in the swarm. Swarm, From 99ed70ca265cc9340846380c29a183f6369f4b54 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 20 Sep 2023 10:23:49 +0200 Subject: [PATCH 10/18] fix: remove obsolete comment --- iroh-gossip/src/proto/plumtree.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/iroh-gossip/src/proto/plumtree.rs b/iroh-gossip/src/proto/plumtree.rs index 30e72bd5c9..672feafaac 100644 --- a/iroh-gossip/src/proto/plumtree.rs +++ b/iroh-gossip/src/proto/plumtree.rs @@ -423,7 +423,6 @@ impl State { scope, }; let me = self.me; - // TODO: Do we want to lazy push neighbor messages as well? if let Scope::Swarm = scope { self.received_messages .insert(id, (), now + self.config.message_id_retention); From 37b3d93d53422b7de1a119509542c9056b591510 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 20 Sep 2023 15:42:29 +0200 Subject: [PATCH 11/18] fix: fixes after rebase --- iroh/src/sync_engine/live.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index 78a92daee2..7e5102e8d5 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -511,7 +511,7 @@ impl Actor { // Inform our neighbors that we have new content ready. let op = Op::ContentReady(hash); let message = postcard::to_stdvec(&op)?.into(); - self.gossip.broadcast_neighbors(topic, message).await?; + self.gossip.broadcast_neighbors(namespace.into(), message).await?; } } @@ -595,13 +595,13 @@ impl Actor { } async fn start_sync(&mut self, namespace: NamespaceId, peers: Vec) -> Result<()> { - self.ensure_subscription(namespace)?; + self.ensure_open(namespace)?; self.syncing_replicas.insert(namespace); self.join_peers(namespace, peers).await?; Ok(()) } - fn ensure_subscription(&mut self, namespace: NamespaceId) -> anyhow::Result<()> { + fn ensure_open(&mut self, namespace: NamespaceId) -> anyhow::Result<()> { if !self.open_replicas.contains(&namespace) { let Some(replica) = self.replica_store.open_replica(&namespace)? else { bail!("Replica not found"); @@ -647,7 +647,7 @@ impl Actor { namespace: NamespaceId, cb: OnLiveEventCallback, ) -> anyhow::Result { - self.ensure_subscription(namespace)?; + self.ensure_open(namespace)?; let subs = self.event_subscriptions.entry(namespace).or_default(); let removal_id = self .event_removal_id @@ -778,7 +778,7 @@ impl Actor { } } - fn on_gossip_event(&mut self, topic: TopicId, event: Event) -> Result<()> { + async fn on_gossip_event(&mut self, topic: TopicId, event: Event) -> Result<()> { let namespace: NamespaceId = topic.as_bytes().into(); let Some(replica) = self.get_replica_if_syncing(&namespace) else { return Err(anyhow!("Doc {namespace:?} is not active")); From d6ac04642c197b99ac444b5295f649814e22e7c0 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 21 Sep 2023 13:18:42 +0200 Subject: [PATCH 12/18] refactor: combine scope and round --- iroh-gossip/Cargo.toml | 2 +- iroh-gossip/src/proto/plumtree.rs | 101 ++++++++++++++++++------------ iroh/src/sync_engine/live.rs | 6 +- 3 files changed, 65 insertions(+), 44 deletions(-) diff --git a/iroh-gossip/Cargo.toml b/iroh-gossip/Cargo.toml index d24a23c776..bfbee6bda8 100644 --- a/iroh-gossip/Cargo.toml +++ b/iroh-gossip/Cargo.toml @@ -17,7 +17,7 @@ anyhow = { version = "1", features = ["backtrace"] } blake3 = { package = "iroh-blake3", version = "1.4.3"} bytes = { version = "1.4.0", features = ["serde"] } data-encoding = "2.4.0" -derive_more = { version = "1.0.0-beta.1", features = ["add", "debug", "display", "from", "try_into"] } +derive_more = { version = "1.0.0-beta.1", features = ["add", "debug", "display", "from", "try_into", "into"] } ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] } indexmap = "2.0" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } diff --git a/iroh-gossip/src/proto/plumtree.rs b/iroh-gossip/src/proto/plumtree.rs index 672feafaac..05a5d0e1eb 100644 --- a/iroh-gossip/src/proto/plumtree.rs +++ b/iroh-gossip/src/proto/plumtree.rs @@ -93,9 +93,7 @@ pub struct GossipEvent { /// originally broadcasted the message, but the peer before us in the gossiping path. pub delivered_from: PI, /// The broadcast scope of the message. - pub scope: Scope, - /// The distance in hops that the message travelled from the original author. - pub distance: u16, + pub scope: DeliveryScope, } impl GossipEvent { @@ -104,7 +102,6 @@ impl GossipEvent { content: message.content.clone(), scope: message.scope, delivered_from: from, - distance: message.round.0, } } } @@ -142,13 +139,41 @@ pub enum Message { pub struct Gossip { /// Id of the message. id: MessageId, - /// Delivery round of the message. - round: Round, /// Message contents. #[debug("<{}b>", content.len())] content: Bytes, /// Scope to broadcast to. - scope: Scope, + scope: DeliveryScope, +} + +impl Gossip { + fn round(&self) -> Option { + match self.scope { + DeliveryScope::Swarm(round) => Some(round), + DeliveryScope::Neighbors => None, + } + } +} + +/// The scope to deliver the message to. +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Copy)] +pub enum DeliveryScope { + /// This message was received from the swarm, with a distance (in hops) travelled from the + /// original broadcaster. + Swarm(Round), + /// This message was received from a direct neighbor that broadcasted the message to neighbors + /// only. + Neighbors, +} + +impl DeliveryScope { + /// Whether this message was directly received from its publisher. + pub fn is_direct(&self) -> bool { + match self { + Self::Neighbors | Self::Swarm(Round(0)) => true, + _ => false, + } + } } /// The broadcast scope of a gossip message. @@ -162,12 +187,14 @@ pub enum Scope { impl Gossip { /// Get a clone of this `Gossip` message and increase the delivery round by 1. - pub fn next_round(self) -> Gossip { - Gossip { - id: self.id, - content: self.content, - round: self.round.next(), - scope: self.scope, + pub fn next_round(&self) -> Option { + match self.scope { + DeliveryScope::Neighbors => None, + DeliveryScope::Swarm(round) => Some(Gossip { + id: self.id, + content: self.content.clone(), + scope: DeliveryScope::Swarm(round.next()), + }), } } @@ -416,14 +443,13 @@ impl State { /// Pushing the message id to the lazy peers is delayed by a timer. fn broadcast(&mut self, content: Bytes, scope: Scope, now: Instant, io: &mut impl IO) { let id = MessageId::from_content(&content); - let message = Gossip { - id, - round: Round(0), - content, - scope, + let scope = match scope { + Scope::Neighbors => DeliveryScope::Neighbors, + Scope::Swarm => DeliveryScope::Swarm(Round(0)), }; + let message = Gossip { id, content, scope }; let me = self.me; - if let Scope::Swarm = scope { + if let DeliveryScope::Swarm(_) = scope { self.received_messages .insert(id, (), now + self.config.message_id_retention); self.cache.insert( @@ -457,7 +483,7 @@ impl State { io.push(OutEvent::SendMessage(sender, Message::Prune)); // otherwise store the message, emit to application and forward to peers } else { - if let Scope::Swarm = message.scope { + if let DeliveryScope::Swarm(prev_round) = message.scope { // insert the message in the list of received messages self.received_messages.insert( message.id, @@ -467,7 +493,7 @@ impl State { // increase the round for forwarding the message, and add to cache // to reply to Graft messages later // TODO: add callback/event to application to get missing messages that were received before? - let message = message.clone().next_round(); + let message = message.next_round().expect("just checked"); self.cache.insert( message.id, @@ -485,7 +511,7 @@ impl State { self.optimize_tree(&sender, &message, previous_ihaves, io); } self.stats.max_last_delivery_hop = - self.stats.max_last_delivery_hop.max(message.round.0); + self.stats.max_last_delivery_hop.max(prev_round.0); } // emit event to application @@ -507,7 +533,7 @@ impl State { previous_ihaves: VecDeque<(PI, Round)>, io: &mut impl IO, ) { - let round = message.round; + let round = message.round().expect("only called for swarm messages"); let best_ihave = previous_ihaves .iter() .min_by(|(_a_peer, a_round), (_b_peer, b_round)| a_round.cmp(b_round)) @@ -661,10 +687,13 @@ impl State { /// Queue lazy message announcements into the queue that will be sent out as batched /// [`Message::IHave`] messages once the [`Timer::DispatchLazyPush`] timer is triggered. fn lazy_push(&mut self, gossip: Gossip, sender: &PI, io: &mut impl IO) { + let Some(round) = gossip.round() else { + return; + }; for peer in self.lazy_push_peers.iter().filter(|x| *x != sender) { self.lazy_push_queue.entry(*peer).or_default().push(IHave { id: gossip.id, - round: gossip.round, + round, }); } if !self.dispatch_timer_scheduled { @@ -709,9 +738,8 @@ mod test { 3, Message::Gossip(Gossip { id, - round: Round(6), content: content.clone(), - scope: Scope::Swarm, + scope: DeliveryScope::Swarm(Round(6)), }), ); state.handle(event, now, &mut io); @@ -726,8 +754,7 @@ mod test { io.push(OutEvent::EmitEvent(Event::Received(GossipEvent { content, delivered_from: 3, - scope: Scope::Swarm, - distance: 6, + scope: DeliveryScope::Swarm(Round(6)), }))); io }; @@ -754,9 +781,8 @@ mod test { 3, Message::Gossip(Gossip { id, - round: Round(9), content: content.clone(), - scope: Scope::Swarm, + scope: DeliveryScope::Swarm(Round(9)), }), ); state.handle(event, now, &mut io); @@ -775,8 +801,7 @@ mod test { io.push(OutEvent::EmitEvent(Event::Received(GossipEvent { content, delivered_from: 3, - scope: Scope::Swarm, - distance: 9, + scope: DeliveryScope::Swarm(Round(9)), }))); io }; @@ -793,9 +818,8 @@ mod test { let content: Bytes = b"hello1".to_vec().into(); let message = Message::Gossip(Gossip { content: content.clone(), - round: Round(1), id: MessageId::from_content(&content), - scope: Scope::Swarm, + scope: DeliveryScope::Swarm(Round(1)), }); let mut io = VecDeque::new(); state.handle(InEvent::RecvMessage(2, message), now, &mut io); @@ -812,8 +836,7 @@ mod test { io.push(OutEvent::EmitEvent(Event::Received(GossipEvent { content, delivered_from: 2, - scope: Scope::Swarm, - distance: 1, + scope: DeliveryScope::Swarm(Round(1)), }))); io }; @@ -823,9 +846,8 @@ mod test { let content: Bytes = b"hello2".to_vec().into(); let message = Message::Gossip(Gossip { content, - round: Round(1), id: MessageId::from_content(b"foo"), - scope: Scope::Swarm, + scope: DeliveryScope::Swarm(Round(1)), }); let mut io = VecDeque::new(); state.handle(InEvent::RecvMessage(2, message), now, &mut io); @@ -841,9 +863,8 @@ mod test { let content: Bytes = b"hello1".to_vec().into(); let message = Message::Gossip(Gossip { content: content.clone(), - round: Round(1), id: MessageId::from_content(&content), - scope: Scope::Swarm, + scope: DeliveryScope::Swarm(Round(1)), }); let mut io = VecDeque::new(); state.handle(InEvent::RecvMessage(2, message), now, &mut io); diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index 7e5102e8d5..3831de8a1d 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -792,9 +792,9 @@ impl Actor { debug!(peer = ?msg.delivered_from, topic = ?topic, "received entry via gossip"); // If the distance is 0, we received the message from its original author. // In this case, assume that the peer can provide the content to us. - let content_status = match msg.distance { - 0 => ContentStatus::Complete, - _ => ContentStatus::Missing, + let content_status = match msg.scope.is_direct() { + true => ContentStatus::Complete, + false => ContentStatus::Missing, }; // At this point, we do not know if the peer has the content. replica.insert_remote_entry( From aad3b6cd45f6d92abdeb6adfc46a99e1377c5d30 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 21 Sep 2023 13:36:06 +0200 Subject: [PATCH 13/18] fix: review feedback --- iroh-sync/src/ranger.rs | 5 +++- iroh/src/downloader.rs | 59 +++++++++++++++++++++++------------------ 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/iroh-sync/src/ranger.rs b/iroh-sync/src/ranger.rs index 9dc2b76416..2180a8b1a7 100644 --- a/iroh-sync/src/ranger.rs +++ b/iroh-sync/src/ranger.rs @@ -268,9 +268,12 @@ where /// Processes an incoming message and produces a response. /// If terminated, returns `None` /// - /// `validate_cb` is called before an entry received from the remote is inserted into the store. + /// `validate_cb` is called for each incoming entry received from the remote. /// It must return true if the entry is valid and should be stored, and false otherwise /// (which means the entry will be dropped and not stored). + /// + /// `content_status_cb` is called for each outgoing entry about to be sent to the remote. + /// It must return a [`ContentStatus`], which will be sent to the remote with the entry. pub fn process_message( &mut self, message: Message, diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index 0d259f8d92..e8f8958f10 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -297,20 +297,26 @@ impl Downloader { /// A peer and its role with regard to a hash. #[derive(Debug, Clone, Copy)] pub struct PeerInfo { - node_id: PublicKey, + peer_id: PublicKey, role: PeerRole, } impl PeerInfo { /// Create a new [`PeerInfo`] from its parts. - pub fn new(node_id: PublicKey, role: PeerRole) -> Self { - Self { node_id, role } + pub fn new(peer_id: PublicKey, role: PeerRole) -> Self { + Self { + peer_id, + role, + } } } impl From<(PublicKey, PeerRole)> for PeerInfo { - fn from((node_id, role): (PublicKey, PeerRole)) -> Self { - Self { node_id, role } + fn from((peer_id, role): (PublicKey, PeerRole)) -> Self { + Self { + peer_id, + role, + } } } @@ -666,14 +672,15 @@ impl, D: Dialer> Service { let mut candidates = self .providers .get_candidates(hash) - .filter_map(|peer| { - if let Some(info) = self.peers.get(&peer.node_id) { + .filter_map(|(peer_id, role)| { + let peer = PeerInfo::new(*peer_id, *role); + if let Some(info) = self.peers.get(&peer_id) { info.conn.as_ref()?; let req_count = info.active_requests(); // filter out peers at capacity let has_capacity = !self.concurrency_limits.peer_at_request_capacity(req_count); has_capacity.then_some((peer, ConnState::Connected(req_count))) - } else if self.dialer.is_pending(&peer.node_id) { + } else if self.dialer.is_pending(&peer_id) { Some((peer, ConnState::Dialing)) } else { Some((peer, ConnState::NotConnected)) @@ -692,15 +699,15 @@ impl, D: Dialer> Service { if let ConnState::NotConnected = state { if !self.at_connections_capacity() { // peer is not connected, not dialing and concurrency limits allow another connection - debug!(peer = %peer.node_id, "dialing peer"); - self.dialer.queue_dial(peer.node_id); - Some(*peer) + debug!(peer = %peer.peer_id, "dialing peer"); + self.dialer.queue_dial(peer.peer_id); + Some(peer) } else { - trace!(peer = %peer.node_id, "required peer not dialed to maintain concurrency limits"); + trace!(peer = %peer.peer_id, "required peer not dialed to maintain concurrency limits"); None } } else { - Some(*peer) + Some(peer) } } @@ -922,10 +929,10 @@ impl, D: Dialer> Service { // first try with the peer that was initially assigned if let Some((peer, conn)) = next_peer.and_then(|peer| { - self.get_peer_connection_for_download(&peer.node_id) + self.get_peer_connection_for_download(&peer.peer_id) .map(|conn| (peer, conn)) }) { - return self.start_download(kind, peer.node_id, conn, remaining_retries, intents); + return self.start_download(kind, peer.peer_id, conn, remaining_retries, intents); } // we either didn't have a peer or the peer is busy or dialing. In any case try to get @@ -934,11 +941,11 @@ impl, D: Dialer> Service { None => None, Some(peer) => { // optimistically check if the peer could do the request right away - match self.get_peer_connection_for_download(&peer.node_id) { + match self.get_peer_connection_for_download(&peer.peer_id) { Some(conn) => { return self.start_download( kind, - peer.node_id, + peer.peer_id, conn, remaining_retries, intents, @@ -1083,7 +1090,7 @@ impl, D: Dialer> Service { #[derive(Default, Debug)] pub struct ProviderMap { /// Candidates to download a hash. - candidates: HashMap>, + candidates: HashMap>, /// Ordered list of provider hashes per peer. /// /// I.e. blobs we assume the peer can provide. @@ -1091,11 +1098,11 @@ pub struct ProviderMap { } struct ProviderIter<'a> { - inner: Option>, + inner: Option>, } impl<'a> Iterator for ProviderIter<'a> { - type Item = &'a PeerInfo; + type Item = (&'a PublicKey, &'a PeerRole); fn next(&mut self) -> Option { self.inner.as_mut().and_then(|iter| iter.next()) @@ -1104,8 +1111,8 @@ impl<'a> Iterator for ProviderIter<'a> { impl ProviderMap { /// Get candidates to download this hash. - fn get_candidates(&self, hash: &Hash) -> impl Iterator { - let inner = self.candidates.get(hash).map(|peers| peers.values()); + fn get_candidates(&self, hash: &Hash) -> impl Iterator { + let inner = self.candidates.get(hash).map(|peers| peers.iter()); ProviderIter { inner } } @@ -1114,12 +1121,12 @@ impl ProviderMap { let entry = self.candidates.entry(hash).or_default(); for peer in peers { entry - .entry(peer.node_id) - .and_modify(|existing| existing.role = (existing.role).max(peer.role)) - .or_insert(*peer); + .entry(peer.peer_id) + .and_modify(|role| *role = (*role).max(peer.role)) + .or_insert(peer.role); if let PeerRole::Provider = peer.role { self.provider_hashes_by_peer - .entry(peer.node_id) + .entry(peer.peer_id) .or_default() .push_back(hash); } From 48a60052daa8437b67570e8bed0d229ca7c12d65 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 21 Sep 2023 13:39:15 +0200 Subject: [PATCH 14/18] fix: use rotate_left --- iroh/src/downloader.rs | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index e8f8958f10..f337ddba9b 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -304,19 +304,13 @@ pub struct PeerInfo { impl PeerInfo { /// Create a new [`PeerInfo`] from its parts. pub fn new(peer_id: PublicKey, role: PeerRole) -> Self { - Self { - peer_id, - role, - } + Self { peer_id, role } } } impl From<(PublicKey, PeerRole)> for PeerInfo { fn from((peer_id, role): (PublicKey, PeerRole)) -> Self { - Self { - peer_id, - role, - } + Self { peer_id, role } } } @@ -907,8 +901,6 @@ impl, D: Dialer> Service { if !self.is_needed(hash) { self.providers.remove(hash) - } else { - self.providers.move_hash_to_back(&peer, hash); } if peer_ready { self.on_peer_ready(peer); @@ -1137,10 +1129,15 @@ impl ProviderMap { /// /// I.e. get the next hash that was added with [`PeerRole::Provider`] for this peer. fn get_next_provider_hash_for_peer(&mut self, peer: &PublicKey) -> Option { - self.provider_hashes_by_peer + let hash = self + .provider_hashes_by_peer .get(peer) .and_then(|hashes| hashes.front()) - .copied() + .copied(); + if let Some(hash) = hash { + self.move_hash_to_back(peer, hash); + } + hash } /// Signal the registry that this hash is no longer of interest. @@ -1159,9 +1156,10 @@ impl ProviderMap { fn move_hash_to_back(&mut self, peer: &PublicKey, hash: Hash) { let hashes = self.provider_hashes_by_peer.get_mut(peer); if let Some(hashes) = hashes { - let front = hashes.pop_front(); - debug_assert_eq!(front, Some(hash)); - hashes.push_back(hash); + debug_assert_eq!(hashes.front(), Some(&hash)); + if !hashes.is_empty() { + hashes.rotate_left(1); + } } } } From ca29462a5963e251f93af3b616aa5fb8a9f46a0d Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 21 Sep 2023 13:47:10 +0200 Subject: [PATCH 15/18] refactor: further simplifactions after review --- iroh/src/downloader.rs | 44 +++++++++++++++--------------------------- 1 file changed, 16 insertions(+), 28 deletions(-) diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index f337ddba9b..ce8924320a 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -385,7 +385,7 @@ struct PendingRequestInfo { delay_key: delay_queue::Key, /// If this attempt was scheduled with a known potential peer, this is stored here to /// prevent another query to the [`ProviderMap`]. - next_peer: Option, + next_peer: Option, } /// State of the connection to this peer. @@ -579,28 +579,16 @@ impl, D: Dialer> Service { Some(info) => { info.intents.insert(id, sender); // pre-emptively get a peer if we don't already have one - let mut reset = false; match (info.next_peer, next_peer) { // We did not yet have next peer, but have a peer now. (None, Some(next_peer)) => { - reset = matches!(next_peer.role, PeerRole::Provider); info.next_peer = Some(next_peer); } - // We did have a next peer already, but the new peer has a better role - // (i.e. is a Provider, while the old next peer is a only Candidate) - (Some(old_next_peer), Some(next_peer)) - if old_next_peer.role < next_peer.role => - { - info.next_peer = Some(next_peer); - reset = true; + (Some(_old_next_peer), Some(_next_peer)) => { + unreachable!("invariant: info.next_peer must be none because checked above with needs_peer") } _ => {} } - // Reset the delay queue timer if we inserted a `Provider` peer. - if reset { - self.scheduled_request_queue - .reset(&info.delay_key, std::time::Duration::ZERO); - } // increasing the retries by one accounts for multiple intents for the same request in // a conservative way @@ -622,7 +610,7 @@ impl, D: Dialer> Service { /// /// If the selected candidate is not connected and we have capacity for another connection, a /// dial is queued. - fn get_best_candidate(&mut self, hash: &Hash) -> Option { + fn get_best_candidate(&mut self, hash: &Hash) -> Option { /// Model the state of peers found in the candidates #[derive(PartialEq, Eq, Clone, Copy)] enum ConnState { @@ -695,13 +683,13 @@ impl, D: Dialer> Service { // peer is not connected, not dialing and concurrency limits allow another connection debug!(peer = %peer.peer_id, "dialing peer"); self.dialer.queue_dial(peer.peer_id); - Some(peer) + Some(peer.peer_id) } else { trace!(peer = %peer.peer_id, "required peer not dialed to maintain concurrency limits"); None } } else { - Some(peer) + Some(peer.peer_id) } } @@ -920,30 +908,30 @@ impl, D: Dialer> Service { } = info; // first try with the peer that was initially assigned - if let Some((peer, conn)) = next_peer.and_then(|peer| { - self.get_peer_connection_for_download(&peer.peer_id) - .map(|conn| (peer, conn)) + if let Some((peer_id, conn)) = next_peer.and_then(|peer_id| { + self.get_peer_connection_for_download(&peer_id) + .map(|conn| (peer_id, conn)) }) { - return self.start_download(kind, peer.peer_id, conn, remaining_retries, intents); + return self.start_download(kind, peer_id, conn, remaining_retries, intents); } // we either didn't have a peer or the peer is busy or dialing. In any case try to get // another peer let next_peer = match self.get_best_candidate(kind.hash()) { None => None, - Some(peer) => { + Some(peer_id) => { // optimistically check if the peer could do the request right away - match self.get_peer_connection_for_download(&peer.peer_id) { + match self.get_peer_connection_for_download(&peer_id) { Some(conn) => { return self.start_download( kind, - peer.peer_id, + peer_id, conn, remaining_retries, intents, ) } - None => Some(peer), + None => Some(peer_id), } } }; @@ -1006,7 +994,7 @@ impl, D: Dialer> Service { &mut self, kind: DownloadKind, remaining_retries: u8, - next_peer: Option, + next_peer: Option, intents: HashMap>, ) { // this is simply INITIAL_REQUEST_DELAY * attempt_num where attempt_num (as an ordinal @@ -1020,7 +1008,7 @@ impl, D: Dialer> Service { intents, remaining_retries, delay_key, - next_peer, + next_peer }; debug!(?kind, ?info, "request scheduled"); self.scheduled_requests.insert(kind, info); From 013b386cd0e85122d7ccc83ba9b1ddf5449fa2b1 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 21 Sep 2023 13:48:16 +0200 Subject: [PATCH 16/18] chore: fmt --- iroh/src/downloader.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index ce8924320a..c8f959ef5e 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -923,13 +923,7 @@ impl, D: Dialer> Service { // optimistically check if the peer could do the request right away match self.get_peer_connection_for_download(&peer_id) { Some(conn) => { - return self.start_download( - kind, - peer_id, - conn, - remaining_retries, - intents, - ) + return self.start_download(kind, peer_id, conn, remaining_retries, intents) } None => Some(peer_id), } @@ -1008,7 +1002,7 @@ impl, D: Dialer> Service { intents, remaining_retries, delay_key, - next_peer + next_peer, }; debug!(?kind, ?info, "request scheduled"); self.scheduled_requests.insert(kind, info); From e27e9b91504153d0728470f4fe5c69f8fe953d0e Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 21 Sep 2023 14:58:22 +0200 Subject: [PATCH 17/18] chore: clippy --- iroh-gossip/src/proto/plumtree.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/iroh-gossip/src/proto/plumtree.rs b/iroh-gossip/src/proto/plumtree.rs index 05a5d0e1eb..a653870ea1 100644 --- a/iroh-gossip/src/proto/plumtree.rs +++ b/iroh-gossip/src/proto/plumtree.rs @@ -169,10 +169,7 @@ pub enum DeliveryScope { impl DeliveryScope { /// Whether this message was directly received from its publisher. pub fn is_direct(&self) -> bool { - match self { - Self::Neighbors | Self::Swarm(Round(0)) => true, - _ => false, - } + matches!(self, Self::Neighbors | Self::Swarm(Round(0))) } } From 05eea849154cc2ed95aec0dead0b69d87382b8b7 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 21 Sep 2023 15:04:49 +0200 Subject: [PATCH 18/18] chore: clippy --- iroh/src/downloader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index c8f959ef5e..14927265e5 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -656,13 +656,13 @@ impl, D: Dialer> Service { .get_candidates(hash) .filter_map(|(peer_id, role)| { let peer = PeerInfo::new(*peer_id, *role); - if let Some(info) = self.peers.get(&peer_id) { + if let Some(info) = self.peers.get(peer_id) { info.conn.as_ref()?; let req_count = info.active_requests(); // filter out peers at capacity let has_capacity = !self.concurrency_limits.peer_at_request_capacity(req_count); has_capacity.then_some((peer, ConnState::Connected(req_count))) - } else if self.dialer.is_pending(&peer_id) { + } else if self.dialer.is_pending(peer_id) { Some((peer, ConnState::Dialing)) } else { Some((peer, ConnState::NotConnected))