Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve content propagation in sync #1480

Merged
merged 22 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
db175c2
feat: allow to send gossip to neighbors only
Frando Sep 14, 2023
0a30d6d
feat: provider and candidate roles for downloader
Frando Sep 14, 2023
40f38fa
feat: transfer content status during set reconciliation
Frando Sep 14, 2023
dccb4de
feat: inform neighbors about finished content downloads
Frando Sep 14, 2023
0535ea2
fix: better transfer of endpoints to gossip
Frando Sep 14, 2023
e3c401f
chore: fmt & clippy
Frando Sep 14, 2023
b0e0268
feat(downloader): start hashes for PeerRole::Provider immediately
Frando Sep 18, 2023
71763ae
feat(gossip): expose message distance, use for content status in sync
Frando Sep 18, 2023
7e2fe22
fix: debug formatting, comments, typos
Frando Sep 20, 2023
99ed70c
fix: remove obsolete comment
Frando Sep 20, 2023
37b3d93
fix: fixes after rebase
Frando Sep 20, 2023
d6ac046
refactor: combine scope and round
Frando Sep 21, 2023
aad3b6c
fix: review feedback
Frando Sep 21, 2023
48a6005
fix: use rotate_left
Frando Sep 21, 2023
ca29462
refactor: further simplifactions after review
Frando Sep 21, 2023
013b386
chore: fmt
Frando Sep 21, 2023
e27e9b9
chore: clippy
Frando Sep 21, 2023
05eea84
chore: clippy
Frando Sep 21, 2023
a6fc196
Merge branch 'main' into sync-content-propagation
divagant-martian Sep 21, 2023
f7e2edc
Merge branch 'main' into sync-content-propagation
divagant-martian Sep 21, 2023
e2a87c6
Merge branch 'main' into sync-content-propagation
divagant-martian Sep 21, 2023
732638c
Merge branch 'main' into sync-content-propagation
Frando Sep 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iroh-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ anyhow = { version = "1", features = ["backtrace"] }
blake3 = { package = "iroh-blake3", version = "1.4.3"}
bytes = { version = "1.4.0", features = ["serde"] }
data-encoding = "2.4.0"
derive_more = { version = "1.0.0-beta.1", features = ["add", "debug", "display", "from", "try_into"] }
derive_more = { version = "1.0.0-beta.1", features = ["add", "debug", "display", "from", "try_into", "into"] }
ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
indexmap = "2.0"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
Expand Down
4 changes: 2 additions & 2 deletions iroh-gossip/examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ async fn subscribe_loop(gossip: Gossip, topic: TopicId) -> anyhow::Result<()> {
let mut stream = gossip.subscribe(topic).await?;
loop {
let event = stream.recv().await?;
if let Event::Received(data, _prev_peer) = event {
let (from, message) = SignedMessage::verify_and_decode(&data)?;
if let Event::Received(msg) = event {
let (from, message) = SignedMessage::verify_and_decode(&msg.content)?;
match message {
Message::AboutMe { name } => {
names.insert(from, name.clone());
Expand Down
94 changes: 53 additions & 41 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use genawaiter::sync::{Co, Gen};
use iroh_net::{key::PublicKey, magic_endpoint::get_peer_id, AddrInfo, MagicEndpoint, PeerAddr};
use rand::rngs::StdRng;
use rand_core::SeedableRng;
use std::{collections::HashMap, fmt, future::Future, sync::Arc, task::Poll, time::Instant};
use std::{collections::HashMap, future::Future, sync::Arc, task::Poll, time::Instant};
use tokio::{
sync::{broadcast, mpsc, oneshot, watch},
task::JoinHandle,
};
use tracing::{debug, trace, warn};

use self::util::{read_message, write_message, Dialer, Timers};
use crate::proto::{self, TopicId};
use crate::proto::{self, Scope, TopicId};

pub mod util;

Expand Down Expand Up @@ -50,16 +50,16 @@ type ProtoMessage = proto::Message<PublicKey>;
/// Each topic is a separate broadcast tree with separate memberships.
///
/// A topic has to be joined before you can publish or subscribe on the topic.
/// To join the swarm for a topic, you have to know the [PublicKey] of at least one peer that also joined the topic.
/// To join the swarm for a topic, you have to know the [`PublicKey`] of at least one peer that also joined the topic.
///
/// Messages published on the swarm will be delivered to all peers that joined the swarm for that
/// topic. You will also be relaying (gossiping) messages published by other peers.
///
/// With the default settings, the protocol will maintain up to 5 peer connections per topic.
///
/// Even though the [`Gossip`] is created from a [MagicEndpoint], it does not accept connections
/// Even though the [`Gossip`] is created from a [`MagicEndpoint`], it does not accept connections
/// itself. You should run an accept loop on the MagicEndpoint yourself, check the ALPN protocol of incoming
/// connections, and if the ALPN protocol equals [GOSSIP_ALPN], forward the connection to the
/// connections, and if the ALPN protocol equals [`GOSSIP_ALPN`], forward the connection to the
/// gossip actor through [Self::handle_connection].
///
/// The gossip actor will, however, initiate new connections to other peers by itself.
Expand Down Expand Up @@ -149,20 +149,33 @@ impl Gossip {
Ok(())
}

/// Broadcast a message on a topic.
/// Broadcast a message on a topic to all peers in the swarm.
///
/// This does not join the topic automatically, so you have to call [Self::join] yourself
/// This does not join the topic automatically, so you have to call [`Self::join`] yourself
/// for messages to be broadcast to peers.
pub async fn broadcast(&self, topic: TopicId, message: Bytes) -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
self.send(ToActor::Broadcast(topic, message, tx)).await?;
self.send(ToActor::Broadcast(topic, message, Scope::Swarm, tx))
.await?;
rx.await??;
Ok(())
}

/// Broadcast a message on a topic to the immediate neighbors.
///
/// This does not join the topic automatically, so you have to call [`Self::join`] yourself
/// for messages to be broadcast to peers.
pub async fn broadcast_neighbors(&self, topic: TopicId, message: Bytes) -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
self.send(ToActor::Broadcast(topic, message, Scope::Neighbors, tx))
.await?;
rx.await??;
Ok(())
}

/// Subscribe to messages and event notifications for a topic.
///
/// Does not join the topic automatically, so you have to call [Self::join] yourself
/// Does not join the topic automatically, so you have to call [`Self::join`] yourself
/// to actually receive messages.
pub async fn subscribe(&self, topic: TopicId) -> anyhow::Result<broadcast::Receiver<Event>> {
let (tx, rx) = oneshot::channel();
Expand All @@ -173,7 +186,7 @@ impl Gossip {

/// Subscribe to all events published on topics that you joined.
///
/// Note that this method takes self by value. Usually you would clone the [Gossip] handle.
/// Note that this method takes self by value. Usually you would clone the [`Gossip`] handle.
/// before.
pub fn subscribe_all(self) -> impl Stream<Item = anyhow::Result<(TopicId, Event)>> {
Gen::new(|co| async move {
Expand All @@ -196,7 +209,7 @@ impl Gossip {
}
}

/// Pass an incoming [quinn::Connection] to the gossip actor.
/// Handle an incoming [`quinn::Connection`].
///
/// Make sure to check the ALPN protocol yourself before passing the connection.
pub async fn handle_connection(&self, conn: quinn::Connection) -> anyhow::Result<()> {
Expand Down Expand Up @@ -256,42 +269,37 @@ enum ConnOrigin {
}

/// Input messages for the gossip [`Actor`].
#[derive(derive_more::Debug)]
enum ToActor {
/// Handle a new QUIC connection, either from accept (external to the actor) or from connect
/// (happens internally in the actor).
ConnIncoming(PublicKey, ConnOrigin, quinn::Connection),
ConnIncoming(PublicKey, ConnOrigin, #[debug(skip)] quinn::Connection),
/// Join a topic with a list of peers. Reply with oneshot once at least one peer joined.
Join(TopicId, Vec<PublicKey>, oneshot::Sender<anyhow::Result<()>>),
Join(
TopicId,
Vec<PublicKey>,
#[debug(skip)] oneshot::Sender<anyhow::Result<()>>,
),
/// Leave a topic, send disconnect messages and drop all state.
Quit(TopicId),
/// Broadcast a message on a topic.
Broadcast(TopicId, Bytes, oneshot::Sender<anyhow::Result<()>>),
Broadcast(
TopicId,
#[debug("<{}b>", _1.len())] Bytes,
Scope,
#[debug(skip)] oneshot::Sender<anyhow::Result<()>>,
),
/// Subscribe to a topic. Return oneshot which resolves to a broadcast receiver for events on a
/// topic.
Subscribe(
TopicId,
oneshot::Sender<anyhow::Result<broadcast::Receiver<Event>>>,
#[debug(skip)] oneshot::Sender<anyhow::Result<broadcast::Receiver<Event>>>,
),
/// Subscribe to a topic. Return oneshot which resolves to a broadcast receiver for events on a
/// topic.
SubscribeAll(oneshot::Sender<anyhow::Result<broadcast::Receiver<(TopicId, Event)>>>),
}

impl fmt::Debug for ToActor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ToActor::ConnIncoming(peer_id, origin, _conn) => {
write!(f, "ConnIncoming({peer_id:?}, {origin:?})")
}
ToActor::Join(topic, peers, _reply) => write!(f, "Join({topic:?}, {peers:?})"),
ToActor::Quit(topic) => write!(f, "Quit({topic:?})"),
ToActor::Broadcast(topic, message, _reply) => {
write!(f, "Broadcast({topic:?}, bytes<{}>)", message.len())
}
ToActor::Subscribe(topic, _reply) => write!(f, "Subscribe({topic:?})"),
ToActor::SubscribeAll(_reply) => write!(f, "SubscribeAll"),
}
}
SubscribeAll(
#[debug(skip)] oneshot::Sender<anyhow::Result<broadcast::Receiver<(TopicId, Event)>>>,
),
}

/// Actor that sends and handles messages between the connection and main state loops
Expand Down Expand Up @@ -340,7 +348,8 @@ impl Actor {
},
_ = self.on_endpoints_rx.changed() => {
let info = self.endpoint.my_addr().await?;
let peer_data = postcard::to_stdvec(&info)?;
let peer_data = Bytes::from(postcard::to_stdvec(&info)?);

self.handle_in_event(InEvent::UpdatePeerData(peer_data.into()), Instant::now()).await?;
}
(peer_id, res) = self.dialer.next_conn() => {
Expand Down Expand Up @@ -429,9 +438,12 @@ impl Actor {
.await?;
self.subscribers_topic.remove(&topic_id);
}
ToActor::Broadcast(topic_id, message, reply) => {
self.handle_in_event(InEvent::Command(topic_id, Command::Broadcast(message)), now)
.await?;
ToActor::Broadcast(topic_id, message, scope, reply) => {
self.handle_in_event(
InEvent::Command(topic_id, Command::Broadcast(message, scope)),
now,
)
.await?;
reply.send(Ok(())).ok();
}
ToActor::Subscribe(topic_id, reply) => {
Expand Down Expand Up @@ -687,8 +699,8 @@ mod test {
loop {
let ev = stream2.recv().await.unwrap();
info!("go2 event: {ev:?}");
if let Event::Received(msg, _prev_peer) = ev {
recv.push(msg);
if let Event::Received(msg) = ev {
recv.push(msg.content);
}
if recv.len() == len {
return recv;
Expand All @@ -702,8 +714,8 @@ mod test {
loop {
let ev = stream3.recv().await.unwrap();
info!("go3 event: {ev:?}");
if let Event::Received(msg, _prev_peer) = ev {
recv.push(msg);
if let Event::Received(msg) = ev {
recv.push(msg.content);
}
if recv.len() == len {
return recv;
Expand Down
34 changes: 28 additions & 6 deletions iroh-gossip/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub mod util;
#[cfg(test)]
mod tests;

pub use plumtree::Scope;
pub use state::{InEvent, Message, OutEvent, State, Timer, TopicId};
pub use topic::{Command, Config, Event, IO};

Expand All @@ -79,7 +80,20 @@ impl<T> PeerIdentity for T where T: Hash + Eq + Copy + fmt::Debug + Serialize +
///
/// Implementations may use these bytes to supply addresses or other information needed to connect
/// to a peer that is not included in the peer's [`PeerIdentity`].
pub type PeerData = bytes::Bytes;
#[derive(
derive_more::Debug,
Serialize,
Deserialize,
Clone,
PartialEq,
Eq,
derive_more::From,
derive_more::Into,
derive_more::Deref,
Default,
)]
#[debug("PeerData({}b)", self.0.len())]
pub struct PeerData(bytes::Bytes);

/// PeerInfo contains a peer's identifier and the opaque peer data as provided by the implementer.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
Expand All @@ -106,7 +120,7 @@ mod test {
assert_synchronous_active, report_round_distribution, sort, Network, Simulator,
SimulatorConfig,
},
TopicId,
Scope, TopicId,
};

#[test]
Expand Down Expand Up @@ -215,10 +229,14 @@ mod test {
assert!(assert_synchronous_active(&network));

// now broadcast a first message
network.command(1, t, Command::Broadcast(b"hi1".to_vec().into()));
network.command(
1,
t,
Command::Broadcast(b"hi1".to_vec().into(), Scope::Swarm),
);
network.ticks(broadcast_ticks);
let events = network.events();
let received = events.filter(|x| matches!(x, (_, _, Event::Received(_, _))));
let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
// message should be received by two other nodes
assert_eq!(received.count(), 2);
assert!(assert_synchronous_active(&network));
Expand All @@ -230,10 +248,14 @@ mod test {
report_round_distribution(&network);

// now broadcast again
network.command(1, t, Command::Broadcast(b"hi2".to_vec().into()));
network.command(
1,
t,
Command::Broadcast(b"hi2".to_vec().into(), Scope::Swarm),
);
network.ticks(broadcast_ticks);
let events = network.events();
let received = events.filter(|x| matches!(x, (_, _, Event::Received(_, _))));
let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
// message should be received by all 5 other nodes
assert_eq!(received.count(), 5);
assert!(assert_synchronous_active(&network));
Expand Down
Loading