Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resizable semaphore #1019

Merged
merged 14 commits into from
Dec 27, 2022
7 changes: 5 additions & 2 deletions crates/subspace-networking/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ const YAMUX_MAX_STREAMS: usize = 256;
///
/// We restrict this so we don't exceed number of incoming streams for single peer, but this value
/// will be boosted depending on number of connected peers.
const KADEMLIA_BASE_CONCURRENT_TASKS: usize = 30;
const KADEMLIA_BASE_CONCURRENT_TASKS: NonZeroUsize =
NonZeroUsize::new(30).expect("KADEMLIA_BASE_CONCURRENT_TASKS is 0");
/// Above base limit will be boosted by specified number for every peer connected starting with
/// second peer, such that it scaled with network connectivity, but the exact coefficient might need
/// to be tweaked in the future.
Expand All @@ -77,7 +78,9 @@ pub(crate) const KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 1;
///
/// We restrict this so we don't exceed number of streams for single peer, but this value will be
/// boosted depending on number of connected peers.
const REGULAR_BASE_CONCURRENT_TASKS: usize = 120 - KADEMLIA_BASE_CONCURRENT_TASKS;
const REGULAR_BASE_CONCURRENT_TASKS: NonZeroUsize =
NonZeroUsize::new(120 - KADEMLIA_BASE_CONCURRENT_TASKS.get())
.expect("REGULAR_BASE_CONCURRENT_TASKS is 0");
/// Above base limit will be boosted by specified number for every peer connected starting with
/// second peer, such that it scaled with network connectivity, but the exact coefficient might need
/// to be tweaked in the future.
Expand Down
38 changes: 21 additions & 17 deletions crates/subspace-networking/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ mod tests;

use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};
use parking_lot::Mutex;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tokio::sync::Notify;
use tracing::warn;

Expand Down Expand Up @@ -98,15 +99,6 @@ pub(crate) fn convert_multiaddresses(addresses: Vec<Multiaddr>) -> Vec<PeerAddre
.collect()
}

/// Semaphore like implementation that allows both shrinking and expanding
/// the max permits.
#[derive(Clone, Debug)]
pub(crate) struct ResizableSemaphore(Arc<SemShared>);

/// The permit.
#[derive(Clone, Debug)]
pub(crate) struct ResizableSemaphorePermit(Arc<SemShared>);

/// The state shared between the semaphore and the outstanding permits.
#[derive(Debug)]
struct SemShared {
Expand Down Expand Up @@ -178,10 +170,18 @@ impl SemState {
}
}

/// Semaphore like implementation that allows both shrinking and expanding
/// the max permits.
#[derive(Clone, Debug)]
pub(crate) struct ResizableSemaphore(Arc<SemShared>);

impl ResizableSemaphore {
pub(crate) fn new(capacity: usize) -> Self {
pub(crate) fn new(capacity: NonZeroUsize) -> Self {
let shared = SemShared {
state: Mutex::new(SemState { capacity, usage: 0 }),
state: Mutex::new(SemState {
capacity: capacity.get(),
usage: 0,
}),
notify: Notify::new(),
};
Self(Arc::new(shared))
Expand All @@ -191,7 +191,7 @@ impl ResizableSemaphore {
pub(crate) async fn acquire(&self) -> ResizableSemaphorePermit {
loop {
let wait = {
let mut state = self.0.state.lock().unwrap();
let mut state = self.0.state.lock();
if state.alloc_one() {
None
} else {
Expand All @@ -212,7 +212,7 @@ impl ResizableSemaphore {
// Currently used only for tests.
#[cfg(test)]
pub(crate) fn try_acquire(&self) -> Option<ResizableSemaphorePermit> {
let mut state = self.0.state.lock().unwrap();
let mut state = self.0.state.lock();
if state.alloc_one() {
Some(ResizableSemaphorePermit(self.0.clone()))
} else {
Expand All @@ -222,21 +222,25 @@ impl ResizableSemaphore {

// Expands the capacity by the specified amount.
pub(crate) fn expand(&self, delta: usize) {
let notify_waiters = self.0.state.lock().unwrap().expand(delta);
let notify_waiters = self.0.state.lock().expand(delta);
if notify_waiters {
self.0.notify.notify_waiters();
}
}

// Shrinks the capacity by the specified amount.
pub(crate) fn shrink(&self, delta: usize) {
self.0.state.lock().unwrap().shrink(delta)
self.0.state.lock().shrink(delta)
}
}

/// The semaphore permit.
#[derive(Clone, Debug)]
pub(crate) struct ResizableSemaphorePermit(Arc<SemShared>);

impl Drop for ResizableSemaphorePermit {
fn drop(&mut self) {
let notify_waiters = self.0.state.lock().unwrap().free_one();
let notify_waiters = self.0.state.lock().free_one();
if notify_waiters {
self.0.notify.notify_waiters();
}
Expand Down
6 changes: 3 additions & 3 deletions crates/subspace-networking/src/utils/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn test_batching() {
#[test]
fn test_resizable_semaphore_alloc() {
// Capacity = 3. We should be able to alloc only three permits.
let sem = ResizableSemaphore::new(3);
let sem = ResizableSemaphore::new(NonZeroUsize::new(3).unwrap());
let _permit_1 = sem.try_acquire().unwrap();
let _permit_2 = sem.try_acquire().unwrap();
let _permit_3 = sem.try_acquire().unwrap();
Expand All @@ -74,7 +74,7 @@ fn test_resizable_semaphore_alloc() {
#[test]
fn test_resizable_semaphore_expand() {
// Initial capacity = 3.
let sem = ResizableSemaphore::new(3);
let sem = ResizableSemaphore::new(NonZeroUsize::new(3).unwrap());
let _permit_1 = sem.try_acquire().unwrap();
let _permit_2 = sem.try_acquire().unwrap();
let _permit_3 = sem.try_acquire().unwrap();
Expand All @@ -90,7 +90,7 @@ fn test_resizable_semaphore_expand() {
#[test]
fn test_resizable_semaphore_shrink() {
// Initial capacity = 4, alloc 4 outstanding permits.
let sem = ResizableSemaphore::new(4);
let sem = ResizableSemaphore::new(NonZeroUsize::new(4).unwrap());
let permit_1 = sem.try_acquire().unwrap();
let permit_2 = sem.try_acquire().unwrap();
let permit_3 = sem.try_acquire().unwrap();
Expand Down