Skip to content

Commit

Permalink
fix: keep two connections between peers
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jul 31, 2024
1 parent 19034dc commit 8ba735b
Showing 1 changed file with 71 additions and 50 deletions.
121 changes: 71 additions & 50 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,10 @@ struct Actor {
/// Map of topics to their state.
topics: HashMap<TopicId, TopicState>,
/// Map of peers to their state.
peers: HashMap<NodeId, PeerState>,
peers: HashMap<NodeId, PeerInfo>,
/// Stream of commands from topic handles.
command_rx: stream_group::Keyed<TopicCommandStream>,
/// Internal queue of nodes to disconnect from.
/// Internal queue of topic to close because all handles were dropped.
quit_queue: VecDeque<TopicId>,
/// Tasks for the connection loops, to keep track of panics.
connection_tasks: JoinSet<()>,
Expand Down Expand Up @@ -347,7 +347,7 @@ impl Actor {
Ok(conn) => {
debug!(peer = ?peer_id, "dial successful");
inc!(Metrics, actor_tick_dialer_success);
self.handle_to_actor_msg(ToActor::HandleConnection(peer_id, ConnOrigin::Dial, conn), Instant::now()).await.context("dialer.next -> conn -> handle_to_actor_msg")?;
self.handle_connection(peer_id, ConnOrigin::Dial, conn);
}
Err(err) => {
warn!(peer = ?peer_id, "dial failed: {err}");
Expand Down Expand Up @@ -425,13 +425,37 @@ impl Actor {
Ok(())
}

fn init_peer_conn(
&mut self,
peer_id: NodeId,
origin: ConnOrigin,
conn: Connection,
queue: Vec<ProtoMessage>,
) -> mpsc::Sender<ProtoMessage> {
fn handle_connection(&mut self, peer_id: NodeId, origin: ConnOrigin, conn: Connection) {
// Check that we only keep one connection per peer per direction.
if let Some(peer_info) = self.peers.get(&peer_id) {
if matches!(origin, ConnOrigin::Dial) && peer_info.conn_dialed.is_some() {
warn!(?peer_id, ?origin, "ignoring connection: already accepted");
return;
}
if matches!(origin, ConnOrigin::Accept) && peer_info.conn_accepted.is_some() {
warn!(?peer_id, ?origin, "ignoring connection: already accepted");
return;
}
}

let mut peer_info = self.peers.remove(&peer_id).unwrap_or_default();

// Store the connection so that we can terminate it when the peer is removed.
match origin {
ConnOrigin::Dial => {
peer_info.conn_dialed = Some(conn.clone());
}
ConnOrigin::Accept => {
peer_info.conn_accepted = Some(conn.clone());
}
}

// Extract the queue of pending messages.
let queue = match &mut peer_info.state {
PeerState::Pending { queue } => std::mem::replace(queue, Vec::new()),
PeerState::Active { .. } => vec![],
};

let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
let max_message_size = self.state.max_message_size();
let in_event_tx = self.in_event_tx.clone();
Expand Down Expand Up @@ -460,26 +484,20 @@ impl Actor {
}
.instrument(error_span!("gossip_conn", peer = %peer_id.fmt_short())),
);
send_tx

peer_info.state = match peer_info.state {
PeerState::Pending { .. } => PeerState::Active { send_tx },
PeerState::Active { send_tx } => PeerState::Active { send_tx },
};

self.peers.insert(peer_id, peer_info);
}

async fn handle_to_actor_msg(&mut self, msg: ToActor, now: Instant) -> anyhow::Result<()> {
trace!("handle to_actor {msg:?}");
match msg {
ToActor::HandleConnection(peer_id, origin, conn) => {
let queue = match self.peers.remove(&peer_id) {
Some(state @ PeerState::Active { .. }) => {
debug!(?peer_id, ?origin, "ignoring connection: already active");
self.peers.insert(peer_id, state);
return Ok(());
}
Some(PeerState::Pending { queue }) => queue,
None => vec![],
};
self.dialer.abort_dial(peer_id);
let send_tx = self.init_peer_conn(peer_id, origin, conn.clone(), queue);
self.peers
.insert(peer_id, PeerState::Active { conn, send_tx });
self.handle_connection(peer_id, origin, conn)
}
ToActor::Join {
topic_id,
Expand Down Expand Up @@ -554,23 +572,21 @@ impl Actor {
};
match event {
OutEvent::SendMessage(peer_id, message) => {
match self.peers.get_mut(&peer_id) {
Some(PeerState::Active { send_tx, .. }) => {
let info = self.peers.entry(peer_id).or_default();
match &mut info.state {
PeerState::Active { send_tx } => {
if let Err(_err) = send_tx.send(message).await {
// Removing the peer is handled by the in_event PeerDisconnected sent
// at the end of the connection task.
warn!("connection loop for {peer_id:?} dropped");
}
}
Some(PeerState::Pending { queue }) => {
PeerState::Pending { queue } => {
if queue.is_empty() {
self.dialer.queue_dial(peer_id, GOSSIP_ALPN);
}
queue.push(message);
}
None => {
let queue = vec![message];
self.peers.insert(peer_id, PeerState::Pending { queue });
debug!(peer = ?peer_id, "dial");
self.dialer.queue_dial(peer_id, GOSSIP_ALPN);
}
}
}
OutEvent::EmitEvent(topic_id, event) => {
Expand Down Expand Up @@ -605,15 +621,13 @@ impl Actor {
}
OutEvent::DisconnectPeer(peer_id) => {
if let Some(peer) = self.peers.remove(&peer_id) {
match peer {
PeerState::Pending { .. } => {
self.dialer.abort_dial(peer_id);
}
PeerState::Active { conn, send_tx } => {
conn.close(0u8.into(), b"close from disconnect");
drop(send_tx);
}
if let Some(conn) = peer.conn_dialed {
conn.close(0u8.into(), b"close from disconnect");
}
if let Some(conn) = peer.conn_accepted {
conn.close(0u8.into(), b"close from disconnect");
}
drop(peer.state);
}
}
OutEvent::PeerData(node_id, data) => match decode_peer_data(&data) {
Expand All @@ -635,16 +649,23 @@ impl Actor {
}
}

#[derive(derive_more::Debug)]
#[derive(Debug, Default)]
struct PeerInfo {
state: PeerState,
conn_dialed: Option<Connection>,
conn_accepted: Option<Connection>,
}

#[derive(Debug)]
enum PeerState {
Pending {
queue: Vec<ProtoMessage>,
},
Active {
#[debug("Connection")]
conn: Connection,
send_tx: mpsc::Sender<ProtoMessage>,
},
Pending { queue: Vec<ProtoMessage> },
Active { send_tx: mpsc::Sender<ProtoMessage> },
}

impl Default for PeerState {
fn default() -> Self {
PeerState::Pending { queue: Vec::new() }
}
}

#[derive(Debug, Default)]
Expand Down

0 comments on commit 8ba735b

Please sign in to comment.