Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Libp2p stream limits #1017

Merged
merged 14 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions crates/subspace-networking/src/create.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#[cfg(test)]
mod tests;

pub use crate::behavior::custom_record_store::ValueGetter;
use crate::behavior::custom_record_store::{
CustomRecordStore, MemoryProviderStorage, NoRecordStorage,
Expand Down Expand Up @@ -88,6 +91,7 @@ const SEMAPHORE_MAINTENANCE_INTERVAL: Duration = Duration::from_secs(5);

async fn maintain_semaphore_permits_capacity(
Copy link
Contributor

@rahulksnv rahulksnv Dec 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess semaphores are not best fit for this scenario, where the max limit can grow/shrink as peers come and go. Hence we need this complexity to hold excess permits, etc

How do you feel about this:

  1. Maintain current_max_limit, current_usage. These two are protected by async Mutex/CV.
  2. current_max_limit can be updated with the boost_per_peer from handle_swarm_event() during connection open/close.
  3. Senders: wait for permits to be freed, add to current_usage
  4. From drop() of the permit: bump up current_usage, signal any waiters. But, any excess permits if the max limit shrinked are not returned to the free pool. This would need spawning async task from the dtor unfortunately, as rust doesn't support await from drop yet.

Happy to make a branch off your PR and make a draft, if that helps (also to get my feet weet)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit hacky, I agree, but not too much (for my taste).

I found CVs counter-intuitive to read in most cases and I feel like we'll implement essentially the same mechanism (practically speaking) in the end. It might be an interesting exercise, but I'm not sure it'll actually be a lot of value in that.

Supporting what we need here (resizing, dropping of unfinished acquisition) is probably quire a bit of effort.

Feel free to try unless you have anything more valuable to do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, looks like tokio doesn't have CondVar, so that won't work :-( But found sem has a forget() API: https://docs.rs/tokio/1.23.0/tokio/sync/struct.SemaphorePermit.html#method.forget

This can help avoid the reserved_permits stash, which would simplify this quite a bit:

if total_permits < expected_total_permits {
     semaphore.add_permits(expected_total_permits - total_permits);
} else {
     let num_excess = total_permits - expected_total_permits;
     // acquire()/forget() num_excess permits
}

Not a big deal: it may take a while for the changes to take effect (in the shrink case), as the sem acquisition in this path doesn't get higher priority (unless done from the dtor as in the other approach)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use forget, but that'll mean we'll permanently increase internal capacity of the semaphore every time we call forget. And while we'll probably never reach https://docs.rs/tokio/1.23.0/tokio/sync/struct.Semaphore.html#associatedconstant.MAX_PERMITS on 64-bit platform, we can on 32-bit platform. It'll take a while, but when you hit it, it'll be hard to debug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah you are right.

I still feel this path can be simplified (took a bit to figure out this shrinking business), let me try out the idea I mentioned above off your branch.

semaphore: &Semaphore,
interval: Duration,
connected_peers_count_weak: Weak<AtomicUsize>,
boost_per_peer: usize,
) {
Expand All @@ -99,13 +103,13 @@ async fn maintain_semaphore_permits_capacity(
let mut reserved_permits = Vec::new();
loop {
let connected_peers_count = match connected_peers_count_weak.upgrade() {
Some(connected_peers_count) => connected_peers_count,
Some(connected_peers_count) => connected_peers_count.load(Ordering::Relaxed),
None => {
return;
}
};
let expected_total_permits =
base_permits + connected_peers_count.load(Ordering::Relaxed) * boost_per_peer;
base_permits + connected_peers_count.saturating_sub(1) * boost_per_peer;

// Release reserves to match expected number of permits if necessary
while total_permits < expected_total_permits && !reserved_permits.is_empty() {
Expand Down Expand Up @@ -133,7 +137,7 @@ async fn maintain_semaphore_permits_capacity(
total_permits = expected_total_permits;
}

tokio::time::sleep(SEMAPHORE_MAINTENANCE_INTERVAL).await;
tokio::time::sleep(interval).await;
}
}

Expand Down Expand Up @@ -371,6 +375,7 @@ where
async move {
maintain_semaphore_permits_capacity(
&kademlia_tasks_semaphore,
SEMAPHORE_MAINTENANCE_INTERVAL,
connected_peers_count_weak,
KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER,
)
Expand All @@ -384,6 +389,7 @@ where
async move {
maintain_semaphore_permits_capacity(
&regular_tasks_semaphore,
SEMAPHORE_MAINTENANCE_INTERVAL,
connected_peers_count_weak,
REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER,
)
Expand Down
119 changes: 119 additions & 0 deletions crates/subspace-networking/src/create/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use futures::future::{select, Either};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::time::sleep;

#[tokio::test]
async fn maintain_semaphore_permits_capacity() {
let base_tasks = 2;
let boost_per_peer = 1;
let interval = Duration::from_micros(1);
let connected_peers_count = Arc::new(AtomicUsize::new(0));
let tasks_semaphore = Arc::new(Semaphore::new(base_tasks));

tokio::spawn({
let tasks_semaphore = Arc::clone(&tasks_semaphore);
let connected_peers_count_weak = Arc::downgrade(&connected_peers_count);

async move {
super::maintain_semaphore_permits_capacity(
&tasks_semaphore,
interval,
connected_peers_count_weak,
boost_per_peer,
)
.await;
}
});

let timeout = Duration::from_millis(100);

// Let above function time to run at least one loop
sleep(timeout).await;

let permit_1_result = select(
Box::pin(tasks_semaphore.acquire()),
Box::pin(sleep(timeout)),
)
.await;
if !matches!(permit_1_result, Either::Left(_)) {
panic!("Must be able to acquire the permit");
}

let permit_2_result = select(
Box::pin(tasks_semaphore.acquire()),
Box::pin(sleep(timeout)),
)
.await;
if !matches!(permit_2_result, Either::Left(_)) {
panic!("Must be able to acquire the second permit");
}

{
let permit_3_result = select(
Box::pin(tasks_semaphore.acquire()),
Box::pin(sleep(timeout)),
)
.await;
if !matches!(permit_3_result, Either::Right(_)) {
panic!("Must not be able to acquire the third permit due to capacity");
}
}

// Increase capacity
connected_peers_count.fetch_add(1, Ordering::SeqCst);

{
let permit_3_result = select(
Box::pin(tasks_semaphore.acquire()),
Box::pin(sleep(timeout)),
)
.await;
if !matches!(permit_3_result, Either::Right(_)) {
panic!("Must not be able to acquire the third permit due to capacity");
}
}

// Increase capacity more
connected_peers_count.fetch_add(1, Ordering::SeqCst);

let permit_3_result = select(
Box::pin(tasks_semaphore.acquire()),
Box::pin(sleep(timeout)),
)
.await;
if !matches!(permit_3_result, Either::Left(_)) {
panic!("Must be able to acquire the third permit due to increased capacity");
}

{
let permit_4_result = select(
Box::pin(tasks_semaphore.acquire()),
Box::pin(sleep(timeout)),
)
.await;
if !matches!(permit_4_result, Either::Right(_)) {
panic!("Must not be able to acquire the fourth permit due to capacity");
}
}

// Decrease capacity capacity
connected_peers_count.fetch_sub(1, Ordering::SeqCst);

drop(permit_3_result);

sleep(timeout).await;

{
let permit_3_result = select(
Box::pin(tasks_semaphore.acquire()),
Box::pin(sleep(timeout)),
)
.await;
if !matches!(permit_3_result, Either::Right(_)) {
panic!("Must not be able to acquire the third permit again due to capacity anymore");
}
}
}