Skip to content

Commit

Permalink
Merge pull request #3148 from autonomys/gateway-fetch
Browse files Browse the repository at this point in the history
Implement gateway RPC launch and piece getting
  • Loading branch information
nazar-pc authored Oct 22, 2024
2 parents bd5bbf1 + ff4f4e7 commit 6f4ae0f
Show file tree
Hide file tree
Showing 13 changed files with 285 additions and 37 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where
error!(
%piece_index,
?error,
"Failed tor retrieve segment headers from node"
"Failed to retrieve segment headers from node"
);
return None;
}
Expand Down
5 changes: 5 additions & 0 deletions crates/subspace-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ jsonrpsee = { version = "0.24.5", features = ["server"] }
mimalloc = "0.1.43"
parking_lot = "0.12.2"
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
subspace-data-retrieval = { version = "0.1.0", path = "../../shared/subspace-data-retrieval" }
subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" }
subspace-gateway-rpc = { version = "0.1.0", path = "../subspace-gateway-rpc" }
subspace-kzg = { version = "0.1.0", path = "../../shared/subspace-kzg" }
subspace-networking = { version = "0.1.0", path = "../subspace-networking" }
subspace-rpc-primitives = { version = "0.1.0", path = "../subspace-rpc-primitives" }
subspace-verification = { version = "0.1.0", path = "../subspace-verification", default-features = false }
supports-color = "3.0.1"
thiserror = "1.0.64"
tokio = { version = "1.40.0", features = ["rt-multi-thread", "signal", "macros"] }
Expand Down
57 changes: 44 additions & 13 deletions crates/subspace-gateway/src/commands/run.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,37 @@
//! Gateway run command.
//! This is the primary command for the gateway.
mod dsn;
mod network;
mod rpc;

use crate::commands::run::dsn::NetworkArgs;
use crate::commands::run::network::{configure_network, NetworkArgs};
use crate::commands::run::rpc::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT};
use crate::commands::shutdown_signal;
use crate::piece_getter::DsnPieceGetter;
use crate::piece_validator::SegmentCommitmentPieceValidator;
use anyhow::anyhow;
use clap::Parser;
use futures::{select, FutureExt};
use std::env;
use std::num::NonZeroUsize;
use std::pin::pin;
use std::{env, future};
use subspace_core_primitives::pieces::Record;
use subspace_data_retrieval::object_fetcher::ObjectFetcher;
use subspace_erasure_coding::ErasureCoding;
use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig};
use subspace_kzg::Kzg;
use tracing::info;

/// Options for running a node
#[derive(Debug, Parser)]
pub struct RunOptions {
pub(crate) struct RunOptions {
#[clap(flatten)]
gateway: GatewayOptions,
}

/// Options for running a gateway
#[derive(Debug, Parser)]
pub(super) struct GatewayOptions {
pub(crate) struct GatewayOptions {
/// Enable development mode.
///
/// Implies following flags (unless customized):
Expand All @@ -30,18 +41,24 @@ pub(super) struct GatewayOptions {

#[clap(flatten)]
dsn_options: NetworkArgs,

/// Options for RPC
#[clap(flatten)]
rpc_options: RpcOptions<RPC_DEFAULT_PORT>,
// TODO: maximum object size
}

/// Default run command for gateway
#[expect(clippy::redundant_locals, reason = "code is incomplete")]
pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
let signal = shutdown_signal();

let RunOptions {
gateway: GatewayOptions {
dev,
mut dsn_options,
},
gateway:
GatewayOptions {
dev,
mut dsn_options,
rpc_options,
},
} = run_options;

// Development mode handling is limited to this section
Expand All @@ -55,12 +72,26 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
info!("✌️ version {}", env!("CARGO_PKG_VERSION"));
info!("❤️ by {}", env!("CARGO_PKG_AUTHORS"));

let kzg = Kzg::new();
let erasure_coding = ErasureCoding::new(
NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
.expect("Not zero; qed"),
)
.map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?;

// TODO: move this service code into its own function, in a new library part of this crate
#[expect(unused_variables, reason = "implementation is incomplete")]
let (dsn_node, mut dsn_node_runner, node_client) = dsn::configure_network(dsn_options).await?;
let (dsn_node, mut dsn_node_runner, node_client) = configure_network(dsn_options).await?;
let dsn_fut = dsn_node_runner.run();

let rpc_fut = future::pending::<()>();
let piece_getter = DsnPieceGetter::new(
dsn_node.clone(),
SegmentCommitmentPieceValidator::new(dsn_node, node_client, kzg),
);
let object_fetcher = ObjectFetcher::new(piece_getter, erasure_coding, None);

let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher });
let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?;
let rpc_fut = rpc_handle.stopped();

// This defines order in which things are dropped
let dsn_fut = dsn_fut;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub async fn configure_network(
}: NetworkArgs,
) -> anyhow::Result<(Node, NodeRunner<()>, RpcNodeClient)> {
// TODO:
// - store keypair on disk and allow CLI override
// - cache known peers on disk
// - prometheus telemetry
let default_config = Config::<()>::default();
Expand All @@ -71,12 +70,12 @@ pub async fn configure_network(
let farmer_app_info = node_client
.farmer_app_info()
.await
.map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?;
.map_err(|error| anyhow!("Failed to get gateway app info: {error}"))?;

// Fall back to the node's bootstrap nodes.
if bootstrap_nodes.is_empty() {
debug!(
dsn_bootstrap_nodes = ?farmer_app_info.dsn_bootstrap_nodes,
bootstrap_nodes = ?farmer_app_info.dsn_bootstrap_nodes,
"Setting DSN bootstrap nodes..."
);
bootstrap_nodes.clone_from(&farmer_app_info.dsn_bootstrap_nodes);
Expand Down
43 changes: 43 additions & 0 deletions crates/subspace-gateway/src/commands/run/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//! RPC service configuration and launch.
use clap::Parser;
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcApiServer};
use tracing::info;

/// The default gateway RPC port.
pub const RPC_DEFAULT_PORT: u16 = 9955;

/// Options for the RPC server.
#[derive(Debug, Parser)]
pub(crate) struct RpcOptions<const DEFAULT_PORT: u16> {
/// IP and port (TCP) on which to listen for RPC requests.
///
/// This RPC method is not safe to be exposed on a public IP address.
#[arg(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
DEFAULT_PORT,
))]
rpc_listen_on: SocketAddr,
}

