Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Libp2p stream limits #1017

Merged
merged 14 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 126 additions & 110 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion crates/sp-lightclient/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ futures = "0.3.25"
rand = { version = "0.8.5", features = ["min_const_gen"] }
subspace-archiving = { version = "0.1.0", path = "../subspace-archiving"}
subspace-farmer-components = { version = "0.1.0", path = "../subspace-farmer-components" }
tokio = { version = "1.23.0", features = ["sync"] }

[features]
default = ["std"]
Expand Down
1 change: 0 additions & 1 deletion crates/sp-lightclient/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ impl Farmer {
&sector_codec,
Cursor::new(sector.as_mut_slice()),
Cursor::new(sector_metadata.as_mut_slice()),
&tokio::sync::Semaphore::new(tokio::sync::Semaphore::MAX_PERMITS),
))
.unwrap();

Expand Down
1 change: 0 additions & 1 deletion crates/subspace-farmer-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ subspace-solving = { version = "0.1.0", path = "../subspace-solving" }
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
subspace-verification = { version = "0.1.0", path = "../subspace-verification" }
thiserror = "1.0.32"
tokio = { version = "1.23.0", features = ["sync"] }
tracing = "0.1.37"

[dev-dependencies]
Expand Down
3 changes: 0 additions & 3 deletions crates/subspace-farmer-components/benches/auditing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use subspace_farmer_components::farming::audit_sector;
use subspace_farmer_components::file_ext::FileExt;
use subspace_farmer_components::plotting::plot_sector;
use subspace_farmer_components::FarmerProtocolInfo;
use tokio::sync::Semaphore;
use utils::BenchPieceReceiver;

mod utils;
Expand Down Expand Up @@ -64,7 +63,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
};
let global_challenge = Blake2b256Hash::default();
let solution_range = SolutionRange::MAX;
let piece_receiver_semaphore = Semaphore::new(Semaphore::MAX_PERMITS);

let plotted_sector = {
let mut plotted_sector = vec![0u8; PLOT_SECTOR_SIZE as usize];
Expand All @@ -79,7 +77,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
&sector_codec,
plotted_sector.as_mut_slice(),
io::sink(),
&piece_receiver_semaphore,
))
.unwrap();

Expand Down
4 changes: 0 additions & 4 deletions crates/subspace-farmer-components/benches/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use subspace_core_primitives::{
};
use subspace_farmer_components::plotting::plot_sector;
use subspace_farmer_components::FarmerProtocolInfo;
use tokio::sync::Semaphore;
use utils::BenchPieceReceiver;

mod utils;
Expand Down Expand Up @@ -54,7 +53,6 @@ fn criterion_benchmark(c: &mut Criterion) {
sector_expiration: 1,
};
let piece_receiver = BenchPieceReceiver::new(piece);
let piece_receiver_semaphore = Semaphore::new(Semaphore::MAX_PERMITS);

let mut group = c.benchmark_group("sector-plotting");
group.throughput(Throughput::Bytes(PLOT_SECTOR_SIZE));
Expand All @@ -70,7 +68,6 @@ fn criterion_benchmark(c: &mut Criterion) {
black_box(&sector_codec),
black_box(io::sink()),
black_box(io::sink()),
black_box(&piece_receiver_semaphore),
))
.unwrap();
})
Expand All @@ -94,7 +91,6 @@ fn criterion_benchmark(c: &mut Criterion) {
black_box(&sector_codec),
black_box(io::sink()),
black_box(io::sink()),
black_box(&piece_receiver_semaphore),
))
.unwrap();
});
Expand Down
3 changes: 0 additions & 3 deletions crates/subspace-farmer-components/benches/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use subspace_farmer_components::farming::audit_sector;
use subspace_farmer_components::file_ext::FileExt;
use subspace_farmer_components::plotting::plot_sector;
use subspace_farmer_components::{FarmerProtocolInfo, SectorMetadata};
use tokio::sync::Semaphore;
use utils::BenchPieceReceiver;

