diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index b0e17c79..9f15f455 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -257,7 +257,7 @@ impl TorrentStateLive { session_stats: session_stats.clone(), stats: Default::default(), states: Default::default(), - live_peers: Default::default(), + live_outgoing_peers: Default::default(), }, locked: RwLock::new(TorrentStateLocked { chunks: Some(paused.chunk_tracker), @@ -841,7 +841,6 @@ impl TorrentStateLive { // As per BEP 11 recommended interval is min 60 seconds const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); - let mut live_peers = HashSet::new(); let mut connected = Vec::with_capacity(MAX_SENT_PEERS); let mut dropped = Vec::with_capacity(MAX_SENT_PEERS); let mut peer_view_of_live_peers = HashSet::new(); @@ -854,44 +853,25 @@ impl TorrentStateLive { loop { interval.tick().await; - // TODO: store them in a shared place - // Fill in live_peers - for ps in self.peers.states.iter() { - let peer = ps.value(); - let addr = *peer.outgoing_address.as_ref().unwrap_or_else(|| ps.key()); - - // As per BEP 11 share only those we were able to connect - let has_outgoing_connections = peer - .stats - .counters - .outgoing_connections - .load(Ordering::Relaxed) - > 0; - - let is_live = has_outgoing_connections && ps.value().is_live(); - if is_live { - live_peers.insert(addr); - } else { - live_peers.remove(&addr); - } + { + let live_peers = self.peers.live_outgoing_peers.read(); + connected.clear(); + dropped.clear(); + + connected.extend( + live_peers + .difference(&peer_view_of_live_peers) + .take(MAX_SENT_PEERS) + .copied(), + ); + dropped.extend( + peer_view_of_live_peers + .difference(&live_peers) + .take(MAX_SENT_PEERS) + .copied(), + ); } - connected.clear(); - dropped.clear(); - - connected.extend( - live_peers - .difference(&peer_view_of_live_peers) - .take(MAX_SENT_PEERS) - .copied(), - ); - dropped.extend( - peer_view_of_live_peers - .difference(&live_peers) - .take(MAX_SENT_PEERS) - .copied(), - ); - // BEP 11 - Dont send closed if they are now in live // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, // and addrs_closed_to_sent are only filtered addresses from sent_peers_live diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index c495f43d..fc1d04fa 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -2,6 +2,7 @@ pub mod stats; use std::collections::HashSet; use std::net::SocketAddr; +use std::sync::atomic::Ordering; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::ChunkInfo; @@ -133,8 +134,8 @@ impl Peer { for counter in [&counters.session_stats.peers, &counters.stats] { counter.dec(&self.state); } - if matches!(&self.state, PeerState::Live(..)) { - counters.live_peers.write().retain(|a| *a != self.addr); + if let (Some(addr), PeerState::Live(..)) = (self.outgoing_address, &self.state) { + counters.live_outgoing_peers.write().remove(&addr); } } @@ -142,12 +143,22 @@ impl Peer { for counter in [&counters.session_stats.peers, &counters.stats] { counter.incdec(&self.state, &new); } - if matches!(&self.state, PeerState::Live(..)) { - counters.live_peers.write().retain(|a| *a != self.addr); - } - if matches!(&new, PeerState::Live(..)) { - counters.live_peers.write().push(self.addr); + if let Some(addr) = self.outgoing_address { + if matches!(&self.state, PeerState::Live(..)) { + counters.live_outgoing_peers.write().remove(&addr); + } + if matches!(&new, PeerState::Live(..)) + && self + .stats + .counters + .outgoing_connections + .load(Ordering::Relaxed) + > 0 + { + counters.live_outgoing_peers.write().insert(addr); + } } + std::mem::replace(&mut self.state, new) } @@ -158,10 +169,6 @@ impl Peer { } } - pub fn is_live(&self) -> bool { - matches!(&self.state, PeerState::Live(_)) - } - pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { match &mut self.state { PeerState::Live(l) => Some(l), diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index 43112732..17c07582 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::{collections::HashSet, net::SocketAddr, sync::Arc}; use anyhow::Context; use backoff::backoff::Backoff; @@ -22,7 +22,9 @@ pub mod stats; pub(crate) struct PeerStates { pub session_stats: Arc, - pub live_peers: RwLock>, + + // This keeps track of live addresses we connected to, for PEX. + pub live_outgoing_peers: RwLock>, pub stats: AggregatePeerStatsAtomic, pub states: DashMap, }