From 2e98629290cb57c6e546702dcd3d6082cfc6bf49 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 18 Oct 2024 11:16:43 +1000 Subject: [PATCH 01/12] Basic gateway RPC and run implementation --- Cargo.lock | 3 ++ crates/subspace-gateway/Cargo.toml | 3 ++ crates/subspace-gateway/src/commands/run.rs | 44 ++++++++++++++---- .../subspace-gateway/src/commands/run/dsn.rs | 6 +-- .../subspace-gateway/src/commands/run/rpc.rs | 46 +++++++++++++++++++ 5 files changed, 91 insertions(+), 11 deletions(-) create mode 100644 crates/subspace-gateway/src/commands/run/rpc.rs diff --git a/Cargo.lock b/Cargo.lock index e85c32b938..032ff2f2bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12589,6 +12589,9 @@ dependencies = [ "mimalloc", "parking_lot 0.12.3", "subspace-core-primitives", + "subspace-data-retrieval", + "subspace-erasure-coding", + "subspace-gateway-rpc", "subspace-networking", "subspace-rpc-primitives", "supports-color", diff --git a/crates/subspace-gateway/Cargo.toml b/crates/subspace-gateway/Cargo.toml index d94ecc3234..2592e10e3f 100644 --- a/crates/subspace-gateway/Cargo.toml +++ b/crates/subspace-gateway/Cargo.toml @@ -27,6 +27,9 @@ jsonrpsee = { version = "0.24.5", features = ["server"] } mimalloc = "0.1.43" parking_lot = "0.12.2" subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } +subspace-data-retrieval = { version = "0.1.0", path = "../../shared/subspace-data-retrieval" } +subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" } +subspace-gateway-rpc = { version = "0.1.0", path = "../subspace-gateway-rpc" } subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-rpc-primitives = { version = "0.1.0", path = "../subspace-rpc-primitives" } supports-color = "3.0.1" diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs index 9d39d053a5..837a582b06 100644 --- a/crates/subspace-gateway/src/commands/run.rs +++ b/crates/subspace-gateway/src/commands/run.rs @@ -2,25 +2,34 @@ //! This is the primary command for the gateway. mod dsn; +mod rpc; use crate::commands::run::dsn::NetworkArgs; +use crate::commands::run::rpc::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT}; use crate::commands::shutdown_signal; +use anyhow::anyhow; use clap::Parser; use futures::{select, FutureExt}; +use std::env; +use std::num::NonZeroUsize; use std::pin::pin; -use std::{env, future}; +use subspace_core_primitives::pieces::Record; +use subspace_data_retrieval::object_fetcher::ObjectFetcher; +use subspace_erasure_coding::ErasureCoding; +use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig}; +use subspace_networking::utils::piece_provider::{NoPieceValidator, PieceProvider}; use tracing::info; /// Options for running a node #[derive(Debug, Parser)] -pub struct RunOptions { +pub(crate) struct RunOptions { #[clap(flatten)] gateway: GatewayOptions, } /// Options for running a gateway #[derive(Debug, Parser)] -pub(super) struct GatewayOptions { +pub(crate) struct GatewayOptions { /// Enable development mode. /// /// Implies following flags (unless customized): @@ -30,6 +39,11 @@ pub(super) struct GatewayOptions { #[clap(flatten)] dsn_options: NetworkArgs, + + /// Options for RPC + #[clap(flatten)] + rpc_options: RpcOptions, + // TODO: maximum object size } /// Default run command for gateway @@ -38,10 +52,12 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { let signal = shutdown_signal(); let RunOptions { - gateway: GatewayOptions { - dev, - mut dsn_options, - }, + gateway: + GatewayOptions { + dev, + mut dsn_options, + rpc_options, + }, } = run_options; // Development mode handling is limited to this section @@ -55,12 +71,24 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { info!("✌️ version {}", env!("CARGO_PKG_VERSION")); info!("❤️ by {}", env!("CARGO_PKG_AUTHORS")); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?; + // TODO: move this service code into its own function, in a new library part of this crate #[expect(unused_variables, reason = "implementation is incomplete")] let (dsn_node, mut dsn_node_runner, node_client) = dsn::configure_network(dsn_options).await?; let dsn_fut = dsn_node_runner.run(); - let rpc_fut = future::pending::<()>(); + // TODO: implement piece validation + let piece_provider = PieceProvider::new(dsn_node, NoPieceValidator); + let object_fetcher = ObjectFetcher::new(piece_provider, erasure_coding, None); + + let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher }); + let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?; + let rpc_fut = rpc_handle.stopped(); // This defines order in which things are dropped let dsn_fut = dsn_fut; diff --git a/crates/subspace-gateway/src/commands/run/dsn.rs b/crates/subspace-gateway/src/commands/run/dsn.rs index 14deb3ca8c..4c3b46e40b 100644 --- a/crates/subspace-gateway/src/commands/run/dsn.rs +++ b/crates/subspace-gateway/src/commands/run/dsn.rs @@ -22,7 +22,7 @@ pub(crate) struct NetworkArgs { /// /// The default bootstrap nodes are fetched from the node RPC connection. #[arg(long)] - pub(crate) dsn_bootstrap_nodes: Vec, + dsn_bootstrap_nodes: Vec, /// Multiaddrs of DSN reserved nodes to maintain a connection to, multiple are supported. #[arg(long)] @@ -35,11 +35,11 @@ pub(crate) struct NetworkArgs { /// Maximum established outgoing swarm connection limit. #[arg(long, default_value_t = 100)] - pub(crate) out_connections: u32, + out_connections: u32, /// Maximum pending outgoing swarm connection limit. #[arg(long, default_value_t = 100)] - pub(crate) pending_out_connections: u32, + pending_out_connections: u32, } /// Create a DSN network client with the supplied configuration. diff --git a/crates/subspace-gateway/src/commands/run/rpc.rs b/crates/subspace-gateway/src/commands/run/rpc.rs new file mode 100644 index 0000000000..60f5b4e7db --- /dev/null +++ b/crates/subspace-gateway/src/commands/run/rpc.rs @@ -0,0 +1,46 @@ +//! RPC service configuration and launch. + +use clap::Parser; +use jsonrpsee::server::{ServerBuilder, ServerHandle}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcApiServer}; + +/// The default gateway RPC port. +pub const RPC_DEFAULT_PORT: u16 = 9955; + +/// Options for the RPC server. +#[derive(Debug, Parser)] +pub(crate) struct RpcOptions { + /// IP and port (TCP) on which to listen for RPC requests. + /// + /// Note: not all RPC methods are safe to be exposed publicly. Use an RPC proxy server to filter out + /// dangerous methods. + /// More details: . + #[arg(long, default_value_t = SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + DEFAULT_PORT, + ))] + rpc_listen_on: SocketAddr, + // TODO: + // - Disable unsafe methods on public IP addresses + // - Maximum number of connections + // - RPC rate limiting, IPs/range that get rate-limit exceptions, trust proxy headers + // - CORS + // - Message buffer capacity + // - Batch requests & max batch request length +} + +/// Launch the RPC server `api` with the provided `options`. +// TODO: +// - add an argument for a custom tokio runtime +pub async fn launch_rpc_server( + rpc_api: SubspaceGatewayRpc, + rpc_options: RpcOptions

