diff --git a/Cargo.lock b/Cargo.lock index 5b10a69d60..53b3c329e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12634,8 +12634,13 @@ dependencies = [ "mimalloc", "parking_lot 0.12.3", "subspace-core-primitives", + "subspace-data-retrieval", + "subspace-erasure-coding", + "subspace-gateway-rpc", + "subspace-kzg", "subspace-networking", "subspace-rpc-primitives", + "subspace-verification", "supports-color", "thiserror", "tokio", diff --git a/crates/subspace-farmer/src/farmer_piece_getter/piece_validator.rs b/crates/subspace-farmer/src/farmer_piece_getter/piece_validator.rs index 7427827955..8d45a7ffc1 100644 --- a/crates/subspace-farmer/src/farmer_piece_getter/piece_validator.rs +++ b/crates/subspace-farmer/src/farmer_piece_getter/piece_validator.rs @@ -54,7 +54,7 @@ where error!( %piece_index, ?error, - "Failed tor retrieve segment headers from node" + "Failed to retrieve segment headers from node" ); return None; } diff --git a/crates/subspace-gateway/Cargo.toml b/crates/subspace-gateway/Cargo.toml index 675a9968f2..e272074658 100644 --- a/crates/subspace-gateway/Cargo.toml +++ b/crates/subspace-gateway/Cargo.toml @@ -27,8 +27,13 @@ jsonrpsee = { version = "0.24.5", features = ["server"] } mimalloc = "0.1.43" parking_lot = "0.12.2" subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } +subspace-data-retrieval = { version = "0.1.0", path = "../../shared/subspace-data-retrieval" } +subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" } +subspace-gateway-rpc = { version = "0.1.0", path = "../subspace-gateway-rpc" } +subspace-kzg = { version = "0.1.0", path = "../../shared/subspace-kzg" } subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-rpc-primitives = { version = "0.1.0", path = "../subspace-rpc-primitives" } +subspace-verification = { version = "0.1.0", path = "../subspace-verification", default-features = false } supports-color = "3.0.1" thiserror = "1.0.64" tokio = { version = "1.40.0", features = ["rt-multi-thread", "signal", "macros"] } diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs index 9d39d053a5..fab26671e4 100644 --- a/crates/subspace-gateway/src/commands/run.rs +++ b/crates/subspace-gateway/src/commands/run.rs @@ -1,26 +1,37 @@ //! Gateway run command. //! This is the primary command for the gateway. -mod dsn; +mod network; +mod rpc; -use crate::commands::run::dsn::NetworkArgs; +use crate::commands::run::network::{configure_network, NetworkArgs}; +use crate::commands::run::rpc::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT}; use crate::commands::shutdown_signal; +use crate::piece_getter::DsnPieceGetter; +use crate::piece_validator::SegmentCommitmentPieceValidator; +use anyhow::anyhow; use clap::Parser; use futures::{select, FutureExt}; +use std::env; +use std::num::NonZeroUsize; use std::pin::pin; -use std::{env, future}; +use subspace_core_primitives::pieces::Record; +use subspace_data_retrieval::object_fetcher::ObjectFetcher; +use subspace_erasure_coding::ErasureCoding; +use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig}; +use subspace_kzg::Kzg; use tracing::info; /// Options for running a node #[derive(Debug, Parser)] -pub struct RunOptions { +pub(crate) struct RunOptions { #[clap(flatten)] gateway: GatewayOptions, } /// Options for running a gateway #[derive(Debug, Parser)] -pub(super) struct GatewayOptions { +pub(crate) struct GatewayOptions { /// Enable development mode. /// /// Implies following flags (unless customized): @@ -30,18 +41,24 @@ pub(super) struct GatewayOptions { #[clap(flatten)] dsn_options: NetworkArgs, + + /// Options for RPC + #[clap(flatten)] + rpc_options: RpcOptions, + // TODO: maximum object size } /// Default run command for gateway -#[expect(clippy::redundant_locals, reason = "code is incomplete")] pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { let signal = shutdown_signal(); let RunOptions { - gateway: GatewayOptions { - dev, - mut dsn_options, - }, + gateway: + GatewayOptions { + dev, + mut dsn_options, + rpc_options, + }, } = run_options; // Development mode handling is limited to this section @@ -55,12 +72,26 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { info!("✌️ version {}", env!("CARGO_PKG_VERSION")); info!("❤️ by {}", env!("CARGO_PKG_AUTHORS")); + let kzg = Kzg::new(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?; + // TODO: move this service code into its own function, in a new library part of this crate - #[expect(unused_variables, reason = "implementation is incomplete")] - let (dsn_node, mut dsn_node_runner, node_client) = dsn::configure_network(dsn_options).await?; + let (dsn_node, mut dsn_node_runner, node_client) = configure_network(dsn_options).await?; let dsn_fut = dsn_node_runner.run(); - let rpc_fut = future::pending::<()>(); + let piece_getter = DsnPieceGetter::new( + dsn_node.clone(), + SegmentCommitmentPieceValidator::new(dsn_node, node_client, kzg), + ); + let object_fetcher = ObjectFetcher::new(piece_getter, erasure_coding, None); + + let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher }); + let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?; + let rpc_fut = rpc_handle.stopped(); // This defines order in which things are dropped let dsn_fut = dsn_fut; diff --git a/crates/subspace-gateway/src/commands/run/dsn.rs b/crates/subspace-gateway/src/commands/run/network.rs similarity index 94% rename from crates/subspace-gateway/src/commands/run/dsn.rs rename to crates/subspace-gateway/src/commands/run/network.rs index a036b5b43f..d61f01502b 100644 --- a/crates/subspace-gateway/src/commands/run/dsn.rs +++ b/crates/subspace-gateway/src/commands/run/network.rs @@ -57,7 +57,6 @@ pub async fn configure_network( }: NetworkArgs, ) -> anyhow::Result<(Node, NodeRunner<()>, RpcNodeClient)> { // TODO: - // - store keypair on disk and allow CLI override // - cache known peers on disk // - prometheus telemetry let default_config = Config::<()>::default(); @@ -71,12 +70,12 @@ pub async fn configure_network( let farmer_app_info = node_client .farmer_app_info() .await - .map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?; + .map_err(|error| anyhow!("Failed to get gateway app info: {error}"))?; // Fall back to the node's bootstrap nodes. if bootstrap_nodes.is_empty() { debug!( - dsn_bootstrap_nodes = ?farmer_app_info.dsn_bootstrap_nodes, + bootstrap_nodes = ?farmer_app_info.dsn_bootstrap_nodes, "Setting DSN bootstrap nodes..." ); bootstrap_nodes.clone_from(&farmer_app_info.dsn_bootstrap_nodes); diff --git a/crates/subspace-gateway/src/commands/run/rpc.rs b/crates/subspace-gateway/src/commands/run/rpc.rs new file mode 100644 index 0000000000..d5a65131cd --- /dev/null +++ b/crates/subspace-gateway/src/commands/run/rpc.rs @@ -0,0 +1,43 @@ +//! RPC service configuration and launch. + +use clap::Parser; +use jsonrpsee::server::{ServerBuilder, ServerHandle}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcApiServer}; +use tracing::info; + +/// The default gateway RPC port. +pub const RPC_DEFAULT_PORT: u16 = 9955; + +/// Options for the RPC server. +#[derive(Debug, Parser)] +pub(crate) struct RpcOptions { + /// IP and port (TCP) on which to listen for RPC requests. + /// + /// This RPC method is not safe to be exposed on a public IP address. + #[arg(long, default_value_t = SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + DEFAULT_PORT, + ))] + rpc_listen_on: SocketAddr, +} + +/// Launch the RPC server `api` with the provided `options`. +// TODO: +// - add an argument for a custom tokio runtime +// - move this RPC code into a new library part of this crate +// - make a RPC config that is independent of clap +pub async fn launch_rpc_server( + rpc_api: SubspaceGatewayRpc, + rpc_options: RpcOptions

