diff --git a/Cargo.lock b/Cargo.lock index 80d77b8b..e313706e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,6 +115,33 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +[[package]] +name = "async-backtrace" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcb391558246d27a13f195c1e3a53eda422270fdd452bd57a5aa9c1da1bb198" +dependencies = [ + "async-backtrace-attributes", + "dashmap", + "futures", + "loom", + "once_cell", + "pin-project-lite", + "rustc-hash 1.1.0", + "static_assertions", +] + +[[package]] +name = "async-backtrace-attributes" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "affbba0d438add06462a0371997575927bc05052f7ec486e7a4ca405c956c3d7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "async-recursion" version = "1.1.1" @@ -963,6 +990,19 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e" +dependencies = [ + "cc", + "libc", + "log", + "rustversion", + "windows", +] + [[package]] name = "generic-array" version = "0.12.4" @@ -1467,6 +1507,7 @@ name = "librqbit" version = "7.0.0-beta.0" dependencies = [ "anyhow", + "async-backtrace", "async-stream", "async-trait", "axum 0.7.5", @@ -1689,6 +1730,19 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "loom" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lru" version = "0.12.4" @@ -2203,7 +2257,7 @@ dependencies = [ "pin-project-lite", "quinn-proto", "quinn-udp", - "rustc-hash", + "rustc-hash 2.0.0", "rustls", "socket2", "thiserror", @@ -2220,7 +2274,7 @@ dependencies = [ "bytes", "rand", "ring", - "rustc-hash", + "rustc-hash 2.0.0", "rustls", "slab", "thiserror", @@ -2481,6 +2535,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.0.0" @@ -2562,6 +2622,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -2992,6 +3058,12 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stringprep" version = "0.1.5" @@ -3696,6 +3768,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-core" version = "0.52.0" diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index ac98be7a..8d45a1d5 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -22,6 +22,7 @@ storage_middleware = ["lru"] storage_examples = [] tracing-subscriber-utils = ["tracing-subscriber"] postgres = ["sqlx"] +async-bt = ["async-backtrace"] [dependencies] sqlx = { version = "0.7", features = [ @@ -88,6 +89,7 @@ lru = { version = "0.12.3", optional = true } mime_guess = { version = "2.0.5", default-features = false } tokio-socks = "0.5.2" async-trait = "0.1.81" +async-backtrace = { version = "0.2", optional = true } [build-dependencies] anyhow = "1" diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 254a8319..d3a87cba 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -222,25 +222,6 @@ impl ChunkTracker { self.have[id.get() as usize] } - // None if wrong chunk - // true if did something - // false if didn't do anything - pub fn mark_chunk_request_cancelled( - &mut self, - index: ValidPieceIndex, - _chunk: u32, - ) -> Option { - if *self.have.get(index.get() as usize)? { - return Some(false); - } - // This will trigger the requesters to re-check each chunk in this piece. - let chunk_range = self.lengths.chunk_range(index); - if !self.chunk_status.get(chunk_range)?.all() { - self.queue_pieces.set(index.get() as usize, true); - } - Some(true) - } - pub fn mark_piece_broken_if_not_have(&mut self, index: ValidPieceIndex) { if self .have diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index e6a5a276..16ea5be5 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -25,6 +25,19 @@ #![warn(clippy::cast_possible_truncation)] +macro_rules! aframe { + ($e:expr) => {{ + #[cfg(feature = "async-bt")] + { + async_backtrace::frame!($e) + } + #[cfg(not(feature = "async-bt"))] + { + $e + } + }}; +} + pub mod api; mod api_error; mod chunk_tracker; diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 8d9af9dc..aec95640 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -199,7 +199,7 @@ impl PeerConnection { .await .context("error reading handshake")?; let h_supports_extended = h.supports_extended(); - trace!( + debug!( "connected: id={:?}", try_decode_peer_id(Id20::new(h.peer_id)) ); diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index 4831a769..9e5b45eb 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -54,14 +54,29 @@ impl TorrentStorage for FilesystemStorage { #[cfg(target_family = "unix")] { use std::os::unix::fs::FileExt; - Ok(of.file.read().read_exact_at(buf, offset)?) + Ok(of + .file + .read() + .as_ref() + .context("file is None")? + .read_exact_at(buf, offset)?) } - #[cfg(not(target_family = "unix"))] + #[cfg(target_family = "windows")] + { + use std::os::windows::fs::FileExt; + let mut remaining = buf.len(); + let g = of.file.read(); + let f = g.as_ref().context("file is None")?; + f.seek_read(buf, offset)?; + Ok(()) + } + #[cfg(not(any(target_family = "unix", target_family = "windows")))] { use std::io::{Read, Seek, SeekFrom}; let mut g = of.file.write(); - g.seek(SeekFrom::Start(offset))?; - Ok(g.read_exact(buf)?) + let mut f = g.as_ref().context("file is None")?; + f.seek(SeekFrom::Start(offset))?; + Ok(f.read_exact(buf)?) } } @@ -70,14 +85,21 @@ impl TorrentStorage for FilesystemStorage { #[cfg(target_family = "unix")] { use std::os::unix::fs::FileExt; - Ok(of.file.read().write_all_at(buf, offset)?) + Ok(of + .file + .read() + .as_ref() + .context("file is None")? + .write_all_at(buf, offset)?) } #[cfg(target_family = "windows")] { use std::os::windows::fs::FileExt; let mut remaining = buf.len(); + let g = of.file.read(); + let f = g.as_ref().context("file is None")?; while remaining > 0 { - remaining -= of.file.read().seek_write(buf, offset)?; + remaining -= f.seek_write(buf, offset)?; } Ok(()) } @@ -85,8 +107,9 @@ impl TorrentStorage for FilesystemStorage { { use std::io::{Read, Seek, SeekFrom, Write}; let mut g = of.file.write(); - g.seek(SeekFrom::Start(offset))?; - Ok(g.write_all(buf)?) + let mut f = g.as_ref().context("file is None")?; + f.seek(SeekFrom::Start(offset))?; + Ok(f.write_all(buf)?) } } @@ -95,7 +118,12 @@ impl TorrentStorage for FilesystemStorage { } fn ensure_file_length(&self, file_id: usize, len: u64) -> anyhow::Result<()> { - Ok(self.opened_files[file_id].file.write().set_len(len)?) + Ok(self.opened_files[file_id] + .file + .write() + .as_ref() + .context("file is None")? + .set_len(len)?) } fn take(&self) -> anyhow::Result> { diff --git a/crates/librqbit/src/storage/filesystem/mmap.rs b/crates/librqbit/src/storage/filesystem/mmap.rs index 07efe1eb..1e68b247 100644 --- a/crates/librqbit/src/storage/filesystem/mmap.rs +++ b/crates/librqbit/src/storage/filesystem/mmap.rs @@ -102,9 +102,10 @@ impl TorrentStorage for MmapFilesystemStorage { let mut mmaps = Vec::new(); for (idx, file) in self.fs.opened_files.iter().enumerate() { let fg = file.file.write(); + let fg = fg.as_ref().context("file is None")?; fg.set_len(meta.file_infos[idx].len) .context("mmap storage: error setting length")?; - let mmap = unsafe { MmapOptions::new().map_mut(&*fg) }.context("error mapping file")?; + let mmap = unsafe { MmapOptions::new().map_mut(fg) }.context("error mapping file")?; mmaps.push(RwLock::new(mmap)); } diff --git a/crates/librqbit/src/storage/filesystem/opened_file.rs b/crates/librqbit/src/storage/filesystem/opened_file.rs index 88f24ddd..f1f4e067 100644 --- a/crates/librqbit/src/storage/filesystem/opened_file.rs +++ b/crates/librqbit/src/storage/filesystem/opened_file.rs @@ -1,37 +1,22 @@ use std::fs::File; -use anyhow::Context; use parking_lot::RwLock; #[derive(Debug)] pub(crate) struct OpenedFile { - pub file: RwLock, -} - -pub(crate) fn dummy_file() -> anyhow::Result { - #[cfg(target_os = "windows")] - const DEVNULL: &str = "NUL"; - #[cfg(not(target_os = "windows"))] - const DEVNULL: &str = "/dev/null"; - - std::fs::OpenOptions::new() - .read(true) - .open(DEVNULL) - .with_context(|| format!("error opening {}", DEVNULL)) + pub file: RwLock>, } impl OpenedFile { pub fn new(f: File) -> Self { Self { - file: RwLock::new(f), + file: RwLock::new(Some(f)), } } - pub fn take(&self) -> anyhow::Result { + pub fn take(&self) -> anyhow::Result> { let mut f = self.file.write(); - let dummy = dummy_file()?; - let f = std::mem::replace(&mut *f, dummy); - Ok(f) + Ok(f.take()) } pub fn take_clone(&self) -> anyhow::Result { diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index 4c5cf0c3..75147bdb 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -14,14 +14,30 @@ use tracing::{error_span, info, Instrument}; use crate::{ create_torrent, - tests::test_util::{create_default_random_dir_with_torrents, TestPeerMetadata}, + tests::test_util::{ + create_default_random_dir_with_torrents, spawn_debug_server, TestPeerMetadata, + }, AddTorrentOptions, AddTorrentResponse, Session, SessionOptions, }; #[tokio::test(flavor = "multi_thread", worker_threads = 64)] async fn test_e2e_download() { + let timeout = std::env::var("E2E_TIMEOUT") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(180); + + tokio::time::timeout(Duration::from_secs(timeout), _test_e2e_download()) + .await + .context("test_e2e_download timed out") + .unwrap() +} + +async fn _test_e2e_download() { let _ = tracing_subscriber::fmt::try_init(); + spawn_debug_server(); + // 1. Create a torrent // Ideally (for a more complicated test) with N files, and at least N pieces that span 2 files. @@ -41,14 +57,17 @@ async fn test_e2e_download() { .await .unwrap(); - let num_servers = 128; + let num_servers = std::env::var("E2E_NUM_SERVERS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(128u8); let torrent_file_bytes = torrent_file.as_bytes().unwrap(); let mut futs = Vec::new(); // 2. Start N servers that are serving that torrent, and return their IP:port combos. // Disable DHT on each. - for i in 0u8..num_servers { + for i in 0..num_servers { let torrent_file_bytes = torrent_file_bytes.clone(); let tempdir = tempdir.path().to_owned(); let fut = spawn( diff --git a/crates/librqbit/src/tests/test_util.rs b/crates/librqbit/src/tests/test_util.rs index 965658a6..4f4dd3b4 100644 --- a/crates/librqbit/src/tests/test_util.rs +++ b/crates/librqbit/src/tests/test_util.rs @@ -1,8 +1,11 @@ use std::{io::Write, path::Path}; +use anyhow::Context; +use axum::{response::IntoResponse, routing::get, Router}; use librqbit_core::Id20; use rand::{thread_rng, Rng, RngCore, SeedableRng}; use tempfile::TempDir; +use tracing::info; pub fn create_new_file_with_random_content(path: &Path, mut size: usize) { let mut file = std::fs::OpenOptions::new() @@ -79,3 +82,38 @@ impl TestPeerMetadata { 0f64 } } + +async fn debug_server() -> anyhow::Result<()> { + async fn backtraces() -> impl IntoResponse { + #[cfg(feature = "async-bt")] + { + async_backtrace::taskdump_tree(true) + } + #[cfg(not(feature = "async-bt"))] + { + use crate::ApiError; + ApiError::from(anyhow::anyhow!( + "backtraces not enabled, enable async-bt feature" + )) + } + } + + let app = Router::new().route("/backtrace", get(backtraces)); + let app = app.into_make_service(); + + let addr = "127.0.0.1:3032"; + + info!(%addr, "starting HTTP server"); + + use tokio::net::TcpListener; + + let listener = TcpListener::bind(addr) + .await + .with_context(|| format!("error binding to {addr}"))?; + axum::serve(listener, app).await?; + Ok(()) +} + +pub fn spawn_debug_server() { + tokio::spawn(debug_server()); +} diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index a0d51db3..6bbb2a92 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -70,15 +70,12 @@ use peer_binary_protocol::{ extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage}, Handshake, Message, MessageOwned, Piece, Request, }; -use tokio::{ - sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - Notify, OwnedSemaphorePermit, Semaphore, - }, - time::timeout, +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + Notify, OwnedSemaphorePermit, Semaphore, }; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, error_span, info, trace, warn}; +use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::{ chunk_tracker::{ChunkMarkingResult, ChunkTracker, HaveNeededSelected}, @@ -177,6 +174,7 @@ pub struct TorrentStateLive { peer_queue_tx: UnboundedSender, finished_notify: Notify, + new_pieces_notify: Notify, down_speed_estimator: SpeedEstimator, up_speed_estimator: SpeedEstimator, @@ -233,6 +231,7 @@ impl TorrentStateLive { }, lengths, peer_semaphore: Arc::new(Semaphore::new(128)), + new_pieces_notify: Notify::new(), peer_queue_tx, finished_notify: Notify::new(), down_speed_estimator, @@ -347,8 +346,9 @@ impl TorrentStateLive { "manage_incoming_peer", addr = %checked_peer.addr ), - self.clone() - .task_manage_incoming_peer(checked_peer, counters, tx, rx, permit), + aframe!(self + .clone() + .task_manage_incoming_peer(checked_peer, counters, tx, rx, permit)), ); Ok(()) } @@ -449,7 +449,12 @@ impl TorrentStateLive { state.meta.spawner, state.meta.connector.clone(), ); - let requester = handler.task_peer_chunk_requester(); + let requester = aframe!(handler + .task_peer_chunk_requester() + .instrument(error_span!("chunk_requester"))); + let conn_manager = aframe!(peer_connection + .manage_peer_outgoing(rx, state.have_broadcast_tx.subscribe()) + .instrument(error_span!("peer_connection"))); handler .counters @@ -457,7 +462,7 @@ impl TorrentStateLive { .fetch_add(1, Ordering::Relaxed); let res = tokio::select! { r = requester => {r} - r = peer_connection.manage_peer_outgoing(rx, state.have_broadcast_tx.subscribe()) => {r} + r = conn_manager => {r} }; match res { @@ -490,7 +495,7 @@ impl TorrentStateLive { let permit = state.peer_semaphore.clone().acquire_owned().await?; state.spawn( error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()), - state.clone().task_manage_outgoing_peer(addr, permit), + aframe!(state.clone().task_manage_outgoing_peer(addr, permit)), ); } } @@ -897,7 +902,8 @@ impl PeerHandler { req.chunk_index ); g.get_chunks_mut()? - .mark_chunk_request_cancelled(req.piece_index, req.chunk_index); + .mark_piece_broken_if_not_have(req.piece_index); + self.state.new_pieces_notify.notify_waiters(); } } PeerState::NotNeeded => { @@ -926,7 +932,7 @@ impl PeerHandler { self.counters.errors.fetch_add(1, Ordering::Relaxed); if self.state.is_finished_and_dont_need_peers() { - trace!("torrent finished, not re-queueing"); + debug!("torrent finished, not re-queueing"); pe.value_mut().state.set(PeerState::NotNeeded, pstats); return Ok(()); } @@ -1208,7 +1214,7 @@ impl PeerHandler { } loop { - self.wait_for_unchoke().await; + aframe!(self.wait_for_unchoke()).await; if self.state.is_finished_and_dont_need_peers() { debug!("nothing left to do, disconnecting peer"); @@ -1219,6 +1225,7 @@ impl PeerHandler { // to download early pieces. // Then try get the next one in queue. // Afterwards means we are close to completion, try stealing more aggressively. + let new_piece_notify = self.state.new_pieces_notify.notified(); let next = match self .try_steal_old_slow_piece(10.) .map_or_else(|| self.reserve_next_needed_piece(), |v| Ok(Some(v)))? @@ -1227,7 +1234,15 @@ impl PeerHandler { Some(next) => next, None => { debug!("no pieces to request"); - tokio::time::sleep(Duration::from_secs(10)).await; + match aframe!(tokio::time::timeout( + Duration::from_secs(10), + new_piece_notify + )) + .await + { + Ok(()) => debug!("woken up, new pieces might be available"), + Err(_) => debug!("woken up by sleep timer"), + } continue; } }; @@ -1261,7 +1276,12 @@ impl PeerHandler { }; loop { - match timeout(Duration::from_secs(10), self.requests_sem.acquire()).await { + match aframe!(tokio::time::timeout( + Duration::from_secs(10), + aframe!(self.requests_sem.acquire()) + )) + .await + { Ok(acq) => break acq?.forget(), Err(_) => continue, }; @@ -1490,6 +1510,7 @@ impl PeerHandler { .lock_write("mark_piece_broken") .get_chunks_mut()? .mark_piece_broken_if_not_have(chunk_info.piece_index); + state.new_pieces_notify.notify_waiters(); anyhow::bail!("i am probably a bogus peer. dying.") } };