From c1775e45ebe8c7e9c808f767babfe8634aa7a8bf Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 19 Aug 2024 11:13:16 +0100 Subject: [PATCH 01/11] E2E_NUM_SERVERS env var --- crates/librqbit/src/tests/e2e.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index 4c5cf0c3..abce8f91 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -41,14 +41,17 @@ async fn test_e2e_download() { .await .unwrap(); - let num_servers = 128; + let num_servers = std::env::var("E2E_NUM_SERVERS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(128u8); let torrent_file_bytes = torrent_file.as_bytes().unwrap(); let mut futs = Vec::new(); // 2. Start N servers that are serving that torrent, and return their IP:port combos. // Disable DHT on each. - for i in 0u8..num_servers { + for i in 0..num_servers { let torrent_file_bytes = torrent_file_bytes.clone(); let tempdir = tempdir.path().to_owned(); let fut = spawn( From 60f831bc6f47dafdeba1edcc1692ae534312adbd Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 19 Aug 2024 11:25:45 +0100 Subject: [PATCH 02/11] dummy file is Option now instead of additional open --- crates/librqbit/src/storage/filesystem/fs.rs | 30 +++++++++++++++---- .../librqbit/src/storage/filesystem/mmap.rs | 3 +- .../src/storage/filesystem/opened_file.rs | 23 +++----------- 3 files changed, 30 insertions(+), 26 deletions(-) diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index 4831a769..5d202b08 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -54,7 +54,12 @@ impl TorrentStorage for FilesystemStorage { #[cfg(target_family = "unix")] { use std::os::unix::fs::FileExt; - Ok(of.file.read().read_exact_at(buf, offset)?) + Ok(of + .file + .read() + .as_ref() + .context("file is None")? + .read_exact_at(buf, offset)?) } #[cfg(not(target_family = "unix"))] { @@ -70,14 +75,21 @@ impl TorrentStorage for FilesystemStorage { #[cfg(target_family = "unix")] { use std::os::unix::fs::FileExt; - Ok(of.file.read().write_all_at(buf, offset)?) + Ok(of + .file + .read() + .as_ref() + .context("file is None")? + .write_all_at(buf, offset)?) } #[cfg(target_family = "windows")] { use std::os::windows::fs::FileExt; let mut remaining = buf.len(); + let g = of.file.read(); + let f = g.as_ref().context("file is None")?; while remaining > 0 { - remaining -= of.file.read().seek_write(buf, offset)?; + remaining -= f.seek_write(buf, offset)?; } Ok(()) } @@ -85,8 +97,9 @@ impl TorrentStorage for FilesystemStorage { { use std::io::{Read, Seek, SeekFrom, Write}; let mut g = of.file.write(); - g.seek(SeekFrom::Start(offset))?; - Ok(g.write_all(buf)?) + let mut f = g.as_ref().context("file is None")?; + f.seek(SeekFrom::Start(offset))?; + Ok(f.write_all(buf)?) } } @@ -95,7 +108,12 @@ impl TorrentStorage for FilesystemStorage { } fn ensure_file_length(&self, file_id: usize, len: u64) -> anyhow::Result<()> { - Ok(self.opened_files[file_id].file.write().set_len(len)?) + Ok(self.opened_files[file_id] + .file + .write() + .as_ref() + .context("file is None")? + .set_len(len)?) } fn take(&self) -> anyhow::Result> { diff --git a/crates/librqbit/src/storage/filesystem/mmap.rs b/crates/librqbit/src/storage/filesystem/mmap.rs index 07efe1eb..1e68b247 100644 --- a/crates/librqbit/src/storage/filesystem/mmap.rs +++ b/crates/librqbit/src/storage/filesystem/mmap.rs @@ -102,9 +102,10 @@ impl TorrentStorage for MmapFilesystemStorage { let mut mmaps = Vec::new(); for (idx, file) in self.fs.opened_files.iter().enumerate() { let fg = file.file.write(); + let fg = fg.as_ref().context("file is None")?; fg.set_len(meta.file_infos[idx].len) .context("mmap storage: error setting length")?; - let mmap = unsafe { MmapOptions::new().map_mut(&*fg) }.context("error mapping file")?; + let mmap = unsafe { MmapOptions::new().map_mut(fg) }.context("error mapping file")?; mmaps.push(RwLock::new(mmap)); } diff --git a/crates/librqbit/src/storage/filesystem/opened_file.rs b/crates/librqbit/src/storage/filesystem/opened_file.rs index 88f24ddd..f1f4e067 100644 --- a/crates/librqbit/src/storage/filesystem/opened_file.rs +++ b/crates/librqbit/src/storage/filesystem/opened_file.rs @@ -1,37 +1,22 @@ use std::fs::File; -use anyhow::Context; use parking_lot::RwLock; #[derive(Debug)] pub(crate) struct OpenedFile { - pub file: RwLock, -} - -pub(crate) fn dummy_file() -> anyhow::Result { - #[cfg(target_os = "windows")] - const DEVNULL: &str = "NUL"; - #[cfg(not(target_os = "windows"))] - const DEVNULL: &str = "/dev/null"; - - std::fs::OpenOptions::new() - .read(true) - .open(DEVNULL) - .with_context(|| format!("error opening {}", DEVNULL)) + pub file: RwLock>, } impl OpenedFile { pub fn new(f: File) -> Self { Self { - file: RwLock::new(f), + file: RwLock::new(Some(f)), } } - pub fn take(&self) -> anyhow::Result { + pub fn take(&self) -> anyhow::Result> { let mut f = self.file.write(); - let dummy = dummy_file()?; - let f = std::mem::replace(&mut *f, dummy); - Ok(f) + Ok(f.take()) } pub fn take_clone(&self) -> anyhow::Result { From 6e92eec167b6ad2be888139b565852e92c67e43d Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 19 Aug 2024 12:08:46 +0100 Subject: [PATCH 03/11] global timeout in e2e test --- crates/librqbit/src/tests/e2e.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index abce8f91..e7ec84a5 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -18,8 +18,16 @@ use crate::{ AddTorrentOptions, AddTorrentResponse, Session, SessionOptions, }; +const TIMEOUT_SECS: u64 = 180; + #[tokio::test(flavor = "multi_thread", worker_threads = 64)] async fn test_e2e_download() { + tokio::time::timeout(Duration::from_secs(TIMEOUT_SECS), _test_e2e_download()) + .await + .unwrap() +} + +async fn _test_e2e_download() { let _ = tracing_subscriber::fmt::try_init(); // 1. Create a torrent From 79e206d5a798aad34f832e6603b47f67337fe6e0 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 19 Aug 2024 12:09:40 +0100 Subject: [PATCH 04/11] more tracing spans --- crates/librqbit/src/torrent_state/live/mod.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index a0d51db3..ac5deaee 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -78,7 +78,7 @@ use tokio::{ time::timeout, }; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, error_span, info, trace, warn}; +use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::{ chunk_tracker::{ChunkMarkingResult, ChunkTracker, HaveNeededSelected}, @@ -449,7 +449,12 @@ impl TorrentStateLive { state.meta.spawner, state.meta.connector.clone(), ); - let requester = handler.task_peer_chunk_requester(); + let requester = handler + .task_peer_chunk_requester() + .instrument(error_span!("chunk_requester")); + let conn_manager = peer_connection + .manage_peer_outgoing(rx, state.have_broadcast_tx.subscribe()) + .instrument(error_span!("peer_connection")); handler .counters @@ -457,7 +462,7 @@ impl TorrentStateLive { .fetch_add(1, Ordering::Relaxed); let res = tokio::select! { r = requester => {r} - r = peer_connection.manage_peer_outgoing(rx, state.have_broadcast_tx.subscribe()) => {r} + r = conn_manager => {r} }; match res { @@ -926,7 +931,7 @@ impl PeerHandler { self.counters.errors.fetch_add(1, Ordering::Relaxed); if self.state.is_finished_and_dont_need_peers() { - trace!("torrent finished, not re-queueing"); + debug!("torrent finished, not re-queueing"); pe.value_mut().state.set(PeerState::NotNeeded, pstats); return Ok(()); } From 7cda2c9807aba5c1322420335861ae893b22e85f Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 19 Aug 2024 12:35:32 +0100 Subject: [PATCH 05/11] async_backtrace --- Cargo.lock | 85 ++++++++++++++++++- crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/tests/e2e.rs | 6 +- crates/librqbit/src/tests/test_util.rs | 28 ++++++ crates/librqbit/src/torrent_state/live/mod.rs | 21 +++-- 5 files changed, 131 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 80d77b8b..e313706e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,6 +115,33 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +[[package]] +name = "async-backtrace" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcb391558246d27a13f195c1e3a53eda422270fdd452bd57a5aa9c1da1bb198" +dependencies = [ + "async-backtrace-attributes", + "dashmap", + "futures", + "loom", + "once_cell", + "pin-project-lite", + "rustc-hash 1.1.0", + "static_assertions", +] + +[[package]] +name = "async-backtrace-attributes" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "affbba0d438add06462a0371997575927bc05052f7ec486e7a4ca405c956c3d7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "async-recursion" version = "1.1.1" @@ -963,6 +990,19 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e" +dependencies = [ + "cc", + "libc", + "log", + "rustversion", + "windows", +] + [[package]] name = "generic-array" version = "0.12.4" @@ -1467,6 +1507,7 @@ name = "librqbit" version = "7.0.0-beta.0" dependencies = [ "anyhow", + "async-backtrace", "async-stream", "async-trait", "axum 0.7.5", @@ -1689,6 +1730,19 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "loom" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lru" version = "0.12.4" @@ -2203,7 +2257,7 @@ dependencies = [ "pin-project-lite", "quinn-proto", "quinn-udp", - "rustc-hash", + "rustc-hash 2.0.0", "rustls", "socket2", "thiserror", @@ -2220,7 +2274,7 @@ dependencies = [ "bytes", "rand", "ring", - "rustc-hash", + "rustc-hash 2.0.0", "rustls", "slab", "thiserror", @@ -2481,6 +2535,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.0.0" @@ -2562,6 +2622,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -2992,6 +3058,12 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stringprep" version = "0.1.5" @@ -3696,6 +3768,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-core" version = "0.52.0" diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index ac98be7a..6ddab8be 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -88,6 +88,7 @@ lru = { version = "0.12.3", optional = true } mime_guess = { version = "2.0.5", default-features = false } tokio-socks = "0.5.2" async-trait = "0.1.81" +async-backtrace = "0.2" [build-dependencies] anyhow = "1" diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index e7ec84a5..3edf7ead 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -14,7 +14,9 @@ use tracing::{error_span, info, Instrument}; use crate::{ create_torrent, - tests::test_util::{create_default_random_dir_with_torrents, TestPeerMetadata}, + tests::test_util::{ + create_default_random_dir_with_torrents, spawn_debug_server, TestPeerMetadata, + }, AddTorrentOptions, AddTorrentResponse, Session, SessionOptions, }; @@ -30,6 +32,8 @@ async fn test_e2e_download() { async fn _test_e2e_download() { let _ = tracing_subscriber::fmt::try_init(); + spawn_debug_server(); + // 1. Create a torrent // Ideally (for a more complicated test) with N files, and at least N pieces that span 2 files. diff --git a/crates/librqbit/src/tests/test_util.rs b/crates/librqbit/src/tests/test_util.rs index 965658a6..1ffe9df6 100644 --- a/crates/librqbit/src/tests/test_util.rs +++ b/crates/librqbit/src/tests/test_util.rs @@ -1,8 +1,11 @@ use std::{io::Write, path::Path}; +use anyhow::Context; +use axum::{response::IntoResponse, routing::get, Router}; use librqbit_core::Id20; use rand::{thread_rng, Rng, RngCore, SeedableRng}; use tempfile::TempDir; +use tracing::info; pub fn create_new_file_with_random_content(path: &Path, mut size: usize) { let mut file = std::fs::OpenOptions::new() @@ -79,3 +82,28 @@ impl TestPeerMetadata { 0f64 } } + +async fn debug_server() -> anyhow::Result<()> { + async fn backtraces() -> impl IntoResponse { + async_backtrace::taskdump_tree(true) + } + + let app = Router::new().route("/backtrace", get(backtraces)); + let app = app.into_make_service(); + + let addr = "127.0.0.1:3032"; + + info!(%addr, "starting HTTP server"); + + use tokio::net::TcpListener; + + let listener = TcpListener::bind(addr) + .await + .with_context(|| format!("error binding to {addr}"))?; + axum::serve(listener, app).await?; + Ok(()) +} + +pub fn spawn_debug_server() { + tokio::spawn(debug_server()); +} diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ac5deaee..a1b0cd68 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -353,6 +353,7 @@ impl TorrentStateLive { Ok(()) } + #[async_backtrace::framed] async fn task_manage_incoming_peer( self: Arc, checked_peer: CheckedIncomingConnection, @@ -413,6 +414,7 @@ impl TorrentStateLive { Ok(()) } + #[async_backtrace::framed] async fn task_manage_outgoing_peer( self: Arc, addr: SocketAddr, @@ -449,12 +451,12 @@ impl TorrentStateLive { state.meta.spawner, state.meta.connector.clone(), ); - let requester = handler + let requester = async_backtrace::frame!(handler .task_peer_chunk_requester() - .instrument(error_span!("chunk_requester")); - let conn_manager = peer_connection + .instrument(error_span!("chunk_requester"))); + let conn_manager = async_backtrace::frame!(peer_connection .manage_peer_outgoing(rx, state.have_broadcast_tx.subscribe()) - .instrument(error_span!("peer_connection")); + .instrument(error_span!("peer_connection"))); handler .counters @@ -1213,7 +1215,7 @@ impl PeerHandler { } loop { - self.wait_for_unchoke().await; + async_backtrace::frame!(self.wait_for_unchoke()).await; if self.state.is_finished_and_dont_need_peers() { debug!("nothing left to do, disconnecting peer"); @@ -1232,7 +1234,7 @@ impl PeerHandler { Some(next) => next, None => { debug!("no pieces to request"); - tokio::time::sleep(Duration::from_secs(10)).await; + async_backtrace::frame!(tokio::time::sleep(Duration::from_secs(10))).await; continue; } }; @@ -1266,7 +1268,12 @@ impl PeerHandler { }; loop { - match timeout(Duration::from_secs(10), self.requests_sem.acquire()).await { + match async_backtrace::frame!(timeout( + Duration::from_secs(10), + async_backtrace::frame!(self.requests_sem.acquire()) + )) + .await + { Ok(acq) => break acq?.forget(), Err(_) => continue, }; From 02eca15c709c19976f1d09e24b0fc9da303e43cf Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 19 Aug 2024 12:54:56 +0100 Subject: [PATCH 06/11] Remove a spurious complicated function ".mark_chunk_request_cancelled" --- crates/librqbit/src/chunk_tracker.rs | 19 ------------------- crates/librqbit/src/torrent_state/live/mod.rs | 2 +- 2 files changed, 1 insertion(+), 20 deletions(-) diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 254a8319..d3a87cba 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -222,25 +222,6 @@ impl ChunkTracker { self.have[id.get() as usize] } - // None if wrong chunk - // true if did something - // false if didn't do anything - pub fn mark_chunk_request_cancelled( - &mut self, - index: ValidPieceIndex, - _chunk: u32, - ) -> Option { - if *self.have.get(index.get() as usize)? { - return Some(false); - } - // This will trigger the requesters to re-check each chunk in this piece. - let chunk_range = self.lengths.chunk_range(index); - if !self.chunk_status.get(chunk_range)?.all() { - self.queue_pieces.set(index.get() as usize, true); - } - Some(true) - } - pub fn mark_piece_broken_if_not_have(&mut self, index: ValidPieceIndex) { if self .have diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index a1b0cd68..896ca885 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -904,7 +904,7 @@ impl PeerHandler { req.chunk_index ); g.get_chunks_mut()? - .mark_chunk_request_cancelled(req.piece_index, req.chunk_index); + .mark_piece_broken_if_not_have(req.piece_index); } } PeerState::NotNeeded => { From 032b34c5d62e782c656257713cd9d06933def24a Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 19 Aug 2024 13:05:15 +0100 Subject: [PATCH 07/11] upgrade a log message to debug --- crates/librqbit/src/peer_connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 8d9af9dc..aec95640 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -199,7 +199,7 @@ impl PeerConnection { .await .context("error reading handshake")?; let h_supports_extended = h.supports_extended(); - trace!( + debug!( "connected: id={:?}", try_decode_peer_id(Id20::new(h.peer_id)) ); From 0cb92eb333419d7ec0e6c1c06519d623f01e50a7 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 19 Aug 2024 13:16:33 +0100 Subject: [PATCH 08/11] Notify for released pieces --- crates/librqbit/src/torrent_state/live/mod.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 896ca885..ed9f0fc4 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -177,6 +177,7 @@ pub struct TorrentStateLive { peer_queue_tx: UnboundedSender, finished_notify: Notify, + new_pieces_notify: Notify, down_speed_estimator: SpeedEstimator, up_speed_estimator: SpeedEstimator, @@ -233,6 +234,7 @@ impl TorrentStateLive { }, lengths, peer_semaphore: Arc::new(Semaphore::new(128)), + new_pieces_notify: Notify::new(), peer_queue_tx, finished_notify: Notify::new(), down_speed_estimator, @@ -905,6 +907,7 @@ impl PeerHandler { ); g.get_chunks_mut()? .mark_piece_broken_if_not_have(req.piece_index); + self.state.new_pieces_notify.notify_waiters(); } } PeerState::NotNeeded => { @@ -1226,6 +1229,7 @@ impl PeerHandler { // to download early pieces. // Then try get the next one in queue. // Afterwards means we are close to completion, try stealing more aggressively. + let new_piece_notify = self.state.new_pieces_notify.notified(); let next = match self .try_steal_old_slow_piece(10.) .map_or_else(|| self.reserve_next_needed_piece(), |v| Ok(Some(v)))? @@ -1234,7 +1238,12 @@ impl PeerHandler { Some(next) => next, None => { debug!("no pieces to request"); - async_backtrace::frame!(tokio::time::sleep(Duration::from_secs(10))).await; + let _ = async_backtrace::frame!(tokio::time::timeout( + Duration::from_secs(10), + new_piece_notify + )) + .await; + debug!("woken up"); continue; } }; @@ -1502,6 +1511,7 @@ impl PeerHandler { .lock_write("mark_piece_broken") .get_chunks_mut()? .mark_piece_broken_if_not_have(chunk_info.piece_index); + state.new_pieces_notify.notify_waiters(); anyhow::bail!("i am probably a bogus peer. dying.") } }; From 17353cf8e183de431c3c1b51cf826cda2700708e Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 19 Aug 2024 13:19:56 +0100 Subject: [PATCH 09/11] Timeout configurable --- crates/librqbit/src/tests/e2e.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index 3edf7ead..75147bdb 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -20,12 +20,16 @@ use crate::{ AddTorrentOptions, AddTorrentResponse, Session, SessionOptions, }; -const TIMEOUT_SECS: u64 = 180; - #[tokio::test(flavor = "multi_thread", worker_threads = 64)] async fn test_e2e_download() { - tokio::time::timeout(Duration::from_secs(TIMEOUT_SECS), _test_e2e_download()) + let timeout = std::env::var("E2E_TIMEOUT") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(180); + + tokio::time::timeout(Duration::from_secs(timeout), _test_e2e_download()) .await + .context("test_e2e_download timed out") .unwrap() } From e4aac7930fc578fd3f128ab3d4e7b2e1c0135577 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 19 Aug 2024 13:40:01 +0100 Subject: [PATCH 10/11] make async-backtrace optional --- crates/librqbit/Cargo.toml | 3 +- crates/librqbit/src/lib.rs | 13 +++++++ crates/librqbit/src/tests/test_util.rs | 12 +++++- crates/librqbit/src/torrent_state/live/mod.rs | 37 +++++++++---------- 4 files changed, 44 insertions(+), 21 deletions(-) diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 6ddab8be..8d45a1d5 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -22,6 +22,7 @@ storage_middleware = ["lru"] storage_examples = [] tracing-subscriber-utils = ["tracing-subscriber"] postgres = ["sqlx"] +async-bt = ["async-backtrace"] [dependencies] sqlx = { version = "0.7", features = [ @@ -88,7 +89,7 @@ lru = { version = "0.12.3", optional = true } mime_guess = { version = "2.0.5", default-features = false } tokio-socks = "0.5.2" async-trait = "0.1.81" -async-backtrace = "0.2" +async-backtrace = { version = "0.2", optional = true } [build-dependencies] anyhow = "1" diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index e6a5a276..16ea5be5 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -25,6 +25,19 @@ #![warn(clippy::cast_possible_truncation)] +macro_rules! aframe { + ($e:expr) => {{ + #[cfg(feature = "async-bt")] + { + async_backtrace::frame!($e) + } + #[cfg(not(feature = "async-bt"))] + { + $e + } + }}; +} + pub mod api; mod api_error; mod chunk_tracker; diff --git a/crates/librqbit/src/tests/test_util.rs b/crates/librqbit/src/tests/test_util.rs index 1ffe9df6..4f4dd3b4 100644 --- a/crates/librqbit/src/tests/test_util.rs +++ b/crates/librqbit/src/tests/test_util.rs @@ -85,7 +85,17 @@ impl TestPeerMetadata { async fn debug_server() -> anyhow::Result<()> { async fn backtraces() -> impl IntoResponse { - async_backtrace::taskdump_tree(true) + #[cfg(feature = "async-bt")] + { + async_backtrace::taskdump_tree(true) + } + #[cfg(not(feature = "async-bt"))] + { + use crate::ApiError; + ApiError::from(anyhow::anyhow!( + "backtraces not enabled, enable async-bt feature" + )) + } } let app = Router::new().route("/backtrace", get(backtraces)); diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ed9f0fc4..6bbb2a92 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -70,12 +70,9 @@ use peer_binary_protocol::{ extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage}, Handshake, Message, MessageOwned, Piece, Request, }; -use tokio::{ - sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - Notify, OwnedSemaphorePermit, Semaphore, - }, - time::timeout, +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + Notify, OwnedSemaphorePermit, Semaphore, }; use tokio_util::sync::CancellationToken; use tracing::{debug, error, error_span, info, trace, warn, Instrument}; @@ -349,13 +346,13 @@ impl TorrentStateLive { "manage_incoming_peer", addr = %checked_peer.addr ), - self.clone() - .task_manage_incoming_peer(checked_peer, counters, tx, rx, permit), + aframe!(self + .clone() + .task_manage_incoming_peer(checked_peer, counters, tx, rx, permit)), ); Ok(()) } - #[async_backtrace::framed] async fn task_manage_incoming_peer( self: Arc, checked_peer: CheckedIncomingConnection, @@ -416,7 +413,6 @@ impl TorrentStateLive { Ok(()) } - #[async_backtrace::framed] async fn task_manage_outgoing_peer( self: Arc, addr: SocketAddr, @@ -453,10 +449,10 @@ impl TorrentStateLive { state.meta.spawner, state.meta.connector.clone(), ); - let requester = async_backtrace::frame!(handler + let requester = aframe!(handler .task_peer_chunk_requester() .instrument(error_span!("chunk_requester"))); - let conn_manager = async_backtrace::frame!(peer_connection + let conn_manager = aframe!(peer_connection .manage_peer_outgoing(rx, state.have_broadcast_tx.subscribe()) .instrument(error_span!("peer_connection"))); @@ -499,7 +495,7 @@ impl TorrentStateLive { let permit = state.peer_semaphore.clone().acquire_owned().await?; state.spawn( error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()), - state.clone().task_manage_outgoing_peer(addr, permit), + aframe!(state.clone().task_manage_outgoing_peer(addr, permit)), ); } } @@ -1218,7 +1214,7 @@ impl PeerHandler { } loop { - async_backtrace::frame!(self.wait_for_unchoke()).await; + aframe!(self.wait_for_unchoke()).await; if self.state.is_finished_and_dont_need_peers() { debug!("nothing left to do, disconnecting peer"); @@ -1238,12 +1234,15 @@ impl PeerHandler { Some(next) => next, None => { debug!("no pieces to request"); - let _ = async_backtrace::frame!(tokio::time::timeout( + match aframe!(tokio::time::timeout( Duration::from_secs(10), new_piece_notify )) - .await; - debug!("woken up"); + .await + { + Ok(()) => debug!("woken up, new pieces might be available"), + Err(_) => debug!("woken up by sleep timer"), + } continue; } }; @@ -1277,9 +1276,9 @@ impl PeerHandler { }; loop { - match async_backtrace::frame!(timeout( + match aframe!(tokio::time::timeout( Duration::from_secs(10), - async_backtrace::frame!(self.requests_sem.acquire()) + aframe!(self.requests_sem.acquire()) )) .await { From 8b1ca494393f56f6af7d79bf737eb510c63f465e Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 19 Aug 2024 13:58:05 +0100 Subject: [PATCH 11/11] fix windows build --- crates/librqbit/src/storage/filesystem/fs.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index 5d202b08..9e5b45eb 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -61,12 +61,22 @@ impl TorrentStorage for FilesystemStorage { .context("file is None")? .read_exact_at(buf, offset)?) } - #[cfg(not(target_family = "unix"))] + #[cfg(target_family = "windows")] + { + use std::os::windows::fs::FileExt; + let mut remaining = buf.len(); + let g = of.file.read(); + let f = g.as_ref().context("file is None")?; + f.seek_read(buf, offset)?; + Ok(()) + } + #[cfg(not(any(target_family = "unix", target_family = "windows")))] { use std::io::{Read, Seek, SeekFrom}; let mut g = of.file.write(); - g.seek(SeekFrom::Start(offset))?; - Ok(g.read_exact(buf)?) + let mut f = g.as_ref().context("file is None")?; + f.seek(SeekFrom::Start(offset))?; + Ok(f.read_exact(buf)?) } }