From a223bae32428e473f0d0c36a2b56d3ef96f6415f Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Tue, 13 Dec 2022 01:22:48 -0800 Subject: [PATCH 01/12] Add ResizableSemaphore --- crates/subspace-networking/src/utils.rs | 122 ++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index 5b829aefb4..888276c153 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -7,6 +7,8 @@ use libp2p::multiaddr::Protocol; use libp2p::{Multiaddr, PeerId}; use std::marker::PhantomData; use std::num::NonZeroUsize; +use std::sync::Arc; +use tokio::sync::{Mutex, Notify}; use tracing::warn; /// This test is successful only for global IP addresses and DNS names. @@ -95,3 +97,123 @@ pub(crate) fn convert_multiaddresses(addresses: Vec) -> Vec, +} + +/// The semaphore permit. +#[derive(Clone)] +pub(crate) struct ResizableSemaphorePermit { + state: Arc, +} + +/// The state shared between the semaphore and the outstanding permits. +struct SemState { + /// The tuple holds (current usage, current max capacity) + current: Mutex<(usize, usize)>, + + /// To signal waiters for permits to be available. + notify: Notify, +} + +impl SemState { + fn new(capacity: usize) -> Self { + Self { + current: Mutex::new((0, capacity)), + notify: Notify::new(), + } + } + + // Allocates a permit if available. + // Returns true if a permit was allocated, false otherwise + async fn alloc_one(&self) -> bool { + let mut current = self.current.lock().await; // (usage, capacity) + if current.0 < current.1 { + current.0 += 1; + true + } else { + false + } + } + + // Returns a permit to the free pool, notifies waiters if needed. + async fn free_one(&self) { + let should_notify = { + let mut current = self.current.lock().await; // (usage, capacity) + assert!(current.0 > 0); + current.0 -= 1; + + // Notify only if usage fell below the current capacity. + // For example: if the previous capacity was 100, and current capacity + // is 50, this will wait for usage to fall below 50 before any waiters + // are notified. + current.0 < current.1 + }; + if should_notify { + self.notify.notify_waiters(); + } + } + + // Expands the max capacity by delta, and notifies any waiters of the newly available + // free permits. + async fn expand(&self, delta: usize) { + { + let mut current = self.current.lock().await; // (usage, capacity) + current.1 += delta; + } + self.notify.notify_waiters(); + } + + // Shrinks the max capacity by delta + async fn shrink(&self, delta: usize) { + let mut current = self.current.lock().await; // (usage, capacity) + assert!(current.1 > delta); + current.1 -= delta; + } +} + +impl ResizableSemaphore { + pub(crate) fn new(capacity: usize) -> Self { + Self { + state: Arc::new(SemState::new(capacity)), + } + } + + /// Acquires a permit. + pub(crate) async fn acquire(&self) -> ResizableSemaphorePermit { + loop { + if self.state.alloc_one().await { + break; + } + self.state.notify.notified().await; + } + ResizableSemaphorePermit { + state: self.state.clone(), + } + } + + /// Expands the capacity by specified amount. + pub(crate) async fn expand(&self, delta: usize) { + self.state.expand(delta).await; + } + + /// Shrinks the capacity by specified amount. + pub(crate) async fn shrink(&self, delta: usize) { + self.state.shrink(delta).await; + } +} + +impl Drop for ResizableSemaphorePermit { + fn drop(&mut self) { + let state = self.state.clone(); + tokio::spawn({ + async move { + state.free_one().await; + } + }); + } +} From 738fdf1530488694edb9d497cf22b25de2367f0e Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Tue, 13 Dec 2022 10:19:40 -0800 Subject: [PATCH 02/12] Clean ups --- crates/subspace-networking/src/utils.rs | 103 ++++++++++++++---------- 1 file changed, 59 insertions(+), 44 deletions(-) diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index 888276c153..ca81968dd2 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -98,32 +98,38 @@ pub(crate) fn convert_multiaddresses(addresses: Vec) -> Vec, -} +pub(crate) struct ResizableSemaphore(Arc); -/// The semaphore permit. +// The permit. #[derive(Clone)] -pub(crate) struct ResizableSemaphorePermit { - state: Arc, +pub(crate) struct ResizableSemaphorePermit(Arc); + +// The state shared between the semaphore and the outstanding permits. +struct SemShared { + // The tuple holds (current usage, current max capacity) + current: Mutex, + + // To signal waiters for permits to be available + notify: Notify, } -/// The state shared between the semaphore and the outstanding permits. +// Current state. +#[derive(Debug)] struct SemState { - /// The tuple holds (current usage, current max capacity) - current: Mutex<(usize, usize)>, + // The current capacity + capacity: usize, - /// To signal waiters for permits to be available. - notify: Notify, + // The current outstanding permits + usage: usize, } -impl SemState { +impl SemShared { fn new(capacity: usize) -> Self { Self { - current: Mutex::new((0, capacity)), + current: Mutex::new(SemState { capacity, usage: 0 }), notify: Notify::new(), } } @@ -131,27 +137,33 @@ impl SemState { // Allocates a permit if available. // Returns true if a permit was allocated, false otherwise async fn alloc_one(&self) -> bool { - let mut current = self.current.lock().await; // (usage, capacity) - if current.0 < current.1 { - current.0 += 1; + let mut current = self.current.lock().await; + if current.usage < current.capacity { + current.usage += 1; true } else { false } } - // Returns a permit to the free pool, notifies waiters if needed. + // Returns a permit back to the free pool, notifies waiters if needed. async fn free_one(&self) { let should_notify = { - let mut current = self.current.lock().await; // (usage, capacity) - assert!(current.0 > 0); - current.0 -= 1; + let mut current = self.current.lock().await; + if current.usage > 0 { + current.usage -= 1; + } else { + panic!( + "SemShared::free_one(): invalid free, current = {:?}", + current + ); + } // Notify only if usage fell below the current capacity. // For example: if the previous capacity was 100, and current capacity // is 50, this will wait for usage to fall below 50 before any waiters // are notified. - current.0 < current.1 + current.usage < current.capacity }; if should_notify { self.notify.notify_waiters(); @@ -162,54 +174,57 @@ impl SemState { // free permits. async fn expand(&self, delta: usize) { { - let mut current = self.current.lock().await; // (usage, capacity) - current.1 += delta; + let mut current = self.current.lock().await; + current.capacity += delta; } self.notify.notify_waiters(); } // Shrinks the max capacity by delta - async fn shrink(&self, delta: usize) { - let mut current = self.current.lock().await; // (usage, capacity) - assert!(current.1 > delta); - current.1 -= delta; + async fn shrink(&self, delta: usize) -> Result<(), String> { + let mut current = self.current.lock().await; + if current.capacity > delta { + current.capacity -= delta; + Ok(()) + } else { + Err(format!( + "SemShared::::shrink(): invalid delta = {}, current = {:?}", + delta, current + )) + } } } impl ResizableSemaphore { pub(crate) fn new(capacity: usize) -> Self { - Self { - state: Arc::new(SemState::new(capacity)), - } + Self(Arc::new(SemShared::new(capacity))) } - /// Acquires a permit. + // Acquires a permit. pub(crate) async fn acquire(&self) -> ResizableSemaphorePermit { loop { - if self.state.alloc_one().await { + if self.0.alloc_one().await { break; } - self.state.notify.notified().await; - } - ResizableSemaphorePermit { - state: self.state.clone(), + self.0.notify.notified().await; } + ResizableSemaphorePermit(self.0.clone()) } - /// Expands the capacity by specified amount. + // Expands the capacity by the specified amount. pub(crate) async fn expand(&self, delta: usize) { - self.state.expand(delta).await; + self.0.expand(delta).await; } - /// Shrinks the capacity by specified amount. - pub(crate) async fn shrink(&self, delta: usize) { - self.state.shrink(delta).await; + // Shrinks the capacity by the specified amount. + pub(crate) async fn shrink(&self, delta: usize) -> Result<(), String> { + self.0.shrink(delta).await } } impl Drop for ResizableSemaphorePermit { fn drop(&mut self) { - let state = self.state.clone(); + let state = self.0.clone(); tokio::spawn({ async move { state.free_one().await; From c5d1006733405151b8c32e7dfcf933e91269cb87 Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Tue, 13 Dec 2022 11:03:45 -0800 Subject: [PATCH 03/12] Add unit tests --- crates/subspace-networking/src/utils.rs | 12 ++- crates/subspace-networking/src/utils/tests.rs | 73 ++++++++++++++++++- 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index ca81968dd2..b2761f5a76 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -200,7 +200,7 @@ impl ResizableSemaphore { Self(Arc::new(SemShared::new(capacity))) } - // Acquires a permit. + // Acquires a permit. Waits until a permit is available. pub(crate) async fn acquire(&self) -> ResizableSemaphorePermit { loop { if self.0.alloc_one().await { @@ -211,6 +211,16 @@ impl ResizableSemaphore { ResizableSemaphorePermit(self.0.clone()) } + // Acquires a permit, does not wait if no permits are available. + // Currently used only for testing. + pub(crate) async fn try_acquire(&self) -> Option { + if self.0.alloc_one().await { + Some(ResizableSemaphorePermit(self.0.clone())) + } else { + None + } + } + // Expands the capacity by the specified amount. pub(crate) async fn expand(&self, delta: usize) { self.0.expand(delta).await; diff --git a/crates/subspace-networking/src/utils/tests.rs b/crates/subspace-networking/src/utils/tests.rs index b65544c0e6..256608adb3 100644 --- a/crates/subspace-networking/src/utils/tests.rs +++ b/crates/subspace-networking/src/utils/tests.rs @@ -1,4 +1,4 @@ -use super::CollectionBatcher; +use super::{CollectionBatcher, ResizableSemaphore, ResizableSemaphorePermit}; use std::num::NonZeroUsize; #[test] @@ -60,3 +60,74 @@ fn test_batching() { assert_eq!(batcher.next_batch(collection.clone()), vec![3, 4, 5, 6]); assert_eq!(batcher.next_batch(collection), vec![7, 1, 2, 3]); } + +#[tokio::test(flavor = "multi_thread")] +async fn test_resizable_semaphore_alloc() { + // Capacity = 3. We should be able to alloc only three permits. + let sem = ResizableSemaphore::new(3); + let (_permit_1, _permit_2, _permit_3) = ( + sem.try_acquire().await.unwrap(), + sem.try_acquire().await.unwrap(), + sem.try_acquire().await.unwrap(), + ); + assert!(sem.try_acquire().await.is_none()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_resizable_semaphore_expand() { + // Initial capacity = 3. + let sem = ResizableSemaphore::new(3); + let (_permit_1, _permit_2, _permit_3) = ( + sem.try_acquire().await.unwrap(), + sem.try_acquire().await.unwrap(), + sem.try_acquire().await.unwrap(), + ); + assert!(sem.try_acquire().await.is_none()); + + // Increase capacity of semaphore by 2, we should be able to alloc two more permits. + sem.expand(2).await; + let (_permit_4, _permit_5) = ( + sem.try_acquire().await.unwrap(), + sem.try_acquire().await.unwrap(), + ); + assert!(sem.try_acquire().await.is_none()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_resizable_semaphore_shrink() { + async fn free_permit(permit: ResizableSemaphorePermit) { + tokio::spawn({ + async move { + std::mem::drop(permit); + } + }) + .await + .unwrap(); + } + + // Initial capacity = 4, alloc 4 outstanding permits. + let sem = ResizableSemaphore::new(4); + let permit_1 = sem.try_acquire().await.unwrap(); + let permit_2 = sem.try_acquire().await.unwrap(); + let permit_3 = sem.try_acquire().await.unwrap(); + let _permit_4 = sem.try_acquire().await.unwrap(); + assert!(sem.try_acquire().await.is_none()); + + // Shrink the capacity by 2, new capacity = 2. + assert!(sem.shrink(2).await.is_ok()); + + // Alloc should fail as outstanding permits(4) >= capacity(2). + assert!(sem.try_acquire().await.is_none()); + + // Free a permit, alloc should fail as outstanding permits(3) >= capacity(2). + free_permit(permit_2).await; + assert!(sem.try_acquire().await.is_none()); + + // Free another permit, alloc should fail as outstanding permits(2) >= capacity(2). + free_permit(permit_3).await; + assert!(sem.try_acquire().await.is_none()); + + // Free another permit, alloc should succeed as outstanding permits(1) < capacity(2). + free_permit(permit_1).await; + assert!(sem.try_acquire().await.is_some()); +} From c8b25a1f5bbe0d224d918b6b10d922fa15df7f38 Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Tue, 13 Dec 2022 11:49:46 -0800 Subject: [PATCH 04/12] Use the sem changes --- crates/subspace-networking/src/create.rs | 111 +++------------- .../subspace-networking/src/create/tests.rs | 119 ------------------ crates/subspace-networking/src/node.rs | 72 +++-------- crates/subspace-networking/src/node_runner.rs | 21 ++++ crates/subspace-networking/src/shared.rs | 12 +- crates/subspace-networking/src/utils.rs | 6 +- 6 files changed, 68 insertions(+), 273 deletions(-) delete mode 100644 crates/subspace-networking/src/create/tests.rs diff --git a/crates/subspace-networking/src/create.rs b/crates/subspace-networking/src/create.rs index af74c8e967..63d9f104b6 100644 --- a/crates/subspace-networking/src/create.rs +++ b/crates/subspace-networking/src/create.rs @@ -1,6 +1,3 @@ -#[cfg(test)] -mod tests; - pub use crate::behavior::custom_record_store::ValueGetter; use crate::behavior::custom_record_store::{ CustomRecordStore, MemoryProviderStorage, NoRecordStorage, @@ -11,7 +8,7 @@ use crate::node::{CircuitRelayClientError, Node}; use crate::node_runner::{NodeRunner, NodeRunnerConfig}; use crate::request_responses::RequestHandler; use crate::shared::Shared; -use crate::utils::convert_multiaddresses; +use crate::utils::{convert_multiaddresses, ResizableSemaphore}; use crate::BootstrappedNetworkingParameters; use futures::channel::mpsc; use libp2p::core::muxing::StreamMuxerBox; @@ -32,13 +29,11 @@ use libp2p::websocket::WsConfig; use libp2p::yamux::YamuxConfig; use libp2p::{core, identity, noise, Multiaddr, PeerId, Transport, TransportError}; use std::num::NonZeroUsize; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use std::time::Duration; use std::{fmt, io}; use subspace_core_primitives::{crypto, PIECE_SIZE}; use thiserror::Error; -use tokio::sync::Semaphore; use tracing::{error, info}; const KADEMLIA_PROTOCOL: &[u8] = b"/subspace/kad/0.1.0"; @@ -74,7 +69,7 @@ const KADEMLIA_BASE_CONCURRENT_TASKS: usize = 30; /// Above base limit will be boosted by specified number for every peer connected starting with /// second peer, such that it scaled with network connectivity, but the exact coefficient might need /// to be tweaked in the future. -const KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 1; +pub(crate) const KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 1; /// Base limit for number of any concurrent tasks except Kademlia. /// /// We configure total number of streams per connection to 256. Here we assume half of them might be @@ -86,60 +81,7 @@ const REGULAR_BASE_CONCURRENT_TASKS: usize = 120 - KADEMLIA_BASE_CONCURRENT_TASK /// Above base limit will be boosted by specified number for every peer connected starting with /// second peer, such that it scaled with network connectivity, but the exact coefficient might need /// to be tweaked in the future. -const REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 2; -const SEMAPHORE_MAINTENANCE_INTERVAL: Duration = Duration::from_secs(5); - -async fn maintain_semaphore_permits_capacity( - semaphore: &Semaphore, - interval: Duration, - connected_peers_count_weak: Weak, - boost_per_peer: usize, -) { - let base_permits = semaphore.available_permits(); - // Total permits technically supported by semaphore - let mut total_permits = base_permits; - // Some permits might be reserved due to number of peers decreasing and will be released back if - // necessary, this is because semaphore supports increasing number of - let mut reserved_permits = Vec::new(); - loop { - let connected_peers_count = match connected_peers_count_weak.upgrade() { - Some(connected_peers_count) => connected_peers_count.load(Ordering::Relaxed), - None => { - return; - } - }; - let expected_total_permits = - base_permits + connected_peers_count.saturating_sub(1) * boost_per_peer; - - // Release reserves to match expected number of permits if necessary - while total_permits < expected_total_permits && !reserved_permits.is_empty() { - reserved_permits.pop(); - total_permits += 1; - } - // If reserved permits were not sufficient, add permits to the semaphore directly. - if total_permits < expected_total_permits { - semaphore.add_permits(expected_total_permits - total_permits); - total_permits = expected_total_permits; - } - // Peers disconnected and expected number of permits went down, we need to put some into - // reserve - if total_permits > expected_total_permits { - let to_reserve = total_permits - expected_total_permits; - reserved_permits.reserve(to_reserve); - for _ in 0..to_reserve { - reserved_permits.push( - semaphore - .acquire() - .await - .expect("We never close a semaphore; qed"), - ); - } - total_permits = expected_total_permits; - } - - tokio::time::sleep(interval).await; - } -} +pub(crate) const REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 2; /// Defines relay configuration for the Node #[derive(Clone, Debug)] @@ -362,41 +304,19 @@ where // Create final structs let (command_sender, command_receiver) = mpsc::channel(1); - let shared = Arc::new(Shared::new(local_peer_id, command_sender)); + let kademlia_tasks_semaphore = + Arc::new(ResizableSemaphore::new(KADEMLIA_BASE_CONCURRENT_TASKS)); + let regular_tasks_semaphore = + Arc::new(ResizableSemaphore::new(REGULAR_BASE_CONCURRENT_TASKS)); + + let shared = Arc::new(Shared::new( + local_peer_id, + command_sender, + kademlia_tasks_semaphore.clone(), + regular_tasks_semaphore.clone(), + )); let shared_weak = Arc::downgrade(&shared); - let kademlia_tasks_semaphore = Arc::new(Semaphore::new(KADEMLIA_BASE_CONCURRENT_TASKS)); - let regular_tasks_semaphore = Arc::new(Semaphore::new(REGULAR_BASE_CONCURRENT_TASKS)); - - tokio::spawn({ - let kademlia_tasks_semaphore = Arc::clone(&kademlia_tasks_semaphore); - let connected_peers_count_weak = Arc::downgrade(&shared.connected_peers_count); - - async move { - maintain_semaphore_permits_capacity( - &kademlia_tasks_semaphore, - SEMAPHORE_MAINTENANCE_INTERVAL, - connected_peers_count_weak, - KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER, - ) - .await; - } - }); - tokio::spawn({ - let regular_tasks_semaphore = Arc::clone(®ular_tasks_semaphore); - let connected_peers_count_weak = Arc::downgrade(&shared.connected_peers_count); - - async move { - maintain_semaphore_permits_capacity( - ®ular_tasks_semaphore, - SEMAPHORE_MAINTENANCE_INTERVAL, - connected_peers_count_weak, - REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER, - ) - .await; - } - }); - let node = Node::new(shared, kademlia_tasks_semaphore, regular_tasks_semaphore); let node_runner = NodeRunner::::new(NodeRunnerConfig:: { allow_non_global_addresses_in_dht, @@ -408,6 +328,7 @@ where reserved_peers: convert_multiaddresses(reserved_peers).into_iter().collect(), max_established_incoming_connections, max_established_outgoing_connections, + metrics, }); diff --git a/crates/subspace-networking/src/create/tests.rs b/crates/subspace-networking/src/create/tests.rs deleted file mode 100644 index dc3e772e2d..0000000000 --- a/crates/subspace-networking/src/create/tests.rs +++ /dev/null @@ -1,119 +0,0 @@ -use futures::future::{select, Either}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::Semaphore; -use tokio::time::sleep; - -#[tokio::test] -async fn maintain_semaphore_permits_capacity() { - let base_tasks = 2; - let boost_per_peer = 1; - let interval = Duration::from_micros(1); - let connected_peers_count = Arc::new(AtomicUsize::new(0)); - let tasks_semaphore = Arc::new(Semaphore::new(base_tasks)); - - tokio::spawn({ - let tasks_semaphore = Arc::clone(&tasks_semaphore); - let connected_peers_count_weak = Arc::downgrade(&connected_peers_count); - - async move { - super::maintain_semaphore_permits_capacity( - &tasks_semaphore, - interval, - connected_peers_count_weak, - boost_per_peer, - ) - .await; - } - }); - - let timeout = Duration::from_millis(100); - - // Let above function time to run at least one loop - sleep(timeout).await; - - let permit_1_result = select( - Box::pin(tasks_semaphore.acquire()), - Box::pin(sleep(timeout)), - ) - .await; - if !matches!(permit_1_result, Either::Left(_)) { - panic!("Must be able to acquire the permit"); - } - - let permit_2_result = select( - Box::pin(tasks_semaphore.acquire()), - Box::pin(sleep(timeout)), - ) - .await; - if !matches!(permit_2_result, Either::Left(_)) { - panic!("Must be able to acquire the second permit"); - } - - { - let permit_3_result = select( - Box::pin(tasks_semaphore.acquire()), - Box::pin(sleep(timeout)), - ) - .await; - if !matches!(permit_3_result, Either::Right(_)) { - panic!("Must not be able to acquire the third permit due to capacity"); - } - } - - // Increase capacity - connected_peers_count.fetch_add(1, Ordering::SeqCst); - - { - let permit_3_result = select( - Box::pin(tasks_semaphore.acquire()), - Box::pin(sleep(timeout)), - ) - .await; - if !matches!(permit_3_result, Either::Right(_)) { - panic!("Must not be able to acquire the third permit due to capacity"); - } - } - - // Increase capacity more - connected_peers_count.fetch_add(1, Ordering::SeqCst); - - let permit_3_result = select( - Box::pin(tasks_semaphore.acquire()), - Box::pin(sleep(timeout)), - ) - .await; - if !matches!(permit_3_result, Either::Left(_)) { - panic!("Must be able to acquire the third permit due to increased capacity"); - } - - { - let permit_4_result = select( - Box::pin(tasks_semaphore.acquire()), - Box::pin(sleep(timeout)), - ) - .await; - if !matches!(permit_4_result, Either::Right(_)) { - panic!("Must not be able to acquire the fourth permit due to capacity"); - } - } - - // Decrease capacity capacity - connected_peers_count.fetch_sub(1, Ordering::SeqCst); - - drop(permit_3_result); - - sleep(timeout).await; - - { - let permit_3_result = select( - Box::pin(tasks_semaphore.acquire()), - Box::pin(sleep(timeout)), - ) - .await; - if !matches!(permit_3_result, Either::Right(_)) { - panic!("Must not be able to acquire the third permit again due to capacity anymore"); - } - } -} diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index 947f4bcbc0..1975c1ddb5 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -1,6 +1,7 @@ use crate::request_handlers::generic_request_handler::GenericRequest; use crate::request_responses; use crate::shared::{Command, CreatedSubscription, Shared}; +use crate::utils::{ResizableSemaphore, ResizableSemaphorePermit}; use bytes::Bytes; use event_listener_primitives::HandlerId; use futures::channel::mpsc::SendError; @@ -16,7 +17,6 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; use thiserror::Error; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::sleep; use tracing::{error, trace}; @@ -25,11 +25,11 @@ struct WrapperWithPermits { #[pin] inner: T, // Just holding onto permit while data structure is not dropped - _permit: OwnedSemaphorePermit, + _permit: ResizableSemaphorePermit, } impl WrapperWithPermits { - fn new(inner: T, permit: OwnedSemaphorePermit) -> Self { + fn new(inner: T, permit: ResizableSemaphorePermit) -> Self { Self { inner, _permit: permit, @@ -57,7 +57,7 @@ pub struct TopicSubscription { command_sender: Option>, #[pin] receiver: mpsc::UnboundedReceiver, - _permit: OwnedSemaphorePermit, + _permit: ResizableSemaphorePermit, } impl Stream for TopicSubscription { @@ -285,15 +285,15 @@ impl From for CircuitRelayClientError { #[must_use = "Node doesn't do anything if dropped"] pub struct Node { shared: Arc, - kademlia_tasks_semaphore: Arc, - regular_tasks_semaphore: Arc, + kademlia_tasks_semaphore: Arc, + regular_tasks_semaphore: Arc, } impl Node { pub(crate) fn new( shared: Arc, - kademlia_tasks_semaphore: Arc, - regular_tasks_semaphore: Arc, + kademlia_tasks_semaphore: Arc, + regular_tasks_semaphore: Arc, ) -> Self { Self { shared, @@ -311,12 +311,7 @@ impl Node { &self, key: Multihash, ) -> Result>, GetValueError> { - let permit = self - .kademlia_tasks_semaphore - .clone() - .acquire_owned() - .await - .expect("We never close a semaphore; qed"); + let permit = self.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); self.shared @@ -334,12 +329,7 @@ impl Node { key: Multihash, value: Vec, ) -> Result, PutValueError> { - let permit = self - .kademlia_tasks_semaphore - .clone() - .acquire_owned() - .await - .expect("We never close a semaphore; qed"); + let permit = self.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); self.shared @@ -357,12 +347,7 @@ impl Node { } pub async fn subscribe(&self, topic: Sha256Topic) -> Result { - let permit = self - .regular_tasks_semaphore - .clone() - .acquire_owned() - .await - .expect("We never close a semaphore; qed"); + let permit = self.regular_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = oneshot::channel(); self.shared @@ -389,12 +374,7 @@ impl Node { } pub async fn publish(&self, topic: Sha256Topic, message: Vec) -> Result<(), PublishError> { - let _permit = self - .regular_tasks_semaphore - .clone() - .acquire_owned() - .await - .expect("We never close a semaphore; qed"); + let _permit = self.regular_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = oneshot::channel(); self.shared @@ -419,12 +399,7 @@ impl Node { where Request: GenericRequest, { - let _permit = self - .regular_tasks_semaphore - .clone() - .acquire_owned() - .await - .expect("We never close a semaphore; qed"); + let _permit = self.regular_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = oneshot::channel(); let command = Command::GenericRequest { peer_id, @@ -445,12 +420,7 @@ impl Node { &self, key: Multihash, ) -> Result, GetClosestPeersError> { - let permit = self - .kademlia_tasks_semaphore - .clone() - .acquire_owned() - .await - .expect("We never close a semaphore; qed"); + let permit = self.kademlia_tasks_semaphore.acquire().await; trace!(?key, "Starting 'GetClosestPeers' request."); let (result_sender, result_receiver) = mpsc::unbounded(); @@ -499,12 +469,7 @@ impl Node { &self, key: Multihash, ) -> Result, AnnounceError> { - let permit = self - .kademlia_tasks_semaphore - .clone() - .acquire_owned() - .await - .expect("We never close a semaphore; qed"); + let permit = self.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); trace!(?key, "Starting 'start_announcing' request."); @@ -542,12 +507,7 @@ impl Node { &self, key: Multihash, ) -> Result, GetProvidersError> { - let permit = self - .kademlia_tasks_semaphore - .clone() - .acquire_owned() - .await - .expect("We never close a semaphore; qed"); + let permit = self.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); trace!(?key, "Starting 'get_providers' request."); diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index ba3af1131c..6e73a2e7ea 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -1,6 +1,9 @@ use crate::behavior::custom_record_store::CustomRecordStore; use crate::behavior::persistent_parameters::NetworkingParametersRegistry; use crate::behavior::{Behavior, Event}; +use crate::create::{ + KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER, REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER, +}; use crate::request_responses::{Event as RequestResponseEvent, IfDisconnected}; use crate::shared::{Command, CreatedSubscription, Shared}; use crate::utils; @@ -305,6 +308,14 @@ where debug!(%peer_id, %is_reserved_peer, "Connection established [{num_established} from peer]"); shared.connected_peers_count.fetch_add(1, Ordering::SeqCst); + shared + .kademlia_tasks_semaphore + .expand(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER) + .await; + shared + .regular_tasks_semaphore + .expand(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER) + .await; let (in_connections_number, out_connections_number) = { let network_info = self.swarm.network_info(); @@ -365,6 +376,16 @@ where debug!("Connection closed with peer {peer_id} [{num_established} from peer]"); shared.connected_peers_count.fetch_sub(1, Ordering::SeqCst); + shared + .kademlia_tasks_semaphore + .shrink(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER) + .await + .expect("Failed to shrink kademlia_tasks_semaphore"); + shared + .regular_tasks_semaphore + .shrink(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER) + .await + .expect("Failed to shrink regular_tasks_semaphore"); } SwarmEvent::OutgoingConnectionError { peer_id, error } => match error { DialError::Transport(ref addresses) => { diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index 3f61fb8cfc..a71c02933b 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -2,6 +2,7 @@ //! queries, subscriptions, various events and shared information. use crate::request_responses::RequestFailure; +use crate::utils::ResizableSemaphore; use bytes::Bytes; use event_listener_primitives::Bag; use futures::channel::{mpsc, oneshot}; @@ -87,16 +88,25 @@ pub(crate) struct Shared { pub(crate) connected_peers_count: Arc, /// Sender end of the channel for sending commands to the swarm. pub(crate) command_sender: mpsc::Sender, + pub(crate) kademlia_tasks_semaphore: Arc, + pub(crate) regular_tasks_semaphore: Arc, } impl Shared { - pub(crate) fn new(id: PeerId, command_sender: mpsc::Sender) -> Self { + pub(crate) fn new( + id: PeerId, + command_sender: mpsc::Sender, + kademlia_tasks_semaphore: Arc, + regular_tasks_semaphore: Arc, + ) -> Self { Self { handlers: Handlers::default(), id, listeners: Mutex::default(), connected_peers_count: Arc::new(AtomicUsize::new(0)), command_sender, + kademlia_tasks_semaphore, + regular_tasks_semaphore, } } } diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index b2761f5a76..7b7443dc68 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -100,14 +100,15 @@ pub(crate) fn convert_multiaddresses(addresses: Vec) -> Vec); // The permit. -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) struct ResizableSemaphorePermit(Arc); // The state shared between the semaphore and the outstanding permits. +#[derive(Debug)] struct SemShared { // The tuple holds (current usage, current max capacity) current: Mutex, @@ -213,6 +214,7 @@ impl ResizableSemaphore { // Acquires a permit, does not wait if no permits are available. // Currently used only for testing. + #[cfg(test)] pub(crate) async fn try_acquire(&self) -> Option { if self.0.alloc_one().await { Some(ResizableSemaphorePermit(self.0.clone())) From 268ead3b83915ff9cd05aa823f6fcec1f7b5292d Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Tue, 13 Dec 2022 13:55:46 -0800 Subject: [PATCH 05/12] Clean up --- crates/subspace-networking/src/node_runner.rs | 3 --- crates/subspace-networking/src/shared.rs | 3 --- 2 files changed, 6 deletions(-) diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 6e73a2e7ea..a1a02c63a4 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -28,7 +28,6 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::pin::Pin; -use std::sync::atomic::Ordering; use std::sync::Weak; use std::time::Duration; use tokio::time::Sleep; @@ -307,7 +306,6 @@ where let is_reserved_peer = self.reserved_peers.contains_key(&peer_id); debug!(%peer_id, %is_reserved_peer, "Connection established [{num_established} from peer]"); - shared.connected_peers_count.fetch_add(1, Ordering::SeqCst); shared .kademlia_tasks_semaphore .expand(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER) @@ -375,7 +373,6 @@ where }; debug!("Connection closed with peer {peer_id} [{num_established} from peer]"); - shared.connected_peers_count.fetch_sub(1, Ordering::SeqCst); shared .kademlia_tasks_semaphore .shrink(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER) diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index a71c02933b..b65ce87974 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -11,7 +11,6 @@ use libp2p::gossipsub::error::{PublishError, SubscriptionError}; use libp2p::gossipsub::Sha256Topic; use libp2p::{Multiaddr, PeerId}; use parking_lot::Mutex; -use std::sync::atomic::AtomicUsize; use std::sync::Arc; #[derive(Debug)] @@ -85,7 +84,6 @@ pub(crate) struct Shared { pub(crate) id: PeerId, /// Addresses on which node is listening for incoming requests. pub(crate) listeners: Mutex>, - pub(crate) connected_peers_count: Arc, /// Sender end of the channel for sending commands to the swarm. pub(crate) command_sender: mpsc::Sender, pub(crate) kademlia_tasks_semaphore: Arc, @@ -103,7 +101,6 @@ impl Shared { handlers: Handlers::default(), id, listeners: Mutex::default(), - connected_peers_count: Arc::new(AtomicUsize::new(0)), command_sender, kademlia_tasks_semaphore, regular_tasks_semaphore, From b1cdedd439c8fe9f1a618e3591d3955275b77cdc Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Fri, 16 Dec 2022 18:20:44 -0800 Subject: [PATCH 06/12] Fix race --- crates/subspace-networking/src/utils.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index 7b7443dc68..1a29ad46a1 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -8,6 +8,7 @@ use libp2p::{Multiaddr, PeerId}; use std::marker::PhantomData; use std::num::NonZeroUsize; use std::sync::Arc; +use tokio::sync::futures::Notified; use tokio::sync::{Mutex, Notify}; use tracing::warn; @@ -136,14 +137,16 @@ impl SemShared { } // Allocates a permit if available. - // Returns true if a permit was allocated, false otherwise - async fn alloc_one(&self) -> bool { + // Returns Ok(()) if a permit was allocated, Err(Notified) otherwise. The caller + // is responsible for waiting for the notification to be signalled when a permit + // becomes available eventually. + async fn alloc_one(&self) -> Result<(), Notified> { let mut current = self.current.lock().await; if current.usage < current.capacity { current.usage += 1; - true + Ok(()) } else { - false + Err(self.notify.notified()) } } @@ -204,10 +207,10 @@ impl ResizableSemaphore { // Acquires a permit. Waits until a permit is available. pub(crate) async fn acquire(&self) -> ResizableSemaphorePermit { loop { - if self.0.alloc_one().await { - break; + match self.0.alloc_one().await { + Ok(()) => break, + Err(notified) => notified.await, } - self.0.notify.notified().await; } ResizableSemaphorePermit(self.0.clone()) } From 7b164887089cb8dc544f3e0fc04baf55be81bf29 Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Fri, 16 Dec 2022 18:39:32 -0800 Subject: [PATCH 07/12] Remove async lock --- crates/subspace-networking/src/node_runner.rs | 8 +- crates/subspace-networking/src/utils.rs | 54 +++++++------ crates/subspace-networking/src/utils/tests.rs | 75 ++++++++----------- 3 files changed, 64 insertions(+), 73 deletions(-) diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index a1a02c63a4..21babbcec2 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -308,12 +308,10 @@ where shared .kademlia_tasks_semaphore - .expand(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER) - .await; + .expand(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER); shared .regular_tasks_semaphore - .expand(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER) - .await; + .expand(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER); let (in_connections_number, out_connections_number) = { let network_info = self.swarm.network_info(); @@ -376,12 +374,10 @@ where shared .kademlia_tasks_semaphore .shrink(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER) - .await .expect("Failed to shrink kademlia_tasks_semaphore"); shared .regular_tasks_semaphore .shrink(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER) - .await .expect("Failed to shrink regular_tasks_semaphore"); } SwarmEvent::OutgoingConnectionError { peer_id, error } => match error { diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index 1a29ad46a1..fb5a05fbfc 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -7,9 +7,9 @@ use libp2p::multiaddr::Protocol; use libp2p::{Multiaddr, PeerId}; use std::marker::PhantomData; use std::num::NonZeroUsize; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tokio::sync::futures::Notified; -use tokio::sync::{Mutex, Notify}; +use tokio::sync::Notify; use tracing::warn; /// This test is successful only for global IP addresses and DNS names. @@ -140,8 +140,8 @@ impl SemShared { // Returns Ok(()) if a permit was allocated, Err(Notified) otherwise. The caller // is responsible for waiting for the notification to be signalled when a permit // becomes available eventually. - async fn alloc_one(&self) -> Result<(), Notified> { - let mut current = self.current.lock().await; + fn alloc_one(&self) -> Result<(), Notified> { + let mut current = self.current.lock().unwrap(); if current.usage < current.capacity { current.usage += 1; Ok(()) @@ -150,10 +150,23 @@ impl SemShared { } } + // Allocates a permit if available. + // Returns bool if permit was available, false otherwise + #[cfg(test)] + fn try_alloc_one(&self) -> bool { + let mut current = self.current.lock().unwrap(); + if current.usage < current.capacity { + current.usage += 1; + true + } else { + false + } + } + // Returns a permit back to the free pool, notifies waiters if needed. - async fn free_one(&self) { + fn free_one(&self) { let should_notify = { - let mut current = self.current.lock().await; + let mut current = self.current.lock().unwrap(); if current.usage > 0 { current.usage -= 1; } else { @@ -176,17 +189,17 @@ impl SemShared { // Expands the max capacity by delta, and notifies any waiters of the newly available // free permits. - async fn expand(&self, delta: usize) { + fn expand(&self, delta: usize) { { - let mut current = self.current.lock().await; + let mut current = self.current.lock().unwrap(); current.capacity += delta; } self.notify.notify_waiters(); } // Shrinks the max capacity by delta - async fn shrink(&self, delta: usize) -> Result<(), String> { - let mut current = self.current.lock().await; + fn shrink(&self, delta: usize) -> Result<(), String> { + let mut current = self.current.lock().unwrap(); if current.capacity > delta { current.capacity -= delta; Ok(()) @@ -207,7 +220,7 @@ impl ResizableSemaphore { // Acquires a permit. Waits until a permit is available. pub(crate) async fn acquire(&self) -> ResizableSemaphorePermit { loop { - match self.0.alloc_one().await { + match self.0.alloc_one() { Ok(()) => break, Err(notified) => notified.await, } @@ -218,8 +231,8 @@ impl ResizableSemaphore { // Acquires a permit, does not wait if no permits are available. // Currently used only for testing. #[cfg(test)] - pub(crate) async fn try_acquire(&self) -> Option { - if self.0.alloc_one().await { + pub(crate) fn try_acquire(&self) -> Option { + if self.0.try_alloc_one() { Some(ResizableSemaphorePermit(self.0.clone())) } else { None @@ -227,23 +240,18 @@ impl ResizableSemaphore { } // Expands the capacity by the specified amount. - pub(crate) async fn expand(&self, delta: usize) { - self.0.expand(delta).await; + pub(crate) fn expand(&self, delta: usize) { + self.0.expand(delta); } // Shrinks the capacity by the specified amount. - pub(crate) async fn shrink(&self, delta: usize) -> Result<(), String> { - self.0.shrink(delta).await + pub(crate) fn shrink(&self, delta: usize) -> Result<(), String> { + self.0.shrink(delta) } } impl Drop for ResizableSemaphorePermit { fn drop(&mut self) { - let state = self.0.clone(); - tokio::spawn({ - async move { - state.free_one().await; - } - }); + self.0.free_one() } } diff --git a/crates/subspace-networking/src/utils/tests.rs b/crates/subspace-networking/src/utils/tests.rs index 256608adb3..b0595f6b19 100644 --- a/crates/subspace-networking/src/utils/tests.rs +++ b/crates/subspace-networking/src/utils/tests.rs @@ -1,4 +1,4 @@ -use super::{CollectionBatcher, ResizableSemaphore, ResizableSemaphorePermit}; +use super::{CollectionBatcher, ResizableSemaphore}; use std::num::NonZeroUsize; #[test] @@ -61,73 +61,60 @@ fn test_batching() { assert_eq!(batcher.next_batch(collection), vec![7, 1, 2, 3]); } -#[tokio::test(flavor = "multi_thread")] -async fn test_resizable_semaphore_alloc() { +#[test] +fn test_resizable_semaphore_alloc() { // Capacity = 3. We should be able to alloc only three permits. let sem = ResizableSemaphore::new(3); let (_permit_1, _permit_2, _permit_3) = ( - sem.try_acquire().await.unwrap(), - sem.try_acquire().await.unwrap(), - sem.try_acquire().await.unwrap(), + sem.try_acquire().unwrap(), + sem.try_acquire().unwrap(), + sem.try_acquire().unwrap(), ); - assert!(sem.try_acquire().await.is_none()); + assert!(sem.try_acquire().is_none()); } -#[tokio::test(flavor = "multi_thread")] -async fn test_resizable_semaphore_expand() { +#[test] +fn test_resizable_semaphore_expand() { // Initial capacity = 3. let sem = ResizableSemaphore::new(3); let (_permit_1, _permit_2, _permit_3) = ( - sem.try_acquire().await.unwrap(), - sem.try_acquire().await.unwrap(), - sem.try_acquire().await.unwrap(), + sem.try_acquire().unwrap(), + sem.try_acquire().unwrap(), + sem.try_acquire().unwrap(), ); - assert!(sem.try_acquire().await.is_none()); + assert!(sem.try_acquire().is_none()); // Increase capacity of semaphore by 2, we should be able to alloc two more permits. - sem.expand(2).await; - let (_permit_4, _permit_5) = ( - sem.try_acquire().await.unwrap(), - sem.try_acquire().await.unwrap(), - ); - assert!(sem.try_acquire().await.is_none()); + sem.expand(2); + let (_permit_4, _permit_5) = (sem.try_acquire().unwrap(), sem.try_acquire().unwrap()); + assert!(sem.try_acquire().is_none()); } -#[tokio::test(flavor = "multi_thread")] -async fn test_resizable_semaphore_shrink() { - async fn free_permit(permit: ResizableSemaphorePermit) { - tokio::spawn({ - async move { - std::mem::drop(permit); - } - }) - .await - .unwrap(); - } - +#[test] +fn test_resizable_semaphore_shrink() { // Initial capacity = 4, alloc 4 outstanding permits. let sem = ResizableSemaphore::new(4); - let permit_1 = sem.try_acquire().await.unwrap(); - let permit_2 = sem.try_acquire().await.unwrap(); - let permit_3 = sem.try_acquire().await.unwrap(); - let _permit_4 = sem.try_acquire().await.unwrap(); - assert!(sem.try_acquire().await.is_none()); + let permit_1 = sem.try_acquire().unwrap(); + let permit_2 = sem.try_acquire().unwrap(); + let permit_3 = sem.try_acquire().unwrap(); + let _permit_4 = sem.try_acquire().unwrap(); + assert!(sem.try_acquire().is_none()); // Shrink the capacity by 2, new capacity = 2. - assert!(sem.shrink(2).await.is_ok()); + assert!(sem.shrink(2).is_ok()); // Alloc should fail as outstanding permits(4) >= capacity(2). - assert!(sem.try_acquire().await.is_none()); + assert!(sem.try_acquire().is_none()); // Free a permit, alloc should fail as outstanding permits(3) >= capacity(2). - free_permit(permit_2).await; - assert!(sem.try_acquire().await.is_none()); + std::mem::drop(permit_2); + assert!(sem.try_acquire().is_none()); // Free another permit, alloc should fail as outstanding permits(2) >= capacity(2). - free_permit(permit_3).await; - assert!(sem.try_acquire().await.is_none()); + std::mem::drop(permit_3); + assert!(sem.try_acquire().is_none()); // Free another permit, alloc should succeed as outstanding permits(1) < capacity(2). - free_permit(permit_1).await; - assert!(sem.try_acquire().await.is_some()); + std::mem::drop(permit_1); + assert!(sem.try_acquire().is_some()); } From a782156a021ced65e30216f8f90884a0185ca865 Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Mon, 19 Dec 2022 20:22:57 -0800 Subject: [PATCH 08/12] Restructure --- crates/subspace-networking/src/node_runner.rs | 6 +- crates/subspace-networking/src/utils.rs | 145 ++++++++---------- crates/subspace-networking/src/utils/tests.rs | 2 +- 3 files changed, 69 insertions(+), 84 deletions(-) diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 4d1873fac5..79062a9d7e 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -383,12 +383,10 @@ where shared .kademlia_tasks_semaphore - .shrink(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER) - .expect("Failed to shrink kademlia_tasks_semaphore"); + .shrink(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER); shared .regular_tasks_semaphore - .shrink(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER) - .expect("Failed to shrink regular_tasks_semaphore"); + .shrink(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER); } SwarmEvent::OutgoingConnectionError { peer_id, error } => match error { DialError::Transport(ref addresses) => { diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index fb5a05fbfc..66e76dbd63 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -8,7 +8,6 @@ use libp2p::{Multiaddr, PeerId}; use std::marker::PhantomData; use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; -use tokio::sync::futures::Notified; use tokio::sync::Notify; use tracing::warn; @@ -112,7 +111,7 @@ pub(crate) struct ResizableSemaphorePermit(Arc); #[derive(Debug)] struct SemShared { // The tuple holds (current usage, current max capacity) - current: Mutex, + state: Mutex, // To signal waiters for permits to be available notify: Notify, @@ -128,111 +127,93 @@ struct SemState { usage: usize, } -impl SemShared { - fn new(capacity: usize) -> Self { - Self { - current: Mutex::new(SemState { capacity, usage: 0 }), - notify: Notify::new(), - } - } - +impl SemState { // Allocates a permit if available. - // Returns Ok(()) if a permit was allocated, Err(Notified) otherwise. The caller - // is responsible for waiting for the notification to be signalled when a permit - // becomes available eventually. - fn alloc_one(&self) -> Result<(), Notified> { - let mut current = self.current.lock().unwrap(); - if current.usage < current.capacity { - current.usage += 1; - Ok(()) + // Returns true if allocated, false otherwise. + fn alloc_one(&mut self) -> bool { + if self.usage < self.capacity { + self.usage += 1; + true } else { - Err(self.notify.notified()) + false } } - // Allocates a permit if available. - // Returns bool if permit was available, false otherwise - #[cfg(test)] - fn try_alloc_one(&self) -> bool { - let mut current = self.current.lock().unwrap(); - if current.usage < current.capacity { - current.usage += 1; - true + // Returns a free permit to the free pool. + // Returns true if any waiters need to be notified. + fn free_one(&mut self) -> bool { + let prev_is_full = self.is_full(); + if let Some(dec) = self.usage.checked_sub(1) { + self.usage = dec; } else { - false + panic!("SemState::free_one(): invalid free, state = {:?}", self); } + + // Notify if we did a full -> available transition. + prev_is_full && !self.is_full() } - // Returns a permit back to the free pool, notifies waiters if needed. - fn free_one(&self) { - let should_notify = { - let mut current = self.current.lock().unwrap(); - if current.usage > 0 { - current.usage -= 1; - } else { - panic!( - "SemShared::free_one(): invalid free, current = {:?}", - current - ); - } + // Expands the max capacity by delta. + // Returns true if any waiters need to be notified. + fn expand(&mut self, delta: usize) -> bool { + let prev_is_full = self.is_full(); + self.capacity += delta; - // Notify only if usage fell below the current capacity. - // For example: if the previous capacity was 100, and current capacity - // is 50, this will wait for usage to fall below 50 before any waiters - // are notified. - current.usage < current.capacity - }; - if should_notify { - self.notify.notify_waiters(); - } + // Notify if we did a full -> available transition. + prev_is_full && !self.is_full() } - // Expands the max capacity by delta, and notifies any waiters of the newly available - // free permits. - fn expand(&self, delta: usize) { - { - let mut current = self.current.lock().unwrap(); - current.capacity += delta; + // Shrinks the max capacity by delta. + fn shrink(&mut self, delta: usize) { + if let Some(dec) = self.capacity.checked_sub(delta) { + self.capacity = dec; + } else { + panic!("SemState::shrink(): invalid shrink, state = {:?}", self); } - self.notify.notify_waiters(); } - // Shrinks the max capacity by delta - fn shrink(&self, delta: usize) -> Result<(), String> { - let mut current = self.current.lock().unwrap(); - if current.capacity > delta { - current.capacity -= delta; - Ok(()) - } else { - Err(format!( - "SemShared::::shrink(): invalid delta = {}, current = {:?}", - delta, current - )) - } + // Returns true if current usage exceeds capacity + fn is_full(&self) -> bool { + self.usage >= self.capacity } } impl ResizableSemaphore { pub(crate) fn new(capacity: usize) -> Self { - Self(Arc::new(SemShared::new(capacity))) + let shared = SemShared { + state: Mutex::new(SemState { capacity, usage: 0 }), + notify: Notify::new(), + }; + Self(Arc::new(shared)) } // Acquires a permit. Waits until a permit is available. pub(crate) async fn acquire(&self) -> ResizableSemaphorePermit { loop { - match self.0.alloc_one() { - Ok(()) => break, - Err(notified) => notified.await, + let wait = { + let mut state = self.0.state.lock().unwrap(); + if state.alloc_one() { + None + } else { + // This needs to be done under the lock to avoid race. + Some(self.0.notify.notified()) + } + }; + + match wait { + Some(notified) => notified.await, + None => break, } } ResizableSemaphorePermit(self.0.clone()) } - // Acquires a permit, does not wait if no permits are available. - // Currently used only for testing. + // Acquires a permit, doesn't wait for permits to be available. + // Currently used only for tests. #[cfg(test)] pub(crate) fn try_acquire(&self) -> Option { - if self.0.try_alloc_one() { + let mut state = self.0.state.lock().unwrap(); + if state.alloc_one() { Some(ResizableSemaphorePermit(self.0.clone())) } else { None @@ -241,17 +222,23 @@ impl ResizableSemaphore { // Expands the capacity by the specified amount. pub(crate) fn expand(&self, delta: usize) { - self.0.expand(delta); + let notify_waiters = self.0.state.lock().unwrap().expand(delta); + if notify_waiters { + self.0.notify.notify_waiters(); + } } // Shrinks the capacity by the specified amount. - pub(crate) fn shrink(&self, delta: usize) -> Result<(), String> { - self.0.shrink(delta) + pub(crate) fn shrink(&self, delta: usize) { + self.0.state.lock().unwrap().shrink(delta) } } impl Drop for ResizableSemaphorePermit { fn drop(&mut self) { - self.0.free_one() + let notify_waiters = self.0.state.lock().unwrap().free_one(); + if notify_waiters { + self.0.notify.notify_waiters(); + } } } diff --git a/crates/subspace-networking/src/utils/tests.rs b/crates/subspace-networking/src/utils/tests.rs index b0595f6b19..6684510918 100644 --- a/crates/subspace-networking/src/utils/tests.rs +++ b/crates/subspace-networking/src/utils/tests.rs @@ -101,7 +101,7 @@ fn test_resizable_semaphore_shrink() { assert!(sem.try_acquire().is_none()); // Shrink the capacity by 2, new capacity = 2. - assert!(sem.shrink(2).is_ok()); + sem.shrink(2); // Alloc should fail as outstanding permits(4) >= capacity(2). assert!(sem.try_acquire().is_none()); From ddd5eaf37b2b7de7dc6ab154b2cf66e9d407f064 Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Mon, 19 Dec 2022 20:58:15 -0800 Subject: [PATCH 09/12] Address comments --- crates/subspace-networking/src/create.rs | 7 ++----- crates/subspace-networking/src/node.rs | 8 ++++---- crates/subspace-networking/src/shared.rs | 8 ++++---- crates/subspace-networking/src/utils.rs | 18 +++++++++--------- crates/subspace-networking/src/utils/tests.rs | 19 ++++++++----------- 5 files changed, 27 insertions(+), 33 deletions(-) diff --git a/crates/subspace-networking/src/create.rs b/crates/subspace-networking/src/create.rs index 63d9f104b6..55ef18adfd 100644 --- a/crates/subspace-networking/src/create.rs +++ b/crates/subspace-networking/src/create.rs @@ -304,10 +304,8 @@ where // Create final structs let (command_sender, command_receiver) = mpsc::channel(1); - let kademlia_tasks_semaphore = - Arc::new(ResizableSemaphore::new(KADEMLIA_BASE_CONCURRENT_TASKS)); - let regular_tasks_semaphore = - Arc::new(ResizableSemaphore::new(REGULAR_BASE_CONCURRENT_TASKS)); + let kademlia_tasks_semaphore = ResizableSemaphore::new(KADEMLIA_BASE_CONCURRENT_TASKS); + let regular_tasks_semaphore = ResizableSemaphore::new(REGULAR_BASE_CONCURRENT_TASKS); let shared = Arc::new(Shared::new( local_peer_id, @@ -328,7 +326,6 @@ where reserved_peers: convert_multiaddresses(reserved_peers).into_iter().collect(), max_established_incoming_connections, max_established_outgoing_connections, - metrics, }); diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index 34f6f448a3..071c8bb628 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -257,15 +257,15 @@ impl From for CircuitRelayClientError { #[must_use = "Node doesn't do anything if dropped"] pub struct Node { shared: Arc, - kademlia_tasks_semaphore: Arc, - regular_tasks_semaphore: Arc, + kademlia_tasks_semaphore: ResizableSemaphore, + regular_tasks_semaphore: ResizableSemaphore, } impl Node { pub(crate) fn new( shared: Arc, - kademlia_tasks_semaphore: Arc, - regular_tasks_semaphore: Arc, + kademlia_tasks_semaphore: ResizableSemaphore, + regular_tasks_semaphore: ResizableSemaphore, ) -> Self { Self { shared, diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index cf7e10b262..8523931fd7 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -91,16 +91,16 @@ pub(crate) struct Shared { pub(crate) listeners: Mutex>, /// Sender end of the channel for sending commands to the swarm. pub(crate) command_sender: mpsc::Sender, - pub(crate) kademlia_tasks_semaphore: Arc, - pub(crate) regular_tasks_semaphore: Arc, + pub(crate) kademlia_tasks_semaphore: ResizableSemaphore, + pub(crate) regular_tasks_semaphore: ResizableSemaphore, } impl Shared { pub(crate) fn new( id: PeerId, command_sender: mpsc::Sender, - kademlia_tasks_semaphore: Arc, - regular_tasks_semaphore: Arc, + kademlia_tasks_semaphore: ResizableSemaphore, + regular_tasks_semaphore: ResizableSemaphore, ) -> Self { Self { handlers: Handlers::default(), diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index 66e76dbd63..8c3008c9f6 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -98,32 +98,32 @@ pub(crate) fn convert_multiaddresses(addresses: Vec) -> Vec); -// The permit. +/// The permit. #[derive(Clone, Debug)] pub(crate) struct ResizableSemaphorePermit(Arc); -// The state shared between the semaphore and the outstanding permits. +/// The state shared between the semaphore and the outstanding permits. #[derive(Debug)] struct SemShared { - // The tuple holds (current usage, current max capacity) + /// The tuple holds (current usage, current max capacity) state: Mutex, - // To signal waiters for permits to be available + /// To signal waiters for permits to be available notify: Notify, } -// Current state. +/// The semaphore state. #[derive(Debug)] struct SemState { - // The current capacity + /// The current capacity capacity: usize, - // The current outstanding permits + /// The current outstanding permits usage: usize, } diff --git a/crates/subspace-networking/src/utils/tests.rs b/crates/subspace-networking/src/utils/tests.rs index 6684510918..13a50aca93 100644 --- a/crates/subspace-networking/src/utils/tests.rs +++ b/crates/subspace-networking/src/utils/tests.rs @@ -65,11 +65,9 @@ fn test_batching() { fn test_resizable_semaphore_alloc() { // Capacity = 3. We should be able to alloc only three permits. let sem = ResizableSemaphore::new(3); - let (_permit_1, _permit_2, _permit_3) = ( - sem.try_acquire().unwrap(), - sem.try_acquire().unwrap(), - sem.try_acquire().unwrap(), - ); + let _permit_1 = sem.try_acquire().unwrap(); + let _permit_2 = sem.try_acquire().unwrap(); + let _permit_3 = sem.try_acquire().unwrap(); assert!(sem.try_acquire().is_none()); } @@ -77,16 +75,15 @@ fn test_resizable_semaphore_alloc() { fn test_resizable_semaphore_expand() { // Initial capacity = 3. let sem = ResizableSemaphore::new(3); - let (_permit_1, _permit_2, _permit_3) = ( - sem.try_acquire().unwrap(), - sem.try_acquire().unwrap(), - sem.try_acquire().unwrap(), - ); + let _permit_1 = sem.try_acquire().unwrap(); + let _permit_2 = sem.try_acquire().unwrap(); + let _permit_3 = sem.try_acquire().unwrap(); assert!(sem.try_acquire().is_none()); // Increase capacity of semaphore by 2, we should be able to alloc two more permits. sem.expand(2); - let (_permit_4, _permit_5) = (sem.try_acquire().unwrap(), sem.try_acquire().unwrap()); + let _permit_4 = sem.try_acquire().unwrap(); + let _permit_5 = sem.try_acquire().unwrap(); assert!(sem.try_acquire().is_none()); } From d96b65f18d83a9e715cbddf9310606e6f840e8a0 Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Mon, 26 Dec 2022 20:26:49 -0800 Subject: [PATCH 10/12] Bring back threshold for bumping quota --- crates/subspace-networking/src/node_runner.rs | 42 +++++++++++++------ crates/subspace-networking/src/shared.rs | 3 ++ 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 79062a9d7e..e5fc6d76f3 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -27,12 +27,20 @@ use nohash_hasher::IntMap; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; +use std::num::NonZeroUsize; use std::pin::Pin; +use std::sync::atomic::Ordering; use std::sync::Weak; use std::time::Duration; use tokio::time::Sleep; use tracing::{debug, error, trace, warn}; +/// How many peers should node be connected to before boosting turns on. +/// +/// 1 means boosting starts with second peer. +const CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD: NonZeroUsize = + NonZeroUsize::new(5).expect("Not zero; qed"); + enum QueryResultSender { Value { sender: mpsc::UnboundedSender>, @@ -316,12 +324,17 @@ where let is_reserved_peer = self.reserved_peers.contains_key(&peer_id); debug!(%peer_id, %is_reserved_peer, "Connection established [{num_established} from peer]"); - shared - .kademlia_tasks_semaphore - .expand(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER); - shared - .regular_tasks_semaphore - .expand(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER); + if shared.connected_peers_count.fetch_add(1, Ordering::SeqCst) + >= CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD.get() + { + // The peer count exceeded the threshold, bump up the quota. + shared + .kademlia_tasks_semaphore + .expand(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER); + shared + .regular_tasks_semaphore + .expand(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER); + } let (in_connections_number, out_connections_number) = { let network_info = self.swarm.network_info(); @@ -381,12 +394,17 @@ where }; debug!("Connection closed with peer {peer_id} [{num_established} from peer]"); - shared - .kademlia_tasks_semaphore - .shrink(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER); - shared - .regular_tasks_semaphore - .shrink(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER); + if shared.connected_peers_count.fetch_sub(1, Ordering::SeqCst) + > CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD.get() + { + // The previous peer count was over the threshold, reclaim the quota. + shared + .kademlia_tasks_semaphore + .shrink(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER); + shared + .regular_tasks_semaphore + .shrink(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER); + } } SwarmEvent::OutgoingConnectionError { peer_id, error } => match error { DialError::Transport(ref addresses) => { diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index 8523931fd7..7a9be91a0b 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -11,6 +11,7 @@ use libp2p::gossipsub::error::{PublishError, SubscriptionError}; use libp2p::gossipsub::Sha256Topic; use libp2p::{Multiaddr, PeerId}; use parking_lot::Mutex; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; #[derive(Debug)] @@ -89,6 +90,7 @@ pub(crate) struct Shared { pub(crate) id: PeerId, /// Addresses on which node is listening for incoming requests. pub(crate) listeners: Mutex>, + pub(crate) connected_peers_count: Arc, /// Sender end of the channel for sending commands to the swarm. pub(crate) command_sender: mpsc::Sender, pub(crate) kademlia_tasks_semaphore: ResizableSemaphore, @@ -106,6 +108,7 @@ impl Shared { handlers: Handlers::default(), id, listeners: Mutex::default(), + connected_peers_count: Arc::new(AtomicUsize::new(0)), command_sender, kademlia_tasks_semaphore, regular_tasks_semaphore, From 66af1d24417d3e3ddd205a39a786e99fcbd4bbcc Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Tue, 27 Dec 2022 09:51:41 -0800 Subject: [PATCH 11/12] Address comments --- crates/subspace-networking/src/create.rs | 7 +++- crates/subspace-networking/src/utils.rs | 38 ++++++++++--------- crates/subspace-networking/src/utils/tests.rs | 6 +-- 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/crates/subspace-networking/src/create.rs b/crates/subspace-networking/src/create.rs index 55ef18adfd..b2c6fe520f 100644 --- a/crates/subspace-networking/src/create.rs +++ b/crates/subspace-networking/src/create.rs @@ -65,7 +65,8 @@ const YAMUX_MAX_STREAMS: usize = 256; /// /// We restrict this so we don't exceed number of incoming streams for single peer, but this value /// will be boosted depending on number of connected peers. -const KADEMLIA_BASE_CONCURRENT_TASKS: usize = 30; +const KADEMLIA_BASE_CONCURRENT_TASKS: NonZeroUsize = + NonZeroUsize::new(30).expect("KADEMLIA_BASE_CONCURRENT_TASKS is 0"); /// Above base limit will be boosted by specified number for every peer connected starting with /// second peer, such that it scaled with network connectivity, but the exact coefficient might need /// to be tweaked in the future. @@ -77,7 +78,9 @@ pub(crate) const KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 1; /// /// We restrict this so we don't exceed number of streams for single peer, but this value will be /// boosted depending on number of connected peers. -const REGULAR_BASE_CONCURRENT_TASKS: usize = 120 - KADEMLIA_BASE_CONCURRENT_TASKS; +const REGULAR_BASE_CONCURRENT_TASKS: NonZeroUsize = + NonZeroUsize::new(120 - KADEMLIA_BASE_CONCURRENT_TASKS.get()) + .expect("REGULAR_BASE_CONCURRENT_TASKS is 0"); /// Above base limit will be boosted by specified number for every peer connected starting with /// second peer, such that it scaled with network connectivity, but the exact coefficient might need /// to be tweaked in the future. diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index 8c3008c9f6..a62d7e183c 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -5,9 +5,10 @@ mod tests; use libp2p::multiaddr::Protocol; use libp2p::{Multiaddr, PeerId}; +use parking_lot::Mutex; use std::marker::PhantomData; use std::num::NonZeroUsize; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use tokio::sync::Notify; use tracing::warn; @@ -98,15 +99,6 @@ pub(crate) fn convert_multiaddresses(addresses: Vec) -> Vec); - -/// The permit. -#[derive(Clone, Debug)] -pub(crate) struct ResizableSemaphorePermit(Arc); - /// The state shared between the semaphore and the outstanding permits. #[derive(Debug)] struct SemShared { @@ -178,10 +170,18 @@ impl SemState { } } +/// Semaphore like implementation that allows both shrinking and expanding +/// the max permits. +#[derive(Clone, Debug)] +pub(crate) struct ResizableSemaphore(Arc); + impl ResizableSemaphore { - pub(crate) fn new(capacity: usize) -> Self { + pub(crate) fn new(capacity: NonZeroUsize) -> Self { let shared = SemShared { - state: Mutex::new(SemState { capacity, usage: 0 }), + state: Mutex::new(SemState { + capacity: capacity.get(), + usage: 0, + }), notify: Notify::new(), }; Self(Arc::new(shared)) @@ -191,7 +191,7 @@ impl ResizableSemaphore { pub(crate) async fn acquire(&self) -> ResizableSemaphorePermit { loop { let wait = { - let mut state = self.0.state.lock().unwrap(); + let mut state = self.0.state.lock(); if state.alloc_one() { None } else { @@ -212,7 +212,7 @@ impl ResizableSemaphore { // Currently used only for tests. #[cfg(test)] pub(crate) fn try_acquire(&self) -> Option { - let mut state = self.0.state.lock().unwrap(); + let mut state = self.0.state.lock(); if state.alloc_one() { Some(ResizableSemaphorePermit(self.0.clone())) } else { @@ -222,7 +222,7 @@ impl ResizableSemaphore { // Expands the capacity by the specified amount. pub(crate) fn expand(&self, delta: usize) { - let notify_waiters = self.0.state.lock().unwrap().expand(delta); + let notify_waiters = self.0.state.lock().expand(delta); if notify_waiters { self.0.notify.notify_waiters(); } @@ -230,13 +230,17 @@ impl ResizableSemaphore { // Shrinks the capacity by the specified amount. pub(crate) fn shrink(&self, delta: usize) { - self.0.state.lock().unwrap().shrink(delta) + self.0.state.lock().shrink(delta) } } +/// The semaphore permit. +#[derive(Clone, Debug)] +pub(crate) struct ResizableSemaphorePermit(Arc); + impl Drop for ResizableSemaphorePermit { fn drop(&mut self) { - let notify_waiters = self.0.state.lock().unwrap().free_one(); + let notify_waiters = self.0.state.lock().free_one(); if notify_waiters { self.0.notify.notify_waiters(); } diff --git a/crates/subspace-networking/src/utils/tests.rs b/crates/subspace-networking/src/utils/tests.rs index 13a50aca93..65e610ed05 100644 --- a/crates/subspace-networking/src/utils/tests.rs +++ b/crates/subspace-networking/src/utils/tests.rs @@ -64,7 +64,7 @@ fn test_batching() { #[test] fn test_resizable_semaphore_alloc() { // Capacity = 3. We should be able to alloc only three permits. - let sem = ResizableSemaphore::new(3); + let sem = ResizableSemaphore::new(NonZeroUsize::new(3).unwrap()); let _permit_1 = sem.try_acquire().unwrap(); let _permit_2 = sem.try_acquire().unwrap(); let _permit_3 = sem.try_acquire().unwrap(); @@ -74,7 +74,7 @@ fn test_resizable_semaphore_alloc() { #[test] fn test_resizable_semaphore_expand() { // Initial capacity = 3. - let sem = ResizableSemaphore::new(3); + let sem = ResizableSemaphore::new(NonZeroUsize::new(3).unwrap()); let _permit_1 = sem.try_acquire().unwrap(); let _permit_2 = sem.try_acquire().unwrap(); let _permit_3 = sem.try_acquire().unwrap(); @@ -90,7 +90,7 @@ fn test_resizable_semaphore_expand() { #[test] fn test_resizable_semaphore_shrink() { // Initial capacity = 4, alloc 4 outstanding permits. - let sem = ResizableSemaphore::new(4); + let sem = ResizableSemaphore::new(NonZeroUsize::new(4).unwrap()); let permit_1 = sem.try_acquire().unwrap(); let permit_2 = sem.try_acquire().unwrap(); let permit_3 = sem.try_acquire().unwrap(); From 361ed9642b3797f8bba5e68850b26950547cbe64 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 27 Dec 2022 20:22:41 +0200 Subject: [PATCH 12/12] Fix expect message --- crates/subspace-networking/src/create.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/subspace-networking/src/create.rs b/crates/subspace-networking/src/create.rs index b2c6fe520f..a5a0218672 100644 --- a/crates/subspace-networking/src/create.rs +++ b/crates/subspace-networking/src/create.rs @@ -65,8 +65,7 @@ const YAMUX_MAX_STREAMS: usize = 256; /// /// We restrict this so we don't exceed number of incoming streams for single peer, but this value /// will be boosted depending on number of connected peers. -const KADEMLIA_BASE_CONCURRENT_TASKS: NonZeroUsize = - NonZeroUsize::new(30).expect("KADEMLIA_BASE_CONCURRENT_TASKS is 0"); +const KADEMLIA_BASE_CONCURRENT_TASKS: NonZeroUsize = NonZeroUsize::new(30).expect("Not zero; qed"); /// Above base limit will be boosted by specified number for every peer connected starting with /// second peer, such that it scaled with network connectivity, but the exact coefficient might need /// to be tweaked in the future. @@ -79,8 +78,7 @@ pub(crate) const KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 1; /// We restrict this so we don't exceed number of streams for single peer, but this value will be /// boosted depending on number of connected peers. const REGULAR_BASE_CONCURRENT_TASKS: NonZeroUsize = - NonZeroUsize::new(120 - KADEMLIA_BASE_CONCURRENT_TASKS.get()) - .expect("REGULAR_BASE_CONCURRENT_TASKS is 0"); + NonZeroUsize::new(120 - KADEMLIA_BASE_CONCURRENT_TASKS.get()).expect("Not zero; qed"); /// Above base limit will be boosted by specified number for every peer connected starting with /// second peer, such that it scaled with network connectivity, but the exact coefficient might need /// to be tweaked in the future.