Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch multiple pieces during object reconstruction #3158

Merged
merged 9 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 21 additions & 9 deletions crates/subspace-gateway-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::ops::{Deref, DerefMut};
use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
use subspace_core_primitives::objects::GlobalObjectMapping;
use subspace_data_retrieval::object_fetcher::{self, ObjectFetcher};
use subspace_data_retrieval::piece_getter::ObjectPieceGetter;
use tracing::debug;

const SUBSPACE_ERROR: i32 = 9000;
Expand Down Expand Up @@ -99,33 +100,44 @@ pub trait SubspaceGatewayRpcApi {
#[method(name = "subspace_fetchObject")]
async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result<Vec<HexData>, Error>;
}

/// Subspace Gateway RPC configuration
pub struct SubspaceGatewayRpcConfig {
pub struct SubspaceGatewayRpcConfig<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
/// DSN object fetcher instance.
pub object_fetcher: ObjectFetcher,
pub object_fetcher: ObjectFetcher<PG>,
}

/// Implements the [`SubspaceGatewayRpcApiServer`] trait for interacting with the Subspace Gateway.
pub struct SubspaceGatewayRpc {
pub struct SubspaceGatewayRpc<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
/// DSN object fetcher instance.
object_fetcher: ObjectFetcher,
object_fetcher: ObjectFetcher<PG>,
}

/// [`SubspaceGatewayRpc`] is used to fetch objects from the DSN.
impl SubspaceGatewayRpc {
impl<PG> SubspaceGatewayRpc<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
/// Creates a new instance of the `SubspaceGatewayRpc` handler.
pub fn new(config: SubspaceGatewayRpcConfig) -> Self {
pub fn new(config: SubspaceGatewayRpcConfig<PG>) -> Self {
Self {
object_fetcher: config.object_fetcher,
}
}
}

#[async_trait]
impl SubspaceGatewayRpcApiServer for SubspaceGatewayRpc {
impl<PG> SubspaceGatewayRpcApiServer for SubspaceGatewayRpc<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result<Vec<HexData>, Error> {
// TODO: deny unsafe RPC calls

let count = mappings.objects().len();
if count > MAX_OBJECTS_PER_REQUEST {
debug!(%count, %MAX_OBJECTS_PER_REQUEST, "Too many mappings in request");
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-gateway/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
Semaphore::new(out_connections as usize * PIECE_PROVIDER_MULTIPLIER),
);
let piece_getter = DsnPieceGetter::new(piece_provider);
let object_fetcher = ObjectFetcher::new(piece_getter, erasure_coding, Some(max_size));
let object_fetcher = ObjectFetcher::new(piece_getter.into(), erasure_coding, Some(max_size));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This puzzled me for a moment, this is the first time I saw From used instead of Arc::new() 🤔


let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher });
let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?;
Expand Down
12 changes: 8 additions & 4 deletions crates/subspace-gateway/src/commands/run/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use clap::Parser;
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use subspace_data_retrieval::piece_getter::ObjectPieceGetter;
use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcApiServer};
use tracing::info;

Expand All @@ -27,10 +28,13 @@ pub(crate) struct RpcOptions<const DEFAULT_PORT: u16> {
// - add an argument for a custom tokio runtime
// - move this RPC code into a new library part of this crate
// - make a RPC config that is independent of clap
pub async fn launch_rpc_server<const P: u16>(
rpc_api: SubspaceGatewayRpc,
rpc_options: RpcOptions<P>,
) -> anyhow::Result<ServerHandle> {
pub async fn launch_rpc_server<PG, const DEFAULT_PORT: u16>(
rpc_api: SubspaceGatewayRpc<PG>,
rpc_options: RpcOptions<DEFAULT_PORT>,
) -> anyhow::Result<ServerHandle>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
let server = ServerBuilder::default()
.build(rpc_options.rpc_listen_on)
.await?;
Expand Down
62 changes: 48 additions & 14 deletions crates/subspace-gateway/src/piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@

use async_trait::async_trait;
use futures::stream::StreamExt;
use futures::{FutureExt, Stream};
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::ops::Deref;
use std::sync::Arc;
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_data_retrieval::piece_getter::{BoxError, ObjectPieceGetter};
use subspace_data_retrieval::piece_getter::ObjectPieceGetter;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};

/// The maximum number of peer-to-peer walking rounds for L1 archival storage.
const MAX_RANDOM_WALK_ROUNDS: usize = 15;

/// Wrapper type for PieceProvider, so it can implement ObjectPieceGetter.
pub struct DsnPieceGetter<PV: PieceValidator>(pub PieceProvider<PV>);
pub struct DsnPieceGetter<PV: PieceValidator>(pub Arc<PieceProvider<PV>>);

impl<PV> fmt::Debug for DsnPieceGetter<PV>
where
Expand All @@ -25,35 +27,35 @@ where
}
}