mod utils;
Expand Down Expand Up @@ -67,7 +66,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
let global_challenge = Blake2b256Hash::default();
let solution_range = SolutionRange::MAX;
let reward_address = PublicKey::default();
let piece_receiver_semaphore = Semaphore::new(Semaphore::MAX_PERMITS);

let (plotted_sector, sector_metadata) = {
let mut plotted_sector = vec![0u8; PLOT_SECTOR_SIZE as usize];
Expand All @@ -83,7 +81,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
&sector_codec,
plotted_sector.as_mut_slice(),
sector_metadata.as_mut_slice(),
&piece_receiver_semaphore,
))
.unwrap();

Expand Down
23 changes: 6 additions & 17 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use subspace_core_primitives::{
Piece, PieceIndex, PublicKey, Scalar, SectorId, SectorIndex, PIECE_SIZE, PLOT_SECTOR_SIZE,
};
use thiserror::Error;
use tokio::sync::Semaphore;
use tracing::{debug, info};

#[async_trait]
Expand Down Expand Up @@ -84,7 +83,6 @@ pub async fn plot_sector<PR, S, SM>(
sector_codec: &SectorCodec,
mut sector_output: S,
mut sector_metadata_output: SM,
semaphore: &Semaphore,
) -> Result<PlottedSector, PlottingError>
where
PR: PieceReceiver,
Expand Down Expand Up @@ -120,7 +118,6 @@ where
piece_receiver,
&piece_indexes,
cancelled,
semaphore,
)
.await?;

