Skip to content

Commit

Permalink
Only successful outgoing connections marked
Browse files Browse the repository at this point in the history
  • Loading branch information
ikatson committed Dec 4, 2024
1 parent ac775af commit 0b129eb
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 51 deletions.
56 changes: 18 additions & 38 deletions crates/librqbit/src/torrent_state/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl TorrentStateLive {
session_stats: session_stats.clone(),
stats: Default::default(),
states: Default::default(),
live_peers: Default::default(),
live_outgoing_peers: Default::default(),
},
locked: RwLock::new(TorrentStateLocked {
chunks: Some(paused.chunk_tracker),
Expand Down Expand Up @@ -841,7 +841,6 @@ impl TorrentStateLive {
// As per BEP 11 recommended interval is min 60 seconds
const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60);

let mut live_peers = HashSet::new();
let mut connected = Vec::with_capacity(MAX_SENT_PEERS);
let mut dropped = Vec::with_capacity(MAX_SENT_PEERS);
let mut peer_view_of_live_peers = HashSet::new();
Expand All @@ -854,44 +853,25 @@ impl TorrentStateLive {
loop {
interval.tick().await;

// TODO: store them in a shared place
// Fill in live_peers
for ps in self.peers.states.iter() {
let peer = ps.value();
let addr = *peer.outgoing_address.as_ref().unwrap_or_else(|| ps.key());

// As per BEP 11 share only those we were able to connect
let has_outgoing_connections = peer
.stats
.counters
.outgoing_connections
.load(Ordering::Relaxed)
> 0;

let is_live = has_outgoing_connections && ps.value().is_live();
if is_live {
live_peers.insert(addr);
} else {
live_peers.remove(&addr);
}
{
let live_peers = self.peers.live_outgoing_peers.read();
connected.clear();
dropped.clear();

connected.extend(
live_peers
.difference(&peer_view_of_live_peers)
.take(MAX_SENT_PEERS)
.copied(),
);
dropped.extend(
peer_view_of_live_peers
.difference(&live_peers)
.take(MAX_SENT_PEERS)
.copied(),
);
}

connected.clear();
dropped.clear();

connected.extend(
live_peers
.difference(&peer_view_of_live_peers)
.take(MAX_SENT_PEERS)
.copied(),
);
dropped.extend(
peer_view_of_live_peers
.difference(&live_peers)
.take(MAX_SENT_PEERS)
.copied(),
);

// BEP 11 - Dont send closed if they are now in live
// it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent,
// and addrs_closed_to_sent are only filtered addresses from sent_peers_live
Expand Down
29 changes: 18 additions & 11 deletions crates/librqbit/src/torrent_state/live/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod stats;

use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::atomic::Ordering;

use librqbit_core::hash_id::Id20;
use librqbit_core::lengths::ChunkInfo;
Expand Down Expand Up @@ -133,21 +134,31 @@ impl Peer {
for counter in [&counters.session_stats.peers, &counters.stats] {
counter.dec(&self.state);
}
if matches!(&self.state, PeerState::Live(..)) {
counters.live_peers.write().retain(|a| *a != self.addr);
if let (Some(addr), PeerState::Live(..)) = (self.outgoing_address, &self.state) {
counters.live_outgoing_peers.write().remove(&addr);
}
}

pub fn set_state(&mut self, new: PeerState, counters: &PeerStates) -> PeerState {
for counter in [&counters.session_stats.peers, &counters.stats] {
counter.incdec(&self.state, &new);
}
if matches!(&self.state, PeerState::Live(..)) {
counters.live_peers.write().retain(|a| *a != self.addr);
}
if matches!(&new, PeerState::Live(..)) {
counters.live_peers.write().push(self.addr);
if let Some(addr) = self.outgoing_address {
if matches!(&self.state, PeerState::Live(..)) {
counters.live_outgoing_peers.write().remove(&addr);
}
if matches!(&new, PeerState::Live(..))
&& self
.stats
.counters
.outgoing_connections
.load(Ordering::Relaxed)
> 0
{
counters.live_outgoing_peers.write().insert(addr);
}
}

std::mem::replace(&mut self.state, new)
}

Expand All @@ -158,10 +169,6 @@ impl Peer {
}
}

pub fn is_live(&self) -> bool {
matches!(&self.state, PeerState::Live(_))
}

pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> {
match &mut self.state {
PeerState::Live(l) => Some(l),
Expand Down
6 changes: 4 additions & 2 deletions crates/librqbit/src/torrent_state/live/peers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::SocketAddr, sync::Arc};
use std::{collections::HashSet, net::SocketAddr, sync::Arc};

use anyhow::Context;
use backoff::backoff::Backoff;
Expand All @@ -22,7 +22,9 @@ pub mod stats;

pub(crate) struct PeerStates {
pub session_stats: Arc<AtomicSessionStats>,
pub live_peers: RwLock<Vec<PeerHandle>>,

// This keeps track of live addresses we connected to, for PEX.
pub live_outgoing_peers: RwLock<HashSet<PeerHandle>>,
pub stats: AggregatePeerStatsAtomic,
pub states: DashMap<PeerHandle, Peer>,
}
Expand Down

0 comments on commit 0b129eb

Please sign in to comment.