impl<PV> Deref for DsnPieceGetter<PV>
impl<PV> Clone for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
type Target = PieceProvider<PV>;

fn deref(&self) -> &Self::Target {
&self.0
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl<PV> DerefMut for DsnPieceGetter<PV>
impl<PV> Deref for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
type Target = PieceProvider<PV>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

// TODO:
// - change ObjectPieceGetter trait to take a list of piece indexes
// - reconstruct segment if piece is missing
// - move this piece getter impl into a new library part of this crate
#[async_trait]
impl<PV> ObjectPieceGetter for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
async fn get_piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, BoxError> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
if let Some((got_piece_index, maybe_piece)) =
self.get_from_cache([piece_index]).await.next().await
{
Expand All @@ -68,6 +70,38 @@ where
.get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS)
.await)
Comment on lines 70 to 71
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nazar-pc is this (and the similar code below) still needed after your PR #3259?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, random walking is a cold storage fallback fr cases when a piece is not cached, in that case it is wandering around the network hoping to stumble upon someone storing a piece in their plot rather than cache

}

async fn get_pieces<'a, PieceIndices>(
&'a self,
piece_indices: PieceIndices,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
>
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
{
let piece_getter = (*self).clone();

let stream = self
.get_from_cache(piece_indices)
.await
.then(move |(index, maybe_piece)| {
let piece_getter = piece_getter.clone();
let fut = async move {
if let Some(piece) = maybe_piece {
return (index, Ok(Some(piece)));
}

piece_getter
.get_piece_from_archival_storage(index, MAX_RANDOM_WALK_ROUNDS)
.map(|piece| (index, Ok(piece)))
.await
};
Box::pin(fut)
});

Ok(Box::new(stream))
}
}

impl<PV> DsnPieceGetter<PV>
Expand All @@ -76,6 +110,6 @@ where
{
/// Creates new DSN piece getter.
pub fn new(piece_provider: PieceProvider<PV>) -> Self {
Self(piece_provider)
Self(Arc::new(piece_provider))
}
}
1 change: 1 addition & 0 deletions shared/subspace-data-retrieval/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ include = [
]

[dependencies]
anyhow = "1.0.89"
async-lock = "3.4.0"
async-trait = "0.1.83"
futures = "0.3.31"
Expand Down
27 changes: 15 additions & 12 deletions shared/subspace-data-retrieval/src/object_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! Fetching objects stored in the archived history of Subspace Network.