, +) -> anyhow::Result { + let server = ServerBuilder::default() + .build(rpc_options.rpc_listen_on) + .await?; + let server_handle = server.start(rpc_api.into_rpc()); + + Ok(server_handle) +} From 86c68290c2047b1b8460071d768ef8b668d866b2 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 18 Oct 2024 16:41:25 +1000 Subject: [PATCH 02/12] Add TODOs for a library part of this crate --- crates/subspace-gateway/src/commands/run/rpc.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/subspace-gateway/src/commands/run/rpc.rs b/crates/subspace-gateway/src/commands/run/rpc.rs index 60f5b4e7db..277027d84c 100644 --- a/crates/subspace-gateway/src/commands/run/rpc.rs +++ b/crates/subspace-gateway/src/commands/run/rpc.rs @@ -33,6 +33,8 @@ pub(crate) struct RpcOptions { /// Launch the RPC server `api` with the provided `options`. // TODO: // - add an argument for a custom tokio runtime +// - move this RPC code into a new library part of this crate +// - make a RPC config that is independent of clap pub async fn launch_rpc_server( rpc_api: SubspaceGatewayRpc, rpc_options: RpcOptions

, From 561834c951ae03ac073c6aa6bd93df8290afcec8 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 18 Oct 2024 17:08:10 +1000 Subject: [PATCH 03/12] Implement DSN ObjectPieceGetter --- crates/subspace-gateway/src/commands/run.rs | 8 +-- crates/subspace-gateway/src/main.rs | 1 + crates/subspace-gateway/src/piece_getter.rs | 77 +++++++++++++++++++++ 3 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 crates/subspace-gateway/src/piece_getter.rs diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs index 837a582b06..3c5b9f243a 100644 --- a/crates/subspace-gateway/src/commands/run.rs +++ b/crates/subspace-gateway/src/commands/run.rs @@ -7,6 +7,7 @@ mod rpc; use crate::commands::run::dsn::NetworkArgs; use crate::commands::run::rpc::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT}; use crate::commands::shutdown_signal; +use crate::piece_getter::DsnPieceGetter; use anyhow::anyhow; use clap::Parser; use futures::{select, FutureExt}; @@ -17,7 +18,7 @@ use subspace_core_primitives::pieces::Record; use subspace_data_retrieval::object_fetcher::ObjectFetcher; use subspace_erasure_coding::ErasureCoding; use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig}; -use subspace_networking::utils::piece_provider::{NoPieceValidator, PieceProvider}; +use subspace_networking::utils::piece_provider::NoPieceValidator; use tracing::info; /// Options for running a node @@ -47,7 +48,6 @@ pub(crate) struct GatewayOptions { } /// Default run command for gateway -#[expect(clippy::redundant_locals, reason = "code is incomplete")] pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { let signal = shutdown_signal(); @@ -83,8 +83,8 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { let dsn_fut = dsn_node_runner.run(); // TODO: implement piece validation - let piece_provider = PieceProvider::new(dsn_node, NoPieceValidator); - let object_fetcher = ObjectFetcher::new(piece_provider, erasure_coding, None); + let piece_getter = DsnPieceGetter::new(dsn_node, NoPieceValidator); + let object_fetcher = ObjectFetcher::new(piece_getter, erasure_coding, None); let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher }); let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?; diff --git a/crates/subspace-gateway/src/main.rs b/crates/subspace-gateway/src/main.rs index 072b27399e..bbefea523e 100644 --- a/crates/subspace-gateway/src/main.rs +++ b/crates/subspace-gateway/src/main.rs @@ -8,6 +8,7 @@ mod commands; mod node_client; +mod piece_getter; use crate::commands::{init_logger, raise_fd_limit, Command}; use clap::Parser; diff --git a/crates/subspace-gateway/src/piece_getter.rs b/crates/subspace-gateway/src/piece_getter.rs new file mode 100644 index 0000000000..ddf6cf86ef --- /dev/null +++ b/crates/subspace-gateway/src/piece_getter.rs @@ -0,0 +1,77 @@ +//! An object piece getter which uses the DSN to fetch pieces. + +use async_trait::async_trait; +use futures::stream::StreamExt; +use std::fmt; +use std::ops::{Deref, DerefMut}; +use subspace_core_primitives::pieces::{Piece, PieceIndex}; +use subspace_data_retrieval::piece_getter::{BoxError, ObjectPieceGetter}; +use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator}; +use subspace_networking::Node; + +/// Wrapper type for PieceProvider, so it can implement ObjectPieceGetter. +pub struct DsnPieceGetter(pub PieceProvider); + +impl fmt::Debug for DsnPieceGetter +where + PV: PieceValidator, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("DsnPieceGetter") + .field(&format!("{:?}", self.0)) + .finish() + } +} + +impl Deref for DsnPieceGetter +where + PV: PieceValidator, +{ + type Target = PieceProvider; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for DsnPieceGetter +where + PV: PieceValidator, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +// TODO: +// - change ObjectPieceGetter trait to take a list of piece indexes +// - move this piece getter impl into a new library part of this crate +#[async_trait] +impl ObjectPieceGetter for DsnPieceGetter +where + PV: PieceValidator, +{ + async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + if let Some((got_piece_index, maybe_piece)) = + self.get_from_cache([piece_index]).await.next().await + { + assert_eq!(piece_index, got_piece_index); + + if let Some(piece) = maybe_piece { + return Ok(Some(piece)); + } + } + + Ok(None) + } +} + +impl DsnPieceGetter +where + PV: PieceValidator, +{ + /// Creates new DSN piece getter. + pub fn new(node: Node, piece_validator: PV) -> Self { + Self(PieceProvider::new(node, piece_validator)) + } +} From fa38758f4e9171c1cde1dd1d07f80f6f7d80f56f Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 18 Oct 2024 17:08:46 +1000 Subject: [PATCH 04/12] Improve PieceValidator debugging --- crates/subspace-networking/src/utils/piece_provider.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index 76e134d982..c4508ba9e8 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -20,6 +20,7 @@ use libp2p::swarm::NetworkBehaviour; use libp2p::{Multiaddr, PeerId}; use parking_lot::Mutex; use rand::prelude::*; +use std::any::type_name; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::iter::Empty; @@ -63,7 +64,8 @@ pub struct PieceProvider { impl fmt::Debug for PieceProvider { #[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PieceProvider").finish_non_exhaustive() + f.debug_struct(&format!("PieceProvider<{}>", type_name::())) + .finish_non_exhaustive() } } From 423d07d15f1bb6b2cd3c1026a1308aa5a9641bf3 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 18 Oct 2024 17:29:58 +1000 Subject: [PATCH 05/12] Log RPC server address --- crates/subspace-gateway/src/commands/run/rpc.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/subspace-gateway/src/commands/run/rpc.rs b/crates/subspace-gateway/src/commands/run/rpc.rs index 277027d84c..075bd0ac51 100644 --- a/crates/subspace-gateway/src/commands/run/rpc.rs +++ b/crates/subspace-gateway/src/commands/run/rpc.rs @@ -4,6 +4,7 @@ use clap::Parser; use jsonrpsee::server::{ServerBuilder, ServerHandle}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcApiServer}; +use tracing::info; /// The default gateway RPC port. pub const RPC_DEFAULT_PORT: u16 = 9955; @@ -42,7 +43,10 @@ pub async fn launch_rpc_server( let server = ServerBuilder::default() .build(rpc_options.rpc_listen_on) .await?; + let addr = server.local_addr()?; let server_handle = server.start(rpc_api.into_rpc()); + info!(?addr, "Running JSON-RPC server"); + Ok(server_handle) } From 3b3cdbe584d57601883d5890f5515be00c43a9e3 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 18 Oct 2024 17:40:26 +1000 Subject: [PATCH 06/12] Fix confusing piece getter error name --- shared/subspace-data-retrieval/src/object_fetcher.rs | 4 ++-- shared/subspace-data-retrieval/src/piece_fetcher.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index ebfb575497..4968c86739 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -128,7 +128,7 @@ pub enum Error { /// Piece getter couldn't find the piece #[error("Piece {piece_index:?} was not found by piece getter")] - PieceGetterNotFound { piece_index: PieceIndex }, + PieceNotFound { piece_index: PieceIndex }, } /// Object fetcher for the Subspace DSN. @@ -596,7 +596,7 @@ impl ObjectFetcher { "Piece not found during object assembling" ); - Err(Error::PieceGetterNotFound { + Err(Error::PieceNotFound { piece_index: mapping_piece_index, })? } diff --git a/shared/subspace-data-retrieval/src/piece_fetcher.rs b/shared/subspace-data-retrieval/src/piece_fetcher.rs index 1984af93e0..14f6e823ae 100644 --- a/shared/subspace-data-retrieval/src/piece_fetcher.rs +++ b/shared/subspace-data-retrieval/src/piece_fetcher.rs @@ -57,7 +57,7 @@ where } Ok(None) => { trace!(?piece_index, "Piece not found"); - Err(Error::PieceGetterNotFound { + Err(Error::PieceNotFound { piece_index: *piece_index, } .into()) From 754530c85bb136258f2dcea249846b9dbf9bbdd5 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 21 Oct 2024 10:27:33 +1000 Subject: [PATCH 07/12] Implement piece validation --- Cargo.lock | 2 + crates/subspace-gateway/Cargo.toml | 2 + crates/subspace-gateway/src/commands/run.rs | 10 +- crates/subspace-gateway/src/main.rs | 1 + crates/subspace-gateway/src/node_client.rs | 1 - .../subspace-gateway/src/piece_validator.rs | 100 ++++++++++++++++++ 6 files changed, 112 insertions(+), 4 deletions(-) create mode 100644 crates/subspace-gateway/src/piece_validator.rs diff --git a/Cargo.lock b/Cargo.lock index 032ff2f2bd..903dd7a19f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12592,8 +12592,10 @@ dependencies = [ "subspace-data-retrieval", "subspace-erasure-coding", "subspace-gateway-rpc", + "subspace-kzg", "subspace-networking", "subspace-rpc-primitives", + "subspace-verification", "supports-color", "thiserror", "tokio", diff --git a/crates/subspace-gateway/Cargo.toml b/crates/subspace-gateway/Cargo.toml index 2592e10e3f..c186a0c439 100644 --- a/crates/subspace-gateway/Cargo.toml +++ b/crates/subspace-gateway/Cargo.toml @@ -30,8 +30,10 @@ subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primiti subspace-data-retrieval = { version = "0.1.0", path = "../../shared/subspace-data-retrieval" } subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" } subspace-gateway-rpc = { version = "0.1.0", path = "../subspace-gateway-rpc" } +subspace-kzg = { version = "0.1.0", path = "../../shared/subspace-kzg" } subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-rpc-primitives = { version = "0.1.0", path = "../subspace-rpc-primitives" } +subspace-verification = { version = "0.1.0", path = "../subspace-verification", default-features = false } supports-color = "3.0.1" thiserror = "1.0.64" tokio = { version = "1.40.0", features = ["macros"] } diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs index 3c5b9f243a..36146bae8d 100644 --- a/crates/subspace-gateway/src/commands/run.rs +++ b/crates/subspace-gateway/src/commands/run.rs @@ -8,6 +8,7 @@ use crate::commands::run::dsn::NetworkArgs; use crate::commands::run::rpc::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT}; use crate::commands::shutdown_signal; use crate::piece_getter::DsnPieceGetter; +use crate::piece_validator::SegmentCommitmentPieceValidator; use anyhow::anyhow; use clap::Parser; use futures::{select, FutureExt}; @@ -18,7 +19,7 @@ use subspace_core_primitives::pieces::Record; use subspace_data_retrieval::object_fetcher::ObjectFetcher; use subspace_erasure_coding::ErasureCoding; use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig}; -use subspace_networking::utils::piece_provider::NoPieceValidator; +use subspace_kzg::Kzg; use tracing::info; /// Options for running a node @@ -71,6 +72,7 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { info!("✌️ version {}", env!("CARGO_PKG_VERSION")); info!("❤️ by {}", env!("CARGO_PKG_AUTHORS")); + let kzg = Kzg::new(); let erasure_coding = ErasureCoding::new( NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) .expect("Not zero; qed"), @@ -78,12 +80,14 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { .map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?; // TODO: move this service code into its own function, in a new library part of this crate - #[expect(unused_variables, reason = "implementation is incomplete")] let (dsn_node, mut dsn_node_runner, node_client) = dsn::configure_network(dsn_options).await?; let dsn_fut = dsn_node_runner.run(); // TODO: implement piece validation - let piece_getter = DsnPieceGetter::new(dsn_node, NoPieceValidator); + let piece_getter = DsnPieceGetter::new( + dsn_node.clone(), + SegmentCommitmentPieceValidator::new(dsn_node, node_client, kzg), + ); let object_fetcher = ObjectFetcher::new(piece_getter, erasure_coding, None); let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher }); diff --git a/crates/subspace-gateway/src/main.rs b/crates/subspace-gateway/src/main.rs index bbefea523e..2b7d4f4d22 100644 --- a/crates/subspace-gateway/src/main.rs +++ b/crates/subspace-gateway/src/main.rs @@ -9,6 +9,7 @@ mod commands; mod node_client; mod piece_getter; +mod piece_validator; use crate::commands::{init_logger, raise_fd_limit, Command}; use clap::Parser; diff --git a/crates/subspace-gateway/src/node_client.rs b/crates/subspace-gateway/src/node_client.rs index ab2c5ea096..28f3fa45af 100644 --- a/crates/subspace-gateway/src/node_client.rs +++ b/crates/subspace-gateway/src/node_client.rs @@ -33,7 +33,6 @@ pub trait NodeClient: fmt::Debug + Send + Sync + 'static { async fn farmer_app_info(&self) -> anyhow::Result; /// Get segment headers for the segments - #[expect(dead_code, reason = "implementation is incomplete")] async fn segment_headers( &self, segment_indices: Vec, diff --git a/crates/subspace-gateway/src/piece_validator.rs b/crates/subspace-gateway/src/piece_validator.rs new file mode 100644 index 0000000000..b9944104e6 --- /dev/null +++ b/crates/subspace-gateway/src/piece_validator.rs @@ -0,0 +1,100 @@ +//! Gateway-specific validator for pieces retrieved from the network. + +use crate::node_client::NodeClient; +use async_trait::async_trait; +use subspace_core_primitives::pieces::{Piece, PieceIndex}; +use subspace_kzg::Kzg; +use subspace_networking::libp2p::PeerId; +use subspace_networking::utils::piece_provider::PieceValidator; +use subspace_networking::Node; +use subspace_verification::is_piece_valid; +use tracing::{error, warn}; + +/// Gateway-specific validator for pieces retrieved from the network. +/// +/// Implements [`PieceValidator`]. +#[derive(Debug, Clone)] +pub struct SegmentCommitmentPieceValidator { + dsn_node: Node, + node_client: NC, + kzg: Kzg, +} + +impl SegmentCommitmentPieceValidator { + /// Create new instance + pub fn new(dsn_node: Node, node_client: NC, kzg: Kzg) -> Self { + Self { + dsn_node, + node_client, + kzg, + } + } +} + +// TODO: unify with the farmer's SegmentCommitmentPieceValidator, based on a common NodeClient trait. +#[async_trait] +impl PieceValidator for SegmentCommitmentPieceValidator +where + NC: NodeClient, +{ + async fn validate_piece( + &self, + source_peer_id: PeerId, + piece_index: PieceIndex, + piece: Piece, + ) -> Option { + if source_peer_id == self.dsn_node.id() { + return Some(piece); + } + + let segment_index = piece_index.segment_index(); + + let segment_headers = match self.node_client.segment_headers(vec![segment_index]).await { + Ok(segment_headers) => segment_headers, + Err(error) => { + error!( + %piece_index, + ?error, + "Failed to retrieve segment headers from node" + ); + return None; + } + }; + + let segment_commitment = match segment_headers.into_iter().next().flatten() { + Some(segment_header) => segment_header.segment_commitment(), + None => { + error!( + %piece_index, + %segment_index, + "Segment commitment for segment index wasn't found on node" + ); + return None; + } + }; + + let is_valid_fut = tokio::task::spawn_blocking({ + let kzg = self.kzg.clone(); + + move || { + is_piece_valid(&kzg, &piece, &segment_commitment, piece_index.position()) + .then_some(piece) + } + }); + + match is_valid_fut.await.unwrap_or_default() { + Some(piece) => Some(piece), + None => { + warn!( + %piece_index, + %source_peer_id, + "Received invalid piece from peer" + ); + + // We don't care about the result here + let _ = self.dsn_node.ban_peer(source_peer_id).await; + None + } + } + } +} From a58d52c295a02f77162366dc60ba73f811f68432 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 21 Oct 2024 10:27:50 +1000 Subject: [PATCH 08/12] Fix a typo in farmer piece validation --- .../subspace-farmer/src/farmer_piece_getter/piece_validator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/subspace-farmer/src/farmer_piece_getter/piece_validator.rs b/crates/subspace-farmer/src/farmer_piece_getter/piece_validator.rs index 7427827955..8d45a7ffc1 100644 --- a/crates/subspace-farmer/src/farmer_piece_getter/piece_validator.rs +++ b/crates/subspace-farmer/src/farmer_piece_getter/piece_validator.rs @@ -54,7 +54,7 @@ where error!( %piece_index, ?error, - "Failed tor retrieve segment headers from node" + "Failed to retrieve segment headers from node" ); return None; } From 769fcaaf663f0781da30feffa0b2a5c2624af33e Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 22 Oct 2024 07:21:41 +1000 Subject: [PATCH 09/12] Remove unnecessary TODOs --- crates/subspace-gateway/src/commands/run.rs | 1 - crates/subspace-gateway/src/commands/run/dsn.rs | 1 - crates/subspace-gateway/src/commands/run/rpc.rs | 11 +---------- 3 files changed, 1 insertion(+), 12 deletions(-) diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs index 36146bae8d..bae44098a3 100644 --- a/crates/subspace-gateway/src/commands/run.rs +++ b/crates/subspace-gateway/src/commands/run.rs @@ -83,7 +83,6 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { let (dsn_node, mut dsn_node_runner, node_client) = dsn::configure_network(dsn_options).await?; let dsn_fut = dsn_node_runner.run(); - // TODO: implement piece validation let piece_getter = DsnPieceGetter::new( dsn_node.clone(), SegmentCommitmentPieceValidator::new(dsn_node, node_client, kzg), diff --git a/crates/subspace-gateway/src/commands/run/dsn.rs b/crates/subspace-gateway/src/commands/run/dsn.rs index a036b5b43f..e46b43ad13 100644 --- a/crates/subspace-gateway/src/commands/run/dsn.rs +++ b/crates/subspace-gateway/src/commands/run/dsn.rs @@ -57,7 +57,6 @@ pub async fn configure_network( }: NetworkArgs, ) -> anyhow::Result<(Node, NodeRunner<()>, RpcNodeClient)> { // TODO: - // - store keypair on disk and allow CLI override // - cache known peers on disk // - prometheus telemetry let default_config = Config::<()>::default(); diff --git a/crates/subspace-gateway/src/commands/run/rpc.rs b/crates/subspace-gateway/src/commands/run/rpc.rs index 075bd0ac51..d5a65131cd 100644 --- a/crates/subspace-gateway/src/commands/run/rpc.rs +++ b/crates/subspace-gateway/src/commands/run/rpc.rs @@ -14,21 +14,12 @@ pub const RPC_DEFAULT_PORT: u16 = 9955; pub(crate) struct RpcOptions { /// IP and port (TCP) on which to listen for RPC requests. /// - /// Note: not all RPC methods are safe to be exposed publicly. Use an RPC proxy server to filter out - /// dangerous methods. - /// More details: . + /// This RPC method is not safe to be exposed on a public IP address. #[arg(long, default_value_t = SocketAddr::new( IpAddr::V4(Ipv4Addr::LOCALHOST), DEFAULT_PORT, ))] rpc_listen_on: SocketAddr, - // TODO: - // - Disable unsafe methods on public IP addresses - // - Maximum number of connections - // - RPC rate limiting, IPs/range that get rate-limit exceptions, trust proxy headers - // - CORS - // - Message buffer capacity - // - Batch requests & max batch request length } /// Launch the RPC server `api` with the provided `options`. From 4f264f077ff7d369a34520f3d1da721893023611 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 22 Oct 2024 07:25:30 +1000 Subject: [PATCH 10/12] Remove unused Error type --- crates/subspace-gateway/src/main.rs | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/crates/subspace-gateway/src/main.rs b/crates/subspace-gateway/src/main.rs index 2b7d4f4d22..0bf8d9c907 100644 --- a/crates/subspace-gateway/src/main.rs +++ b/crates/subspace-gateway/src/main.rs @@ -17,21 +17,6 @@ use clap::Parser; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; -/// Subspace gateway error. -#[derive(thiserror::Error, Debug)] -pub enum Error { - /// Other kind of error. - #[error("Other: {0}")] - Other(String), -} - -impl From for Error { - #[inline] - fn from(s: String) -> Self { - Self::Other(s) - } -} - #[tokio::main] async fn main() -> anyhow::Result<()> { init_logger(); From 5b1f33063773a71113b65b9a6682e1e28085bffd Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 22 Oct 2024 07:27:46 +1000 Subject: [PATCH 11/12] Tweak log wording --- crates/subspace-gateway/src/commands/run/dsn.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/subspace-gateway/src/commands/run/dsn.rs b/crates/subspace-gateway/src/commands/run/dsn.rs index e46b43ad13..d61f01502b 100644 --- a/crates/subspace-gateway/src/commands/run/dsn.rs +++ b/crates/subspace-gateway/src/commands/run/dsn.rs @@ -70,12 +70,12 @@ pub async fn configure_network( let farmer_app_info = node_client .farmer_app_info() .await - .map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?; + .map_err(|error| anyhow!("Failed to get gateway app info: {error}"))?; // Fall back to the node's bootstrap nodes. if bootstrap_nodes.is_empty() { debug!( - dsn_bootstrap_nodes = ?farmer_app_info.dsn_bootstrap_nodes, + bootstrap_nodes = ?farmer_app_info.dsn_bootstrap_nodes, "Setting DSN bootstrap nodes..." ); bootstrap_nodes.clone_from(&farmer_app_info.dsn_bootstrap_nodes); From ff4f4e7757c23dc3b542b9a048d3c86d7929fac9 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 22 Oct 2024 07:29:10 +1000 Subject: [PATCH 12/12] Rename dsn module to network --- crates/subspace-gateway/src/commands/run.rs | 6 +++--- .../src/commands/run/{dsn.rs => network.rs} | 0 2 files changed, 3 insertions(+), 3 deletions(-) rename crates/subspace-gateway/src/commands/run/{dsn.rs => network.rs} (100%) diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs index bae44098a3..fab26671e4 100644 --- a/crates/subspace-gateway/src/commands/run.rs +++ b/crates/subspace-gateway/src/commands/run.rs @@ -1,10 +1,10 @@ //! Gateway run command. //! This is the primary command for the gateway. -mod dsn; +mod network; mod rpc; -use crate::commands::run::dsn::NetworkArgs; +use crate::commands::run::network::{configure_network, NetworkArgs}; use crate::commands::run::rpc::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT}; use crate::commands::shutdown_signal; use crate::piece_getter::DsnPieceGetter; @@ -80,7 +80,7 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { .map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?; // TODO: move this service code into its own function, in a new library part of this crate - let (dsn_node, mut dsn_node_runner, node_client) = dsn::configure_network(dsn_options).await?; + let (dsn_node, mut dsn_node_runner, node_client) = configure_network(dsn_options).await?; let dsn_fut = dsn_node_runner.run(); let piece_getter = DsnPieceGetter::new( diff --git a/crates/subspace-gateway/src/commands/run/dsn.rs b/crates/subspace-gateway/src/commands/run/network.rs similarity index 100% rename from crates/subspace-gateway/src/commands/run/dsn.rs rename to crates/subspace-gateway/src/commands/run/network.rs