Skip to content

Commit

Permalink
Resizable semaphore (#1019)
Browse files Browse the repository at this point in the history
* Add ResizableSemaphore

* Clean ups

* Add unit tests

* Use the sem changes

* Clean up

* Fix race

* Remove async lock

* Restructure

* Address comments

* Bring back threshold for bumping quota

* Address comments

* Fix expect message

Co-authored-by: Nazar Mokrynskyi <[email protected]>
  • Loading branch information
rahulksnv and nazar-pc authored Dec 27, 2022
1 parent fc80120 commit 6e989d3
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 179 deletions.
121 changes: 16 additions & 105 deletions crates/subspace-networking/src/create.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#[cfg(test)]
mod tests;

pub use crate::behavior::custom_record_store::ValueGetter;
use crate::behavior::custom_record_store::{
CustomRecordStore, MemoryProviderStorage, NoRecordStorage,
Expand All @@ -11,7 +8,7 @@ use crate::node::{CircuitRelayClientError, Node};
use crate::node_runner::{NodeRunner, NodeRunnerConfig};
use crate::request_responses::RequestHandler;
use crate::shared::Shared;
use crate::utils::convert_multiaddresses;
use crate::utils::{convert_multiaddresses, ResizableSemaphore};
use crate::BootstrappedNetworkingParameters;
use futures::channel::mpsc;
use libp2p::core::muxing::StreamMuxerBox;
Expand All @@ -32,13 +29,11 @@ use libp2p::websocket::WsConfig;
use libp2p::yamux::YamuxConfig;
use libp2p::{core, identity, noise, Multiaddr, PeerId, Transport, TransportError};
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, io};
use subspace_core_primitives::{crypto, PIECE_SIZE};
use thiserror::Error;
use tokio::sync::Semaphore;
use tracing::{error, info};

const KADEMLIA_PROTOCOL: &[u8] = b"/subspace/kad/0.1.0";
Expand Down Expand Up @@ -70,82 +65,24 @@ const YAMUX_MAX_STREAMS: usize = 256;
///
/// We restrict this so we don't exceed number of incoming streams for single peer, but this value
/// will be boosted depending on number of connected peers.
const KADEMLIA_BASE_CONCURRENT_TASKS: usize = 30;
const KADEMLIA_BASE_CONCURRENT_TASKS: NonZeroUsize = NonZeroUsize::new(30).expect("Not zero; qed");
/// Above base limit will be boosted by specified number for every peer connected starting with
/// second peer, such that it scaled with network connectivity, but the exact coefficient might need
/// to be tweaked in the future.
const KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 1;
pub(crate) const KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 1;
/// Base limit for number of any concurrent tasks except Kademlia.
///
/// We configure total number of streams per connection to 256. Here we assume half of them might be
/// incoming and half outgoing, we also leave a small buffer of streams just in case.
///
/// We restrict this so we don't exceed number of streams for single peer, but this value will be
/// boosted depending on number of connected peers.
const REGULAR_BASE_CONCURRENT_TASKS: usize = 120 - KADEMLIA_BASE_CONCURRENT_TASKS;
const REGULAR_BASE_CONCURRENT_TASKS: NonZeroUsize =
NonZeroUsize::new(120 - KADEMLIA_BASE_CONCURRENT_TASKS.get()).expect("Not zero; qed");
/// Above base limit will be boosted by specified number for every peer connected starting with
/// second peer, such that it scaled with network connectivity, but the exact coefficient might need
/// to be tweaked in the future.
const REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 2;
/// How many peers should node be connected to before boosting turns on.
///
/// 1 means boosting starts with second peer.
const CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD: NonZeroUsize =
NonZeroUsize::new(5).expect("Not zero; qed");
const SEMAPHORE_MAINTENANCE_INTERVAL: Duration = Duration::from_secs(5);

async fn maintain_semaphore_permits_capacity(
semaphore: &Semaphore,
interval: Duration,
connected_peers_count_weak: Weak<AtomicUsize>,
boost_per_peer: usize,
boost_peers_threshold: NonZeroUsize,
) {
let base_permits = semaphore.available_permits();
// Total permits technically supported by semaphore
let mut total_permits = base_permits;
// Some permits might be reserved due to number of peers decreasing and will be released back if
// necessary, this is because semaphore supports increasing number of
let mut reserved_permits = Vec::new();
loop {
let connected_peers_count = match connected_peers_count_weak.upgrade() {
Some(connected_peers_count) => connected_peers_count.load(Ordering::Relaxed),
None => {
return;
}
};
let expected_total_permits = base_permits
+ connected_peers_count.saturating_sub(boost_peers_threshold.get()) * boost_per_peer;

// Release reserves to match expected number of permits if necessary
while total_permits < expected_total_permits && !reserved_permits.is_empty() {
reserved_permits.pop();
total_permits += 1;
}
// If reserved permits were not sufficient, add permits to the semaphore directly.
if total_permits < expected_total_permits {
semaphore.add_permits(expected_total_permits - total_permits);
total_permits = expected_total_permits;
}
// Peers disconnected and expected number of permits went down, we need to put some into
// reserve
if total_permits > expected_total_permits {
let to_reserve = total_permits - expected_total_permits;
reserved_permits.reserve(to_reserve);
for _ in 0..to_reserve {
reserved_permits.push(
semaphore
.acquire()
.await
.expect("We never close a semaphore; qed"),
);
}
total_permits = expected_total_permits;
}

tokio::time::sleep(interval).await;
}
}
pub(crate) const REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 2;

/// Defines relay configuration for the Node
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -368,42 +305,16 @@ where
// Create final structs
let (command_sender, command_receiver) = mpsc::channel(1);

let shared = Arc::new(Shared::new(local_peer_id, command_sender));
let shared_weak = Arc::downgrade(&shared);

let kademlia_tasks_semaphore = Arc::new(Semaphore::new(KADEMLIA_BASE_CONCURRENT_TASKS));
let regular_tasks_semaphore = Arc::new(Semaphore::new(REGULAR_BASE_CONCURRENT_TASKS));

tokio::spawn({
let kademlia_tasks_semaphore = Arc::clone(&kademlia_tasks_semaphore);
let connected_peers_count_weak = Arc::downgrade(&shared.connected_peers_count);

async move {
maintain_semaphore_permits_capacity(
&kademlia_tasks_semaphore,
SEMAPHORE_MAINTENANCE_INTERVAL,
connected_peers_count_weak,
KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER,
CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD,
)
.await;
}
});
tokio::spawn({
let regular_tasks_semaphore = Arc::clone(&regular_tasks_semaphore);
let connected_peers_count_weak = Arc::downgrade(&shared.connected_peers_count);
let kademlia_tasks_semaphore = ResizableSemaphore::new(KADEMLIA_BASE_CONCURRENT_TASKS);
let regular_tasks_semaphore = ResizableSemaphore::new(REGULAR_BASE_CONCURRENT_TASKS);

async move {
maintain_semaphore_permits_capacity(
&regular_tasks_semaphore,
SEMAPHORE_MAINTENANCE_INTERVAL,
connected_peers_count_weak,
REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER,
CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD,
)
.await;
}
});
let shared = Arc::new(Shared::new(
local_peer_id,
command_sender,
kademlia_tasks_semaphore.clone(),
regular_tasks_semaphore.clone(),
));
let shared_weak = Arc::downgrade(&shared);

let node = Node::new(shared, kademlia_tasks_semaphore, regular_tasks_semaphore);
let node_runner = NodeRunner::<RecordStore>::new(NodeRunnerConfig::<RecordStore> {
Expand Down
70 changes: 14 additions & 56 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::request_handlers::generic_request_handler::GenericRequest;
use crate::request_responses;
use crate::shared::{Command, CreatedSubscription, Shared};
use crate::utils::{ResizableSemaphore, ResizableSemaphorePermit};
use bytes::Bytes;
use event_listener_primitives::HandlerId;
use futures::channel::mpsc::SendError;
Expand All @@ -16,7 +17,6 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use thiserror::Error;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::sleep;
use tracing::{error, trace};

Expand All @@ -29,7 +29,7 @@ pub struct TopicSubscription {
command_sender: Option<mpsc::Sender<Command>>,
#[pin]
receiver: mpsc::UnboundedReceiver<Bytes>,
_permit: OwnedSemaphorePermit,
_permit: ResizableSemaphorePermit,
}

impl Stream for TopicSubscription {
Expand Down Expand Up @@ -257,15 +257,15 @@ impl From<oneshot::Canceled> for CircuitRelayClientError {
#[must_use = "Node doesn't do anything if dropped"]
pub struct Node {
shared: Arc<Shared>,
kademlia_tasks_semaphore: Arc<Semaphore>,
regular_tasks_semaphore: Arc<Semaphore>,
kademlia_tasks_semaphore: ResizableSemaphore,
regular_tasks_semaphore: ResizableSemaphore,
}

impl Node {
pub(crate) fn new(
shared: Arc<Shared>,
kademlia_tasks_semaphore: Arc<Semaphore>,
regular_tasks_semaphore: Arc<Semaphore>,
kademlia_tasks_semaphore: ResizableSemaphore,
regular_tasks_semaphore: ResizableSemaphore,
) -> Self {
Self {
shared,
Expand All @@ -283,12 +283,7 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = Vec<u8>>, GetValueError> {
let permit = self
.kademlia_tasks_semaphore
.clone()
.acquire_owned()
.await
.expect("We never close a semaphore; qed");
let permit = self.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

self.shared
Expand All @@ -310,12 +305,7 @@ impl Node {
key: Multihash,
value: Vec<u8>,
) -> Result<impl Stream<Item = ()>, PutValueError> {
let permit = self
.kademlia_tasks_semaphore
.clone()
.acquire_owned()
.await
.expect("We never close a semaphore; qed");
let permit = self.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

self.shared
Expand All @@ -334,12 +324,7 @@ impl Node {
}

pub async fn subscribe(&self, topic: Sha256Topic) -> Result<TopicSubscription, SubscribeError> {
let permit = self
.regular_tasks_semaphore
.clone()
.acquire_owned()
.await
.expect("We never close a semaphore; qed");
let permit = self.regular_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = oneshot::channel();

self.shared
Expand All @@ -366,12 +351,7 @@ impl Node {
}

pub async fn publish(&self, topic: Sha256Topic, message: Vec<u8>) -> Result<(), PublishError> {
let _permit = self
.regular_tasks_semaphore
.clone()
.acquire_owned()
.await
.expect("We never close a semaphore; qed");
let _permit = self.regular_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = oneshot::channel();

self.shared
Expand All @@ -396,14 +376,7 @@ impl Node {
where
Request: GenericRequest,
{
// TODO: Cancelling this method's future will drop the permit, but will not abort the
// request if it is already initiated
let _permit = self
.regular_tasks_semaphore
.clone()
.acquire_owned()
.await
.expect("We never close a semaphore; qed");
let _permit = self.regular_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = oneshot::channel();
let command = Command::GenericRequest {
peer_id,
Expand All @@ -424,12 +397,7 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerId>, GetClosestPeersError> {
let permit = self
.kademlia_tasks_semaphore
.clone()
.acquire_owned()
.await
.expect("We never close a semaphore; qed");
let permit = self.kademlia_tasks_semaphore.acquire().await;
trace!(?key, "Starting 'GetClosestPeers' request.");

let (result_sender, result_receiver) = mpsc::unbounded();
Expand Down Expand Up @@ -482,12 +450,7 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = ()>, AnnounceError> {
let permit = self
.kademlia_tasks_semaphore
.clone()
.acquire_owned()
.await
.expect("We never close a semaphore; qed");
let permit = self.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

trace!(?key, "Starting 'start_announcing' request.");
Expand Down Expand Up @@ -529,12 +492,7 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerId>, GetProvidersError> {
let permit = self
.kademlia_tasks_semaphore
.clone()
.acquire_owned()
.await
.expect("We never close a semaphore; qed");
let permit = self.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

trace!(?key, "Starting 'get_providers' request.");
Expand Down
Loading

0 comments on commit 6e989d3

Please sign in to comment.