Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement gateway RPC launch and piece getting #3148

Merged
merged 15 commits into from
Oct 22, 2024
Merged
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const SEGMENT_HEADERS_LIMIT: u32 = MAX_SEGMENT_HEADERS_PER_REQUEST as u32;
#[derive(Debug, Parser)]
pub(in super::super) struct NetworkArgs {
/// Multiaddrs of bootstrap nodes to connect to on startup, multiple are supported
#[arg(long)]
#[arg(long = "bootstrap-node")]
pub(in super::super) bootstrap_nodes: Vec<Multiaddr>,
/// Multiaddrs to listen on for subspace networking, for instance `/ip4/0.0.0.0/tcp/0`,
/// multiple are supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where
error!(
%piece_index,
?error,
"Failed tor retrieve segment headers from node"
"Failed to retrieve segment headers from node"
);
return None;
}
Expand Down
5 changes: 5 additions & 0 deletions crates/subspace-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ jsonrpsee = { version = "0.24.5", features = ["server"] }
mimalloc = "0.1.43"
parking_lot = "0.12.2"
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
subspace-data-retrieval = { version = "0.1.0", path = "../../shared/subspace-data-retrieval" }
subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" }
subspace-gateway-rpc = { version = "0.1.0", path = "../subspace-gateway-rpc" }
subspace-kzg = { version = "0.1.0", path = "../../shared/subspace-kzg" }
subspace-networking = { version = "0.1.0", path = "../subspace-networking" }
subspace-rpc-primitives = { version = "0.1.0", path = "../subspace-rpc-primitives" }
subspace-verification = { version = "0.1.0", path = "../subspace-verification", default-features = false }
supports-color = "3.0.1"
thiserror = "1.0.64"
tokio = { version = "1.40.0", features = ["rt-multi-thread", "signal", "macros"] }
Expand Down
57 changes: 44 additions & 13 deletions crates/subspace-gateway/src/commands/run.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,37 @@
//! Gateway run command.
//! This is the primary command for the gateway.

mod dsn;
mod network;
mod rpc;

use crate::commands::run::dsn::NetworkArgs;
use crate::commands::run::network::{configure_network, NetworkArgs};
use crate::commands::run::rpc::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT};
use crate::commands::shutdown_signal;
use crate::piece_getter::DsnPieceGetter;
use crate::piece_validator::SegmentCommitmentPieceValidator;
use anyhow::anyhow;
use clap::Parser;
use futures::{select, FutureExt};
use std::env;
use std::num::NonZeroUsize;
use std::pin::pin;
use std::{env, future};
use subspace_core_primitives::pieces::Record;
use subspace_data_retrieval::object_fetcher::ObjectFetcher;
use subspace_erasure_coding::ErasureCoding;
use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig};
use subspace_kzg::Kzg;
use tracing::info;