/// Launch the RPC server `api` with the provided `options`.
// TODO:
// - add an argument for a custom tokio runtime
// - move this RPC code into a new library part of this crate
// - make a RPC config that is independent of clap
pub async fn launch_rpc_server<const P: u16>(
rpc_api: SubspaceGatewayRpc,
rpc_options: RpcOptions<P>,
) -> anyhow::Result<ServerHandle> {
let server = ServerBuilder::default()
.build(rpc_options.rpc_listen_on)
.await?;
let addr = server.local_addr()?;
let server_handle = server.start(rpc_api.into_rpc());

info!(?addr, "Running JSON-RPC server");

Ok(server_handle)
}
17 changes: 2 additions & 15 deletions crates/subspace-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,15 @@

mod commands;
mod node_client;
mod piece_getter;
mod piece_validator;

use crate::commands::{init_logger, raise_fd_limit, Command};
use clap::Parser;

#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

/// Subspace gateway error.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Other kind of error.
#[error("Other: {0}")]
Other(String),
}

impl From<String> for Error {
#[inline]
fn from(s: String) -> Self {
Self::Other(s)
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
init_logger();
Expand Down
1 change: 0 additions & 1 deletion crates/subspace-gateway/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub trait NodeClient: fmt::Debug + Send + Sync + 'static {
async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo>;

/// Get segment headers for the segments
#[expect(dead_code, reason = "implementation is incomplete")]
async fn segment_headers(
&self,
segment_indices: Vec<SegmentIndex>,
Expand Down
77 changes: 77 additions & 0 deletions crates/subspace-gateway/src/piece_getter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! An object piece getter which uses the DSN to fetch pieces.
use async_trait::async_trait;
use futures::stream::StreamExt;
use std::fmt;
use std::ops::{Deref, DerefMut};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_data_retrieval::piece_getter::{BoxError, ObjectPieceGetter};
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
use subspace_networking::Node;

/// Wrapper type for PieceProvider, so it can implement ObjectPieceGetter.
pub struct DsnPieceGetter<PV: PieceValidator>(pub PieceProvider<PV>);

impl<PV> fmt::Debug for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("DsnPieceGetter")
.field(&format!("{:?}", self.0))
.finish()
}
}

impl<PV> Deref for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
type Target = PieceProvider<PV>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<PV> DerefMut for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

// TODO:
// - change ObjectPieceGetter trait to take a list of piece indexes
// - move this piece getter impl into a new library part of this crate
#[async_trait]
impl<PV> ObjectPieceGetter for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
async fn get_piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, BoxError> {
if let Some((got_piece_index, maybe_piece)) =
self.get_from_cache([piece_index]).await.next().await
{
assert_eq!(piece_index, got_piece_index);

if let Some(piece) = maybe_piece {
return Ok(Some(piece));
}
}

Ok(None)
}
}

impl<PV> DsnPieceGetter<PV>
where
PV: PieceValidator,
{
/// Creates new DSN piece getter.
pub fn new(node: Node, piece_validator: PV) -> Self {
Self(PieceProvider::new(node, piece_validator))
}
}
Loading

0 comments on commit 6f4ae0f

Please sign in to comment.