use crate::piece_fetcher::download_pieces;
use crate::piece_getter::{BoxError, ObjectPieceGetter};
use crate::piece_getter::ObjectPieceGetter;
use crate::segment_fetcher::{download_segment, SegmentGetterError};
use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
use std::sync::Arc;
Expand Down Expand Up @@ -123,7 +123,7 @@ pub enum Error {
#[error("Getting piece caused an error: {source:?}")]
PieceGetterError {
#[from]
source: BoxError,
source: anyhow::Error,
},

/// Piece getter couldn't find the piece
Expand All @@ -132,9 +132,12 @@ pub enum Error {
}

/// Object fetcher for the Subspace DSN.
pub struct ObjectFetcher {
pub struct ObjectFetcher<PG>
where
PG: ObjectPieceGetter + Send + Sync,
{
/// The piece getter used to fetch pieces.
piece_getter: Arc<dyn ObjectPieceGetter + Send + Sync + 'static>,
piece_getter: Arc<PG>,

/// The erasure coding configuration of those pieces.
erasure_coding: ErasureCoding,
Expand All @@ -143,21 +146,21 @@ pub struct ObjectFetcher {
max_object_len: usize,
}

impl ObjectFetcher {
impl<PG> ObjectFetcher<PG>
where
PG: ObjectPieceGetter + Send + Sync,
{
/// Create a new object fetcher with the given configuration.
///
/// `max_object_len` is the amount of data bytes we'll read for a single object before giving
/// up and returning an error, or `None` for no limit (`usize::MAX`).
pub fn new<PG>(
piece_getter: PG,
pub fn new(
piece_getter: Arc<PG>,
erasure_coding: ErasureCoding,
max_object_len: Option<usize>,
) -> Self
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
) -> Self {
Self {
piece_getter: Arc::new(piece_getter),
piece_getter,
erasure_coding,
max_object_len: max_object_len.unwrap_or(usize::MAX),
}
Expand Down
56 changes: 20 additions & 36 deletions shared/subspace-data-retrieval/src/piece_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
//! Fetching pieces of the archived history of Subspace Network.

use crate::object_fetcher::Error;
use crate::piece_getter::{BoxError, ObjectPieceGetter};
use futures::stream::FuturesOrdered;
use futures::TryStreamExt;
use crate::piece_getter::ObjectPieceGetter;
use futures::StreamExt;
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use tracing::{debug, trace};

Expand All @@ -31,7 +30,7 @@ use tracing::{debug, trace};
pub async fn download_pieces<PG>(
piece_indexes: &[PieceIndex],
piece_getter: &PG,
) -> Result<Vec<Piece>, BoxError>
) -> anyhow::Result<Vec<Piece>>
where
PG: ObjectPieceGetter,
{
Expand All @@ -42,46 +41,31 @@ where
);

// TODO:
// - consider using a semaphore to limit the number of concurrent requests, like
// download_segment_pieces()
// - if we're close to the number of pieces in a segment, use segment downloading and piece
// - if we're close to the number of pieces in a segment, or we can't find a piece, use segment downloading and piece
// reconstruction instead
// Currently most objects are limited to 4 pieces, so this isn't needed yet.
let received_pieces = piece_indexes
.iter()
.map(|piece_index| async move {
match piece_getter.get_piece(*piece_index).await {
Ok(Some(piece)) => {
trace!(?piece_index, "Piece request succeeded",);
Ok(piece)
}
Ok(None) => {
trace!(?piece_index, "Piece not found");
Err(Error::PieceNotFound {
piece_index: *piece_index,
}
.into())
}
Err(error) => {
trace!(
%error,
?piece_index,
"Piece request caused an error",
);
Err(error)
}
}
})
.collect::<FuturesOrdered<_>>();
let mut received_pieces = piece_getter
.get_pieces(piece_indexes.iter().copied())
.await?;

// We want exact pieces, so any errors are fatal.
let received_pieces: Vec<Piece> = received_pieces.try_collect().await?;
let mut pieces = Vec::new();
pieces.resize(piece_indexes.len(), Piece::default());
Comment on lines +51 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will work, but it is a bit unfortunate that we'll be allocating so much upfront (each piece) and then simply throwing those allocations away.

I'd probably push all results into HashMap<PieceIndex, Piece> and then extract them at the end by piece index (pieces have copy-on-write behavior, so it'll be efficient, we don't need to remove individual pieces, can just drop the whole hashmap at the end at once).


while let Some((piece_index, maybe_piece)) = received_pieces.next().await {
// We want exact pieces, so any errors are fatal.
let piece = maybe_piece?.ok_or(Error::PieceNotFound { piece_index })?;
let index_position = piece_indexes
.iter()
.position(|i| *i == piece_index)
.expect("get_pieces only returns indexes it was supplied; qed");
pieces[index_position] = piece;
}

trace!(
count = piece_indexes.len(),
?piece_indexes,
"Successfully retrieved exact pieces"
);

Ok(received_pieces)
Ok(pieces)
}
Loading
Loading