Expand Down Expand Up @@ -183,23 +180,15 @@ async fn plot_pieces_in_batches_non_blocking<PR: PieceReceiver>(
piece_receiver: &PR,
piece_indexes: &[PieceIndex],
cancelled: &AtomicBool,
semaphore: &Semaphore,
) -> Result<(), PlottingError> {
let mut pieces_receiving_futures = piece_indexes
.iter()
.map(|piece_index| {
Box::pin(async {
let _permit = semaphore
.acquire()
.await
.expect("Should be valid on non-closed semaphore");

let piece_result = match check_cancellation(cancelled, sector_index) {
Ok(()) => piece_receiver.get_piece(*piece_index).await,
Err(error) => Err(error.into()),
};
(*piece_index, piece_result)
})
.map(|piece_index| async {
let piece_result = match check_cancellation(cancelled, sector_index) {
Ok(()) => piece_receiver.get_piece(*piece_index).await,
Err(error) => Err(error.into()),
};
(*piece_index, piece_result)
})
.collect::<FuturesOrdered<_>>();

Expand Down
20 changes: 3 additions & 17 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,13 @@ pub(crate) async fn farm_multi_disk(
disk_concurrency,
disable_farming,
mut dsn,
piece_receiver_batch_size,
piece_publisher_batch_size,
max_concurrent_plots,
} = farming_args;

let readers_and_pieces = Arc::new(Mutex::new(None));

info!("Connecting to node RPC at {}", node_rpc_url);
let rpc_client = NodeRpcClient::new(&node_rpc_url).await?;
let piece_publisher_semaphore = Arc::new(tokio::sync::Semaphore::new(
farming_args.piece_receiver_batch_size.get(),
));
let piece_receiver_semaphore = Arc::new(tokio::sync::Semaphore::new(
farming_args.piece_publisher_batch_size.get(),
));
let concurrent_plotting_semaphore = Arc::new(tokio::sync::Semaphore::new(
farming_args.max_concurrent_plots.get(),
));
Expand Down Expand Up @@ -129,8 +121,6 @@ pub(crate) async fn farm_multi_disk(
rpc_client,
reward_address,
dsn_node: node.clone(),
piece_receiver_semaphore: Arc::clone(&piece_receiver_semaphore),
piece_publisher_semaphore: Arc::clone(&piece_publisher_semaphore),
concurrent_plotting_semaphore: Arc::clone(&concurrent_plotting_semaphore),
});

Expand Down Expand Up @@ -239,9 +229,7 @@ pub(crate) async fn farm_multi_disk(

futures::select!(
// Signal future
_ = Box::pin(async move {
signal.await;
}).fuse() => {},
_ = signal.fuse() => {},

// Plotting future
result = Box::pin(async move {
Expand All @@ -256,11 +244,9 @@ pub(crate) async fn farm_multi_disk(
},

// Node runner future
_ = Box::pin(async move {
node_runner.run().await;

_ = node_runner.run().fuse() => {
info!("Node runner exited.")
}).fuse() => {},
},
);

anyhow::Ok(())
Expand Down
28 changes: 0 additions & 28 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, EnvFilter};

// Defines a maximum constraint for the piece publisher batch.
const MAX_PIECE_PUBLISHER_BATCH_SIZE: usize = 30;

// Defines a maximum constraint for the piece receiver batch.
const MAX_PIECE_RECEIVER_BATCH_SIZE: usize = 30;

#[cfg(all(
target_arch = "x86_64",
target_vendor = "unknown",
Expand Down Expand Up @@ -59,12 +53,6 @@ struct FarmingArgs {
/// DSN parameters
#[clap(flatten)]
dsn: DsnArgs,
/// Defines size for the pieces batch of the piece receiving process.
#[arg(long, default_value = "12")]
piece_receiver_batch_size: NonZeroUsize,
/// Defines size for the pieces batch of the piece publishing process.
#[arg(long, default_value = "12")]
piece_publisher_batch_size: NonZeroUsize,
/// Number of plots that can be plotted concurrently, impacts RAM usage.
#[arg(long, default_value = "10")]
max_concurrent_plots: NonZeroUsize,
Expand Down Expand Up @@ -299,22 +287,6 @@ async fn main() -> Result<()> {
command.farm
};

if farming_args.piece_publisher_batch_size.get() > MAX_PIECE_PUBLISHER_BATCH_SIZE {
return Err(anyhow::anyhow!(
"Incorrect piece publisher batch size: {}. Should be 1-{}",
farming_args.piece_publisher_batch_size,
MAX_PIECE_PUBLISHER_BATCH_SIZE
));
}

if farming_args.piece_receiver_batch_size.get() > MAX_PIECE_RECEIVER_BATCH_SIZE {
return Err(anyhow::anyhow!(
"Incorrect piece receiver batch size: {}. Should be 1-{}",
farming_args.piece_receiver_batch_size,
MAX_PIECE_RECEIVER_BATCH_SIZE
));
}

commands::farm_multi_disk(base_path, disk_farms, farming_args).await?;
}
Subcommand::Info => {
Expand Down
14 changes: 2 additions & 12 deletions crates/subspace-farmer/src/single_disk_plot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,6 @@ pub struct SingleDiskPlotOptions<RC> {
pub reward_address: PublicKey,
/// Optional DSN Node.
pub dsn_node: Node,
/// Semaphore to limit concurrency of piece receiving process.
pub piece_receiver_semaphore: Arc<tokio::sync::Semaphore>,
/// Semaphore to limit concurrency of piece publishing process.
pub piece_publisher_semaphore: Arc<tokio::sync::Semaphore>,
/// Semaphore to limit concurrency of plotting process.
pub concurrent_plotting_semaphore: Arc<tokio::sync::Semaphore>,
}
Expand Down Expand Up @@ -472,8 +468,6 @@ impl SingleDiskPlot {
rpc_client,
reward_address,
dsn_node,
piece_publisher_semaphore,
piece_receiver_semaphore,
concurrent_plotting_semaphore,
} = options;

Expand Down Expand Up @@ -651,11 +645,8 @@ impl SingleDiskPlot {
let shutting_down = Arc::clone(&shutting_down);
let rpc_client = rpc_client.clone();
let error_sender = Arc::clone(&error_sender);
let piece_publisher = PieceSectorPublisher::new(
dsn_node.clone(),
shutting_down.clone(),
piece_publisher_semaphore,
);
let piece_publisher =
PieceSectorPublisher::new(dsn_node.clone(), shutting_down.clone());

move || {
let _tokio_handle_guard = handle.enter();
Expand Down Expand Up @@ -732,7 +723,6 @@ impl SingleDiskPlot {
&sector_codec,
sector,
sector_metadata,
&piece_receiver_semaphore,
);
let plotted_sector = match plot_sector_fut.await {
Ok(plotted_sector) => {
Expand Down
25 changes: 4 additions & 21 deletions crates/subspace-farmer/src/single_disk_plot/piece_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,29 @@ use std::time::Duration;
use subspace_core_primitives::{PieceIndex, PieceIndexHash};
use subspace_networking::utils::multihash::MultihashCode;
use subspace_networking::{Node, ToMultihash};
use tokio::sync::Semaphore;
use tokio::time::error::Elapsed;
use tokio::time::timeout;
use tracing::{debug, error, info, trace};

/// Max time allocated for putting piece from DSN before attempt is considered to fail
const PUT_PIECE_TIMEOUT: Duration = Duration::from_secs(5);
const PUT_PIECE_TIMEOUT: Duration = Duration::from_secs(120);
/// Defines initial duration between put_piece calls.
const PUT_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1);
/// Defines max duration between put_piece calls.
const PUT_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5);
const PUT_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(30);

// Piece-by-sector DSN publishing helper.
#[derive(Clone)]
pub(crate) struct PieceSectorPublisher {
dsn_node: Node,
cancelled: Arc<AtomicBool>,
semaphore: Arc<Semaphore>,
}

impl PieceSectorPublisher {
pub(crate) fn new(
dsn_node: Node,
cancelled: Arc<AtomicBool>,
semaphore: Arc<Semaphore>,
) -> Self {
pub(crate) fn new(dsn_node: Node, cancelled: Arc<AtomicBool>) -> Self {
Self {
dsn_node,
cancelled,
semaphore,
}
}

Expand All @@ -61,17 +54,7 @@ impl PieceSectorPublisher {
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let mut pieces_receiving_futures = piece_indexes
.iter()
.map(|piece_index| {
Box::pin(async {
let _permit = self
.semaphore
.acquire()
.await
.expect("Should be valid on non-closed semaphore");

self.publish_single_piece_with_backoff(*piece_index).await
})
})
.map(|piece_index| self.publish_single_piece_with_backoff(*piece_index))
.collect::<FuturesUnordered<_>>();

while pieces_receiving_futures.next().await.is_some() {
Expand Down
6 changes: 3 additions & 3 deletions crates/subspace-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ tracing-subscriber = "0.3.16"
unsigned-varint = { version = "0.7.1", features = ["futures", "asynchronous_codec"] }

[dependencies.libp2p]
# TODO: change when https://github.com/libp2p/rust-libp2p/pull/3178 is released
git = "https://github.com/libp2p/rust-libp2p"
rev = "be0b62a78fe9d72811b9eda742137cc8ddc4da35"
# TODO: change to upstream release when https://github.com/libp2p/rust-libp2p/pull/3287 is released
git = "https://github.com/subspace/rust-libp2p"
rev = "b700d0c9a12f984936b44f634e79c9f3ee5e342d"
default-features = false
features = [
"dns",
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub(crate) struct Behavior<RecordStore> {

impl<RecordStore> Behavior<RecordStore>
where
RecordStore: Send + Sync + for<'a> libp2p::kad::store::RecordStore<'a> + 'static,
RecordStore: Send + Sync + libp2p::kad::store::RecordStore + 'static,
{
pub(crate) fn new(config: BehaviorConfig<RecordStore>) -> Self {
let kademlia = Kademlia::<RecordStore>::with_config(
Expand Down
Loading