Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further e2e fixes for reliability #198

Merged
merged 2 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 57 additions & 32 deletions crates/librqbit/src/torrent_state/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
Expand Down Expand Up @@ -371,6 +371,7 @@ impl TorrentStateLive {
state: self.clone(),
tx,
counters,
first_message_received: AtomicBool::new(false),
};
let options = PeerConnectionOptions {
connect_timeout: self.meta.options.peer_connect_timeout,
Expand Down Expand Up @@ -434,6 +435,7 @@ impl TorrentStateLive {
state: state.clone(),
tx,
counters,
first_message_received: AtomicBool::new(false),
};
let options = PeerConnectionOptions {
connect_timeout: state.meta.options.peer_connect_timeout,
Expand Down Expand Up @@ -486,7 +488,7 @@ impl TorrentStateLive {
let state = self;
loop {
let addr = peer_queue_rx.recv().await.context("torrent closed")?;
if state.is_finished_and_dont_need_peers() {
if state.is_finished_and_no_active_streams() {
debug!("ignoring peer {} as we are finished", addr);
state.peers.mark_peer_not_needed(addr);
continue;
Expand Down Expand Up @@ -656,6 +658,7 @@ impl TorrentStateLive {
Ok(())
}

// If we have all selected pieces but not necessarily all pieces.
pub(crate) fn is_finished(&self) -> bool {
self.get_hns().map(|h| h.finished()).unwrap_or_default()
}
Expand All @@ -670,7 +673,9 @@ impl TorrentStateLive {
.any(|file_id| !chunks.is_file_finished(&self.meta.file_infos[file_id]))
}

fn is_finished_and_dont_need_peers(&self) -> bool {
// We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite
// them being selected, we aren't fully "finished".
fn is_finished_and_no_active_streams(&self) -> bool {
self.is_finished()
&& !self.has_active_streams_unfinished_files(
&self.lock_read("is_finished_and_dont_need_peers"),
Expand Down Expand Up @@ -766,6 +771,8 @@ struct PeerHandler {
addr: SocketAddr,

tx: PeerTx,

first_message_received: AtomicBool,
}

impl<'a> PeerConnectionHandler for &'a PeerHandler {
Expand All @@ -780,6 +787,14 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
}

async fn on_received_message(&self, message: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
// The first message must be "bitfield", but if it's not sent,
// assume the bitfield is all zeroes and was sent.
if !matches!(&message, Message::Bitfield(..))
&& !self.first_message_received.swap(true, Ordering::Relaxed)
{
self.on_bitfield_notify.notify_waiters();
}

match message {
Message::Request(request) => {
self.on_download_request(request)
Expand Down Expand Up @@ -931,7 +946,7 @@ impl PeerHandler {

self.counters.errors.fetch_add(1, Ordering::Relaxed);

if self.state.is_finished_and_dont_need_peers() {
if self.state.is_finished_and_no_active_streams() {
debug!("torrent finished, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return Ok(());
Expand All @@ -953,7 +968,9 @@ impl PeerHandler {
duration = format!("{dur:?}")
),
async move {
debug!("waiting to reconnect again");
tokio::time::sleep(dur).await;
debug!("finished waiting");
self.state
.peers
.with_peer_mut(handle, "dead_to_queued", |peer| {
Expand Down Expand Up @@ -1184,43 +1201,50 @@ impl PeerHandler {
.await;
}

// The job of this is to request chunks and also to keep peer alive.
// The moment this ends, the peer is disconnected.
async fn task_peer_chunk_requester(&self) -> anyhow::Result<()> {
let handle = self.addr;
self.wait_for_bitfield().await;

// TODO: this check needs to happen more often, we need to update our
// interested state with the other side, for now we send it only once.
if self.state.is_finished_and_dont_need_peers() {
self.tx
.send(WriterRequest::Message(MessageOwned::NotInterested))?;

if self
.state
.peers
.with_live(self.addr, |l| {
l.has_full_torrent(self.state.lengths.total_pieces() as usize)
})
.unwrap_or_default()
{
debug!("both peer and us have full torrent, disconnecting");
self.tx.send(WriterRequest::Disconnect(Ok(())))?;
// Sleep a bit to ensure this gets written to the network by manage_peer
tokio::time::sleep(Duration::from_millis(100)).await;
return Ok(());
let mut update_interest = {
let mut current = false;
move |h: &PeerHandler, new_value: bool| -> anyhow::Result<()> {
if new_value != current {
h.tx.send(if new_value {
WriterRequest::Message(MessageOwned::Interested)
} else {
WriterRequest::Message(MessageOwned::NotInterested)
})?;
current = new_value;
}
Ok(())
}
} else {
self.tx
.send(WriterRequest::Message(MessageOwned::Interested))?;
}
};

loop {
aframe!(self.wait_for_unchoke()).await;

if self.state.is_finished_and_dont_need_peers() {
debug!("nothing left to do, disconnecting peer");
return Ok(());
// If we have full torrent, we don't need to request more pieces.
// However we might still need to seed them to the peer.
if self.state.is_finished_and_no_active_streams() {
update_interest(self, false)?;
if !self.state.peers.is_peer_interested(self.addr) {
debug!("nothing left to do, neither of us is interested, disconnecting peer");
self.tx.send(WriterRequest::Disconnect(Ok(())))?;
// wait until the receiver gets the message so that it doesn't finish with an error.
tokio::time::sleep(Duration::from_millis(100)).await;
return Ok(());
} else {
// TODO: wait for a notification of interest, e.g. update of selected files or new streams or change
// in peer interest.
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
}

update_interest(self, true)?;

// Try steal a pice from a very slow peer first. Otherwise we might wait too long
// to download early pieces.
// Then try get the next one in queue.
Expand All @@ -1235,7 +1259,8 @@ impl PeerHandler {
None => {
debug!("no pieces to request");
match aframe!(tokio::time::timeout(
Duration::from_secs(10),
// Half of default rw timeout not to race with it.
Duration::from_secs(5),
new_piece_notify
))
.await
Expand Down Expand Up @@ -1277,7 +1302,7 @@ impl PeerHandler {

loop {
match aframe!(tokio::time::timeout(
Duration::from_secs(10),
Duration::from_secs(5),
aframe!(self.requests_sem.acquire())
))
.await
Expand Down
16 changes: 11 additions & 5 deletions crates/librqbit/src/torrent_state/live/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Peer {
tx: PeerTx,
counters: &AggregatePeerStatsAtomic,
) -> Self {
let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx)));
let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true)));
counters.inc(&state.0);
Self {
state,
Expand Down Expand Up @@ -142,7 +142,10 @@ impl PeerStateNoMut {
}
match self.take(counters) {
PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => {
self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters);
self.set(
PeerState::Live(LivePeerState::new(peer_id, tx, true)),
counters,
);
}
PeerState::Connecting(..) | PeerState::Live(..) => unreachable!(),
}
Expand All @@ -159,7 +162,10 @@ impl PeerStateNoMut {
PeerState::Connecting(tx) => tx,
_ => unreachable!(),
};
self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters);
self.set(
PeerState::Live(LivePeerState::new(peer_id, tx, false)),
counters,
);
self.get_live_mut()
} else {
None
Expand Down Expand Up @@ -189,10 +195,10 @@ pub(crate) struct LivePeerState {
}

impl LivePeerState {
pub fn new(peer_id: Id20, tx: PeerTx) -> Self {
pub fn new(peer_id: Id20, tx: PeerTx, initial_interested: bool) -> Self {
LivePeerState {
peer_id,
peer_interested: false,
peer_interested: initial_interested,
bitfield: BF::default(),
inflight_requests: Default::default(),
tx,
Expand Down
6 changes: 6 additions & 0 deletions crates/librqbit/src/torrent_state/live/peers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,19 @@ impl PeerStates {
Some(p)
}

pub fn is_peer_interested(&self, handle: PeerHandle) -> bool {
self.with_live(handle, |live| live.peer_interested)
.unwrap_or(false)
}

pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option<bool> {
self.with_live_mut(handle, "mark_peer_interested", |live| {
let prev = live.peer_interested;
live.peer_interested = is_interested;
prev
})
}

pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Box<[u8]>) -> Option<()> {
self.with_live_mut(handle, "update_bitfield_from_vec", |live| {
live.bitfield = BF::from_boxed_slice(bitfield);
Expand Down