From f845f4840dfe3c9c71231a312eb871f6e558c292 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 18 Oct 2024 12:17:38 +0300 Subject: [PATCH 1/3] Implement `PieceCache::read_pieces()` in everything except `ClusterPieceGetter` --- Cargo.lock | 51 ++++++++++++++ crates/subspace-farmer/Cargo.toml | 1 + .../subspace-farmer/src/farmer_cache/tests.rs | 21 ++++++ .../src/farmer_piece_getter.rs | 67 ++++++++++++++++++- 4 files changed, 139 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index c9df164be4..27e8f31840 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -287,6 +287,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "aliasable" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" + [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -7467,6 +7473,31 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ouroboros" +version = "0.18.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "944fa20996a25aded6b4795c6d63f10014a7a83f8be9828a11860b08c5fc4a67" +dependencies = [ + "aliasable", + "ouroboros_macro", + "static_assertions", +] + +[[package]] +name = "ouroboros_macro" +version = "0.18.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39b0deead1528fd0e5947a8546a9642a9777c25f6e1e26f34c97b204bbb465bd" +dependencies = [ + "heck 0.4.1", + "itertools 0.12.1", + "proc-macro2", + "proc-macro2-diagnostics", + "quote", + "syn 2.0.79", +] + [[package]] name = "overload" version = "0.1.1" @@ -8662,6 +8693,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proc-macro2-diagnostics" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", + "version_check", + "yansi", +] + [[package]] name = "prometheus" version = "0.13.4" @@ -12507,6 +12551,7 @@ dependencies = [ "jsonrpsee", "mimalloc", "num_cpus", + "ouroboros", "parity-scale-codec", "parking_lot 0.12.3", "pin-project", @@ -15046,6 +15091,12 @@ dependencies = [ "web-time", ] +[[package]] +name = "yansi" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" + [[package]] name = "yasna" version = "0.5.2" diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index cfe35a8588..331c9731d6 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -39,6 +39,7 @@ hwlocality = { version = "1.0.0-alpha.6", features = ["vendored"], optional = tr jsonrpsee = { version = "0.24.5", features = ["ws-client"] } mimalloc = { version = "0.1.43", optional = true } num_cpus = "1.16.0" +ouroboros = "0.18.4" parity-scale-codec = "3.6.12" parking_lot = "0.12.2" pin-project = "1.1.5" diff --git a/crates/subspace-farmer/src/farmer_cache/tests.rs b/crates/subspace-farmer/src/farmer_cache/tests.rs index d25ce1d4d8..f72b3e4d32 100644 --- a/crates/subspace-farmer/src/farmer_cache/tests.rs +++ b/crates/subspace-farmer/src/farmer_cache/tests.rs @@ -9,6 +9,7 @@ use crate::farmer_cache::{decode_piece_index_from_record_key, FarmerCache}; use crate::node_client::NodeClient; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; +use futures::stream::FuturesUnordered; use futures::{SinkExt, Stream, StreamExt}; use parking_lot::Mutex; use rand::prelude::*; @@ -160,6 +161,26 @@ impl PieceGetter for MockPieceGetter { .clone(), )) } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + Ok(Box::new( + piece_indices + .into_iter() + .map(|piece_index| async move { + let result = self.get_piece(piece_index).await; + (piece_index, result) + }) + .collect::>(), + ) as Box<_>) + } } #[tokio::test(flavor = "multi_thread")] diff --git a/crates/subspace-farmer/src/farmer_piece_getter.rs b/crates/subspace-farmer/src/farmer_piece_getter.rs index e28ca167a8..f56b6dd4ba 100644 --- a/crates/subspace-farmer/src/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/farmer_piece_getter.rs @@ -15,9 +15,10 @@ use futures::{stream, FutureExt, Stream, StreamExt}; use std::fmt; use std::hash::Hash; use std::num::NonZeroUsize; +use std::pin::Pin; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Weak}; -use std::task::Poll; +use std::task::{Context, Poll}; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use subspace_farmer_components::PieceGetter; use subspace_networking::utils::multihash::ToMultihash; @@ -438,6 +439,40 @@ impl Clone for WeakFarmerPieceGetter +where + FarmIndex: 'static, + CacheIndex: 'static, + PV: 'static, + NC: 'static, +{ + piece_getter: FarmerPieceGetter, + #[borrows(piece_getter)] + #[covariant] + stream: + Box>)> + Send + Unpin + 'this>, +} + +impl Stream for StreamWithPieceGetter +where + FarmIndex: 'static, + CacheIndex: 'static, + PV: 'static, + NC: 'static, +{ + type Item = (PieceIndex, anyhow::Result>); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut() + .with_stream_mut(|stream| stream.poll_next_unpin(cx)) + } +} + #[async_trait] impl PieceGetter for WeakFarmerPieceGetter @@ -458,6 +493,36 @@ where piece_getter.get_piece(piece_index).await } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + let Some(piece_getter) = self.upgrade() else { + debug!("Farmer piece getter upgrade didn't succeed"); + return Ok(Box::new(stream::iter( + piece_indices + .into_iter() + .map(|piece_index| (piece_index, Ok(None))), + ))); + }; + + // TODO: This is necessary due to more complex lifetimes not yet supported by ouroboros, see + // https://github.com/someguynamedjosh/ouroboros/issues/112 + let piece_indices = piece_indices.into_iter().collect::>(); + let stream_with_piece_getter = + StreamWithPieceGetter::try_new_async_send(piece_getter, move |piece_getter| { + piece_getter.get_pieces(piece_indices) + }) + .await?; + + Ok(Box::new(stream_with_piece_getter)) + } } impl WeakFarmerPieceGetter { From 65a532540e2dd1893af4cba95caae2ed85f30250 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 18 Oct 2024 12:59:24 +0300 Subject: [PATCH 2/3] Small API changes and simplifications in NATS client --- crates/subspace-farmer/src/cluster/cache.rs | 4 ++-- crates/subspace-farmer/src/cluster/farmer.rs | 2 +- crates/subspace-farmer/src/cluster/nats_client.rs | 12 ++++-------- crates/subspace-farmer/src/cluster/plotter.rs | 2 +- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index 81733bc43f..4db7ba3312 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -132,7 +132,7 @@ impl PieceCache for ClusterPieceCache { > { Ok(Box::new( self.nats_client - .stream_request(ClusterCacheContentsRequest, Some(&self.cache_id_string)) + .stream_request(&ClusterCacheContentsRequest, Some(&self.cache_id_string)) .await? .map(|response| response.map_err(FarmError::from)), )) @@ -200,7 +200,7 @@ impl PieceCache for ClusterPieceCache { let mut stream = self .nats_client .stream_request( - ClusterCacheReadPiecesRequest { offsets }, + &ClusterCacheReadPiecesRequest { offsets }, Some(&self.cache_id_string), ) .await? diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index 4c184c1b2f..d6afd67b74 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -134,7 +134,7 @@ impl PlottedSectors for ClusterPlottedSectors { Ok(Box::new( self.nats_client .stream_request( - ClusterFarmerPlottedSectorsRequest, + &ClusterFarmerPlottedSectorsRequest, Some(&self.farm_id_string), ) .await? diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs index be2d53b508..a75e6aa85f 100644 --- a/crates/subspace-farmer/src/cluster/nats_client.rs +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -21,7 +21,6 @@ use async_nats::{ }; use backoff::backoff::Backoff; use backoff::ExponentialBackoff; -use derive_more::{Deref, DerefMut}; use futures::channel::mpsc; use futures::stream::FuturesUnordered; use futures::{select, FutureExt, Stream, StreamExt}; @@ -30,6 +29,7 @@ use std::any::type_name; use std::collections::VecDeque; use std::future::Future; use std::marker::PhantomData; +use std::ops::Deref; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -146,12 +146,10 @@ pub enum StreamRequestError { /// Wrapper around subscription that transforms stream of wrapped response messages into a normal /// `Response` stream. -#[derive(Debug, Deref, DerefMut)] +#[derive(Debug)] #[pin_project::pin_project] pub struct StreamResponseSubscriber { #[pin] - #[deref] - #[deref_mut] subscriber: Subscriber, response_subject: String, buffered_responses: Option>, @@ -314,12 +312,10 @@ pub trait GenericBroadcast: Encode + Decode + fmt::Debug + Send + Sync + 'static } /// Subscriber wrapper that decodes messages automatically and skips messages that can't be decoded -#[derive(Debug, Deref, DerefMut)] +#[derive(Debug)] #[pin_project::pin_project] pub struct SubscriberWrapper { #[pin] - #[deref] - #[deref_mut] subscriber: Subscriber, _phantom: PhantomData, } @@ -624,7 +620,7 @@ impl NatsClient { /// Make request that expects stream response pub async fn stream_request( &self, - request: Request, + request: &Request, instance: Option<&str>, ) -> Result, StreamRequestError> where diff --git a/crates/subspace-farmer/src/cluster/plotter.rs b/crates/subspace-farmer/src/cluster/plotter.rs index 15856d0a2e..0623cea496 100644 --- a/crates/subspace-farmer/src/cluster/plotter.rs +++ b/crates/subspace-farmer/src/cluster/plotter.rs @@ -353,7 +353,7 @@ impl ClusterPlotter { let response_stream_result = nats_client .stream_request( - ClusterPlotterPlotSectorRequest { + &ClusterPlotterPlotSectorRequest { public_key, sector_index, farmer_protocol_info, From d9d2cc0809631cb56629ccb1cb5423b268354157 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 18 Oct 2024 14:25:54 +0300 Subject: [PATCH 3/3] Implement `PieceCache::read_pieces()` for `ClusterPieceGetter` --- crates/subspace-farmer-components/src/lib.rs | 14 +- .../commands/cluster/plotter.rs | 4 +- crates/subspace-farmer/src/cluster/cache.rs | 2 +- .../subspace-farmer/src/cluster/controller.rs | 283 +++++++++++++++++- crates/subspace-farmer/src/farmer_cache.rs | 35 ++- 5 files changed, 305 insertions(+), 33 deletions(-) diff --git a/crates/subspace-farmer-components/src/lib.rs b/crates/subspace-farmer-components/src/lib.rs index 8387e6e5d9..19afc04103 100644 --- a/crates/subspace-farmer-components/src/lib.rs +++ b/crates/subspace-farmer-components/src/lib.rs @@ -54,19 +54,7 @@ pub trait PieceGetter { Box>)> + Send + Unpin + 'a>, > where - PieceIndices: IntoIterator + Send + 'a, - { - // TODO: Remove default impl here - Ok(Box::new( - piece_indices - .into_iter() - .map(|piece_index| async move { - let result = self.get_piece(piece_index).await; - (piece_index, result) - }) - .collect::>(), - ) as Box<_>) - } + PieceIndices: IntoIterator + Send + 'a; } #[async_trait] diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs index f10b5269dc..06ed90307a 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs @@ -125,8 +125,8 @@ struct RocmPlottingOptions { pub(super) struct PlotterArgs { /// Piece getter concurrency. /// - /// Increasing this value can cause NATS communication issues if too many messages arrive via NATS, but - /// are not processed quickly enough. + /// Increasing this value can cause NATS communication issues if too many messages arrive via + /// NATS, but are not processed quickly enough. #[arg(long, default_value = "32")] piece_getter_concurrency: NonZeroUsize, /// Plotting options only used by CPU plotter diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index 4db7ba3312..daeed8a290 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -90,7 +90,7 @@ impl GenericStreamRequest for ClusterCacheReadPiecesRequest { type Response = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), String>; } -/// Request plotted from farmer, request +/// Collect plotted pieces from farmer #[derive(Debug, Clone, Encode, Decode)] struct ClusterCacheContentsRequest; diff --git a/crates/subspace-farmer/src/cluster/controller.rs b/crates/subspace-farmer/src/cluster/controller.rs index ec9acde162..6c57db0665 100644 --- a/crates/subspace-farmer/src/cluster/controller.rs +++ b/crates/subspace-farmer/src/cluster/controller.rs @@ -6,9 +6,11 @@ //! client implementations designed to work with cluster controller and a service function to drive //! the backend part of the controller. -use crate::cluster::cache::{ClusterCacheIndex, ClusterCacheReadPieceRequest}; +use crate::cluster::cache::{ + ClusterCacheIndex, ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest, +}; use crate::cluster::nats_client::{ - GenericBroadcast, GenericNotification, GenericRequest, NatsClient, + GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient, }; use crate::farm::{PieceCacheId, PieceCacheOffset}; use crate::farmer_cache::FarmerCache; @@ -17,19 +19,24 @@ use anyhow::anyhow; use async_lock::Semaphore; use async_nats::HeaderValue; use async_trait::async_trait; -use futures::{select, FutureExt, Stream, StreamExt}; +use futures::channel::mpsc; +use futures::future::FusedFuture; +use futures::stream::FuturesUnordered; +use futures::{select, stream, FutureExt, Stream, StreamExt}; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; +use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::Arc; +use std::task::Poll; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex}; use subspace_farmer_components::PieceGetter; use subspace_rpc_primitives::{ FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; -use tracing::{debug, trace, warn}; +use tracing::{debug, error, trace, warn}; /// Broadcast sent by controllers requesting farmers to identify themselves #[derive(Debug, Copy, Clone, Encode, Decode)] @@ -150,6 +157,18 @@ impl GenericRequest for ClusterControllerFindPieceInCacheRequest { type Response = Option<(PieceCacheId, PieceCacheOffset)>; } +/// Find pieces with specified indices in cache +#[derive(Debug, Clone, Encode, Decode)] +struct ClusterControllerFindPiecesInCacheRequest { + piece_indices: Vec, +} + +impl GenericStreamRequest for ClusterControllerFindPiecesInCacheRequest { + const SUBJECT: &'static str = "subspace.controller.find-pieces-in-cache"; + /// Only pieces that were found are returned + type Response = (PieceIndex, PieceCacheId, PieceCacheOffset); +} + /// Request piece with specified index #[derive(Debug, Clone, Encode, Decode)] struct ClusterControllerPieceRequest { @@ -161,6 +180,18 @@ impl GenericRequest for ClusterControllerPieceRequest { type Response = Option; } +/// Request pieces with specified index +#[derive(Debug, Clone, Encode, Decode)] +struct ClusterControllerPiecesRequest { + piece_indices: Vec, +} + +impl GenericStreamRequest for ClusterControllerPiecesRequest { + const SUBJECT: &'static str = "subspace.controller.pieces"; + /// Only pieces that were found are returned + type Response = (PieceIndex, Piece); +} + /// Cluster piece getter #[derive(Debug, Clone)] pub struct ClusterPieceGetter { @@ -246,6 +277,172 @@ impl PieceGetter for ClusterPieceGetter { .request(&ClusterControllerPieceRequest { piece_index }, None) .await?) } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + let (tx, mut rx) = mpsc::unbounded(); + + let piece_indices = piece_indices.into_iter().collect::>(); + let piece_indices_to_get = + Mutex::new(piece_indices.iter().copied().collect::>()); + + let mut cached_pieces_by_cache_id = HashMap::>::new(); + + { + let _guard = self.request_semaphore.acquire().await; + + let mut cached_pieces = self + .nats_client + .stream_request( + &ClusterControllerFindPiecesInCacheRequest { piece_indices }, + None, + ) + .await?; + + while let Some((_piece_index, piece_cache_id, piece_cache_offset)) = + cached_pieces.next().await + { + cached_pieces_by_cache_id + .entry(piece_cache_id) + .or_default() + .push(piece_cache_offset); + } + } + + let fut = async move { + let tx = &tx; + + let mut getting_from_piece_cache = cached_pieces_by_cache_id + .into_iter() + .map(|(piece_cache_id, offsets)| { + let piece_indices_to_download = &piece_indices_to_get; + + async move { + let _guard = self.request_semaphore.acquire().await; + + let mut pieces_stream = match self + .nats_client + .stream_request( + &ClusterCacheReadPiecesRequest { offsets }, + Some(&piece_cache_id.to_string()), + ) + .await + { + Ok(pieces) => pieces, + Err(error) => { + warn!( + %error, + %piece_cache_id, + "Failed to request pieces from cache" + ); + + return; + } + }; + + while let Some(piece_result) = pieces_stream.next().await { + let (piece_offset, maybe_piece) = match piece_result { + Ok(result) => result, + Err(error) => { + warn!(%error, "Failed to get piece from cache"); + continue; + } + }; + + if let Some((piece_index, piece)) = maybe_piece { + piece_indices_to_download.lock().remove(&piece_index); + + tx.unbounded_send((piece_index, Ok(Some(piece)))).expect( + "This future isn't polled after receiver is dropped; qed", + ); + } else { + warn!( + %piece_cache_id, + %piece_offset, + "Failed to get piece from cache, it was missing or already gone" + ); + } + } + } + }) + .collect::>(); + // TODO: Can't use this due to https://github.com/rust-lang/rust/issues/64650 + // Simply drain everything + // .for_each(|()| async {}) + + // TODO: Remove once https://github.com/rust-lang/rust/issues/64650 is resolved + while let Some(()) = getting_from_piece_cache.next().await { + // Simply drain everything + } + drop(getting_from_piece_cache); + + let mut piece_indices_to_get = piece_indices_to_get.into_inner(); + if piece_indices_to_get.is_empty() { + return; + } + + let _guard = self.request_semaphore.acquire().await; + + let mut pieces_from_controller = match self + .nats_client + .stream_request( + &ClusterControllerPiecesRequest { + piece_indices: piece_indices_to_get.iter().copied().collect(), + }, + None, + ) + .await + { + Ok(pieces_from_controller) => pieces_from_controller, + Err(error) => { + error!(%error, "Failed to get pieces from controller"); + + for piece_index in piece_indices_to_get { + tx.unbounded_send(( + piece_index, + Err(anyhow::anyhow!("Failed to get piece from controller")), + )) + .expect("This future isn't polled after receiver is dropped; qed"); + } + return; + } + }; + + while let Some((piece_index, piece)) = pieces_from_controller.next().await { + piece_indices_to_get.remove(&piece_index); + tx.unbounded_send((piece_index, Ok(Some(piece)))) + .expect("This future isn't polled after receiver is dropped; qed"); + } + + for piece_index in piece_indices_to_get { + tx.unbounded_send((piece_index, Err(anyhow::anyhow!("Failed to get piece")))) + .expect("This future isn't polled after receiver is dropped; qed"); + } + }; + let mut fut = Box::pin(fut.fuse()); + + // Drive above future and stream back any pieces that were downloaded so far + Ok(Box::new(stream::poll_fn(move |cx| { + if !fut.is_terminated() { + // Result doesn't matter, we'll need to poll stream below anyway + let _ = fut.poll_unpin(cx); + } + + if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) { + return Poll::Ready(maybe_result); + } + + // Exit will be done by the stream above + Poll::Pending + }))) + } } impl ClusterPieceGetter { @@ -466,9 +663,15 @@ where result = find_piece_responder(nats_client, farmer_cache).fuse() => { result }, + result = find_pieces_responder(nats_client, farmer_cache).fuse() => { + result + }, result = piece_responder(nats_client, piece_getter).fuse() => { result }, + result = pieces_responder(nats_client, piece_getter).fuse() => { + result + }, } } else { select! { @@ -481,9 +684,15 @@ where result = find_piece_responder(nats_client, farmer_cache).fuse() => { result }, + result = find_pieces_responder(nats_client, farmer_cache).fuse() => { + result + }, result = piece_responder(nats_client, piece_getter).fuse() => { result }, + result = pieces_responder(nats_client, piece_getter).fuse() => { + result + }, } } } @@ -703,12 +912,12 @@ where .request_responder( None, Some("subspace.controller".to_string()), - |request: ClusterControllerSegmentHeadersRequest| async move { + |ClusterControllerSegmentHeadersRequest { segment_indices }| async move { node_client - .segment_headers(request.segment_indices.clone()) + .segment_headers(segment_indices.clone()) .await .inspect_err(|error| { - warn!(%error, segment_indices = ?request.segment_indices, "Failed to get segment headers"); + warn!(%error, ?segment_indices, "Failed to get segment headers"); }) .ok() }, @@ -724,8 +933,23 @@ async fn find_piece_responder( .request_responder( None, Some("subspace.controller".to_string()), - |request: ClusterControllerFindPieceInCacheRequest| async move { - Some(farmer_cache.find_piece(request.piece_index).await) + |ClusterControllerFindPieceInCacheRequest { piece_index }| async move { + Some(farmer_cache.find_piece(piece_index).await) + }, + ) + .await +} + +async fn find_pieces_responder( + nats_client: &NatsClient, + farmer_cache: &FarmerCache, +) -> anyhow::Result<()> { + nats_client + .stream_request_responder( + None, + Some("subspace.controller".to_string()), + |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move { + Some(stream::iter(farmer_cache.find_pieces(piece_indices).await)) }, ) .await @@ -739,13 +963,44 @@ where .request_responder( None, Some("subspace.controller".to_string()), - |request: ClusterControllerPieceRequest| async move { + |ClusterControllerPieceRequest { piece_index }| async move { + piece_getter + .get_piece(piece_index) + .await + .inspect_err(|error| warn!(%error, %piece_index, "Failed to get piece")) + .ok() + }, + ) + .await +} + +async fn pieces_responder(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()> +where + PG: PieceGetter + Sync, +{ + nats_client + .stream_request_responder( + None, + Some("subspace.controller".to_string()), + |ClusterControllerPiecesRequest { piece_indices }| async move { piece_getter - .get_piece(request.piece_index) + .get_pieces(piece_indices) .await - .inspect_err( - |error| warn!(%error, piece_index = %request.piece_index, "Failed to get piece"), - ) + .map(|stream| { + Box::pin(stream.filter_map( + |(piece_index, maybe_piece_result)| async move { + match maybe_piece_result { + Ok(Some(piece)) => Some((piece_index, piece)), + Ok(None) => None, + Err(error) => { + warn!(%error, %piece_index, "Failed to get piece"); + None + } + } + }, + )) + }) + .inspect_err(|error| warn!(%error, "Failed to get pieces")) .ok() }, ) diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index d340bfeee3..6ec7a1a139 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -1388,7 +1388,8 @@ where ); // Quick check in piece caches - if let Some(piece_caches) = self.piece_caches.try_read() { + { + let piece_caches = self.piece_caches.read().await; pieces_to_find.retain(|_piece_index, key| { let distance_key = KeyWithDistance::new(self.peer_id, key.clone()); !piece_caches.contains_stored_piece(&distance_key) @@ -1436,13 +1437,41 @@ where } /// Find piece in cache and return its retrieval details - pub(crate) async fn find_piece( + pub async fn find_piece( &self, piece_index: PieceIndex, ) -> Option<(PieceCacheId, PieceCacheOffset)> { - let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash()); + let caches = self.piece_caches.read().await; + + self.find_piece_internal(&caches, piece_index) + } + /// Find pieces in cache and return their retrieval details + pub async fn find_pieces( + &self, + piece_indices: PieceIndices, + ) -> Vec<(PieceIndex, PieceCacheId, PieceCacheOffset)> + where + PieceIndices: IntoIterator, + { let caches = self.piece_caches.read().await; + + piece_indices + .into_iter() + .filter_map(|piece_index| { + self.find_piece_internal(&caches, piece_index) + .map(|(cache_id, piece_offset)| (piece_index, cache_id, piece_offset)) + }) + .collect() + } + + fn find_piece_internal( + &self, + caches: &PieceCachesState, + piece_index: PieceIndex, + ) -> Option<(PieceCacheId, PieceCacheOffset)> { + let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash()); + let Some(offset) = caches.get_stored_piece(&key) else { if let Some(metrics) = &self.metrics { metrics.cache_find_miss.inc();