From 608585eaf65a08df28fbeb82400647fd15f78a25 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Thu, 4 Apr 2024 17:12:43 +0100 Subject: [PATCH 1/5] chore: add new cargo dependency: crossbeam-skiplist It will be used to create a new torrent repository implementation that allos adding torrents in parallel. That could be potencially faster than BTreeMap in a write intensive context. --- Cargo.lock | 12 ++++++++++++ Cargo.toml | 5 +++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e77b5de6d..74d9aaafd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1027,6 +1027,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -3908,6 +3918,7 @@ dependencies = [ "clap", "colored", "config", + "crossbeam-skiplist", "derive_more", "fern", "futures", @@ -4013,6 +4024,7 @@ version = "3.0.0-alpha.12-develop" dependencies = [ "async-std", "criterion", + "crossbeam-skiplist", "futures", "rstest", "tokio", diff --git a/Cargo.toml b/Cargo.toml index d045b945a..8c1df0685 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ chrono = { version = "0", default-features = false, features = ["clock"] } clap = { version = "4", features = ["derive", "env"] } colored = "2" config = "0" +crossbeam-skiplist = "0.1" derive_more = "0" fern = "0" futures = "0" @@ -63,8 +64,8 @@ serde_json = "1" serde_repr = "0" thiserror = "1" tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] } -torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "packages/configuration" } torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "packages/clock" } +torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "packages/configuration" } torrust-tracker-contrib-bencode = { version = "3.0.0-alpha.12-develop", path = "contrib/bencode" } torrust-tracker-located-error = { version = "3.0.0-alpha.12-develop", path = "packages/located-error" } torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "packages/primitives" } @@ -105,4 +106,4 @@ opt-level = 3 [profile.release-debug] inherits = "release" -debug = true \ No newline at end of file +debug = true From 642d6be04ae968a2b170ede2c379183a792bd782 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Thu, 4 Apr 2024 17:14:43 +0100 Subject: [PATCH 2/5] feat: new torrent repository using crossbeam_skiplist::SkipMap SkipMap is an ordered map based on a lock-free skip list. It's al alternative to BTreeMap which supports concurrent access across multiple threads. One of the performance problems with the current solution is we can only add one torrent at the time because threads need to lock the whole BTreeMap. The SkipMap should avoid that problem. More info about SkiMap: https://docs.rs/crossbeam-skiplist/latest/crossbeam_skiplist/struct.SkipMap.html#method.remove The aquatic UDP load test was executed with the current implementation and the new one: Current Implementation: Requests out: 397287.37/second Responses in: 357549.15/second - Connect responses: 177073.94 - Announce responses: 176905.36 - Scrape responses: 3569.85 - Error responses: 0.00 Peers per announce response: 0.00 Announce responses per info hash: - p10: 1 - p25: 1 - p50: 1 - p75: 1 - p90: 2 - p95: 3 - p99: 104 - p99.9: 287 - p100: 371 New Implementation: Requests out: 396788.68/second Responses in: 357105.27/second - Connect responses: 176662.91 - Announce responses: 176863.44 - Scrape responses: 3578.91 - Error responses: 0.00 Peers per announce response: 0.00 Announce responses per info hash: - p10: 1 - p25: 1 - p50: 1 - p75: 1 - p90: 2 - p95: 3 - p99: 105 - p99.9: 287 - p100: 351 The result is pretty similar but the benchmarking for the repository using criterios shows that this implementations is a litle bit better than the current one. --- cSpell.json | 1 + packages/torrent-repository/Cargo.toml | 5 +- .../benches/repository_benchmark.rs | 21 +++- packages/torrent-repository/src/lib.rs | 3 + .../torrent-repository/src/repository/mod.rs | 1 + .../src/repository/skip_map_mutex_std.rs | 106 ++++++++++++++++++ src/core/torrent/mod.rs | 5 +- 7 files changed, 137 insertions(+), 5 deletions(-) create mode 100644 packages/torrent-repository/src/repository/skip_map_mutex_std.rs diff --git a/cSpell.json b/cSpell.json index 0ee2f8306..0480590af 100644 --- a/cSpell.json +++ b/cSpell.json @@ -135,6 +135,7 @@ "Shareaza", "sharktorrent", "SHLVL", + "skiplist", "socketaddr", "sqllite", "subsec", diff --git a/packages/torrent-repository/Cargo.toml b/packages/torrent-repository/Cargo.toml index 4cea8767f..5f1a20d32 100644 --- a/packages/torrent-repository/Cargo.toml +++ b/packages/torrent-repository/Cargo.toml @@ -16,11 +16,12 @@ rust-version.workspace = true version.workspace = true [dependencies] +crossbeam-skiplist = "0.1" futures = "0.3.29" tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] } -torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" } -torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" } torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock" } +torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" } +torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" } [dev-dependencies] criterion = { version = "0", features = ["async_tokio"] } diff --git a/packages/torrent-repository/benches/repository_benchmark.rs b/packages/torrent-repository/benches/repository_benchmark.rs index a3684c8e2..65608c86c 100644 --- a/packages/torrent-repository/benches/repository_benchmark.rs +++ b/packages/torrent-repository/benches/repository_benchmark.rs @@ -5,7 +5,7 @@ mod helpers; use criterion::{criterion_group, criterion_main, Criterion}; use torrust_tracker_torrent_repository::{ TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, - TorrentsRwLockTokioMutexTokio, + TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, }; use crate::helpers::{asyn, sync}; @@ -45,6 +45,10 @@ fn add_one_torrent(c: &mut Criterion) { .iter_custom(asyn::add_one_torrent::); }); + group.bench_function("SkipMapMutexStd", |b| { + b.iter_custom(sync::add_one_torrent::); + }); + group.finish(); } @@ -89,6 +93,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) { .iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapMutexStd", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); + }); + group.finish(); } @@ -133,6 +142,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) { .iter_custom(|iters| asyn::update_one_torrent_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapMutexStd", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); + }); + group.finish(); } @@ -178,6 +192,11 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) { }); }); + group.bench_function("SkipMapMutexStd", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::update_multiple_torrents_in_parallel::(&rt, iters, None)); + }); + group.finish(); } diff --git a/packages/torrent-repository/src/lib.rs b/packages/torrent-repository/src/lib.rs index 8bb1b6def..f7c19624c 100644 --- a/packages/torrent-repository/src/lib.rs +++ b/packages/torrent-repository/src/lib.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use repository::skip_map_mutex_std::CrossbeamSkipList; use torrust_tracker_clock::clock; pub mod entry; @@ -16,6 +17,8 @@ pub type TorrentsRwLockTokio = repository::RwLockTokio; pub type TorrentsRwLockTokioMutexStd = repository::RwLockTokio; pub type TorrentsRwLockTokioMutexTokio = repository::RwLockTokio; +pub type TorrentsSkipMapMutexStd = CrossbeamSkipList; + /// This code needs to be copied into each crate. /// Working version, for production. #[cfg(not(test))] diff --git a/packages/torrent-repository/src/repository/mod.rs b/packages/torrent-repository/src/repository/mod.rs index 494040c9d..7ede1f87a 100644 --- a/packages/torrent-repository/src/repository/mod.rs +++ b/packages/torrent-repository/src/repository/mod.rs @@ -11,6 +11,7 @@ pub mod rw_lock_std_mutex_tokio; pub mod rw_lock_tokio; pub mod rw_lock_tokio_mutex_std; pub mod rw_lock_tokio_mutex_tokio; +pub mod skip_map_mutex_std; use std::fmt::Debug; diff --git a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs new file mode 100644 index 000000000..aa1f43826 --- /dev/null +++ b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs @@ -0,0 +1,106 @@ +use std::collections::BTreeMap; +use std::sync::Arc; + +use crossbeam_skiplist::SkipMap; +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::info_hash::InfoHash; +use torrust_tracker_primitives::pagination::Pagination; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; + +use super::Repository; +use crate::entry::{Entry, EntrySync}; +use crate::{EntryMutexStd, EntrySingle}; + +#[derive(Default, Debug)] +pub struct CrossbeamSkipList { + torrents: SkipMap, +} + +impl Repository for CrossbeamSkipList +where + EntryMutexStd: EntrySync, + EntrySingle: Entry, +{ + fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); + entry.value().insert_or_update_peer_and_get_stats(peer) + } + + fn get(&self, key: &InfoHash) -> Option { + let maybe_entry = self.torrents.get(key); + maybe_entry.map(|entry| entry.value().clone()) + } + + fn get_metrics(&self) -> TorrentsMetrics { + let mut metrics = TorrentsMetrics::default(); + + for entry in &self.torrents { + let stats = entry.value().lock().expect("it should get a lock").get_stats(); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); + metrics.torrents += 1; + } + + metrics + } + + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { + match pagination { + Some(pagination) => self + .torrents + .iter() + .skip(pagination.offset as usize) + .take(pagination.limit as usize) + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + None => self + .torrents + .iter() + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + } + } + + fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { + for (info_hash, completed) in persistent_torrents { + if self.torrents.contains_key(info_hash) { + continue; + } + + let entry = EntryMutexStd::new( + EntrySingle { + peers: BTreeMap::default(), + downloaded: *completed, + } + .into(), + ); + + // Since SkipMap is lock-free the torrent could have been inserted + // after checking if it exists. + self.torrents.get_or_insert(*info_hash, entry); + } + } + + fn remove(&self, key: &InfoHash) -> Option { + self.torrents.remove(key).map(|entry| entry.value().clone()) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + for entry in &self.torrents { + entry.value().remove_inactive_peers(current_cutoff); + } + } + + fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { + for entry in &self.torrents { + if entry.value().is_good(policy) { + continue; + } + + entry.remove(); + } + } +} diff --git a/src/core/torrent/mod.rs b/src/core/torrent/mod.rs index ab78de683..5d42e8b4d 100644 --- a/src/core/torrent/mod.rs +++ b/src/core/torrent/mod.rs @@ -26,6 +26,7 @@ //! Peer that don not have a full copy of the torrent data are called "leechers". //! -use torrust_tracker_torrent_repository::TorrentsRwLockStdMutexStd; +use torrust_tracker_torrent_repository::TorrentsSkipMapMutexStd; -pub type Torrents = TorrentsRwLockStdMutexStd; // Currently Used +//pub type Torrents = TorrentsRwLockStdMutexStd; // Currently Used +pub type Torrents = TorrentsSkipMapMutexStd; // Currently Used From eec20247e146b0b822e86061943f4c899c93658f Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 5 Apr 2024 18:27:17 +0100 Subject: [PATCH 3/5] chore: ignore crossbeam-skiplist crate in cargo-machete It's been used in the src/packages/torrent-repository package. --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8c1df0685..f440799cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ url = "2" uuid = { version = "1", features = ["v4"] } [package.metadata.cargo-machete] -ignored = ["serde_bytes"] +ignored = ["serde_bytes", "crossbeam-skiplist"] [dev-dependencies] local-ip-address = "0" From 098928592f821cb2aa779746d6e40b71703c529b Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 5 Apr 2024 18:29:24 +0100 Subject: [PATCH 4/5] refactor: separate torrent repository trait from implementations There are now more implementations. --- packages/torrent-repository/src/lib.rs | 14 ++++---- .../torrent-repository/src/repository/mod.rs | 34 ------------------- .../src/repository/rw_lock_std.rs | 16 +++++++++ .../src/repository/rw_lock_tokio.rs | 18 ++++++++++ .../tests/repository/mod.rs | 3 +- src/core/torrent/mod.rs | 2 -- 6 files changed, 44 insertions(+), 43 deletions(-) diff --git a/packages/torrent-repository/src/lib.rs b/packages/torrent-repository/src/lib.rs index f7c19624c..ccaf579e3 100644 --- a/packages/torrent-repository/src/lib.rs +++ b/packages/torrent-repository/src/lib.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use repository::rw_lock_std::RwLockStd; +use repository::rw_lock_tokio::RwLockTokio; use repository::skip_map_mutex_std::CrossbeamSkipList; use torrust_tracker_clock::clock; @@ -10,12 +12,12 @@ pub type EntrySingle = entry::Torrent; pub type EntryMutexStd = Arc>; pub type EntryMutexTokio = Arc>; -pub type TorrentsRwLockStd = repository::RwLockStd; -pub type TorrentsRwLockStdMutexStd = repository::RwLockStd; -pub type TorrentsRwLockStdMutexTokio = repository::RwLockStd; -pub type TorrentsRwLockTokio = repository::RwLockTokio; -pub type TorrentsRwLockTokioMutexStd = repository::RwLockTokio; -pub type TorrentsRwLockTokioMutexTokio = repository::RwLockTokio; +pub type TorrentsRwLockStd = RwLockStd; +pub type TorrentsRwLockStdMutexStd = RwLockStd; +pub type TorrentsRwLockStdMutexTokio = RwLockStd; +pub type TorrentsRwLockTokio = RwLockTokio; +pub type TorrentsRwLockTokioMutexStd = RwLockTokio; +pub type TorrentsRwLockTokioMutexTokio = RwLockTokio; pub type TorrentsSkipMapMutexStd = CrossbeamSkipList; diff --git a/packages/torrent-repository/src/repository/mod.rs b/packages/torrent-repository/src/repository/mod.rs index 7ede1f87a..975a876d8 100644 --- a/packages/torrent-repository/src/repository/mod.rs +++ b/packages/torrent-repository/src/repository/mod.rs @@ -41,37 +41,3 @@ pub trait RepositoryAsync: Debug + Default + Sized + 'static { peer: &peer::Peer, ) -> impl std::future::Future + Send; } - -#[derive(Default, Debug)] -pub struct RwLockStd { - torrents: std::sync::RwLock>, -} - -#[derive(Default, Debug)] -pub struct RwLockTokio { - torrents: tokio::sync::RwLock>, -} - -impl RwLockStd { - /// # Panics - /// - /// Panics if unable to get a lock. - pub fn write( - &self, - ) -> std::sync::RwLockWriteGuard<'_, std::collections::BTreeMap> { - self.torrents.write().expect("it should get lock") - } -} - -impl RwLockTokio { - pub fn write( - &self, - ) -> impl std::future::Future< - Output = tokio::sync::RwLockWriteGuard< - '_, - std::collections::BTreeMap, - >, - > { - self.torrents.write() - } -} diff --git a/packages/torrent-repository/src/repository/rw_lock_std.rs b/packages/torrent-repository/src/repository/rw_lock_std.rs index 9d7f29416..e9074a271 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std.rs @@ -11,6 +11,22 @@ use super::Repository; use crate::entry::Entry; use crate::{EntrySingle, TorrentsRwLockStd}; +#[derive(Default, Debug)] +pub struct RwLockStd { + pub(crate) torrents: std::sync::RwLock>, +} + +impl RwLockStd { + /// # Panics + /// + /// Panics if unable to get a lock. + pub fn write( + &self, + ) -> std::sync::RwLockWriteGuard<'_, std::collections::BTreeMap> { + self.torrents.write().expect("it should get lock") + } +} + impl TorrentsRwLockStd { fn get_torrents<'a>(&'a self) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap> where diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio.rs index fa84e2451..d84074eaf 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio.rs @@ -11,6 +11,24 @@ use super::RepositoryAsync; use crate::entry::Entry; use crate::{EntrySingle, TorrentsRwLockTokio}; +#[derive(Default, Debug)] +pub struct RwLockTokio { + pub(crate) torrents: tokio::sync::RwLock>, +} + +impl RwLockTokio { + pub fn write( + &self, + ) -> impl std::future::Future< + Output = tokio::sync::RwLockWriteGuard< + '_, + std::collections::BTreeMap, + >, + > { + self.torrents.write() + } +} + impl TorrentsRwLockTokio { async fn get_torrents<'a>(&'a self) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap> where diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index 7ffe17dd7..117f3c0a6 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -8,7 +8,8 @@ use torrust_tracker_primitives::info_hash::InfoHash; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::{NumberOfBytes, PersistentTorrents}; use torrust_tracker_torrent_repository::entry::Entry as _; -use torrust_tracker_torrent_repository::repository::{RwLockStd, RwLockTokio}; +use torrust_tracker_torrent_repository::repository::rw_lock_std::RwLockStd; +use torrust_tracker_torrent_repository::repository::rw_lock_tokio::RwLockTokio; use torrust_tracker_torrent_repository::EntrySingle; use crate::common::repo::Repo; diff --git a/src/core/torrent/mod.rs b/src/core/torrent/mod.rs index 5d42e8b4d..286a7e047 100644 --- a/src/core/torrent/mod.rs +++ b/src/core/torrent/mod.rs @@ -25,8 +25,6 @@ //! - The number of peers that have NOT completed downloading the torrent and are still active, that means they are actively participating in the network. //! Peer that don not have a full copy of the torrent data are called "leechers". //! - use torrust_tracker_torrent_repository::TorrentsSkipMapMutexStd; -//pub type Torrents = TorrentsRwLockStdMutexStd; // Currently Used pub type Torrents = TorrentsSkipMapMutexStd; // Currently Used From 12f54e703e78af677fbd4456c359f580e83b1c44 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 8 Apr 2024 14:07:15 +0100 Subject: [PATCH 5/5] test: add tests for new torrent repository using SkipMap --- .../src/repository/skip_map_mutex_std.rs | 2 +- .../torrent-repository/tests/common/repo.rs | 165 +++++++++++------- .../tests/repository/mod.rs | 108 ++++++++++-- 3 files changed, 193 insertions(+), 82 deletions(-) diff --git a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs index aa1f43826..0c0127b15 100644 --- a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs @@ -15,7 +15,7 @@ use crate::{EntryMutexStd, EntrySingle}; #[derive(Default, Debug)] pub struct CrossbeamSkipList { - torrents: SkipMap, + pub torrents: SkipMap, } impl Repository for CrossbeamSkipList diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs index 3a4b53d2f..5a86aa3cf 100644 --- a/packages/torrent-repository/tests/common/repo.rs +++ b/packages/torrent-repository/tests/common/repo.rs @@ -7,49 +7,54 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _}; use torrust_tracker_torrent_repository::{ EntrySingle, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, - TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, + TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, }; #[derive(Debug)] pub(crate) enum Repo { - Std(TorrentsRwLockStd), - StdMutexStd(TorrentsRwLockStdMutexStd), - StdMutexTokio(TorrentsRwLockStdMutexTokio), - Tokio(TorrentsRwLockTokio), - TokioMutexStd(TorrentsRwLockTokioMutexStd), - TokioMutexTokio(TorrentsRwLockTokioMutexTokio), + RwLockStd(TorrentsRwLockStd), + RwLockStdMutexStd(TorrentsRwLockStdMutexStd), + RwLockStdMutexTokio(TorrentsRwLockStdMutexTokio), + RwLockTokio(TorrentsRwLockTokio), + RwLockTokioMutexStd(TorrentsRwLockTokioMutexStd), + RwLockTokioMutexTokio(TorrentsRwLockTokioMutexTokio), + SkipMapMutexStd(TorrentsSkipMapMutexStd), } impl Repo { pub(crate) async fn get(&self, key: &InfoHash) -> Option { match self { - Repo::Std(repo) => repo.get(key), - Repo::StdMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), - Repo::StdMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), - Repo::Tokio(repo) => repo.get(key).await, - Repo::TokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()), - Repo::TokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), + Repo::RwLockStd(repo) => repo.get(key), + Repo::RwLockStdMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), + Repo::RwLockStdMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), + Repo::RwLockTokio(repo) => repo.get(key).await, + Repo::RwLockTokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()), + Repo::RwLockTokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), + Repo::SkipMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), } } + pub(crate) async fn get_metrics(&self) -> TorrentsMetrics { match self { - Repo::Std(repo) => repo.get_metrics(), - Repo::StdMutexStd(repo) => repo.get_metrics(), - Repo::StdMutexTokio(repo) => repo.get_metrics().await, - Repo::Tokio(repo) => repo.get_metrics().await, - Repo::TokioMutexStd(repo) => repo.get_metrics().await, - Repo::TokioMutexTokio(repo) => repo.get_metrics().await, + Repo::RwLockStd(repo) => repo.get_metrics(), + Repo::RwLockStdMutexStd(repo) => repo.get_metrics(), + Repo::RwLockStdMutexTokio(repo) => repo.get_metrics().await, + Repo::RwLockTokio(repo) => repo.get_metrics().await, + Repo::RwLockTokioMutexStd(repo) => repo.get_metrics().await, + Repo::RwLockTokioMutexTokio(repo) => repo.get_metrics().await, + Repo::SkipMapMutexStd(repo) => repo.get_metrics(), } } + pub(crate) async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { match self { - Repo::Std(repo) => repo.get_paginated(pagination), - Repo::StdMutexStd(repo) => repo + Repo::RwLockStd(repo) => repo.get_paginated(pagination), + Repo::RwLockStdMutexStd(repo) => repo .get_paginated(pagination) .iter() .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) .collect(), - Repo::StdMutexTokio(repo) => { + Repo::RwLockStdMutexTokio(repo) => { let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; for (i, t) in repo.get_paginated(pagination).await { @@ -57,14 +62,14 @@ impl Repo { } v } - Repo::Tokio(repo) => repo.get_paginated(pagination).await, - Repo::TokioMutexStd(repo) => repo + Repo::RwLockTokio(repo) => repo.get_paginated(pagination).await, + Repo::RwLockTokioMutexStd(repo) => repo .get_paginated(pagination) .await .iter() .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) .collect(), - Repo::TokioMutexTokio(repo) => { + Repo::RwLockTokioMutexTokio(repo) => { let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; for (i, t) in repo.get_paginated(pagination).await { @@ -72,76 +77,102 @@ impl Repo { } v } + Repo::SkipMapMutexStd(repo) => repo + .get_paginated(pagination) + .iter() + .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) + .collect(), } } + pub(crate) async fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { match self { - Repo::Std(repo) => repo.import_persistent(persistent_torrents), - Repo::StdMutexStd(repo) => repo.import_persistent(persistent_torrents), - Repo::StdMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, - Repo::Tokio(repo) => repo.import_persistent(persistent_torrents).await, - Repo::TokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await, - Repo::TokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, + Repo::RwLockStd(repo) => repo.import_persistent(persistent_torrents), + Repo::RwLockStdMutexStd(repo) => repo.import_persistent(persistent_torrents), + Repo::RwLockStdMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, + Repo::RwLockTokio(repo) => repo.import_persistent(persistent_torrents).await, + Repo::RwLockTokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await, + Repo::RwLockTokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, + Repo::SkipMapMutexStd(repo) => repo.import_persistent(persistent_torrents), } } + pub(crate) async fn remove(&self, key: &InfoHash) -> Option { match self { - Repo::Std(repo) => repo.remove(key), - Repo::StdMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), - Repo::StdMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), - Repo::Tokio(repo) => repo.remove(key).await, - Repo::TokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()), - Repo::TokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), + Repo::RwLockStd(repo) => repo.remove(key), + Repo::RwLockStdMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), + Repo::RwLockStdMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), + Repo::RwLockTokio(repo) => repo.remove(key).await, + Repo::RwLockTokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()), + Repo::RwLockTokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), + Repo::SkipMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), } } + pub(crate) async fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { match self { - Repo::Std(repo) => repo.remove_inactive_peers(current_cutoff), - Repo::StdMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), - Repo::StdMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, - Repo::Tokio(repo) => repo.remove_inactive_peers(current_cutoff).await, - Repo::TokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await, - Repo::TokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::RwLockStd(repo) => repo.remove_inactive_peers(current_cutoff), + Repo::RwLockStdMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), + Repo::RwLockStdMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::RwLockTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::RwLockTokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::RwLockTokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::SkipMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), } } + pub(crate) async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { match self { - Repo::Std(repo) => repo.remove_peerless_torrents(policy), - Repo::StdMutexStd(repo) => repo.remove_peerless_torrents(policy), - Repo::StdMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, - Repo::Tokio(repo) => repo.remove_peerless_torrents(policy).await, - Repo::TokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await, - Repo::TokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, + Repo::RwLockStd(repo) => repo.remove_peerless_torrents(policy), + Repo::RwLockStdMutexStd(repo) => repo.remove_peerless_torrents(policy), + Repo::RwLockStdMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, + Repo::RwLockTokio(repo) => repo.remove_peerless_torrents(policy).await, + Repo::RwLockTokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await, + Repo::RwLockTokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, + Repo::SkipMapMutexStd(repo) => repo.remove_peerless_torrents(policy), } } + pub(crate) async fn update_torrent_with_peer_and_get_stats( &self, info_hash: &InfoHash, peer: &peer::Peer, ) -> (bool, SwarmMetadata) { match self { - Repo::Std(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), - Repo::StdMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), - Repo::StdMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::Tokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::TokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::TokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::RwLockStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), + Repo::RwLockStdMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), + Repo::RwLockStdMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::RwLockTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::RwLockTokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::RwLockTokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::SkipMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), } } + pub(crate) async fn insert(&self, info_hash: &InfoHash, torrent: EntrySingle) -> Option { match self { - Repo::Std(repo) => repo.write().insert(*info_hash, torrent), - Repo::StdMutexStd(repo) => Some(repo.write().insert(*info_hash, torrent.into())?.lock().unwrap().clone()), - Repo::StdMutexTokio(repo) => { - let r = repo.write().insert(*info_hash, torrent.into()); - match r { - Some(t) => Some(t.lock().await.clone()), - None => None, - } + Repo::RwLockStd(repo) => { + repo.write().insert(*info_hash, torrent); } - Repo::Tokio(repo) => repo.write().await.insert(*info_hash, torrent), - Repo::TokioMutexStd(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().unwrap().clone()), - Repo::TokioMutexTokio(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().await.clone()), - } + Repo::RwLockStdMutexStd(repo) => { + repo.write().insert(*info_hash, torrent.into()); + } + Repo::RwLockStdMutexTokio(repo) => { + repo.write().insert(*info_hash, torrent.into()); + } + Repo::RwLockTokio(repo) => { + repo.write().await.insert(*info_hash, torrent); + } + Repo::RwLockTokioMutexStd(repo) => { + repo.write().await.insert(*info_hash, torrent.into()); + } + Repo::RwLockTokioMutexTokio(repo) => { + repo.write().await.insert(*info_hash, torrent.into()); + } + Repo::SkipMapMutexStd(repo) => { + repo.torrents.insert(*info_hash, torrent.into()); + } + }; + self.get(info_hash).await } } diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index 117f3c0a6..ab9648584 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -10,6 +10,7 @@ use torrust_tracker_primitives::{NumberOfBytes, PersistentTorrents}; use torrust_tracker_torrent_repository::entry::Entry as _; use torrust_tracker_torrent_repository::repository::rw_lock_std::RwLockStd; use torrust_tracker_torrent_repository::repository::rw_lock_tokio::RwLockTokio; +use torrust_tracker_torrent_repository::repository::skip_map_mutex_std::CrossbeamSkipList; use torrust_tracker_torrent_repository::EntrySingle; use crate::common::repo::Repo; @@ -17,30 +18,37 @@ use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; #[fixture] fn standard() -> Repo { - Repo::Std(RwLockStd::default()) + Repo::RwLockStd(RwLockStd::default()) } + #[fixture] fn standard_mutex() -> Repo { - Repo::StdMutexStd(RwLockStd::default()) + Repo::RwLockStdMutexStd(RwLockStd::default()) } #[fixture] fn standard_tokio() -> Repo { - Repo::StdMutexTokio(RwLockStd::default()) + Repo::RwLockStdMutexTokio(RwLockStd::default()) } #[fixture] fn tokio_std() -> Repo { - Repo::Tokio(RwLockTokio::default()) + Repo::RwLockTokio(RwLockTokio::default()) } + #[fixture] fn tokio_mutex() -> Repo { - Repo::TokioMutexStd(RwLockTokio::default()) + Repo::RwLockTokioMutexStd(RwLockTokio::default()) } #[fixture] fn tokio_tokio() -> Repo { - Repo::TokioMutexTokio(RwLockTokio::default()) + Repo::RwLockTokioMutexTokio(RwLockTokio::default()) +} + +#[fixture] +fn skip_list_std() -> Repo { + Repo::SkipMapMutexStd(CrossbeamSkipList::default()) } type Entries = Vec<(InfoHash, EntrySingle)>; @@ -224,7 +232,16 @@ fn policy_remove_persist() -> TrackerPolicy { #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_a_torrent_entry( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, ) { make(&repo, &entries).await; @@ -247,7 +264,16 @@ async fn it_should_get_a_torrent_entry( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, many_out_of_order: Entries, ) { @@ -280,7 +306,16 @@ async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_paginated( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, #[values(paginated_limit_zero(), paginated_limit_one(), paginated_limit_one_offset_one())] paginated: Pagination, ) { @@ -328,7 +363,16 @@ async fn it_should_get_paginated( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_metrics( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, ) { use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; @@ -360,7 +404,16 @@ async fn it_should_get_metrics( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_import_persistent_torrents( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, #[values(persistent_empty(), persistent_single(), persistent_three())] persistent_torrents: PersistentTorrents, ) { @@ -389,7 +442,16 @@ async fn it_should_import_persistent_torrents( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_remove_an_entry( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, ) { make(&repo, &entries).await; @@ -416,7 +478,16 @@ async fn it_should_remove_an_entry( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_remove_inactive_peers( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, ) { use std::ops::Sub as _; @@ -489,7 +560,16 @@ async fn it_should_remove_inactive_peers( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_remove_peerless_torrents( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, ) {