diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index da9037b0..4b1806c4 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -69,7 +69,7 @@ use librqbit_core::{ use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use peer_binary_protocol::{ extended::{ - handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage, + self, handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage, }, Handshake, Message, MessageOwned, Piece, Request, }; @@ -838,6 +838,93 @@ impl TorrentStateLive { .take_while(|r| r.is_ok()) .last(); } + + async fn task_send_pex_to_peer( + self: Arc, + peer_addr: SocketAddr, + tx: PeerTx, + ) -> anyhow::Result<()> { + let mut sent_peers_live: HashSet = HashSet::new(); + const MAX_SENT_PEERS: usize = 50; // As per BEP 11 we should not send more than 50 peers at once (here it also applies to fist message, should be OK as we anyhow really have more) + const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); // As per BEP 11 recommended interval is min 60 seconds + let mut delay = Duration::from_secs(10); // Wait 10 seconds before sending the first message to assure that peer will stay with us + + loop { + tokio::select! { + _ = tx.closed() => return Ok(()), + _ = tokio::time::sleep(delay) => {}, + + } + delay = PEX_MESSAGE_INTERVAL; + + let addrs_live_to_sent = self + .peers + .states + .iter() + .filter_map(|e| { + let peer = e.value(); + let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key()); + + if *addr != peer_addr { + let has_outgoing_connections = peer + .stats + .counters + .outgoing_connections + .load(Ordering::Relaxed) + > 0; // As per BEP 11 share only those we were able to connect + if peer.state.is_live() + && has_outgoing_connections + && !sent_peers_live.contains(addr) + { + Some(*addr) + } else { + None + } + } else { + None + } + }) + .take(MAX_SENT_PEERS) + .collect::>(); + + let addrs_closed_to_sent = sent_peers_live + .iter() + .filter(|addr| { + self.peers + .states + .get(addr) + .map(|p| !p.value().state.is_live()) + .unwrap_or(true) + }) + .copied() + .take(MAX_SENT_PEERS) + .collect::>(); + + // BEP 11 - Dont send closed if they are now in live + // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, + // and addrs_closed_to_sent are only filtered addresses from sent_peers_live + + if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { + debug!( + "sending PEX with {} live ({:?})and {} closed peers", + addrs_live_to_sent.len(), + addrs_live_to_sent, + addrs_closed_to_sent.len() + ); + let pex_msg = + extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); + let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); + let msg = Message::Extended(ext_msg); + + if tx.send(WriterRequest::Message(msg)).is_err() { + return Ok(()); // Peer disconnected + } + + sent_peers_live.extend(&addrs_live_to_sent); + sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); + } + } + } } struct PeerHandlerLocked { @@ -963,8 +1050,17 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { } fn on_extended_handshake(&self, hs: &ExtendedHandshake) -> anyhow::Result<()> { - if let Some(peer_pex_msg_id) = hs.ut_pex() { - trace!("peer supports pex at {peer_pex_msg_id}"); + if let Some(_peer_pex_msg_id) = hs.ut_pex() { + self.state.clone().spawn( + error_span!( + parent: self.state.torrent.span.clone(), + "sending_pex_to_peer", + peer = self.addr.to_string() + ), + self.state + .clone() + .task_send_pex_to_peer(self.addr, self.tx.clone()), + ); } // Lets update outgoing Socket address for incoming connection if self.incoming { diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index b0895f8e..3c6e6af0 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -148,6 +148,10 @@ impl PeerStateNoMut { } } + pub fn is_live(&self) -> bool { + matches!(&self.0, PeerState::Live(_)) + } + pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { match &mut self.0 { PeerState::Live(l) => Some(l), diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index 8e1b0a8a..a7c24b9f 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -1,7 +1,8 @@ use std::net::{IpAddr, SocketAddr}; +use buffers::ByteBufOwned; use byteorder::{ByteOrder, BE}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use clone_to_owned::CloneToOwned; use serde::{Deserialize, Serialize}; @@ -121,9 +122,58 @@ where } } +impl UtPex { + pub fn from_addrs<'a, I, J>(addrs_live: I, addrs_closed: J) -> Self + where + I: IntoIterator, + J: IntoIterator, + { + fn addrs_to_bytes<'a, I>(addrs: I) -> (Option, Option) + where + I: IntoIterator, + { + let mut ipv4_addrs = BytesMut::new(); + let mut ipv6_addrs = BytesMut::new(); + for addr in addrs { + match addr { + SocketAddr::V4(v4) => { + ipv4_addrs.extend_from_slice(&v4.ip().octets()); + ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes()); + } + SocketAddr::V6(v6) => { + ipv6_addrs.extend_from_slice(&v6.ip().octets()); + ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes()); + } + } + } + + let freeze = |buf: BytesMut| -> Option { + if !buf.is_empty() { + Some(buf.freeze().into()) + } else { + None + } + }; + + (freeze(ipv4_addrs), freeze(ipv6_addrs)) + } + + let (added, added6) = addrs_to_bytes(addrs_live); + let (dropped, dropped6) = addrs_to_bytes(addrs_closed); + + Self { + added, + added6, + dropped, + dropped6, + ..Default::default() + } + } +} + #[cfg(test)] mod tests { - use bencode::from_bytes; + use bencode::{bencode_serialize_to_writer, from_bytes}; use buffers::ByteBuf; use super::*; @@ -154,4 +204,35 @@ mod tests { ); assert_eq!(0, addrs[1].flags); } + + #[test] + fn test_pex_roundtrip() { + let a1 = "185.159.157.20:46439".parse::().unwrap(); + let a2 = "151.249.105.134:4240".parse::().unwrap(); + //IPV6 + let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439" + .parse::() + .unwrap(); + let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240" + .parse::() + .unwrap(); + + let addrs = vec![a1, aa1, a2, aa2]; + let pex = UtPex::from_addrs(&addrs, &addrs); + let mut bytes = Vec::new(); + bencode_serialize_to_writer(&pex, &mut bytes).unwrap(); + let pex2 = from_bytes::>(&bytes).unwrap(); + assert_eq!(4, pex2.added_peers().count()); + assert_eq!(pex.added_peers().count(), pex2.added_peers().count()); + let addrs2: Vec<_> = pex2.added_peers().collect(); + assert_eq!(a1, addrs2[0].addr); + assert_eq!(a2, addrs2[1].addr); + assert_eq!(aa1, addrs2[2].addr); + assert_eq!(aa2, addrs2[3].addr); + let addrs2: Vec<_> = pex2.dropped_peers().collect(); + assert_eq!(a1, addrs2[0].addr); + assert_eq!(a2, addrs2[1].addr); + assert_eq!(aa1, addrs2[2].addr); + assert_eq!(aa2, addrs2[3].addr); + } }