diff --git a/.github/workflows/domain-genesis-storage-snapshot-build.yml b/.github/workflows/domain-genesis-storage-snapshot-build.yml index c2c3787ce79..7a59fe20d7c 100644 --- a/.github/workflows/domain-genesis-storage-snapshot-build.yml +++ b/.github/workflows/domain-genesis-storage-snapshot-build.yml @@ -18,6 +18,9 @@ jobs: packages: write steps: + - name: Checkout + uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 + - name: Build node image id: build uses: docker/build-push-action@4f58ea79222b3b9dc2c8bbdd6debcef730109a75 # v6.9.0 diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f9d83c71572..bbf5507c27e 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -4,13 +4,7 @@ on: push: branches: - main - paths-ignore: - - "**.md" - - ".github/CODEOWNERS" pull_request: - paths-ignore: - - "**.md" - - ".github/CODEOWNERS" workflow_dispatch: merge_group: diff --git a/.github/workflows/snapshot-build.yml b/.github/workflows/snapshot-build.yml index 7420f0fa7cc..dba04fe5b8f 100644 --- a/.github/workflows/snapshot-build.yml +++ b/.github/workflows/snapshot-build.yml @@ -33,6 +33,9 @@ jobs: - image: node base-artifact: subspace-node upload-executables: true + - image: gateway + base-artifact: subspace-gateway + upload-executables: false - image: bootstrap-node base-artifact: subspace-bootstrap-node upload-executables: false @@ -104,7 +107,6 @@ jobs: cd executables IMAGE="${{ fromJSON(steps.meta.outputs.json).tags[0] }}" ARTIFACT="${{ matrix.build.base-artifact }}" - docker run --rm --platform linux/amd64 --entrypoint /bin/cat $IMAGE /$ARTIFACT > $ARTIFACT-ubuntu-x86_64-skylake-${{ github.ref_name }} # TODO: Pull is a workaround for https://github.com/moby/moby/issues/48197#issuecomment-2472265028 docker pull --platform linux/amd64/v2 $IMAGE diff --git a/Cargo.lock b/Cargo.lock index 38d17f4c4cc..e75272425a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -760,15 +760,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "async-mutex" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" -dependencies = [ - "event-listener 2.5.3", -] - [[package]] name = "async-nats" version = "0.37.0" @@ -4236,7 +4227,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f2f12607f92c69b12ed746fabf9ca4f5c482cba46679c1a75b874ed7c26adb" dependencies = [ "futures-io", - "rustls 0.23.13", + "rustls 0.23.18", "rustls-pki-types", ] @@ -5282,7 +5273,7 @@ dependencies = [ "http 1.1.0", "jsonrpsee-core", "pin-project", - "rustls 0.23.13", + "rustls 0.23.18", "rustls-pki-types", "rustls-platform-verifier", "soketto", @@ -6090,7 +6081,7 @@ dependencies = [ "quinn 0.11.5", "rand", "ring 0.17.8", - "rustls 0.23.13", + "rustls 0.23.18", "socket2 0.5.7", "thiserror 1.0.64", "tokio", @@ -6287,7 +6278,7 @@ dependencies = [ "libp2p-identity", "rcgen 0.11.3", "ring 0.17.8", - "rustls 0.23.13", + "rustls 0.23.18", "rustls-webpki 0.101.7", "thiserror 1.0.64", "x509-parser 0.16.0", @@ -9000,7 +8991,7 @@ dependencies = [ "quinn-proto 0.11.8", "quinn-udp 0.5.5", "rustc-hash 2.0.0", - "rustls 0.23.13", + "rustls 0.23.18", "socket2 0.5.7", "thiserror 1.0.64", "tokio", @@ -9052,7 +9043,7 @@ dependencies = [ "rand", "ring 0.17.8", "rustc-hash 2.0.0", - "rustls 0.23.13", + "rustls 0.23.18", "slab", "thiserror 1.0.64", "tinyvec", @@ -9615,9 +9606,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.13" +version = "0.23.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2dabaac7466917e566adb06783a81ca48944c6898a1b08b9374106dd671f4c8" +checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" dependencies = [ "log", "once_cell", @@ -9673,9 +9664,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" [[package]] name = "rustls-platform-verifier" @@ -9688,7 +9679,7 @@ dependencies = [ "jni", "log", "once_cell", - "rustls 0.23.13", + "rustls 0.23.18", "rustls-native-certs 0.7.3", "rustls-platform-verifier-android", "rustls-webpki 0.102.8", @@ -12676,6 +12667,7 @@ name = "subspace-gateway" version = "0.1.0" dependencies = [ "anyhow", + "async-lock 3.4.0", "async-trait", "clap", "fdlimit", @@ -12683,7 +12675,6 @@ dependencies = [ "hex", "jsonrpsee", "mimalloc", - "parking_lot 0.12.3", "subspace-core-primitives", "subspace-data-retrieval", "subspace-erasure-coding", @@ -12693,7 +12684,6 @@ dependencies = [ "subspace-rpc-primitives", "subspace-verification", "supports-color", - "thiserror 2.0.0", "tokio", "tracing", "tracing-subscriber", @@ -12809,13 +12799,12 @@ dependencies = [ name = "subspace-networking" version = "0.1.0" dependencies = [ - "async-mutex", + "async-lock 3.4.0", "async-trait", "backoff", "bytes", "clap", "derive_more 1.0.0", - "either", "event-listener-primitives", "fs2", "futures", @@ -13057,6 +13046,7 @@ version = "0.1.0" dependencies = [ "array-bytes", "async-channel 1.9.0", + "async-lock 3.4.0", "async-trait", "cross-domain-message-gossip", "domain-runtime-primitives", @@ -13084,9 +13074,7 @@ dependencies = [ "sc-executor", "sc-informant", "sc-network", - "sc-network-light", "sc-network-sync", - "sc-network-transactions", "sc-offchain", "sc-proof-of-time", "sc-rpc", @@ -13099,7 +13087,6 @@ dependencies = [ "sc-tracing", "sc-transaction-pool", "sc-transaction-pool-api", - "sc-utils", "schnellru", "schnorrkel", "sp-api", @@ -13746,7 +13733,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.13", + "rustls 0.23.18", "rustls-pki-types", "tokio", ] diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 8ffefd987f5..075fabc162d 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -630,18 +630,29 @@ where best_block_to_archive = best_block_number; } - // If the user chooses an object mapping start block we don't have the data for, we can't + // If the user chooses an object mapping start block we don't have data or state for, we can't // create mappings for it, so the node must exit with an error. let best_block_to_archive_hash = client .hash(best_block_to_archive.into())? .expect("just checked above; qed"); - if client.block(best_block_to_archive_hash)?.is_none() { + let Some(best_block_data) = client.block(best_block_to_archive_hash)? else { let error = format!( - "Missing data for mapping block {best_block_to_archive} hash {best_block_to_archive_hash},\ + "Missing data for mapping block {best_block_to_archive} hash {best_block_to_archive_hash}, \ try a higher block number, or wipe your node and restart with `--sync full`" ); return Err(sp_blockchain::Error::Application(error.into())); - } + }; + + // Similarly, state can be pruned, even if the data is present. + client + .runtime_api() + .extract_block_object_mapping(*best_block_data.block.header().parent_hash(), best_block_data.block.clone()) + .map_err(|error| { + sp_blockchain::Error::Application( + format!("Missing state for mapping block {best_block_to_archive} hash {best_block_to_archive_hash}: {error}, \ + try a higher block number, or wipe your node and restart with `--sync full`").into(), + ) + })?; let maybe_last_archived_block = find_last_archived_block( client, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs index 6fa95bc1195..095adc71a6f 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs @@ -3,7 +3,7 @@ use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_BROADCAST_INTERVAL; use crate::commands::shared::derive_libp2p_keypair; use crate::commands::shared::network::{configure_network, NetworkArgs}; use anyhow::anyhow; -use async_lock::RwLock as AsyncRwLock; +use async_lock::{RwLock as AsyncRwLock, Semaphore}; use backoff::ExponentialBackoff; use clap::{Parser, ValueHint}; use futures::stream::FuturesUnordered; @@ -38,6 +38,8 @@ const PIECE_GETTER_MAX_RETRIES: u16 = 7; const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(5); /// Defines max duration between get_piece calls. const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40); +/// Multiplier on top of outgoing connections number for piece downloading purposes +const PIECE_PROVIDER_MULTIPLIER: usize = 10; /// Arguments for controller #[derive(Debug, Parser)] @@ -137,6 +139,7 @@ pub(super) async fn controller( .await .map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?; + let out_connections = network_args.out_connections; let (node, mut node_runner) = { if network_args.bootstrap_nodes.is_empty() { network_args @@ -161,6 +164,7 @@ pub(super) async fn controller( let piece_provider = PieceProvider::new( node.clone(), SegmentCommitmentPieceValidator::new(node.clone(), node_client.clone(), kzg.clone()), + Semaphore::new(out_connections as usize * PIECE_PROVIDER_MULTIPLIER), ); let piece_getter = FarmerPieceGetter::new( diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs index 506c8645830..e05ea9b51ed 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs @@ -2,7 +2,7 @@ use crate::commands::shared::DiskFarm; use anyhow::anyhow; -use async_lock::Mutex as AsyncMutex; +use async_lock::{Mutex as AsyncMutex, Semaphore}; use backoff::ExponentialBackoff; use bytesize::ByteSize; use clap::Parser; @@ -36,7 +36,7 @@ use subspace_farmer::utils::{ use subspace_farmer_components::reading::ReadSectorRecordChunksMode; use subspace_kzg::Kzg; use subspace_proof_of_space::Table; -use tokio::sync::{Barrier, Semaphore}; +use tokio::sync::Barrier; use tracing::{error, info, info_span, warn, Instrument}; const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs index 26927d8262d..d787afffd85 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs @@ -1,6 +1,6 @@ use crate::commands::shared::PlottingThreadPriority; use anyhow::anyhow; -use async_lock::Mutex as AsyncMutex; +use async_lock::{Mutex as AsyncMutex, Semaphore}; use clap::Parser; use prometheus_client::registry::Registry; use std::future::Future; @@ -28,7 +28,6 @@ use subspace_farmer::utils::{ use subspace_farmer_components::PieceGetter; use subspace_kzg::Kzg; use subspace_proof_of_space::Table; -use tokio::sync::Semaphore; use tracing::info; const PLOTTING_RETRY_INTERVAL: Duration = Duration::from_secs(5); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 1379f420255..d8cb3ccf3df 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -2,7 +2,7 @@ use crate::commands::shared::network::{configure_network, NetworkArgs}; use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority}; use crate::utils::shutdown_signal; use anyhow::anyhow; -use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; +use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore}; use backoff::ExponentialBackoff; use bytesize::ByteSize; use clap::{Parser, ValueHint}; @@ -54,7 +54,7 @@ use subspace_kzg::Kzg; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::utils::piece_provider::PieceProvider; use subspace_proof_of_space::Table; -use tokio::sync::{Barrier, Semaphore}; +use tokio::sync::Barrier; use tracing::{error, info, info_span, warn, Instrument}; /// Get piece retry attempts number. @@ -68,6 +68,8 @@ const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40); const MAX_SPACE_PLEDGED_FOR_PLOT_CACHE_ON_WINDOWS: u64 = 7 * 1024 * 1024 * 1024 * 1024; const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30); const PLOTTING_RETRY_INTERVAL: Duration = Duration::from_secs(5); +/// Multiplier on top of outgoing connections number for piece downloading purposes +const PIECE_PROVIDER_MULTIPLIER: usize = 10; type FarmIndex = u8; @@ -431,6 +433,7 @@ where .await .map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?; + let out_connections = network_args.out_connections; let (node, mut node_runner) = { if network_args.bootstrap_nodes.is_empty() { network_args @@ -460,6 +463,7 @@ where let piece_provider = PieceProvider::new( node.clone(), SegmentCommitmentPieceValidator::new(node.clone(), node_client.clone(), kzg.clone()), + Semaphore::new(out_connections as usize * PIECE_PROVIDER_MULTIPLIER), ); let piece_getter = FarmerPieceGetter::new( diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 07d1fd686dc..31143856c83 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -654,7 +654,7 @@ where downloading_pieces_stream // This allows to schedule new batch while previous batches partially completed, but // avoids excessive memory usage like when all futures are created upfront - .buffer_unordered(SYNC_CONCURRENT_BATCHES * 2) + .buffer_unordered(SYNC_CONCURRENT_BATCHES * 10) // Simply drain everything .for_each(|()| async {}) .await; diff --git a/crates/subspace-farmer/src/plotter/cpu.rs b/crates/subspace-farmer/src/plotter/cpu.rs index de534fdf0c9..57194d7efeb 100644 --- a/crates/subspace-farmer/src/plotter/cpu.rs +++ b/crates/subspace-farmer/src/plotter/cpu.rs @@ -6,7 +6,7 @@ use crate::plotter::cpu::metrics::CpuPlotterMetrics; use crate::plotter::{Plotter, SectorPlottingProgress}; use crate::thread_pool_manager::PlottingThreadPoolManager; use crate::utils::AsyncJoinOnDrop; -use async_lock::Mutex as AsyncMutex; +use async_lock::{Mutex as AsyncMutex, Semaphore, SemaphoreGuardArc}; use async_trait::async_trait; use bytes::Bytes; use event_listener_primitives::{Bag, HandlerId}; @@ -35,7 +35,6 @@ use subspace_farmer_components::plotting::{ use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter}; use subspace_kzg::Kzg; use subspace_proof_of_space::Table; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::task::yield_now; use tracing::{warn, Instrument}; @@ -87,7 +86,7 @@ where PosTable: Table, { async fn has_free_capacity(&self) -> Result { - Ok(self.downloading_semaphore.available_permits() > 0) + Ok(self.downloading_semaphore.try_acquire().is_some()) } async fn plot_sector( @@ -97,39 +96,13 @@ where farmer_protocol_info: FarmerProtocolInfo, pieces_in_sector: u16, replotting: bool, - mut progress_sender: mpsc::Sender, + progress_sender: mpsc::Sender, ) { let start = Instant::now(); // Done outside the future below as a backpressure, ensuring that it is not possible to // schedule unbounded number of plotting tasks - let downloading_permit = match Arc::clone(&self.downloading_semaphore) - .acquire_owned() - .await - { - Ok(downloading_permit) => downloading_permit, - Err(error) => { - warn!(%error, "Failed to acquire downloading permit"); - - let progress_updater = ProgressUpdater { - public_key, - sector_index, - handlers: Arc::clone(&self.handlers), - metrics: self.metrics.clone(), - }; - - progress_updater - .update_progress_and_events( - &mut progress_sender, - SectorPlottingProgress::Error { - error: format!("Failed to acquire downloading permit: {error}"), - }, - ) - .await; - - return; - } - }; + let downloading_permit = self.downloading_semaphore.acquire_arc().await; self.plot_sector_internal( start, @@ -155,8 +128,7 @@ where ) -> bool { let start = Instant::now(); - let Ok(downloading_permit) = Arc::clone(&self.downloading_semaphore).try_acquire_owned() - else { + let Some(downloading_permit) = self.downloading_semaphore.try_acquire_arc() else { return false; }; @@ -259,7 +231,7 @@ where async fn plot_sector_internal( &self, start: Instant, - downloading_permit: OwnedSemaphorePermit, + downloading_permit: SemaphoreGuardArc, public_key: PublicKey, sector_index: SectorIndex, farmer_protocol_info: FarmerProtocolInfo, diff --git a/crates/subspace-farmer/src/plotter/gpu.rs b/crates/subspace-farmer/src/plotter/gpu.rs index 07c3d8c89ad..2814ac9e4b6 100644 --- a/crates/subspace-farmer/src/plotter/gpu.rs +++ b/crates/subspace-farmer/src/plotter/gpu.rs @@ -11,7 +11,7 @@ use crate::plotter::gpu::gpu_encoders_manager::GpuRecordsEncoderManager; use crate::plotter::gpu::metrics::GpuPlotterMetrics; use crate::plotter::{Plotter, SectorPlottingProgress}; use crate::utils::AsyncJoinOnDrop; -use async_lock::Mutex as AsyncMutex; +use async_lock::{Mutex as AsyncMutex, Semaphore, SemaphoreGuardArc}; use async_trait::async_trait; use bytes::Bytes; use event_listener_primitives::{Bag, HandlerId}; @@ -37,7 +37,6 @@ use subspace_farmer_components::plotting::{ }; use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter}; use subspace_kzg::Kzg; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::task::yield_now; use tracing::{warn, Instrument}; @@ -97,7 +96,7 @@ where GRE: GpuRecordsEncoder + 'static, { async fn has_free_capacity(&self) -> Result { - Ok(self.downloading_semaphore.available_permits() > 0) + Ok(self.downloading_semaphore.try_acquire().is_some()) } async fn plot_sector( @@ -107,39 +106,13 @@ where farmer_protocol_info: FarmerProtocolInfo, pieces_in_sector: u16, _replotting: bool, - mut progress_sender: mpsc::Sender, + progress_sender: mpsc::Sender, ) { let start = Instant::now(); // Done outside the future below as a backpressure, ensuring that it is not possible to // schedule unbounded number of plotting tasks - let downloading_permit = match Arc::clone(&self.downloading_semaphore) - .acquire_owned() - .await - { - Ok(downloading_permit) => downloading_permit, - Err(error) => { - warn!(%error, "Failed to acquire downloading permit"); - - let progress_updater = ProgressUpdater { - public_key, - sector_index, - handlers: Arc::clone(&self.handlers), - metrics: self.metrics.clone(), - }; - - progress_updater - .update_progress_and_events( - &mut progress_sender, - SectorPlottingProgress::Error { - error: format!("Failed to acquire downloading permit: {error}"), - }, - ) - .await; - - return; - } - }; + let downloading_permit = self.downloading_semaphore.acquire_arc().await; self.plot_sector_internal( start, @@ -164,8 +137,7 @@ where ) -> bool { let start = Instant::now(); - let Ok(downloading_permit) = Arc::clone(&self.downloading_semaphore).try_acquire_owned() - else { + let Some(downloading_permit) = self.downloading_semaphore.try_acquire_arc() else { return false; }; @@ -266,7 +238,7 @@ where async fn plot_sector_internal( &self, start: Instant, - downloading_permit: OwnedSemaphorePermit, + downloading_permit: SemaphoreGuardArc, public_key: PublicKey, sector_index: SectorIndex, farmer_protocol_info: FarmerProtocolInfo, diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 7e74e587f25..9c294f48861 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -40,7 +40,7 @@ use crate::single_disk_farm::plotting::{ use crate::single_disk_farm::reward_signing::reward_signing; use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop}; use crate::{farm, KNOWN_PEERS_CACHE_SIZE}; -use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; +use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore}; use async_trait::async_trait; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::{mpsc, oneshot}; @@ -82,7 +82,7 @@ use subspace_proof_of_space::Table; use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse}; use thiserror::Error; use tokio::runtime::Handle; -use tokio::sync::{broadcast, Barrier, Semaphore}; +use tokio::sync::{broadcast, Barrier}; use tokio::task; use tracing::{debug, error, info, trace, warn, Instrument, Span}; diff --git a/crates/subspace-gateway/Cargo.toml b/crates/subspace-gateway/Cargo.toml index 27e3b74049d..c9dd44b992e 100644 --- a/crates/subspace-gateway/Cargo.toml +++ b/crates/subspace-gateway/Cargo.toml @@ -17,6 +17,7 @@ include = [ targets = ["x86_64-unknown-linux-gnu"] [dependencies] +async-lock = "3.4.0" anyhow = "1.0.89" async-trait = "0.1.83" clap = { version = "4.5.18", features = ["derive"] } @@ -25,7 +26,6 @@ futures = "0.3.31" hex = "0.4.3" jsonrpsee = { version = "0.24.5", features = ["server"] } mimalloc = "0.1.43" -parking_lot = "0.12.2" subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } subspace-data-retrieval = { version = "0.1.0", path = "../../shared/subspace-data-retrieval" } subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" } @@ -35,7 +35,6 @@ subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-rpc-primitives = { version = "0.1.0", path = "../subspace-rpc-primitives" } subspace-verification = { version = "0.1.0", path = "../subspace-verification", default-features = false } supports-color = "3.0.1" -thiserror = "2.0.0" tokio = { version = "1.40.0", features = ["rt-multi-thread", "signal", "macros"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs index 89763055f68..a562b127e1c 100644 --- a/crates/subspace-gateway/src/commands/run.rs +++ b/crates/subspace-gateway/src/commands/run.rs @@ -10,6 +10,7 @@ use crate::commands::shutdown_signal; use crate::piece_getter::DsnPieceGetter; use crate::piece_validator::SegmentCommitmentPieceValidator; use anyhow::anyhow; +use async_lock::Semaphore; use clap::Parser; use futures::{select, FutureExt}; use std::env; @@ -21,10 +22,13 @@ use subspace_data_retrieval::object_fetcher::ObjectFetcher; use subspace_erasure_coding::ErasureCoding; use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig}; use subspace_kzg::Kzg; +use subspace_networking::utils::piece_provider::PieceProvider; use tracing::info; /// The default size limit, based on the maximum block size in some domains. pub const DEFAULT_MAX_SIZE: usize = 5 * 1024 * 1024; +/// Multiplier on top of outgoing connections number for piece downloading purposes +const PIECE_PROVIDER_MULTIPLIER: usize = 10; /// Options for running a node #[derive(Debug, Parser)] @@ -88,15 +92,18 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { ) .map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?; + let out_connections = dsn_options.out_connections; // TODO: move this service code into its own function, in a new library part of this crate let (dsn_node, mut dsn_node_runner, node_client) = configure_network(dsn_options).await?; let dsn_fut = dsn_node_runner.run(); - let piece_getter = DsnPieceGetter::new( + let piece_provider = PieceProvider::new( dsn_node.clone(), SegmentCommitmentPieceValidator::new(dsn_node, node_client, kzg), + Semaphore::new(out_connections as usize * PIECE_PROVIDER_MULTIPLIER), ); - let object_fetcher = ObjectFetcher::new(Arc::new(piece_getter), erasure_coding, Some(max_size)); + let piece_getter = DsnPieceGetter::new(piece_provider); + let object_fetcher = ObjectFetcher::new(piece_getter, erasure_coding, Some(max_size)); let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher }); let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?; diff --git a/crates/subspace-gateway/src/commands/run/network.rs b/crates/subspace-gateway/src/commands/run/network.rs index f8688c00387..f985c59a5ac 100644 --- a/crates/subspace-gateway/src/commands/run/network.rs +++ b/crates/subspace-gateway/src/commands/run/network.rs @@ -37,7 +37,7 @@ pub(crate) struct NetworkArgs { /// Maximum established outgoing swarm connection limit. #[arg(long, default_value_t = 100)] - out_connections: u32, + pub(crate) out_connections: u32, /// Maximum pending outgoing swarm connection limit. #[arg(long, default_value_t = 100)] diff --git a/crates/subspace-gateway/src/piece_getter.rs b/crates/subspace-gateway/src/piece_getter.rs index 0cbf132b612..45e6072460e 100644 --- a/crates/subspace-gateway/src/piece_getter.rs +++ b/crates/subspace-gateway/src/piece_getter.rs @@ -9,7 +9,6 @@ use std::sync::Arc; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use subspace_data_retrieval::piece_getter::ObjectPieceGetter; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator}; -use subspace_networking::Node; /// The maximum number of peer-to-peer walking rounds for L1 archival storage. const MAX_RANDOM_WALK_ROUNDS: usize = 15; diff --git a/crates/subspace-networking/Cargo.toml b/crates/subspace-networking/Cargo.toml index f512556e9d7..06b5d2d39b6 100644 --- a/crates/subspace-networking/Cargo.toml +++ b/crates/subspace-networking/Cargo.toml @@ -16,13 +16,12 @@ include = [ ] [dependencies] -async-mutex = "1.4.0" +async-lock = "3.4.0" async-trait = "0.1.83" backoff = { version = "0.4.0", features = ["futures", "tokio"] } bytes = "1.7.2" clap = { version = "4.5.18", features = ["color", "derive"] } derive_more = { version = "1.0.0", features = ["full"] } -either = "1.13.0" event-listener-primitives = "2.0.1" # TODO: Switch to fs4 once https://github.com/al8n/fs4-rs/issues/15 is resolved fs2 = "0.4.3" diff --git a/crates/subspace-networking/examples/benchmark.rs b/crates/subspace-networking/examples/benchmark.rs index 3f29fd1580d..8b7326dff74 100644 --- a/crates/subspace-networking/examples/benchmark.rs +++ b/crates/subspace-networking/examples/benchmark.rs @@ -1,3 +1,4 @@ +use async_lock::Semaphore; use backoff::future::retry; use backoff::ExponentialBackoff; use clap::Parser; @@ -17,7 +18,6 @@ use subspace_core_primitives::pieces::{Piece, PieceIndex}; use subspace_networking::protocols::request_response::handlers::piece_by_index::PieceByIndexRequestHandler; use subspace_networking::utils::piece_provider::{NoPieceValidator, PieceProvider, PieceValidator}; use subspace_networking::{Config, Node}; -use tokio::sync::Semaphore; use tracing::{debug, error, info, trace, warn, Level}; use tracing_subscriber::fmt::Subscriber; use tracing_subscriber::util::SubscriberInitExt; @@ -214,7 +214,7 @@ async fn simple_benchmark(node: Node, max_pieces: usize, start_with: usize, retr return; } - let piece_provider = PieceProvider::new(node, NoPieceValidator); + let piece_provider = PieceProvider::new(node, NoPieceValidator, Semaphore::new(100)); let mut total_duration = Duration::default(); for i in start_with..(start_with + max_pieces) { let piece_index = PieceIndex::from(i as u64); @@ -266,7 +266,11 @@ async fn parallel_benchmark( let semaphore = &Semaphore::new(parallelism_level.into()); - let piece_provider = &PieceProvider::new(node, NoPieceValidator); + let piece_provider = &PieceProvider::new( + node, + NoPieceValidator, + Semaphore::new(parallelism_level.into()), + ); let mut total_duration = Duration::default(); let mut pure_total_duration = Duration::default(); let mut pending_pieces = (start_with..(start_with + max_pieces)) @@ -277,10 +281,7 @@ async fn parallel_benchmark( async move { let start = Instant::now(); - let permit = semaphore - .acquire() - .await - .expect("Semaphore cannot be closed."); + let permit = semaphore.acquire().await; let semaphore_acquired = Instant::now(); let maybe_piece = get_piece_from_dsn_cache_with_retries( piece_provider, diff --git a/crates/subspace-networking/src/behavior.rs b/crates/subspace-networking/src/behavior.rs index 2666fe924d0..cb7ceac09e3 100644 --- a/crates/subspace-networking/src/behavior.rs +++ b/crates/subspace-networking/src/behavior.rs @@ -42,6 +42,9 @@ pub(crate) struct BehaviorConfig { pub(crate) record_store: RecordStore, /// The configuration for the [`RequestResponsesBehaviour`] protocol. pub(crate) request_response_protocols: Vec>, + /// The upper bound for the number of concurrent inbound + outbound streams for request/response + /// protocols. + pub(crate) request_response_max_concurrent_streams: usize, /// Connection limits for the swarm. pub(crate) connection_limits: ConnectionLimits, /// The configuration for the [`ReservedPeersBehaviour`]. @@ -97,6 +100,7 @@ where ping: Ping::default(), request_response: RequestResponseFactoryBehaviour::new( config.request_response_protocols, + config.request_response_max_concurrent_streams, ) //TODO: Convert to an error. .expect("RequestResponse protocols registration failed."), diff --git a/crates/subspace-networking/src/constructor.rs b/crates/subspace-networking/src/constructor.rs index 5defa205174..70b29f433f0 100644 --- a/crates/subspace-networking/src/constructor.rs +++ b/crates/subspace-networking/src/constructor.rs @@ -61,7 +61,8 @@ const SWARM_MAX_PENDING_INCOMING_CONNECTIONS: u32 = 80; /// The default maximum pending incoming connection number for the swarm. const SWARM_MAX_PENDING_OUTGOING_CONNECTIONS: u32 = 80; const KADEMLIA_QUERY_TIMEOUT: Duration = Duration::from_secs(40); -const SWARM_MAX_ESTABLISHED_CONNECTIONS_PER_PEER: Option = Some(3); +const SWARM_MAX_ESTABLISHED_CONNECTIONS_PER_PEER: u32 = 3; +const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10; // TODO: Consider moving this constant to configuration or removing `Toggle` wrapper when we find a // use-case for gossipsub protocol. const ENABLE_GOSSIP_PROTOCOL: bool = false; @@ -441,7 +442,7 @@ where ); let connection_limits = ConnectionLimits::default() - .with_max_established_per_peer(SWARM_MAX_ESTABLISHED_CONNECTIONS_PER_PEER) + .with_max_established_per_peer(Some(SWARM_MAX_ESTABLISHED_CONNECTIONS_PER_PEER)) .with_max_pending_incoming(Some(max_pending_incoming_connections)) .with_max_pending_outgoing(Some(max_pending_outgoing_connections)) .with_max_established_incoming(Some(max_established_incoming_connections)) @@ -469,6 +470,11 @@ where gossipsub, record_store: LocalOnlyRecordStore::new(local_records_provider), request_response_protocols, + request_response_max_concurrent_streams: { + let max_num_connections = max_established_incoming_connections as usize + + max_established_outgoing_connections as usize; + max_num_connections * MAX_CONCURRENT_STREAMS_PER_CONNECTION + }, connection_limits, reserved_peers: ReservedPeersConfig { reserved_peers: reserved_peers.clone(), diff --git a/crates/subspace-networking/src/lib.rs b/crates/subspace-networking/src/lib.rs index 132d142ea74..b13554da37b 100644 --- a/crates/subspace-networking/src/lib.rs +++ b/crates/subspace-networking/src/lib.rs @@ -16,7 +16,7 @@ //! Networking functionality of Subspace Network, primarily used for DSN (Distributed Storage //! Network). -#![feature(impl_trait_in_assoc_type, ip, try_blocks)] +#![feature(exact_size_is_empty, impl_trait_in_assoc_type, ip, try_blocks)] #![warn(missing_docs)] mod behavior; diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index be429d5a738..8c622e60bd5 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -571,7 +571,7 @@ impl Node { pub async fn connected_peers(&self) -> Result, ConnectedPeersError> { let (result_sender, result_receiver) = oneshot::channel(); - trace!("Starting 'connected_peers' request."); + trace!("Starting `connected_peers` request"); self.shared .command_sender @@ -584,11 +584,28 @@ impl Node { .map_err(|_| ConnectedPeersError::ConnectedPeers) } + /// Returns a collection of currently connected servers (typically farmers). + pub async fn connected_servers(&self) -> Result, ConnectedPeersError> { + let (result_sender, result_receiver) = oneshot::channel(); + + trace!("Starting `connected_servers` request."); + + self.shared + .command_sender + .clone() + .send(Command::ConnectedServers { result_sender }) + .await?; + + result_receiver + .await + .map_err(|_| ConnectedPeersError::ConnectedPeers) + } + /// Bootstraps Kademlia network pub async fn bootstrap(&self) -> Result<(), BootstrapError> { let (result_sender, mut result_receiver) = mpsc::unbounded(); - debug!("Starting 'bootstrap' request."); + debug!("Starting `bootstrap` request"); self.shared .command_sender diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 77bf983b74f..8c85e1e2f82 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -10,7 +10,7 @@ use crate::protocols::request_response::request_response_factory::{ }; use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared}; use crate::utils::{is_global_address_or_dns, strip_peer_id, SubspaceMetrics}; -use async_mutex::Mutex as AsyncMutex; +use async_lock::Mutex as AsyncMutex; use bytes::Bytes; use event_listener_primitives::HandlerId; use futures::channel::mpsc; @@ -108,6 +108,7 @@ where periodical_tasks_interval: Pin>>, /// Manages the networking parameters like known peers and addresses known_peers_registry: Box, + connected_servers: HashSet, /// Defines set of peers with a permanent connection (and reconnection if necessary). reserved_peers: HashMap, /// Temporarily banned peers. @@ -211,6 +212,7 @@ where // We'll make the first dial right away and continue at the interval. periodical_tasks_interval: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()), known_peers_registry, + connected_servers: HashSet::new(), reserved_peers, temporary_bans, libp2p_metrics, @@ -569,6 +571,7 @@ where if num_established == 0 { self.peer_ip_addresses.remove(&peer_id); + self.connected_servers.remove(&peer_id); } let num_established_peer_connections = shared .num_established_peer_connections @@ -831,6 +834,8 @@ where kademlia.remove_address(&peer_id, &old_address); } + + self.connected_servers.insert(peer_id); } else { debug!( %local_peer_id, @@ -841,6 +846,7 @@ where ); kademlia.remove_peer(&peer_id); + self.connected_servers.remove(&peer_id); } } } @@ -1475,6 +1481,11 @@ where let _ = result_sender.send(connected_peers); } + Command::ConnectedServers { result_sender } => { + let connected_servers = self.connected_servers.iter().cloned().collect(); + + let _ = result_sender.send(connected_servers); + } Command::Bootstrap { result_sender } => { let kademlia = &mut self.swarm.behaviour_mut().kademlia; diff --git a/crates/subspace-networking/src/protocols/request_response/request_response_factory.rs b/crates/subspace-networking/src/protocols/request_response/request_response_factory.rs index 9c576158fac..cbac6eb043c 100644 --- a/crates/subspace-networking/src/protocols/request_response/request_response_factory.rs +++ b/crates/subspace-networking/src/protocols/request_response/request_response_factory.rs @@ -323,6 +323,7 @@ impl RequestResponseFactoryBehaviour { /// the same protocol is passed twice. pub fn new( list: impl IntoIterator>, + max_concurrent_streams: usize, ) -> Result { let mut protocols = HashMap::new(); let mut request_handlers = Vec::new(); @@ -341,7 +342,9 @@ impl RequestResponseFactoryBehaviour { max_response_size: config.max_response_size, }, iter::once(StreamProtocol::new(config.name)).zip(iter::repeat(protocol_support)), - RequestResponseConfig::default().with_request_timeout(config.request_timeout), + RequestResponseConfig::default() + .with_request_timeout(config.request_timeout) + .with_max_concurrent_streams(max_concurrent_streams), ); match protocols.entry(Cow::Borrowed(config.name)) { diff --git a/crates/subspace-networking/src/protocols/request_response/request_response_factory/tests.rs b/crates/subspace-networking/src/protocols/request_response/request_response_factory/tests.rs index 1c6a99de642..20916d93fee 100644 --- a/crates/subspace-networking/src/protocols/request_response/request_response_factory/tests.rs +++ b/crates/subspace-networking/src/protocols/request_response/request_response_factory/tests.rs @@ -41,7 +41,7 @@ async fn build_swarm( .into_iter() .map(|config| Box::new(MockRunner(config)) as Box) .collect::>(); - let behaviour = RequestResponseFactoryBehaviour::new(configs).unwrap(); + let behaviour = RequestResponseFactoryBehaviour::new(configs, 100).unwrap(); let mut swarm = SwarmBuilder::with_new_identity() .with_tokio() diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index 8aacbda5259..edde4d85867 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -108,6 +108,9 @@ pub(crate) enum Command { ConnectedPeers { result_sender: oneshot::Sender>, }, + ConnectedServers { + result_sender: oneshot::Sender>, + }, Bootstrap { // No result sender means background async bootstrapping result_sender: Option>, diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index 5c5c877935d..8583ad58a15 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -8,6 +8,7 @@ use crate::protocols::request_response::handlers::piece_by_index::{ }; use crate::utils::multihash::ToMultihash; use crate::{Multihash, Node}; +use async_lock::{Semaphore, SemaphoreGuard}; use async_trait::async_trait; use futures::channel::mpsc; use futures::future::FusedFuture; @@ -18,15 +19,15 @@ use libp2p::kad::store::RecordStore; use libp2p::kad::{store, Behaviour as Kademlia, KBucketKey, ProviderRecord, Record, RecordKey}; use libp2p::swarm::NetworkBehaviour; use libp2p::{Multiaddr, PeerId}; -use parking_lot::Mutex; use rand::prelude::*; use std::any::type_name; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::iter::Empty; +use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::{fmt, iter, mem}; +use std::{fmt, iter}; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use tokio_stream::StreamMap; use tracing::{debug, trace, warn, Instrument}; @@ -59,6 +60,7 @@ impl PieceValidator for NoPieceValidator { pub struct PieceProvider { node: Node, piece_validator: PV, + piece_downloading_semaphore: Semaphore, } impl fmt::Debug for PieceProvider { @@ -74,10 +76,11 @@ where PV: PieceValidator, { /// Creates new piece provider. - pub fn new(node: Node, piece_validator: PV) -> Self { + pub fn new(node: Node, piece_validator: PV, piece_downloading_semaphore: Semaphore) -> Self { Self { node, piece_validator, + piece_downloading_semaphore, } } @@ -91,14 +94,32 @@ where where PieceIndices: IntoIterator + 'a, { + let download_id = random::(); let (tx, mut rx) = mpsc::unbounded(); - let fut = get_from_cache_inner( - piece_indices.into_iter(), - &self.node, - &self.piece_validator, - tx, - ); - let mut fut = Box::pin(fut.fuse()); + let fut = async move { + let not_downloaded_pieces = download_cached_pieces( + piece_indices.into_iter(), + &self.node, + &self.piece_validator, + &tx, + &self.piece_downloading_semaphore, + ) + .await; + + if not_downloaded_pieces.is_empty() { + debug!("Done"); + return; + } + + for piece_index in not_downloaded_pieces { + tx.unbounded_send((piece_index, None)) + .expect("This future isn't polled after receiver is dropped; qed"); + } + + debug!("Done #2"); + }; + + let mut fut = Box::pin(fut.instrument(tracing::info_span!("", %download_id)).fuse()); // Drive above future and stream back any pieces that were downloaded so far stream::poll_fn(move |cx| { @@ -247,9 +268,9 @@ where // TODO: consider using retry policy for L1 lookups as well. trace!(%piece_index, "Getting piece from archival storage.."); - let connected_peers = { - let connected_peers = match self.node.connected_peers().await { - Ok(connected_peers) => connected_peers, + let connected_servers = { + let connected_servers = match self.node.connected_servers().await { + Ok(connected_servers) => connected_servers, Err(err) => { debug!(%piece_index, ?err, "Cannot get connected peers (DSN L1 lookup)"); @@ -257,13 +278,13 @@ where } }; - HashSet::::from_iter(connected_peers) + HashSet::::from_iter(connected_servers) }; - if connected_peers.is_empty() { + if connected_servers.is_empty() { debug!(%piece_index, "Cannot acquire piece from no connected peers (DSN L1 lookup)"); } else { - for peer_id in connected_peers.iter() { + for peer_id in connected_servers.iter() { let maybe_piece = self.get_piece_from_peer(*peer_id, piece_index).await; if maybe_piece.is_some() { @@ -484,58 +505,15 @@ impl KademliaWrapper { } } -async fn get_from_cache_inner( - piece_indices: PieceIndices, - node: &Node, - piece_validator: &PV, - results: mpsc::UnboundedSender<(PieceIndex, Option)>, -) where - PV: PieceValidator, - PieceIndices: Iterator, -{ - let download_id = random::(); - - // TODO: It'd be nice to combine downloading from connected peers with downloading from closest - // peers concurrently - let fut = async move { - // Download from connected peers first - let pieces_to_download = download_cached_pieces_from_connected_peers( - piece_indices, - node, - piece_validator, - &results, - ) - .await; - - if pieces_to_download.is_empty() { - debug!("Done"); - return; - } - - // Download from iteratively closer peers according to Kademlia rules - download_cached_pieces_from_closest_peers( - pieces_to_download, - node, - piece_validator, - &results, - ) - .await; - - debug!("Done #2"); - }; - - fut.instrument(tracing::info_span!("", %download_id)).await; -} - /// Takes pieces to download as an input, sends results with pieces that were downloaded -/// successfully and returns those that were not downloaded from connected peer with addresses of -/// potential candidates -async fn download_cached_pieces_from_connected_peers( +/// successfully and returns those that were not downloaded +async fn download_cached_pieces( piece_indices: PieceIndices, node: &Node, piece_validator: &PV, results: &mpsc::UnboundedSender<(PieceIndex, Option)>, -) -> HashMap>> + semaphore: &Semaphore, +) -> impl ExactSizeIterator where PV: PieceValidator, PieceIndices: Iterator, @@ -546,376 +524,413 @@ where // At the end pieces that were not downloaded will remain with a collection of known closest // peers for them. let mut pieces_to_download = piece_indices - .map(|piece_index| (piece_index, HashMap::new())) - .collect::>>>(); + .map(|piece_index| async move { + let mut kademlia = KademliaWrapper::new(node.id()); + let key = piece_index.to_multihash(); - debug!(num_pieces = %pieces_to_download.len(), "Starting"); + let local_closest_peers = node + .get_closest_local_peers(key, None) + .await + .unwrap_or_default(); - let mut checked_connected_peers = HashSet::new(); + // Seed with local closest peers + for (peer_id, addresses) in local_closest_peers { + kademlia.add_peer(&peer_id, addresses); + } - // The loop is in order to check peers that might be connected after the initial loop has - // started. - loop { - let Ok(connected_peers) = node.connected_peers().await else { - trace!("Connected peers error"); - break; - }; + (piece_index, kademlia) + }) + .collect::>() + .collect::>() + .await; - debug!( - connected_peers = %connected_peers.len(), - pieces_to_download = %pieces_to_download.len(), - "Loop" - ); - if connected_peers.is_empty() || pieces_to_download.is_empty() { - break; - } + let num_pieces = pieces_to_download.len(); + debug!(%num_pieces, "Starting"); - let num_pieces = pieces_to_download.len(); - let step = num_pieces / connected_peers.len().min(num_pieces); + let mut checked_peers = HashSet::new(); - // Dispatch initial set of requests to peers - let mut downloading_stream = connected_peers - .into_iter() - .take(num_pieces) - .enumerate() - .filter_map(|(peer_index, peer_id)| { - if !checked_connected_peers.insert(peer_id) { - return None; - } + let Ok(connected_servers) = node.connected_servers().await else { + trace!("Connected servers error"); + return pieces_to_download.into_keys(); + }; - // Take unique first piece index for each connected peer and the rest just to check - // cached pieces up to recommended limit - let mut peer_piece_indices = pieces_to_download - .keys() - .cycle() - .skip(step * peer_index) - .take(num_pieces.min(CachedPieceByIndexRequest::RECOMMENDED_LIMIT)) - .copied() - .collect::>(); - // Pick first piece index as the piece we want to download - let piece_index = peer_piece_indices.swap_remove(0); - - let fut = download_cached_piece_from_peer( + let num_connected_servers = connected_servers.len(); + debug!( + %num_connected_servers, + %num_pieces, + "Starting downloading" + ); + + // Dispatch initial set of requests to peers with checked pieces distributed uniformly + let mut downloading_stream = connected_servers + .into_iter() + .take(num_pieces) + .enumerate() + .map(|(peer_index, peer_id)| { + checked_peers.insert(peer_id); + + // Inside to avoid division by zero in case there are no connected servers or pieces + let step = num_pieces / num_connected_servers.min(num_pieces); + + // Take unique first piece index for each connected peer and the rest just to check + // cached pieces up to recommended limit + let mut check_cached_pieces = pieces_to_download + .keys() + .cycle() + .skip(step * peer_index) + // + 1 because one index below is removed below + .take(num_pieces.min(CachedPieceByIndexRequest::RECOMMENDED_LIMIT + 1)) + .copied() + .collect::>(); + // Pick first piece index as the piece we want to download + let piece_index = check_cached_pieces.swap_remove(0); + + trace!(%peer_id, %piece_index, "Downloading piece from initially connected peer"); + + let permit = semaphore.try_acquire(); + + let fut = async move { + let permit = match permit { + Some(permit) => permit, + None => semaphore.acquire().await, + }; + + download_cached_piece_from_peer( node, piece_validator, peer_id, Vec::new(), - Arc::new(peer_piece_indices), + Arc::new(check_cached_pieces), piece_index, HashSet::new(), HashSet::new(), - ); + permit, + ) + .await + }; - Some((piece_index, Box::pin(fut.into_stream()))) - }) - .collect::>(); + (piece_index, Box::pin(fut.into_stream()) as _) + }) + .collect::>(); - // Process every response and potentially schedule follow-up request to the same peer - while let Some((piece_index, result)) = downloading_stream.next().await { - let DownloadedPieceFromPeer { - peer_id, - result, - mut cached_pieces, - not_cached_pieces, - } = result; - trace!(%piece_index, %peer_id, result = %result.is_some(), "Piece response"); + loop { + // Process up to 50% of the pieces concurrently + let mut additional_pieces_to_download = + (num_pieces / 2).saturating_sub(downloading_stream.len()); + if additional_pieces_to_download > 0 { + trace!( + %additional_pieces_to_download, + num_pieces, + currently_downloading = %downloading_stream.len(), + "Downloading additional pieces from closest peers" + ); + // Pick up any newly connected peers (if any) + 'outer: for peer_id in node + .connected_servers() + .await + .unwrap_or_default() + .into_iter() + .filter(|peer_id| checked_peers.insert(*peer_id)) + .take(additional_pieces_to_download) + { + let permit = if downloading_stream.is_empty() { + semaphore.acquire().await + } else if let Some(permit) = semaphore.try_acquire() { + permit + } else { + break; + }; - let Some(result) = result else { - // Downloading failed, ignore peer - continue; - }; + for &piece_index in pieces_to_download.keys() { + if downloading_stream.contains_key(&piece_index) { + continue; + } - match result { - PieceResult::Piece(piece) => { - trace!(%piece_index, %peer_id, "Got piece"); + trace!(%peer_id, %piece_index, "Downloading piece from newly connected peer"); - // Downloaded successfully - pieces_to_download.remove(&piece_index); + let check_cached_pieces = sample_cached_piece_indices( + pieces_to_download.keys(), + &HashSet::new(), + &HashSet::new(), + piece_index, + ); + let fut = download_cached_piece_from_peer( + node, + piece_validator, + peer_id, + Vec::new(), + Arc::new(check_cached_pieces), + piece_index, + HashSet::new(), + HashSet::new(), + permit, + ); - results - .unbounded_send((piece_index, Some(piece))) - .expect("This future isn't polled after receiver is dropped; qed"); + downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _); + additional_pieces_to_download -= 1; - if pieces_to_download.is_empty() { - return HashMap::new(); - } + continue 'outer; } - PieceResult::ClosestPeers(closest_peers) => { - trace!(%piece_index, %peer_id, "Got closest peers"); - // Store closer peers in case piece index was not downloaded yet - if let Some(peers) = pieces_to_download.get_mut(&piece_index) { - peers.extend(Vec::from(closest_peers)); - } + break; + } - // No need to ask this peer again if they didn't have the piece we expected, or - // they claimed to have earlier + // Pick up more pieces to download from the closest peers + // Ideally we'd not allocate here, but it is hard to explain to the compiler that + // entries are not removed otherwise + let pieces_indices_to_download = pieces_to_download.keys().copied().collect::>(); + for piece_index in pieces_indices_to_download { + if additional_pieces_to_download == 0 { + break; + } + if downloading_stream.contains_key(&piece_index) { continue; } - } + let permit = if downloading_stream.is_empty() { + semaphore.acquire().await + } else if let Some(permit) = semaphore.try_acquire() { + permit + } else { + break; + }; - let mut maybe_piece_index_to_download_next = None; - // Clear useless entries in cached pieces and find something to download next - cached_pieces.retain(|piece_index| { - // Clear downloaded pieces - if !pieces_to_download.contains_key(piece_index) { - return false; - } + let kbucket_key = KBucketKey::from(piece_index.to_multihash()); + let closest_peers_to_check = pieces_to_download + .get_mut(&piece_index) + .expect("Entries are not removed here; qed") + .closest_peers(&kbucket_key); + for (peer_id, addresses) in closest_peers_to_check { + if !checked_peers.insert(peer_id) { + continue; + } - // Try to pick a piece to download that is not being downloaded already - if maybe_piece_index_to_download_next.is_none() - && !downloading_stream.contains_key(piece_index) - { - maybe_piece_index_to_download_next.replace(*piece_index); - // We'll not need to download it after this attempt - return false; - } + trace!(%peer_id, %piece_index, "Downloading piece from closest peer"); - // Retain everything else - true - }); + let check_cached_pieces = sample_cached_piece_indices( + pieces_to_download.keys(), + &HashSet::new(), + &HashSet::new(), + piece_index, + ); + let fut = download_cached_piece_from_peer( + node, + piece_validator, + peer_id, + addresses, + Arc::new(check_cached_pieces), + piece_index, + HashSet::new(), + HashSet::new(), + permit, + ); - let piece_index_to_download_next = - if let Some(piece_index) = maybe_piece_index_to_download_next { - trace!(%piece_index, %peer_id, "Next piece to download from peer"); - piece_index - } else { - trace!(%peer_id, "Peer doesn't have anything else"); - // Nothing left to do with this peer - continue; - }; + downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _); + additional_pieces_to_download -= 1; + break; + } + } - let fut = download_cached_piece_from_peer( - node, - piece_validator, - peer_id, - Vec::new(), - // Sample more random cached piece indices for connected peer, algorithm can be - // improved, but has to be something simple and this should do it for now - Arc::new( - pieces_to_download - .keys() - // Do a bit of work to filter-out piece indices we already know remote peer - // has or doesn't to decrease burden on them - .filter_map(|piece_index| { - if piece_index == &piece_index_to_download_next - || cached_pieces.contains(piece_index) - || not_cached_pieces.contains(piece_index) - { - None - } else { - Some(*piece_index) - } - }) - .choose_multiple( - &mut thread_rng(), - CachedPieceByIndexRequest::RECOMMENDED_LIMIT, - ), - ), - piece_index_to_download_next, - cached_pieces, - not_cached_pieces, + trace!( + pieces_left = %additional_pieces_to_download, + "Initiated downloading additional pieces from closest peers" ); - downloading_stream.insert(piece_index_to_download_next, Box::pin(fut.into_stream())); } - if pieces_to_download.len() == num_pieces { - debug!(%num_pieces, "Finished downloading from connected peers"); - // Nothing was downloaded, we're done here + let Some((piece_index, result)) = downloading_stream.next().await else { + if !pieces_to_download.is_empty() { + debug!( + %num_pieces, + downloaded = %pieces_to_download.len(), + "Finished downloading early" + ); + // Nothing was downloaded, we're done here + break; + } + break; + }; + process_downloading_result( + piece_index, + result, + &mut pieces_to_download, + &mut downloading_stream, + node, + piece_validator, + results, + ); + + if pieces_to_download.is_empty() { break; } } - pieces_to_download + pieces_to_download.into_keys() } -/// Takes pieces to download with potential peer candidates as an input, sends results with pieces -/// that were downloaded successfully and returns those that were not downloaded -async fn download_cached_pieces_from_closest_peers( - maybe_pieces_to_download: HashMap>>, - node: &Node, - piece_validator: &PV, - results: &mpsc::UnboundedSender<(PieceIndex, Option)>, +fn process_downloading_result<'a, 'b, PV>( + piece_index: PieceIndex, + result: DownloadedPieceFromPeer<'a>, + pieces_to_download: &'b mut HashMap, + downloading_stream: &'b mut StreamMap< + PieceIndex, + Pin> + Send + 'a>>, + >, + node: &'a Node, + piece_validator: &'a PV, + results: &'a mpsc::UnboundedSender<(PieceIndex, Option)>, ) where PV: PieceValidator, { - let kademlia = &Mutex::new(KademliaWrapper::new(node.id())); - // Collection of pieces to download and already connected peers that claim to have them - let connected_peers_with_piece = &Mutex::new( - maybe_pieces_to_download - .keys() - .map(|&piece_index| (piece_index, HashSet::::new())) - .collect::>(), - ); - - let mut downloaded_pieces = maybe_pieces_to_download - .into_iter() - .map(|(piece_index, collected_peers)| async move { - let key = piece_index.to_multihash(); - let kbucket_key = KBucketKey::from(key); - let mut checked_closest_peers = HashSet::::new(); + let DownloadedPieceFromPeer { + peer_id, + result, + mut cached_pieces, + not_cached_pieces, + permit, + } = result; + trace!(%piece_index, %peer_id, result = %result.is_some(), "Piece response"); + + let Some(result) = result else { + // Downloading failed, ignore peer + return; + }; - { - let local_closest_peers = node - .get_closest_local_peers(key, None) - .await - .unwrap_or_default(); - let mut kademlia = kademlia.lock(); + match result { + PieceResult::Piece(piece) => { + trace!(%piece_index, %peer_id, "Got piece"); - for (peer_id, addresses) in collected_peers { - kademlia.add_peer(&peer_id, addresses); - } - for (peer_id, addresses) in local_closest_peers { - kademlia.add_peer(&peer_id, addresses); - } - } + // Downloaded successfully + pieces_to_download.remove(&piece_index); - loop { - // Collect pieces that still need to be downloaded and connected peers that claim to - // have them - let (pieces_to_download, connected_peers) = { - let mut connected_peers_with_piece = connected_peers_with_piece.lock(); - - ( - Arc::new( - connected_peers_with_piece - .keys() - .filter(|&candidate| candidate != &piece_index) - .take(CachedPieceByIndexRequest::RECOMMENDED_LIMIT) - .copied() - .collect::>(), - ), - connected_peers_with_piece - .get_mut(&piece_index) - .map(mem::take) - .unwrap_or_default(), - ) - }; + results + .unbounded_send((piece_index, Some(piece))) + .expect("This future isn't polled after receiver is dropped; qed"); - // Check connected peers that claim to have the piece index first - for peer_id in connected_peers { - let fut = download_cached_piece_from_peer( - node, - piece_validator, - peer_id, - Vec::new(), - Arc::default(), - piece_index, - HashSet::new(), - HashSet::new(), - ); + if pieces_to_download.is_empty() { + return; + } - match fut.await.result { - Some(PieceResult::Piece(piece)) => { - return (piece_index, Some(piece)); - } - Some(PieceResult::ClosestPeers(closest_peers)) => { - let mut kademlia = kademlia.lock(); + cached_pieces.remove(&piece_index); + } + PieceResult::ClosestPeers(closest_peers) => { + trace!(%piece_index, %peer_id, "Got closest peers"); - // Store additional closest peers reported by the peer - for (peer_id, addresses) in Vec::from(closest_peers) { - kademlia.add_peer(&peer_id, addresses); - } - } - None => { - checked_closest_peers.insert(peer_id); - } - } + // Store closer peers in case piece index was not downloaded yet + if let Some(kademlia) = pieces_to_download.get_mut(&piece_index) { + for (peer_id, addresses) in Vec::from(closest_peers) { + kademlia.add_peer(&peer_id, addresses); } + } - // Find the closest peers that were not queried yet - let closest_peers_to_check = kademlia.lock().closest_peers(&kbucket_key); - let closest_peers_to_check = closest_peers_to_check - .filter(|(peer_id, _addresses)| checked_closest_peers.insert(*peer_id)) - .collect::>(); + // No need to ask this peer again if they claimed to have this piece index earlier + if cached_pieces.remove(&piece_index) { + return; + } + } + } - if closest_peers_to_check.is_empty() { - // No new closest peers found, nothing left to do here - break; - } + let mut maybe_piece_index_to_download_next = None; + // Clear useless entries in cached pieces and find something to download next + cached_pieces.retain(|piece_index| { + // Clear downloaded pieces + if !pieces_to_download.contains_key(piece_index) { + return false; + } - for (peer_id, addresses) in closest_peers_to_check { - let fut = download_cached_piece_from_peer( - node, - piece_validator, - peer_id, - addresses, - Arc::clone(&pieces_to_download), - piece_index, - HashSet::new(), - HashSet::new(), - ); + // Try to pick a piece to download that is not being downloaded already + if maybe_piece_index_to_download_next.is_none() + && !downloading_stream.contains_key(piece_index) + { + maybe_piece_index_to_download_next.replace(*piece_index); + // We'll check it later when receiving response + return true; + } - let DownloadedPieceFromPeer { - peer_id: _, - result, - cached_pieces, - not_cached_pieces: _, - } = fut.await; - - if !cached_pieces.is_empty() { - let mut connected_peers_with_piece = connected_peers_with_piece.lock(); - - // Remember that this peer has some pieces that need to be downloaded here - for cached_piece_index in cached_pieces { - if let Some(peers) = - connected_peers_with_piece.get_mut(&cached_piece_index) - { - peers.insert(peer_id); - } - } - } + // Retain everything else + true + }); - match result { - Some(PieceResult::Piece(piece)) => { - return (piece_index, Some(piece)); - } - Some(PieceResult::ClosestPeers(closest_peers)) => { - let mut kademlia = kademlia.lock(); + let piece_index_to_download_next = if let Some(piece_index) = maybe_piece_index_to_download_next + { + trace!(%piece_index, %peer_id, "Next piece to download from peer"); + piece_index + } else { + trace!(%peer_id, "Peer doesn't have anything else"); + // Nothing left to do with this peer + return; + }; - // Store additional closest peers - for (peer_id, addresses) in Vec::from(closest_peers) { - kademlia.add_peer(&peer_id, addresses); - } - } - None => { - checked_closest_peers.insert(peer_id); - } - } - } - } + let fut = download_cached_piece_from_peer( + node, + piece_validator, + peer_id, + Vec::new(), + // Sample more random cached piece indices for connected peer, algorithm can be + // improved, but has to be something simple and this should do it for now + Arc::new(sample_cached_piece_indices( + pieces_to_download.keys(), + &cached_pieces, + ¬_cached_pieces, + piece_index_to_download_next, + )), + piece_index_to_download_next, + cached_pieces, + not_cached_pieces, + permit, + ); + downloading_stream.insert(piece_index_to_download_next, Box::pin(fut.into_stream())); +} - (piece_index, None) +fn sample_cached_piece_indices<'a, I>( + pieces_to_download: I, + cached_pieces: &HashSet, + not_cached_pieces: &HashSet, + piece_index_to_download_next: PieceIndex, +) -> Vec +where + I: Iterator, +{ + pieces_to_download + // Do a bit of work to filter-out piece indices we already know remote peer + // has or doesn't to decrease burden on them + .filter_map(|piece_index| { + if piece_index == &piece_index_to_download_next + || cached_pieces.contains(piece_index) + || not_cached_pieces.contains(piece_index) + { + None + } else { + Some(*piece_index) + } }) - .collect::>(); - - while let Some((piece_index, maybe_piece)) = downloaded_pieces.next().await { - connected_peers_with_piece.lock().remove(&piece_index); - - results - .unbounded_send((piece_index, maybe_piece)) - .expect("This future isn't polled after receiver is dropped; qed"); - } + .choose_multiple( + &mut thread_rng(), + CachedPieceByIndexRequest::RECOMMENDED_LIMIT, + ) } -struct DownloadedPieceFromPeer { +struct DownloadedPieceFromPeer<'a> { peer_id: PeerId, result: Option, cached_pieces: HashSet, not_cached_pieces: HashSet, + permit: SemaphoreGuard<'a>, } +/// `check_cached_pieces` contains a list of pieces for peer to filter-out according to locally +/// caches pieces, `cached_pieces` and `not_cached_pieces` contain piece indices peer claims is +/// known to have or not have already #[allow(clippy::too_many_arguments)] -async fn download_cached_piece_from_peer( - node: &Node, - piece_validator: &PV, +async fn download_cached_piece_from_peer<'a, PV>( + node: &'a Node, + piece_validator: &'a PV, peer_id: PeerId, addresses: Vec, - peer_piece_indices: Arc>, + check_cached_pieces: Arc>, piece_index: PieceIndex, mut cached_pieces: HashSet, mut not_cached_pieces: HashSet, -) -> DownloadedPieceFromPeer + permit: SemaphoreGuard<'a>, +) -> DownloadedPieceFromPeer<'a> where PV: PieceValidator, { @@ -925,7 +940,7 @@ where addresses, CachedPieceByIndexRequest { piece_index, - cached_pieces: peer_piece_indices, + cached_pieces: Arc::clone(&check_cached_pieces), }, ) .await @@ -958,24 +973,29 @@ where }; match result { - Some(result) => DownloadedPieceFromPeer { - peer_id, - result: Some(result.result), - cached_pieces: { - cached_pieces.extend(result.cached_pieces); - cached_pieces - }, - not_cached_pieces, - }, - None => { - not_cached_pieces.insert(piece_index); + Some(result) => { + cached_pieces.extend(result.cached_pieces); + not_cached_pieces.extend( + check_cached_pieces + .iter() + .filter(|piece_index| !cached_pieces.contains(piece_index)) + .copied(), + ); DownloadedPieceFromPeer { peer_id, - result: None, - cached_pieces, + result: Some(result.result), + cached_pieces: { cached_pieces }, not_cached_pieces, + permit, } } + None => DownloadedPieceFromPeer { + peer_id, + result: None, + cached_pieces, + not_cached_pieces, + permit, + }, } } diff --git a/crates/subspace-service/Cargo.toml b/crates/subspace-service/Cargo.toml index 1cb4b260eb2..63008b8ef6b 100644 --- a/crates/subspace-service/Cargo.toml +++ b/crates/subspace-service/Cargo.toml @@ -18,6 +18,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] array-bytes = "6.2.3" async-channel = "1.8.0" +async-lock = "3.4.0" async-trait = "0.1.83" cross-domain-message-gossip = { version = "0.1.0", path = "../../domains/client/cross-domain-message-gossip" } domain-runtime-primitives = { version = "0.1.0", path = "../../domains/primitives/runtime" } @@ -43,9 +44,7 @@ sc-domains = { version = "0.1.0", path = "../sc-domains" } sc-executor = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } sc-informant = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } sc-network = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } -sc-network-light = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } sc-network-sync = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } -sc-network-transactions = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } sc-offchain = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } sc-proof-of-time = { version = "0.1.0", path = "../sc-proof-of-time" } sc-rpc = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } @@ -57,7 +56,6 @@ sc-telemetry = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8 sc-tracing = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } sc-transaction-pool = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } sc-transaction-pool-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } -sc-utils = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } schnellru = "0.2.1" schnorrkel = "0.11.4" sp-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "94a1a8143a89bbe9f938c1939ff68abc1519a305" } diff --git a/crates/subspace-service/src/config.rs b/crates/subspace-service/src/config.rs index 6f6e895f3ff..d9539860429 100644 --- a/crates/subspace-service/src/config.rs +++ b/crates/subspace-service/src/config.rs @@ -287,6 +287,8 @@ pub enum SubspaceNetworking { node: Node, /// Bootstrap nodes used (that can be also sent to the farmer over RPC) bootstrap_nodes: Vec, + /// Sum of incoming and outgoing connection limits + max_connections: u32, }, /// Networking must be instantiated internally Create { diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index f31b8f4be41..e6c77cea76a 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -43,6 +43,7 @@ pub use crate::mmr::sync::mmr_sync; use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator; use crate::sync_from_dsn::snap_sync::snap_sync; use crate::transaction_pool::FullPool; +use async_lock::Semaphore; use core::sync::atomic::{AtomicU32, Ordering}; use cross_domain_message_gossip::xdm_gossip_peers_set_config; use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader}; @@ -148,6 +149,8 @@ const_assert!(std::mem::size_of::() >= std::mem::size_of::()); /// too large to handle const POT_VERIFIER_CACHE_SIZE: u32 = 30_000; const SYNC_TARGET_UPDATE_INTERVAL: Duration = Duration::from_secs(1); +/// Multiplier on top of outgoing connections number for piece downloading purposes +const PIECE_PROVIDER_MULTIPLIER: usize = 10; /// Error type for Subspace service. #[derive(thiserror::Error, Debug)] @@ -775,11 +778,12 @@ where } = other; let offchain_indexing_enabled = config.base.offchain_worker.indexing_enabled; - let (node, bootstrap_nodes) = match config.subspace_networking { + let (node, bootstrap_nodes, max_connections) = match config.subspace_networking { SubspaceNetworking::Reuse { node, bootstrap_nodes, - } => (node, bootstrap_nodes), + max_connections, + } => (node, bootstrap_nodes, max_connections), SubspaceNetworking::Create { config: dsn_config } => { let dsn_protocol_version = hex::encode(client.chain_info().genesis_hash); @@ -789,6 +793,7 @@ where "Setting DSN protocol version..." ); + let out_connections = dsn_config.max_out_connections; let (node, mut node_runner) = create_dsn_instance( dsn_protocol_version, dsn_config.clone(), @@ -822,7 +827,7 @@ where ), ); - (node, dsn_config.bootstrap_nodes) + (node, dsn_config.bootstrap_nodes, out_connections) } }; @@ -1057,6 +1062,7 @@ where subspace_link.kzg().clone(), segment_headers_store.clone(), ), + Semaphore::new(max_connections as usize * PIECE_PROVIDER_MULTIPLIER), )) }); diff --git a/docker/gateway.Dockerfile b/docker/gateway.Dockerfile new file mode 100644 index 00000000000..85f4c62f1c1 --- /dev/null +++ b/docker/gateway.Dockerfile @@ -0,0 +1,109 @@ +# This Dockerfile supports both native building and cross-compilation to x86-64, aarch64 and riscv64 +FROM --platform=$BUILDPLATFORM ubuntu:22.04 + +ARG RUSTC_VERSION=nightly-2024-10-22 +ARG PROFILE=production +ARG RUSTFLAGS +# Incremental compilation here isn't helpful +ENV CARGO_INCREMENTAL=0 +ENV PKG_CONFIG_ALLOW_CROSS=true + +ARG BUILDARCH +ARG TARGETARCH + +WORKDIR /code + +RUN \ + apt-get update && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ + ca-certificates \ + protobuf-compiler \ + curl \ + git \ + llvm \ + clang \ + automake \ + libtool \ + pkg-config \ + make + +RUN \ + if [ $BUILDARCH != "arm64" ] && [ $TARGETARCH = "arm64" ]; then \ + DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ + g++-aarch64-linux-gnu \ + gcc-aarch64-linux-gnu \ + libc6-dev-arm64-cross \ + ; fi + +RUN \ + if [ $BUILDARCH != "riscv64" ] && [ $TARGETARCH = "riscv64" ]; then \ + DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ + g++-riscv64-linux-gnu \ + gcc-riscv64-linux-gnu \ + libc6-dev-riscv64-cross \ + ; fi + +RUN \ + if [ $BUILDARCH != "amd64" ] && [ $TARGETARCH = "amd64" ]; then \ + DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ + g++-x86-64-linux-gnu \ + gcc-x86-64-linux-gnu \ + libc6-dev-amd64-cross \ + ; fi + +RUN \ + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION && \ + /root/.cargo/bin/rustup target add wasm32-unknown-unknown + +COPY Cargo.lock /code/Cargo.lock +COPY Cargo.toml /code/Cargo.toml +COPY rust-toolchain.toml /code/rust-toolchain.toml + +COPY crates /code/crates +COPY domains /code/domains +COPY shared /code/shared +COPY test /code/test + +# Up until this line all Rust images in this repo should be the same to share the same layers + +ARG TARGETVARIANT + +RUN \ + if [ $BUILDARCH != "arm64" ] && [ $TARGETARCH = "arm64" ]; then \ + export RUSTFLAGS="$RUSTFLAGS -C linker=aarch64-linux-gnu-gcc" \ + ; fi && \ + if [ $BUILDARCH != "riscv64" ] && [ $TARGETARCH = "riscv64" ]; then \ + export RUSTFLAGS="$RUSTFLAGS -C linker=riscv64-linux-gnu-gcc" \ + ; fi && \ + if [ $TARGETARCH = "amd64" ] && [ "$RUSTFLAGS" = "" ]; then \ + case "$TARGETVARIANT" in \ + # x86-64-v2 with AES-NI + "v2") export RUSTFLAGS="-C target-cpu=x86-64-v2" ;; \ + # x86-64-v3 with AES-NI + "v3") export RUSTFLAGS="-C target-cpu=x86-64-v3 -C target-feature=+aes" ;; \ + # v4 is compiled for Zen 4+ + "v4") export RUSTFLAGS="-C target-cpu=znver4" ;; \ + # Default build is for Skylake + *) export RUSTFLAGS="-C target-cpu=skylake" ;; \ + esac \ + ; fi && \ + if [ $BUILDARCH != "amd64" ] && [ $TARGETARCH = "amd64" ]; then \ + export RUSTFLAGS="$RUSTFLAGS -C linker=x86_64-linux-gnu-gcc" \ + ; fi && \ + RUSTC_TARGET_ARCH=$(echo $TARGETARCH | sed "s/amd64/x86_64/g" | sed "s/arm64/aarch64/g" | sed "s/riscv64/riscv64gc/g") && \ + /root/.cargo/bin/cargo -Zgitoxide -Zgit build \ + --locked \ + -Z build-std \ + --profile $PROFILE \ + --bin subspace-gateway \ + --target $RUSTC_TARGET_ARCH-unknown-linux-gnu && \ + mv target/*/*/subspace-gateway subspace-gateway && \ + rm -rf target + +FROM ubuntu:22.04 + +COPY --from=0 /code/subspace-gateway /subspace-gateway + +USER nobody:nogroup + +ENTRYPOINT ["/subspace-gateway"] diff --git a/docker/gateway.Dockerfile.dockerignore b/docker/gateway.Dockerfile.dockerignore new file mode 120000 index 00000000000..5a3c9b7d379 --- /dev/null +++ b/docker/gateway.Dockerfile.dockerignore @@ -0,0 +1 @@ +.dockerignore \ No newline at end of file diff --git a/domains/pallets/messenger/src/lib.rs b/domains/pallets/messenger/src/lib.rs index 4c01990382a..183109e87d0 100644 --- a/domains/pallets/messenger/src/lib.rs +++ b/domains/pallets/messenger/src/lib.rs @@ -47,9 +47,9 @@ use sp_runtime::traits::{Extrinsic, Hash}; use sp_runtime::DispatchError; pub(crate) mod verification_errors { - pub(crate) const INVALID_CHANNEL: u8 = 100; - pub(crate) const INVALID_NONCE: u8 = 101; - pub(crate) const NONCE_OVERFLOW: u8 = 102; + pub(crate) const INVALID_CHANNEL: u8 = 200; + pub(crate) const INVALID_NONCE: u8 = 201; + pub(crate) const NONCE_OVERFLOW: u8 = 202; } #[derive(Debug, Encode, Decode, Clone, Eq, PartialEq, TypeInfo, Copy)]