/// Options for running a node
#[derive(Debug, Parser)]
pub struct RunOptions {
pub(crate) struct RunOptions {
#[clap(flatten)]
gateway: GatewayOptions,
}

/// Options for running a gateway
#[derive(Debug, Parser)]
pub(super) struct GatewayOptions {
pub(crate) struct GatewayOptions {
/// Enable development mode.
///
/// Implies following flags (unless customized):
Expand All @@ -30,18 +41,24 @@ pub(super) struct GatewayOptions {

#[clap(flatten)]
dsn_options: NetworkArgs,

/// Options for RPC
#[clap(flatten)]
rpc_options: RpcOptions<RPC_DEFAULT_PORT>,
// TODO: maximum object size
}

/// Default run command for gateway
#[expect(clippy::redundant_locals, reason = "code is incomplete")]
pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
let signal = shutdown_signal();

let RunOptions {
gateway: GatewayOptions {
dev,
mut dsn_options,
},
gateway:
GatewayOptions {
dev,
mut dsn_options,
rpc_options,
},
} = run_options;

// Development mode handling is limited to this section
Expand All @@ -55,12 +72,26 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
info!("✌️ version {}", env!("CARGO_PKG_VERSION"));
info!("❤️ by {}", env!("CARGO_PKG_AUTHORS"));

let kzg = Kzg::new();
let erasure_coding = ErasureCoding::new(
NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
.expect("Not zero; qed"),
)
.map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?;

// TODO: move this service code into its own function, in a new library part of this crate
#[expect(unused_variables, reason = "implementation is incomplete")]
let (dsn_node, mut dsn_node_runner, node_client) = dsn::configure_network(dsn_options).await?;
let (dsn_node, mut dsn_node_runner, node_client) = configure_network(dsn_options).await?;
let dsn_fut = dsn_node_runner.run();

let rpc_fut = future::pending::<()>();
let piece_getter = DsnPieceGetter::new(
dsn_node.clone(),
SegmentCommitmentPieceValidator::new(dsn_node, node_client, kzg),
);
let object_fetcher = ObjectFetcher::new(piece_getter, erasure_coding, None);

let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher });
let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?;
let rpc_fut = rpc_handle.stopped();

// This defines order in which things are dropped
let dsn_fut = dsn_fut;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,25 @@ pub(crate) struct NetworkArgs {
/// Multiaddrs of DSN bootstrap nodes to connect to on startup, multiple are supported.
///
/// The default bootstrap nodes are fetched from the node RPC connection.
#[arg(long)]
pub(crate) dsn_bootstrap_nodes: Vec<Multiaddr>,
#[arg(long = "bootstrap-node")]
bootstrap_nodes: Vec<Multiaddr>,

/// Multiaddrs of DSN reserved nodes to maintain a connection to, multiple are supported.
#[arg(long)]
dsn_reserved_peers: Vec<Multiaddr>,
#[arg(long = "reserved-peer")]
reserved_peers: Vec<Multiaddr>,

/// Enable non-global (private, shared, loopback..) addresses in the Kademlia DHT.
/// By default these addresses are excluded from the DHT.
/// By default, these addresses are excluded from the DHT.
#[arg(long, default_value_t = false)]
pub(crate) allow_private_ips: bool,

/// Maximum established outgoing swarm connection limit.
#[arg(long, default_value_t = 100)]
pub(crate) out_connections: u32,
out_connections: u32,

/// Maximum pending outgoing swarm connection limit.
#[arg(long, default_value_t = 100)]
pub(crate) pending_out_connections: u32,
pending_out_connections: u32,
}

/// Create a DSN network client with the supplied configuration.
Expand All @@ -49,15 +49,14 @@ pub(crate) struct NetworkArgs {
pub async fn configure_network(
NetworkArgs {
node_rpc_url,
mut dsn_bootstrap_nodes,
dsn_reserved_peers,
mut bootstrap_nodes,
reserved_peers,
allow_private_ips,
out_connections,
pending_out_connections,
}: NetworkArgs,
) -> anyhow::Result<(Node, NodeRunner<()>, RpcNodeClient)> {
// TODO:
// - store keypair on disk and allow CLI override
// - cache known peers on disk
// - prometheus telemetry
let default_config = Config::<()>::default();
Expand All @@ -71,21 +70,24 @@ pub async fn configure_network(
let farmer_app_info = node_client
.farmer_app_info()
.await
.map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?;
.map_err(|error| anyhow!("Failed to get gateway app info: {error}"))?;

// Fall back to the node's bootstrap nodes.
if dsn_bootstrap_nodes.is_empty() {
debug!(dsn_bootstrap_nodes = ?farmer_app_info.dsn_bootstrap_nodes, "Setting DSN bootstrap nodes...");
dsn_bootstrap_nodes.clone_from(&farmer_app_info.dsn_bootstrap_nodes);
if bootstrap_nodes.is_empty() {
debug!(
bootstrap_nodes = ?farmer_app_info.dsn_bootstrap_nodes,
"Setting DSN bootstrap nodes..."
);
bootstrap_nodes.clone_from(&farmer_app_info.dsn_bootstrap_nodes);
}

let dsn_protocol_version = hex::encode(farmer_app_info.genesis_hash);
debug!(?dsn_protocol_version, "Setting DSN protocol version...");

let config = Config {
protocol_version: dsn_protocol_version,
bootstrap_addresses: dsn_bootstrap_nodes,
reserved_peers: dsn_reserved_peers,
bootstrap_addresses: bootstrap_nodes,
reserved_peers,
allow_non_global_addresses_in_dht: allow_private_ips,
max_established_outgoing_connections: out_connections,
max_pending_outgoing_connections: pending_out_connections,
Expand Down
43 changes: 43 additions & 0 deletions crates/subspace-gateway/src/commands/run/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//! RPC service configuration and launch.

use clap::Parser;
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcApiServer};
use tracing::info;

/// The default gateway RPC port.
pub const RPC_DEFAULT_PORT: u16 = 9955;

/// Options for the RPC server.
#[derive(Debug, Parser)]
pub(crate) struct RpcOptions<const DEFAULT_PORT: u16> {
/// IP and port (TCP) on which to listen for RPC requests.
///
/// This RPC method is not safe to be exposed on a public IP address.
#[arg(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
DEFAULT_PORT,
))]
rpc_listen_on: SocketAddr,
}

/// Launch the RPC server `api` with the provided `options`.
// TODO:
// - add an argument for a custom tokio runtime
// - move this RPC code into a new library part of this crate
// - make a RPC config that is independent of clap
pub async fn launch_rpc_server<const P: u16>(
rpc_api: SubspaceGatewayRpc,
rpc_options: RpcOptions<P>,
) -> anyhow::Result<ServerHandle> {
let server = ServerBuilder::default()
.build(rpc_options.rpc_listen_on)
.await?;
let addr = server.local_addr()?;
let server_handle = server.start(rpc_api.into_rpc());

info!(?addr, "Running JSON-RPC server");

Ok(server_handle)
}
17 changes: 2 additions & 15 deletions crates/subspace-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,15 @@

mod commands;
mod node_client;
mod piece_getter;
mod piece_validator;

use crate::commands::{init_logger, raise_fd_limit, Command};
use clap::Parser;

#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

/// Subspace gateway error.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Other kind of error.
#[error("Other: {0}")]
Other(String),
}

impl From<String> for Error {
#[inline]
fn from(s: String) -> Self {
Self::Other(s)
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
init_logger();
Expand Down
1 change: 0 additions & 1 deletion crates/subspace-gateway/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub trait NodeClient: fmt::Debug + Send + Sync + 'static {
async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo>;

/// Get segment headers for the segments
#[expect(dead_code, reason = "implementation is incomplete")]
async fn segment_headers(
&self,
segment_indices: Vec<SegmentIndex>,
Expand Down
77 changes: 77 additions & 0 deletions crates/subspace-gateway/src/piece_getter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! An object piece getter which uses the DSN to fetch pieces.

use async_trait::async_trait;
use futures::stream::StreamExt;
use std::fmt;
use std::ops::{Deref, DerefMut};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_data_retrieval::piece_getter::{BoxError, ObjectPieceGetter};
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
use subspace_networking::Node;

/// Wrapper type for PieceProvider, so it can implement ObjectPieceGetter.
pub struct DsnPieceGetter<PV: PieceValidator>(pub PieceProvider<PV>);

impl<PV> fmt::Debug for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("DsnPieceGetter")
.field(&format!("{:?}", self.0))
.finish()
}
}

impl<PV> Deref for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
type Target = PieceProvider<PV>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<PV> DerefMut for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

// TODO:
// - change ObjectPieceGetter trait to take a list of piece indexes
// - move this piece getter impl into a new library part of this crate
#[async_trait]
impl<PV> ObjectPieceGetter for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
async fn get_piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, BoxError> {
if let Some((got_piece_index, maybe_piece)) =
self.get_from_cache([piece_index]).await.next().await
{
assert_eq!(piece_index, got_piece_index);

if let Some(piece) = maybe_piece {
return Ok(Some(piece));
}
}

Ok(None)
}
}

impl<PV> DsnPieceGetter<PV>
where
PV: PieceValidator,
{
/// Creates new DSN piece getter.
pub fn new(node: Node, piece_validator: PV) -> Self {
Self(PieceProvider::new(node, piece_validator))
}
}
Loading
Loading