diff --git a/Cargo.lock b/Cargo.lock index 3bdbe5cd..2b71311e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "addr2line" -version = "0.24.1" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" dependencies = [ "gimli", ] @@ -244,9 +244,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-lc-rs" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f95446d919226d587817a7d21379e6eb099b97b45110a7f272a444ca5c54070" +checksum = "cdd82dba44d209fddb11c190e0a94b78651f95299598e472215667417a03ff1d" dependencies = [ "aws-lc-sys", "mirai-annotations", @@ -257,9 +257,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.21.2" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3ddc4a5b231dd6958b140ff3151b6412b3f4321fab354f399eec8f14b06df62" +checksum = "df7a4168111d7eb622a31b214057b8509c0a7e1794f44c546d742330dc793972" dependencies = [ "bindgen", "cc", @@ -580,9 +580,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.24" +version = "1.1.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "812acba72f0a070b003d3697490d2b55b837230ae7c6c6497f05cc2ddbb8d938" +checksum = "e8d9e0b4957f635b8d3da819d0db5603620467ecf1f692d22a8c2717ce27e6d8" dependencies = [ "jobserver", "libc", @@ -1482,9 +1482,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -1497,9 +1497,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1507,15 +1507,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -1535,15 +1535,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1552,21 +1552,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -1754,9 +1754,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "gio" @@ -2290,9 +2290,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.10.0" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "187674a687eed5fe42285b40c6291f9a01517d415fad1c3cbc6a9f778af7fcd4" +checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" [[package]] name = "is-docker" @@ -3388,9 +3388,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.4" +version = "0.36.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a" +checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" dependencies = [ "memchr", ] @@ -3732,18 +3732,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.5" +version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +checksum = "baf123a161dde1e524adf36f90bc5d8d3462824a9c43553ad07a8183161189ec" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.5" +version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +checksum = "a4502d8515ca9f32f1fb543d987f63d95a14934883db45bdb48060b6b69257f8" dependencies = [ "proc-macro2", "quote", @@ -4360,9 +4360,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.13" +version = "0.23.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2dabaac7466917e566adb06783a81ca48944c6898a1b08b9374106dd671f4c8" +checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8" dependencies = [ "once_cell", "ring", @@ -5198,9 +5198,9 @@ dependencies = [ [[package]] name = "tao" -version = "0.30.2" +version = "0.30.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06e48d7c56b3f7425d061886e8ce3b6acfab1993682ed70bef50fd133d721ee6" +checksum = "a0dbbebe82d02044dfa481adca1550d6dd7bd16e086bc34fa0fbecceb5a63751" dependencies = [ "bitflags 2.6.0", "cocoa", @@ -5260,9 +5260,9 @@ checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" [[package]] name = "tauri" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c9c08beea86d5095b6f5fb1c788fe8759b23c3f71927c66a69e725a91d089cd" +checksum = "f3fad474c02a3bcd4a304afff97159a31b9bab84e29563f7109c7b0ce8cd774e" dependencies = [ "anyhow", "bytes", @@ -5310,9 +5310,9 @@ dependencies = [ [[package]] name = "tauri-build" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93bb649a284aec2ab43e8df6831b8c8060d231ec8ddf05bf021d58cb67570e1f" +checksum = "935f9b3c49b22b3e2e485a57f46d61cd1ae07b1cbb2ba87387a387caf2d8c4e7" dependencies = [ "anyhow", "cargo_toml", @@ -5332,9 +5332,9 @@ dependencies = [ [[package]] name = "tauri-codegen" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4511912612ba0da11aeb300e18e18b2c7067fd14aa886eac46bdcc43b4fa3ee" +checksum = "95d7443dd4f0b597704b6a14b964ee2ed16e99928d8e6292ae9825f09fbcd30e" dependencies = [ "base64 0.22.1", "brotli", @@ -5359,9 +5359,9 @@ dependencies = [ [[package]] name = "tauri-macros" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ee976578a14b779996d7b6879d7e625c8ce674bc87e223953664f37def2eef" +checksum = "4d2c0963ccfc3f5194415f2cce7acc975942a8797fbabfb0aa1ed6f59326ae7f" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -5373,9 +5373,9 @@ dependencies = [ [[package]] name = "tauri-plugin" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "774d084450b7ec8e445ad119079307f935b7bf3d736da139a8664eb1d4909aa5" +checksum = "b2e6660a409963e4d57b9bfab4addd141eeff41bd3a7fb14e13004a832cf7ef6" dependencies = [ "anyhow", "glob", @@ -5390,9 +5390,9 @@ dependencies = [ [[package]] name = "tauri-plugin-shell" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2929bb35edb7255949e0cbcb2285ff6b02371bf826ad03471077b6b3bf4e6d60" +checksum = "371fb9aca2823990a2d0db7970573be5fdf07881fcaa2b835b29631feb84aec1" dependencies = [ "encoding_rs", "log", @@ -5411,9 +5411,9 @@ dependencies = [ [[package]] name = "tauri-runtime" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2570e1f33f332a2d2d9967ebb3903bc4e1f92b9c47e4d1b302c10ea4153fcdbb" +checksum = "af12ad1af974b274ef1d32a94e6eba27a312b429ef28fcb98abc710df7f9151d" dependencies = [ "dpi", "gtk", @@ -5430,9 +5430,9 @@ dependencies = [ [[package]] name = "tauri-runtime-wry" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8147d8f9ed418d83a90af3d64fbdca5e0e924ae28e5351da88f9568169db8665" +checksum = "e45e88aa0b11b302d836e6ea3e507a6359044c4a8bc86b865ba99868c695753d" dependencies = [ "gtk", "http", diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 2ebfdaf2..3fe41e3b 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -402,9 +402,8 @@ impl RecursiveRequest { self.callbacks.on_request_start(self, id, addr); } - let response = self.dht.request(self.request.clone(), addr).await.map(|r| { + let response = self.dht.request(self.request.clone(), addr).await.inspect(|r| { self.mark_node_responded(addr, &r); - r }); if let Some(id) = id { self.callbacks.on_request_end(self, id, addr, &response); diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 4dfc0d50..f94b2da5 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -19,6 +19,7 @@ use crate::{ peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}, FileStream, ManagedTorrentHandle, }, + StreamOptions, }; #[cfg(feature = "tracing-subscriber-utils")] @@ -423,7 +424,7 @@ impl Api { pub fn api_stream(&self, idx: TorrentIdOrHash, file_id: usize) -> Result { let mgr = self.mgr_handle(idx)?; - Ok(mgr.stream(file_id)?) + Ok(mgr.stream(file_id, StreamOptions::default())?) } } diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 455381cb..df03b8fb 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -61,6 +61,7 @@ pub mod session_stats; mod spawn_utils; pub mod storage; mod stream_connect; +mod stream_ops; mod torrent_state; #[cfg(feature = "tracing-subscriber-utils")] pub mod tracing_subscriber_config_utils; @@ -80,6 +81,7 @@ pub use session::{ SessionPersistenceConfig, SUPPORTED_SCHEMES, }; pub use spawn_utils::spawn as librqbit_spawn; +pub use stream_ops::StreamOptions; pub use torrent_state::{ ManagedTorrent, ManagedTorrentShared, ManagedTorrentState, TorrentStats, TorrentStatsState, }; diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 3d54cf2e..9b3632e7 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -73,6 +73,7 @@ pub(crate) struct PeerConnection { options: PeerConnectionOptions, spawner: BlockingSpawner, connector: Arc, + read_controller: tokio::sync::watch::Receiver, } pub(crate) async fn with_timeout( @@ -97,6 +98,7 @@ impl PeerConnection { options: Option, spawner: BlockingSpawner, connector: Arc, + read_controller: tokio::sync::watch::Receiver, ) -> Self { PeerConnection { handler, @@ -106,6 +108,7 @@ impl PeerConnection { spawner, options: options.unwrap_or_default(), connector, + read_controller, } } @@ -394,20 +397,27 @@ impl PeerConnection { let reader = async move { loop { - let message = read_buf - .read_message(&mut read_half, rwtimeout) - .await - .context("error reading message")?; - trace!("received: {:?}", &message); - - if let Message::Extended(ExtendedMessage::Handshake(h)) = &message { - *extended_handshake_ref.write() = Some(h.clone_to_owned(None)); - self.handler.on_extended_handshake(h)?; - } else { - self.handler - .on_received_message(message) + // Check if value has been marked as "true", meaning that we do not have to wait before reading further messages + if *self.read_controller.borrow() { + let message = read_buf + .read_message(&mut read_half, rwtimeout) .await - .context("error in handler.on_received_message()")?; + .context("error reading message")?; + trace!("received: {:?}", &message); + + if let Message::Extended(ExtendedMessage::Handshake(h)) = &message { + *extended_handshake_ref.write() = Some(h.clone_to_owned(None)); + self.handler.on_extended_handshake(h)?; + } else { + self.handler + .on_received_message(message) + .await + .context("error in handler.on_received_message()")?; + } + } else { + // Todo: make it configurable + println!("cannot read, must wait (500ms)..."); + tokio::time::sleep(Duration::from_millis(500)).await; } } diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index a22ab43f..bfb44a22 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -45,6 +45,8 @@ pub(crate) async fn read_metainfo_from_peer( result_tx: Mutex::new(Some(result_tx)), locked: RwLock::new(None), }; + + let (_, rx) = tokio::sync::watch::channel(true); let connection = PeerConnection::new( addr, info_hash, @@ -53,6 +55,7 @@ pub(crate) async fn read_metainfo_from_peer( peer_connection_options, spawner, connector, + rx, ); let result_reader = async move { result_rx.await? }; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 83a03bee..a3c9c4a9 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -20,7 +20,8 @@ use crate::{ session_stats::SessionStats, spawn_utils::BlockingSpawner, storage::{ - filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage, + filesystem::FilesystemStorageFactory, BoxStorageFactory, MemoryWatcherStorage, + StorageFactoryExt, TorrentStorage, }, stream_connect::{SocksProxyConfig, StreamConnector}, torrent_state::{ @@ -702,6 +703,7 @@ impl Session { } session.start_speed_estimator_updater(); + session.start_memory_usage_updater(); Ok(session) } @@ -836,6 +838,53 @@ impl Session { self.root_span.as_ref().and_then(|s| s.id()) } + pub(crate) fn start_memory_usage_updater(self: &Arc) { + self.spawn(error_span!(parent: self.rs(), "memory_usage"), { + let s = Arc::downgrade(self); + + async move { + let mut poll_interval = tokio::time::interval(Duration::from_secs(1)); + loop { + poll_interval.tick().await; + // Check memory usage here + let s = s.upgrade().context("session is dead")?; + + let r = s.db.read(); + + let max_size = 100 * 1024 * 1024; + + for v in r.torrents.values() { + if let Some(t) = v.live() { + if t.files.get_current_memory_size() > max_size { + println!( + "TOO MUCH MEMORY, PAUSING {}: {}/{} = {} %", + v.id(), + t.files.get_current_memory_size(), + max_size, + (t.files.get_current_memory_size() as f32 / (max_size as f32) + * 100.0) as usize + ); + // TODO: ADD SEND ! + // if let Err(e) = t.read_controller_tx.send(false) { + // println!("{}", e); + // } + } else { + println!( + "Torrent {} is OK: {}/{} = {} %", + v.id(), + t.files.get_current_memory_size(), + max_size, + (t.files.get_current_memory_size() as f32 / (max_size as f32) + * 100.0) as usize + ); + } + } + } + } + } + }); + } + /// Stop the session and all managed tasks. pub async fn stop(&self) { let torrents = self @@ -1184,9 +1233,10 @@ impl Session { let initializing = Arc::new(TorrentStateInitializing::new( minfo.clone(), only_files.clone(), - minfo.storage_factory.create_and_init(&minfo)?, + MemoryWatcherStorage::new(minfo.storage_factory.create_and_init(&minfo)?), false, )); + let (tx, rx) = tokio::sync::watch::channel(true); let handle = Arc::new(ManagedTorrent { locked: RwLock::new(ManagedTorrentLocked { paused: opts.paused, @@ -1195,6 +1245,8 @@ impl Session { }), state_change_notify: Notify::new(), shared: minfo, + read_controller_recv: rx, + read_controller_send: tx, }); g.add_torrent(handle.clone(), id); diff --git a/crates/librqbit/src/storage/internal_storage.rs b/crates/librqbit/src/storage/internal_storage.rs new file mode 100644 index 00000000..379ba85d --- /dev/null +++ b/crates/librqbit/src/storage/internal_storage.rs @@ -0,0 +1,60 @@ +use parking_lot::RwLock; + +use super::TorrentStorage; + +pub struct MemoryWatcherStorage { + inner: Box, + size_in_memory: RwLock, +} + +impl MemoryWatcherStorage { + pub fn new(inner: Box) -> Self { + Self { + inner, + size_in_memory: RwLock::new(0), + } + } + + pub fn as_storage(&self) -> &dyn TorrentStorage { + self + } + + pub fn get_current_memory_size(&self) -> usize { + *(self.size_in_memory.read()) + } +} + +impl TorrentStorage for MemoryWatcherStorage { + fn init(&mut self, meta: &crate::ManagedTorrentShared) -> anyhow::Result<()> { + self.inner.init(meta) + } + + fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + self.inner.pread_exact(file_id, offset, buf) + } + + fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { + { + let mut write = self.size_in_memory.write(); + *write += buf.len(); + } + + self.inner.pwrite_all(file_id, offset, buf) + } + + fn remove_file(&self, file_id: usize, filename: &std::path::Path) -> anyhow::Result<()> { + self.inner.remove_file(file_id, filename) + } + + fn remove_directory_if_empty(&self, path: &std::path::Path) -> anyhow::Result<()> { + self.inner.remove_directory_if_empty(path) + } + + fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()> { + self.inner.ensure_file_length(file_id, length) + } + + fn take(&self) -> anyhow::Result> { + self.inner.take() + } +} diff --git a/crates/librqbit/src/storage/mod.rs b/crates/librqbit/src/storage/mod.rs index efefa6e4..913bd76c 100644 --- a/crates/librqbit/src/storage/mod.rs +++ b/crates/librqbit/src/storage/mod.rs @@ -1,4 +1,7 @@ pub mod filesystem; +mod internal_storage; + +pub use internal_storage::MemoryWatcherStorage; #[cfg(feature = "storage_examples")] pub mod examples; diff --git a/crates/librqbit/src/stream_ops.rs b/crates/librqbit/src/stream_ops.rs new file mode 100644 index 00000000..bc1e7715 --- /dev/null +++ b/crates/librqbit/src/stream_ops.rs @@ -0,0 +1,20 @@ + +const MAX_STORED_CONTENT_SIZE: usize = 100 * 1024 * 1024; // 100 MB + +/// Data structure representing what can be tweaked when streaming data. +pub struct StreamOptions { + /// Once served to client, data is removed from memory (and thus needs to be re-fetched if needed again) + pub erase_content_after_served: bool, + /// Maximum size allowed to be stored in the `TorrentStorage` before throttling. + /// Allows a finer control over what memory quantity can be used. + pub max_stored_content_size: usize +} + +impl Default for StreamOptions { + fn default() -> Self { + Self { + erase_content_after_served: false, + max_stored_content_size: MAX_STORED_CONTENT_SIZE + } + } +} diff --git a/crates/librqbit/src/tests/e2e_stream.rs b/crates/librqbit/src/tests/e2e_stream.rs index 4af90c1b..6ad4d890 100644 --- a/crates/librqbit/src/tests/e2e_stream.rs +++ b/crates/librqbit/src/tests/e2e_stream.rs @@ -8,7 +8,7 @@ use tracing::info; use crate::{ create_torrent, tests::test_util::{setup_test_logging, TestPeerMetadata}, - AddTorrent, CreateTorrentOptions, Session, + AddTorrent, CreateTorrentOptions, Session, StreamOptions, }; use super::test_util::create_default_random_dir_with_torrents; @@ -103,7 +103,7 @@ async fn e2e_stream() -> anyhow::Result<()> { info!("client torrent initialized, starting stream"); - let mut stream = client_handle.stream(0)?; + let mut stream = client_handle.stream(0, StreamOptions::default())?; let mut buf = Vec::::with_capacity(8192); stream.read_to_end(&mut buf).await?; diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index ed4e2288..3b9866e7 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -15,19 +15,14 @@ use size_format::SizeFormatterBinary as SF; use tracing::{info, trace, warn}; use crate::{ - api::TorrentIdOrHash, - bitv::BitV, - bitv_factory::BitVFactory, - chunk_tracker::ChunkTracker, - file_ops::FileOps, - type_aliases::{FileStorage, BF}, - FileInfos, + api::TorrentIdOrHash, bitv::BitV, bitv_factory::BitVFactory, chunk_tracker::ChunkTracker, + file_ops::FileOps, storage::MemoryWatcherStorage, type_aliases::BF, FileInfos, }; use super::{paused::TorrentStatePaused, ManagedTorrentShared}; pub struct TorrentStateInitializing { - pub(crate) files: FileStorage, + pub(crate) files: MemoryWatcherStorage, pub(crate) shared: Arc, pub(crate) only_files: Option>, pub(crate) checked_bytes: AtomicU64, @@ -56,7 +51,7 @@ impl TorrentStateInitializing { pub fn new( meta: Arc, only_files: Option>, - files: FileStorage, + files: MemoryWatcherStorage, previously_errored: bool, ) -> Self { Self { @@ -245,7 +240,7 @@ impl TorrentStateInitializing { .unwrap_or(true) { let now = Instant::now(); - if let Err(err) = self.files.ensure_file_length(idx, fi.len) { + if let Err(err) = self.files.as_storage().ensure_file_length(idx, fi.len) { warn!( "Error setting length for file {:?} to {}: {:#?}", fi.relative_filename, fi.len, err @@ -265,7 +260,7 @@ impl TorrentStateInitializing { let paused = TorrentStatePaused { shared: self.shared.clone(), - files: self.files.take()?, + files: self.files.as_storage().take()?, chunk_tracker, streams: Arc::new(Default::default()), }; diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 220c6371..1069cce5 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -88,8 +88,9 @@ use crate::{ }, session::CheckedIncomingConnection, session_stats::atomic::AtomicSessionStats, + storage::MemoryWatcherStorage, torrent_state::{peer::Peer, utils::atomic_inc}, - type_aliases::{DiskWorkQueueSender, FilePriorities, FileStorage, PeerHandle, BF}, + type_aliases::{DiskWorkQueueSender, FilePriorities, PeerHandle, BF}, }; use self::{ @@ -177,7 +178,7 @@ pub struct TorrentStateLive { torrent: Arc, locked: RwLock, - pub(crate) files: FileStorage, + pub(crate) files: MemoryWatcherStorage, per_piece_locks: Vec>, @@ -201,6 +202,9 @@ pub struct TorrentStateLive { pub(crate) streams: Arc, have_broadcast_tx: tokio::sync::broadcast::Sender, + + pub(crate) read_controller_rx: tokio::sync::watch::Receiver, + pub(crate) read_controller_tx: tokio::sync::watch::Sender, } impl TorrentStateLive { @@ -208,6 +212,8 @@ impl TorrentStateLive { paused: TorrentStatePaused, fatal_errors_tx: tokio::sync::oneshot::Sender, cancellation_token: CancellationToken, + read_controller_rx: tokio::sync::watch::Receiver, + read_controller_tx: tokio::sync::watch::Sender, ) -> anyhow::Result> { let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); let session = paused @@ -253,7 +259,7 @@ impl TorrentStateLive { fatal_errors_tx: Some(fatal_errors_tx), unflushed_bitv_bytes: 0, }), - files: paused.files, + files: MemoryWatcherStorage::new(paused.files), stats: AtomicStats { have_bytes: AtomicU64::new(have_bytes), ..Default::default() @@ -272,6 +278,8 @@ impl TorrentStateLive { per_piece_locks: (0..lengths.total_pieces()) .map(|_| RwLock::new(())) .collect(), + read_controller_rx, + read_controller_tx, }); state.spawn( @@ -422,6 +430,7 @@ impl TorrentStateLive { Some(options), self.torrent.spawner, self.torrent.connector.clone(), + self.read_controller_rx.clone(), ); let requester = handler.task_peer_chunk_requester(); @@ -487,6 +496,7 @@ impl TorrentStateLive { Some(options), state.torrent.spawner, state.torrent.connector.clone(), + state.read_controller_rx.clone(), ); let requester = aframe!(handler .task_peer_chunk_requester() @@ -555,7 +565,7 @@ impl TorrentStateLive { pub(crate) fn file_ops(&self) -> FileOps<'_> { FileOps::new( &self.torrent.info, - &*self.files, + self.files.as_storage(), &self.torrent().file_infos, &self.lengths, ) @@ -666,7 +676,7 @@ impl TorrentStateLive { // g.chunks; Ok(TorrentStatePaused { shared: self.torrent.clone(), - files: self.files.take()?, + files: self.files.as_storage().take()?, chunk_tracker, streams: self.streams.clone(), }) @@ -721,7 +731,7 @@ impl TorrentStateLive { } fn on_piece_completed(&self, id: ValidPieceIndex) -> anyhow::Result<()> { - if let Err(e) = self.files.on_piece_completed(id) { + if let Err(e) = self.files.as_storage().on_piece_completed(id) { debug!(?id, "file storage errored in on_piece_completed(): {e:#}"); } let mut g = self.lock_write("on_piece_completed"); @@ -844,6 +854,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { async fn on_received_message(&self, message: Message>) -> anyhow::Result<()> { // The first message must be "bitfield", but if it's not sent, // assume the bitfield is all zeroes and was sent. + if !matches!(&message, Message::Bitfield(..)) && !self.first_message_received.swap(true, Ordering::Relaxed) { diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index cf8940a9..c2950976 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -39,6 +39,7 @@ use crate::chunk_tracker::ChunkTracker; use crate::session::TorrentId; use crate::spawn_utils::BlockingSpawner; use crate::storage::BoxStorageFactory; +use crate::storage::MemoryWatcherStorage; use crate::stream_connect::StreamConnector; use crate::torrent_state::stats::LiveStats; use crate::type_aliases::DiskWorkQueueSender; @@ -146,6 +147,8 @@ pub struct ManagedTorrent { pub shared: Arc, pub(crate) state_change_notify: Notify, pub(crate) locked: RwLock, + pub(crate) read_controller_send: tokio::sync::watch::Sender, + pub(crate) read_controller_recv: tokio::sync::watch::Receiver, } impl ManagedTorrent { @@ -321,6 +324,9 @@ impl ManagedTorrent { let span = self.shared().span.clone(); let token = cancellation_token.clone(); + let pause_read_rx = t.read_controller_recv.clone(); + let pause_write_rx = t.read_controller_send.clone(); + spawn_with_cancel( error_span!(parent: span.clone(), "initialize_and_start"), token.clone(), @@ -348,7 +354,13 @@ impl ManagedTorrent { } let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new(paused, tx, cancellation_token)?; + let live = TorrentStateLive::new( + paused, + tx, + cancellation_token, + pause_read_rx, + pause_write_rx, + )?; g.state = ManagedTorrentState::Live(live.clone()); drop(g); @@ -371,9 +383,18 @@ impl ManagedTorrent { Ok(()) } ManagedTorrentState::Paused(_) => { + let pause_read_rx = self.read_controller_recv.clone(); + let pause_read_tx = self.read_controller_send.clone(); + let paused = g.state.take().assert_paused(); let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new(paused, tx, cancellation_token.clone())?; + let live = TorrentStateLive::new( + paused, + tx, + cancellation_token.clone(), + pause_read_rx, + pause_read_tx, + )?; g.state = ManagedTorrentState::Live(live.clone()); drop(g); @@ -385,7 +406,9 @@ impl ManagedTorrent { let initializing = Arc::new(TorrentStateInitializing::new( self.shared.clone(), g.only_files.clone(), - self.shared.storage_factory.create_and_init(self.shared())?, + MemoryWatcherStorage::new( + self.shared.storage_factory.create_and_init(self.shared())?, + ), true, )); g.state = ManagedTorrentState::Initializing(initializing.clone()); diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 73fb9ea0..09e36ef4 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -16,7 +16,7 @@ use tokio::io::{AsyncRead, AsyncSeek}; use tracing::{debug, trace}; use crate::{ - file_info::FileInfo, spawn_utils::BlockingSpawner, storage::TorrentStorage, ManagedTorrent, + file_info::FileInfo, spawn_utils::BlockingSpawner, storage::TorrentStorage, ManagedTorrent, StreamOptions, }; use super::ManagedTorrentHandle; @@ -226,6 +226,12 @@ impl AsyncRead for FileStream { self.as_mut().advance(bytes_to_read as u64); tbuf.advance(bytes_to_read); + // Buffer has yet been filled with data: + // - Check if other streams actually need this piece + // - if not needed by other, ask underlying storage to delete the piece + // - on_piece_completed() could thus be replaced by delete_chunk() + // - update value in chunk tracker to say we do not have this piece anymore (or add a new BitV when removed pieces ?) + Poll::Ready(Ok(())) } } @@ -269,14 +275,14 @@ impl Drop for FileStream { } impl ManagedTorrent { - fn with_storage_and_file(&self, file_id: usize, f: F) -> anyhow::Result + pub fn with_storage_and_file(&self, file_id: usize, f: F) -> anyhow::Result where F: FnOnce(&dyn TorrentStorage, &FileInfo) -> R, { self.with_state(|s| { let files = match s { crate::ManagedTorrentState::Paused(p) => &*p.files, - crate::ManagedTorrentState::Live(l) => &*l.files, + crate::ManagedTorrentState::Live(l) => l.files.as_storage(), s => anyhow::bail!("with_storage_and_file: invalid state: {}", s.name()), }; let fi = self @@ -315,7 +321,7 @@ impl ManagedTorrent { .unwrap_or(false) } - pub fn stream(self: Arc, file_id: usize) -> anyhow::Result { + pub fn stream(self: Arc, file_id: usize, options: StreamOptions) -> anyhow::Result { let (fd_len, fd_offset) = self.with_storage_and_file(file_id, |_fd, fi| (fi.len, fi.offset_in_torrent))?; let streams = self.streams()?; diff --git a/crates/librqbit_core/src/magnet.rs b/crates/librqbit_core/src/magnet.rs index 22ca5aa4..429dceb8 100644 --- a/crates/librqbit_core/src/magnet.rs +++ b/crates/librqbit_core/src/magnet.rs @@ -90,11 +90,7 @@ impl Magnet { id20, id32, trackers, - select_only: if files.is_empty() { - None - } else { - Some(files) - }, + select_only: if files.is_empty() { None } else { Some(files) }, }), false => { anyhow::bail!("did not find infohash") diff --git a/crates/upnp/src/lib.rs b/crates/upnp/src/lib.rs index 7ff09aa1..156584d9 100644 --- a/crates/upnp/src/lib.rs +++ b/crates/upnp/src/lib.rs @@ -291,9 +291,8 @@ pub async fn discover_services(location: Url) -> anyhow::Result { trace!("received from {location}: {response}"); let root_desc: RootDesc = quick_xml::de::from_str(&response) .context("failed to parse response body as xml") - .map_err(|e| { + .inspect_err(|e| { debug!("failed to parse this XML: {response}"); - e })?; Ok(root_desc) }