, +) -> anyhow::Result { + let server = ServerBuilder::default() + .build(rpc_options.rpc_listen_on) + .await?; + let addr = server.local_addr()?; + let server_handle = server.start(rpc_api.into_rpc()); + + info!(?addr, "Running JSON-RPC server"); + + Ok(server_handle) +} diff --git a/crates/subspace-gateway/src/main.rs b/crates/subspace-gateway/src/main.rs index 072b27399e..0bf8d9c907 100644 --- a/crates/subspace-gateway/src/main.rs +++ b/crates/subspace-gateway/src/main.rs @@ -8,6 +8,8 @@ mod commands; mod node_client; +mod piece_getter; +mod piece_validator; use crate::commands::{init_logger, raise_fd_limit, Command}; use clap::Parser; @@ -15,21 +17,6 @@ use clap::Parser; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; -/// Subspace gateway error. -#[derive(thiserror::Error, Debug)] -pub enum Error { - /// Other kind of error. - #[error("Other: {0}")] - Other(String), -} - -impl From for Error { - #[inline] - fn from(s: String) -> Self { - Self::Other(s) - } -} - #[tokio::main] async fn main() -> anyhow::Result<()> { init_logger(); diff --git a/crates/subspace-gateway/src/node_client.rs b/crates/subspace-gateway/src/node_client.rs index ab2c5ea096..28f3fa45af 100644 --- a/crates/subspace-gateway/src/node_client.rs +++ b/crates/subspace-gateway/src/node_client.rs @@ -33,7 +33,6 @@ pub trait NodeClient: fmt::Debug + Send + Sync + 'static { async fn farmer_app_info(&self) -> anyhow::Result; /// Get segment headers for the segments - #[expect(dead_code, reason = "implementation is incomplete")] async fn segment_headers( &self, segment_indices: Vec, diff --git a/crates/subspace-gateway/src/piece_getter.rs b/crates/subspace-gateway/src/piece_getter.rs new file mode 100644 index 0000000000..ddf6cf86ef --- /dev/null +++ b/crates/subspace-gateway/src/piece_getter.rs @@ -0,0 +1,77 @@ +//! An object piece getter which uses the DSN to fetch pieces. + +use async_trait::async_trait; +use futures::stream::StreamExt; +use std::fmt; +use std::ops::{Deref, DerefMut}; +use subspace_core_primitives::pieces::{Piece, PieceIndex}; +use subspace_data_retrieval::piece_getter::{BoxError, ObjectPieceGetter}; +use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator}; +use subspace_networking::Node; + +/// Wrapper type for PieceProvider, so it can implement ObjectPieceGetter. +pub struct DsnPieceGetter(pub PieceProvider); + +impl fmt::Debug for DsnPieceGetter +where + PV: PieceValidator, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("DsnPieceGetter") + .field(&format!("{:?}", self.0)) + .finish() + } +} + +impl Deref for DsnPieceGetter +where + PV: PieceValidator, +{ + type Target = PieceProvider; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for DsnPieceGetter +where + PV: PieceValidator, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +// TODO: +// - change ObjectPieceGetter trait to take a list of piece indexes +// - move this piece getter impl into a new library part of this crate +#[async_trait] +impl ObjectPieceGetter for DsnPieceGetter +where + PV: PieceValidator, +{ + async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + if let Some((got_piece_index, maybe_piece)) = + self.get_from_cache([piece_index]).await.next().await + { + assert_eq!(piece_index, got_piece_index); + + if let Some(piece) = maybe_piece { + return Ok(Some(piece)); + } + } + + Ok(None) + } +} + +impl DsnPieceGetter +where + PV: PieceValidator, +{ + /// Creates new DSN piece getter. + pub fn new(node: Node, piece_validator: PV) -> Self { + Self(PieceProvider::new(node, piece_validator)) + } +} diff --git a/crates/subspace-gateway/src/piece_validator.rs b/crates/subspace-gateway/src/piece_validator.rs new file mode 100644 index 0000000000..b9944104e6 --- /dev/null +++ b/crates/subspace-gateway/src/piece_validator.rs @@ -0,0 +1,100 @@ +//! Gateway-specific validator for pieces retrieved from the network. + +use crate::node_client::NodeClient; +use async_trait::async_trait; +use subspace_core_primitives::pieces::{Piece, PieceIndex}; +use subspace_kzg::Kzg; +use subspace_networking::libp2p::PeerId; +use subspace_networking::utils::piece_provider::PieceValidator; +use subspace_networking::Node; +use subspace_verification::is_piece_valid; +use tracing::{error, warn}; + +/// Gateway-specific validator for pieces retrieved from the network. +/// +/// Implements [`PieceValidator`]. +#[derive(Debug, Clone)] +pub struct SegmentCommitmentPieceValidator { + dsn_node: Node, + node_client: NC, + kzg: Kzg, +} + +impl SegmentCommitmentPieceValidator { + /// Create new instance + pub fn new(dsn_node: Node, node_client: NC, kzg: Kzg) -> Self { + Self { + dsn_node, + node_client, + kzg, + } + } +} + +// TODO: unify with the farmer's SegmentCommitmentPieceValidator, based on a common NodeClient trait. +#[async_trait] +impl PieceValidator for SegmentCommitmentPieceValidator +where + NC: NodeClient, +{ + async fn validate_piece( + &self, + source_peer_id: PeerId, + piece_index: PieceIndex, + piece: Piece, + ) -> Option { + if source_peer_id == self.dsn_node.id() { + return Some(piece); + } + + let segment_index = piece_index.segment_index(); + + let segment_headers = match self.node_client.segment_headers(vec![segment_index]).await { + Ok(segment_headers) => segment_headers, + Err(error) => { + error!( + %piece_index, + ?error, + "Failed to retrieve segment headers from node" + ); + return None; + } + }; + + let segment_commitment = match segment_headers.into_iter().next().flatten() { + Some(segment_header) => segment_header.segment_commitment(), + None => { + error!( + %piece_index, + %segment_index, + "Segment commitment for segment index wasn't found on node" + ); + return None; + } + }; + + let is_valid_fut = tokio::task::spawn_blocking({ + let kzg = self.kzg.clone(); + + move || { + is_piece_valid(&kzg, &piece, &segment_commitment, piece_index.position()) + .then_some(piece) + } + }); + + match is_valid_fut.await.unwrap_or_default() { + Some(piece) => Some(piece), + None => { + warn!( + %piece_index, + %source_peer_id, + "Received invalid piece from peer" + ); + + // We don't care about the result here + let _ = self.dsn_node.ban_peer(source_peer_id).await; + None + } + } + } +} diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index bd20905d99..cab3a11728 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -20,6 +20,7 @@ use libp2p::swarm::NetworkBehaviour; use libp2p::{Multiaddr, PeerId}; use parking_lot::Mutex; use rand::prelude::*; +use std::any::type_name; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::iter::Empty; @@ -63,7 +64,8 @@ pub struct PieceProvider { impl fmt::Debug for PieceProvider { #[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PieceProvider").finish_non_exhaustive() + f.debug_struct(&format!("PieceProvider<{}>", type_name::())) + .finish_non_exhaustive() } } diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index ebfb575497..4968c86739 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -128,7 +128,7 @@ pub enum Error { /// Piece getter couldn't find the piece #[error("Piece {piece_index:?} was not found by piece getter")] - PieceGetterNotFound { piece_index: PieceIndex }, + PieceNotFound { piece_index: PieceIndex }, } /// Object fetcher for the Subspace DSN. @@ -596,7 +596,7 @@ impl ObjectFetcher { "Piece not found during object assembling" ); - Err(Error::PieceGetterNotFound { + Err(Error::PieceNotFound { piece_index: mapping_piece_index, })? } diff --git a/shared/subspace-data-retrieval/src/piece_fetcher.rs b/shared/subspace-data-retrieval/src/piece_fetcher.rs index 1984af93e0..14f6e823ae 100644 --- a/shared/subspace-data-retrieval/src/piece_fetcher.rs +++ b/shared/subspace-data-retrieval/src/piece_fetcher.rs @@ -57,7 +57,7 @@ where } Ok(None) => { trace!(?piece_index, "Piece not found"); - Err(Error::PieceGetterNotFound { + Err(Error::PieceNotFound { piece_index: *piece_index, } .into())