diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs index 17a1109f2c..a57a71e6e6 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs @@ -15,7 +15,9 @@ use std::future::{ready, Future}; use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::{Duration, Instant}; -use subspace_farmer::cluster::cache::{ClusterCacheIdentifyBroadcast, ClusterPieceCache}; +use subspace_farmer::cluster::cache::{ + ClusterCacheIdentifyBroadcast, ClusterCacheIndex, ClusterPieceCache, +}; use subspace_farmer::cluster::controller::ClusterControllerCacheIdentifyBroadcast; use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::farm::{PieceCache, PieceCacheId}; @@ -86,7 +88,7 @@ impl KnownCaches { pub(super) async fn maintain_caches( cache_group: &str, nats_client: &NatsClient, - farmer_cache: FarmerCache, + farmer_cache: FarmerCache, ) -> anyhow::Result<()> { let mut known_caches = KnownCaches::default(); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index c2c70d7f55..c3d53a6687 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -58,6 +58,8 @@ const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40); const MAX_SPACE_PLEDGED_FOR_PLOT_CACHE_ON_WINDOWS: u64 = 7 * 1024 * 1024 * 1024 * 1024; const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30); +type CacheIndex = u8; + /// Arguments for farmer #[derive(Debug, Parser)] pub(crate) struct FarmingArgs { @@ -342,7 +344,7 @@ where let should_start_prometheus_server = !prometheus_listen_on.is_empty(); let (farmer_cache, farmer_cache_worker) = - FarmerCache::new(node_client.clone(), peer_id, Some(&mut registry)); + FarmerCache::::new(node_client.clone(), peer_id, Some(&mut registry)); let node_client = CachingProxyNodeClient::new(node_client) .await diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs index 29b5129c4e..3328956b78 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs @@ -70,7 +70,7 @@ pub(in super::super) struct NetworkArgs { } #[allow(clippy::too_many_arguments)] -pub(in super::super) fn configure_network( +pub(in super::super) fn configure_network( protocol_prefix: String, base_path: &Path, keypair: Keypair, @@ -87,12 +87,15 @@ pub(in super::super) fn configure_network( }: NetworkArgs, weak_plotted_pieces: Weak>>, node_client: NC, - farmer_cache: FarmerCache, + farmer_cache: FarmerCache, prometheus_metrics_registry: Option<&mut Registry>, -) -> Result<(Node, NodeRunner), anyhow::Error> +) -> Result<(Node, NodeRunner>), anyhow::Error> where FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static, usize: From, + CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, + usize: From, + CacheIndex: TryFrom, NC: NodeClientExt + Clone, { let known_peers_registry = KnownPeersManager::new(KnownPeersManagerConfig { diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index 5d8e423ad7..edd624de5a 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -26,6 +26,9 @@ use tracing::{debug, error, info, trace, warn}; const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); +/// Type alias for cache index used by cluster. +pub type ClusterCacheIndex = u16; + /// Broadcast with identification details by caches #[derive(Debug, Clone, Encode, Decode)] pub struct ClusterCacheIdentifyBroadcast { diff --git a/crates/subspace-farmer/src/cluster/controller.rs b/crates/subspace-farmer/src/cluster/controller.rs index 55d0fc51c9..6aea3a0cac 100644 --- a/crates/subspace-farmer/src/cluster/controller.rs +++ b/crates/subspace-farmer/src/cluster/controller.rs @@ -6,7 +6,7 @@ //! client implementations designed to work with cluster controller and a service function to drive //! the backend part of the controller. -use crate::cluster::cache::ClusterCacheReadPieceRequest; +use crate::cluster::cache::{ClusterCacheIndex, ClusterCacheReadPieceRequest}; use crate::cluster::nats_client::{ GenericBroadcast, GenericNotification, GenericRequest, NatsClient, }; @@ -435,7 +435,7 @@ pub async fn controller_service( nats_client: &NatsClient, node_client: &NC, piece_getter: &PG, - farmer_cache: &FarmerCache, + farmer_cache: &FarmerCache, instance: &str, primary_instance: bool, ) -> anyhow::Result<()> @@ -721,7 +721,7 @@ where async fn find_piece_responder( nats_client: &NatsClient, - farmer_cache: &FarmerCache, + farmer_cache: &FarmerCache, ) -> anyhow::Result<()> { nats_client .request_responder( diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index b0f7af23df..fac97939d0 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -14,10 +14,11 @@ use crate::utils::run_future_in_dedicated_thread; use async_lock::RwLock as AsyncRwLock; use event_listener_primitives::{Bag, HandlerId}; use futures::stream::{FuturesOrdered, FuturesUnordered}; -use futures::{select, stream, FutureExt, StreamExt}; +use futures::{select, FutureExt, StreamExt}; use prometheus_client::registry::Registry; use rayon::prelude::*; use std::collections::{HashMap, VecDeque}; +use std::hash::Hash; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -51,11 +52,124 @@ struct Handlers { progress: Handler, } +#[derive(Debug, Clone, Copy)] +struct FarmerCacheOffset { + cache_index: CacheIndex, + piece_offset: PieceCacheOffset, +} + +impl FarmerCacheOffset +where + CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, + CacheIndex: TryFrom, +{ + fn new(cache_index: CacheIndex, piece_offset: PieceCacheOffset) -> Self { + Self { + cache_index, + piece_offset, + } + } +} + #[derive(Debug, Clone)] -struct PieceCacheState { - stored_pieces: HashMap, - free_offsets: VecDeque, +struct CacheBackend { backend: Arc, + used_capacity: u32, + total_capacity: u32, +} + +impl std::ops::Deref for CacheBackend { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.backend + } +} + +impl CacheBackend { + fn new(backend: Arc, total_capacity: u32) -> Self { + Self { + backend, + used_capacity: 0, + total_capacity, + } + } + + fn next_free(&mut self) -> Option { + let offset = self.used_capacity; + if offset < self.total_capacity { + self.used_capacity += 1; + Some(PieceCacheOffset(offset)) + } else { + debug!(?offset, total_capacity = ?self.total_capacity, "No free space in cache backend"); + None + } + } + + fn free_size(&self) -> u32 { + self.total_capacity - self.used_capacity + } +} + +#[derive(Debug, Clone)] +struct PieceCachesState { + stored_pieces: HashMap>, + dangling_free_offsets: VecDeque>, + backends: Vec, +} + +impl PieceCachesState +where + CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, + usize: From, + CacheIndex: TryFrom, +{ + fn pop_free_offset(&mut self) -> Option> { + match self.dangling_free_offsets.pop_front() { + Some(free_offset) => { + debug!(?free_offset, "Popped dangling free offset"); + Some(free_offset) + } + None => { + // Sort piece caches by number of stored pieces to fill those that are less + // populated first + let mut sorted_backends = self + .backends + .iter_mut() + .enumerate() + .filter_map(|(cache_index, backend)| { + Some((CacheIndex::try_from(cache_index).ok()?, backend)) + }) + .collect::>(); + sorted_backends.sort_unstable_by_key(|(_, backend)| backend.free_size()); + sorted_backends + .into_iter() + .rev() + .find_map(|(cache_index, backend)| { + backend + .next_free() + .map(|free_offset| FarmerCacheOffset::new(cache_index, free_offset)) + }) + } + } + } +} + +impl Default for PieceCachesState { + fn default() -> Self { + Self { + stored_pieces: HashMap::default(), + dangling_free_offsets: VecDeque::default(), + backends: Vec::default(), + } + } +} + +#[derive(Debug)] +struct CacheState { + cache_stored_pieces: HashMap>, + cache_free_offsets: Vec>, + backend: CacheBackend, } #[derive(Debug)] @@ -77,22 +191,25 @@ struct CacheWorkerState { /// Farmer cache worker used to drive the farmer cache backend #[derive(Debug)] #[must_use = "Farmer cache will not work unless its worker is running"] -pub struct FarmerCacheWorker +pub struct FarmerCacheWorker where NC: fmt::Debug, { peer_id: PeerId, node_client: NC, - piece_caches: Arc>>, + piece_caches: Arc>>, plot_caches: Arc, handlers: Arc, worker_receiver: Option>, metrics: Option>, } -impl FarmerCacheWorker +impl FarmerCacheWorker where NC: NodeClient, + CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, + usize: From, + CacheIndex: TryFrom, { /// Run the cache worker with provided piece getter. /// @@ -176,38 +293,39 @@ where // TODO: Consider implementing optional re-sync of the piece instead of just forgetting WorkerCommand::ForgetKey { key } => { let mut caches = self.piece_caches.write().await; + let Some(offset) = caches.stored_pieces.remove(&key) else { + // Key not exist + return; + }; + let cache_index = usize::from(offset.cache_index); + let piece_offset = offset.piece_offset; + let Some(backend) = caches.backends.get(cache_index).cloned() else { + // Cache backend not exist + return; + }; - for (cache_index, cache) in caches.iter_mut().enumerate() { - let Some(offset) = cache.stored_pieces.remove(&key) else { - // Not this disk farm - continue; - }; - - // Making offset as unoccupied and remove corresponding key from heap - cache.free_offsets.push_front(offset); - match cache.backend.read_piece_index(offset).await { - Ok(Some(piece_index)) => { - worker_state.heap.remove(KeyWrapper(piece_index)); - } - Ok(None) => { - warn!( - %cache_index, - %offset, - "Piece index out of range, this is likely an implementation bug, \ - not freeing heap element" - ); - } - Err(error) => { - error!( - %error, - %cache_index, - ?key, - %offset, - "Error while reading piece from cache, might be a disk corruption" - ); - } + caches.dangling_free_offsets.push_front(offset); + match backend.read_piece_index(piece_offset).await { + Ok(Some(piece_index)) => { + worker_state.heap.remove(KeyWrapper(piece_index)); + } + Ok(None) => { + warn!( + %cache_index, + %piece_offset, + "Piece index out of range, this is likely an implementation bug, \ + not freeing heap element" + ); + } + Err(error) => { + error!( + %error, + %cache_index, + ?key, + %piece_offset, + "Error while reading piece from cache, might be a disk corruption" + ); } - return; } } } @@ -222,18 +340,15 @@ where PG: PieceGetter, { info!("Initializing piece cache"); + // Pull old cache state since it will be replaced with a new one and reuse its allocations - let cache_state = mem::take(&mut *self.piece_caches.write().await); - let mut stored_pieces = Vec::with_capacity(new_piece_caches.len()); - let mut free_offsets = Vec::with_capacity(new_piece_caches.len()); - for mut state in cache_state { - state.stored_pieces.clear(); - stored_pieces.push(state.stored_pieces); - state.free_offsets.clear(); - free_offsets.push(state.free_offsets); - } - stored_pieces.resize(new_piece_caches.len(), HashMap::default()); - free_offsets.resize(new_piece_caches.len(), VecDeque::default()); + let PieceCachesState { + mut stored_pieces, + mut dangling_free_offsets, + backends: _, + } = mem::take(&mut *self.piece_caches.write().await); + stored_pieces.clear(); + dangling_free_offsets.clear(); debug!("Collecting pieces that were in the cache before"); @@ -241,106 +356,128 @@ where metrics.piece_cache_capacity_total.set(0); metrics.piece_cache_capacity_used.set(0); } + // Build cache state of all backends - let maybe_caches_futures = stored_pieces + let piece_caches_number = new_piece_caches.len(); + let maybe_caches_futures = new_piece_caches .into_iter() - .zip(free_offsets) - .zip(new_piece_caches) .enumerate() - .map( - |(index, ((mut stored_pieces, mut free_offsets), new_cache))| { - if let Some(metrics) = &self.metrics { - metrics - .piece_cache_capacity_total - .inc_by(new_cache.max_num_elements() as i64); - } - run_future_in_dedicated_thread( - move || async move { - // Hack with first collecting into `Option` with `Option::take()` call - // later is to satisfy compiler that gets confused about ownership - // otherwise - let mut maybe_contents = match new_cache.contents().await { - Ok(contents) => Some(contents), - Err(error) => { - warn!(%index, %error, "Failed to get cache contents"); - - None - } - }; - let Some(mut contents) = maybe_contents.take() else { - drop(maybe_contents); - - return PieceCacheState { - stored_pieces, - free_offsets, - backend: new_cache, - }; - }; - - stored_pieces.reserve(new_cache.max_num_elements() as usize); - - while let Some(maybe_element_details) = contents.next().await { - let (offset, maybe_piece_index) = match maybe_element_details { - Ok(element_details) => element_details, - Err(error) => { - warn!( - %index, - %error, - "Failed to get cache contents element details" - ); - break; - } - }; - match maybe_piece_index { - Some(piece_index) => { - stored_pieces.insert( - RecordKey::from(piece_index.to_multihash()), - offset, - ); - } - None => { - free_offsets.push_back(offset); - } - } - - // Allow for task to be aborted - yield_now().await; - } + .filter_map(|(cache_index, new_cache)| { + let total_capacity = new_cache.max_num_elements(); + let mut backend = CacheBackend::new(new_cache, total_capacity); + let Ok(cache_index) = CacheIndex::try_from(cache_index) else { + warn!( + ?piece_caches_number, + "Too many piece caches provided, {cache_index} cache will be ignored", + ); + return None; + }; + + if let Some(metrics) = &self.metrics { + metrics + .piece_cache_capacity_total + .inc_by(total_capacity as i64); + } + + let init_fut = async move { + let used_capacity = &mut backend.used_capacity; + + // Hack with first collecting into `Option` with `Option::take()` call + // later is to satisfy compiler that gets confused about ownership + // otherwise + let mut maybe_contents = match backend.backend.contents().await { + Ok(contents) => Some(contents), + Err(error) => { + warn!(%cache_index, %error, "Failed to get cache contents"); - drop(maybe_contents); - drop(contents); + None + } + }; + + #[allow(clippy::mutable_key_type)] + let mut cache_stored_pieces = HashMap::new(); + let mut cache_free_offsets = Vec::new(); + + let Some(mut contents) = maybe_contents.take() else { + drop(maybe_contents); + + return CacheState { + cache_stored_pieces, + cache_free_offsets, + backend, + }; + }; - PieceCacheState { - stored_pieces, - free_offsets, - backend: new_cache, + while let Some(maybe_element_details) = contents.next().await { + let (piece_offset, maybe_piece_index) = match maybe_element_details { + Ok(element_details) => element_details, + Err(error) => { + warn!( + %cache_index, + %error, + "Failed to get cache contents element details" + ); + break; } - }, - format!("piece-cache.{index}"), - ) - }, - ) + }; + let offset = FarmerCacheOffset::new(cache_index, piece_offset); + match maybe_piece_index { + Some(piece_index) => { + *used_capacity = piece_offset.0 + 1; + cache_stored_pieces + .insert(RecordKey::from(piece_index.to_multihash()), offset); + } + None => { + // TODO: Optimize to not store all free offsets, only dangling + // offsets are actually necessary + cache_free_offsets.push(offset); + } + } + + // Allow for task to be aborted + yield_now().await; + } + + drop(maybe_contents); + drop(contents); + + CacheState { + cache_stored_pieces, + cache_free_offsets, + backend, + } + }; + + Some(run_future_in_dedicated_thread( + move || init_fut, + format!("piece-cache.{cache_index}"), + )) + }) .collect::, _>>(); let caches_futures = match maybe_caches_futures { Ok(caches_futures) => caches_futures, Err(error) => { - error!( - %error, - "Failed to spawn piece cache reading thread" - ); + error!(%error, "Failed to spawn piece cache reading thread"); return; } }; - let mut caches = Vec::with_capacity(caches_futures.len()); + let mut backends = Vec::with_capacity(caches_futures.len()); let mut caches_futures = caches_futures.into_iter().collect::>(); while let Some(maybe_cache) = caches_futures.next().await { match maybe_cache { Ok(cache) => { - caches.push(cache); + let backend = cache.backend; + stored_pieces.extend(cache.cache_stored_pieces.into_iter()); + dangling_free_offsets.extend( + cache.cache_free_offsets.into_iter().filter(|free_offset| { + free_offset.piece_offset.0 < backend.used_capacity + }), + ); + backends.push(backend); } Err(_cancelled) => { error!("Piece cache reading thread panicked"); @@ -350,6 +487,12 @@ where }; } + let mut caches = PieceCachesState { + stored_pieces, + dangling_free_offsets, + backends, + }; + info!("Synchronizing piece cache"); let last_segment_index = loop { @@ -386,14 +529,13 @@ where debug!(%last_segment_index, "Identified last segment index"); + let limit = caches + .backends + .iter() + .fold(0usize, |acc, backend| acc + backend.total_capacity as usize); worker_state.heap.clear(); // Change limit to number of pieces - worker_state.heap.set_limit( - caches - .iter() - .map(|state| state.stored_pieces.len() + state.free_offsets.len()) - .sum::(), - ); + worker_state.heap.set_limit(limit); for segment_index in SegmentIndex::ZERO..=last_segment_index { for piece_index in segment_index.segment_piece_indexes() { @@ -412,23 +554,28 @@ where }) .collect::>(); - caches.iter_mut().for_each(|state| { - // Filter-out piece indices that are stored, but should not be as well as clean - // `inserted_piece_indices` from already stored piece indices, leaving just those that are - // still missing in cache - state - .stored_pieces - .extract_if(|key, _offset| piece_indices_to_store.remove(key).is_none()) - .for_each(|(_piece_index, offset)| { - state.free_offsets.push_front(offset); - }); - }); + let mut piece_caches_capacity_used = vec![0u32; caches.backends.len()]; + // Filter-out piece indices that are stored, but should not be as well as clean + // `inserted_piece_indices` from already stored piece indices, leaving just those that are + // still missing in cache + caches + .stored_pieces + .extract_if(|key, _offset| piece_indices_to_store.remove(key).is_none()) + .for_each(|(_piece_index, offset)| { + // There is no need to adjust the `last_stored_offset` of the `backend` here, + // as the free_offset will be preferentially taken from the dangling free offsets + caches.dangling_free_offsets.push_front(offset); + }); if let Some(metrics) = &self.metrics { - for cache in &mut caches { + for offset in caches.stored_pieces.values() { + piece_caches_capacity_used[usize::from(offset.cache_index)] += 1; + } + + for cache_used in piece_caches_capacity_used { metrics .piece_cache_capacity_used - .inc_by(cache.stored_pieces.len() as i64); + .inc_by(i64::from(cache_used)); } } @@ -488,42 +635,32 @@ where }; // Find plot in which there is a place for new piece to be stored - let mut sorted_caches = caches.iter_mut().enumerate().collect::>(); - // Sort piece caches by number of stored pieces to fill those that are less - // populated first - sorted_caches.sort_by_key(|(_, cache)| cache.stored_pieces.len()); - if !stream::iter(sorted_caches) - .any(|(cache_index, cache)| async move { - let Some(offset) = cache.free_offsets.pop_front() else { - return false; - }; + let Some(offset) = caches.pop_free_offset() else { + error!( + %piece_index, + "Failed to store piece in cache, there was no space" + ); + break; + }; - if let Err(error) = cache.backend.write_piece(offset, *piece_index, piece).await - { - error!( - %error, - %cache_index, - %piece_index, - %offset, - "Failed to write piece into cache" - ); - return false; - } - if let Some(metrics) = &self.metrics { - metrics.piece_cache_capacity_used.inc(); - } - cache - .stored_pieces - .insert(RecordKey::from(piece_index.to_multihash()), offset); - true - }) - .await + let cache_index = usize::from(offset.cache_index); + let piece_offset = offset.piece_offset; + if let Some(backend) = caches.backends.get(cache_index) + && let Err(error) = backend.write_piece(piece_offset, *piece_index, piece).await { + // TODO: Will likely need to cache problematic backend indices to avoid hitting it over and over again repeatedly error!( + %error, + %cache_index, %piece_index, - "Failed to store piece in cache, there was no space" + %piece_offset, + "Failed to write piece into cache" ); + continue; } + caches + .stored_pieces + .insert(RecordKey::from(piece_index.to_multihash()), offset); downloaded_pieces_count += 1; // Do not print anything or send progress notification after last piece until piece @@ -758,83 +895,91 @@ where match worker_state.heap.insert(heap_key) { // Entry is already occupied, we need to find and replace old piece with new one Some(KeyWrapper(old_piece_index)) => { - for (cache_index, cache) in caches.iter_mut().enumerate() { - let old_record_key = RecordKey::from(old_piece_index.to_multihash()); - let Some(offset) = cache.stored_pieces.remove(&old_record_key) else { - // Not this disk farm - continue; - }; - - if let Err(error) = cache.backend.write_piece(offset, piece_index, &piece).await - { - error!( - %error, - %cache_index, - %piece_index, - %offset, - "Failed to write piece into cache" - ); - } else { - trace!( - %cache_index, - %old_piece_index, - %piece_index, - %offset, - "Successfully replaced old cached piece" - ); - cache.stored_pieces.insert(record_key, offset); - } + let old_record_key = RecordKey::from(old_piece_index.to_multihash()); + let Some(offset) = caches.stored_pieces.remove(&old_record_key) else { + // Not this disk farm + warn!( + %old_piece_index, + %piece_index, + "Should have replaced cached piece, but it didn't happen, this is an \ + implementation bug" + ); + return; + }; + let cache_index = usize::from(offset.cache_index); + let piece_offset = offset.piece_offset; + let Some(backend) = caches.backends.get(cache_index) else { + // Cache backend not exist + warn!( + %cache_index, + %piece_index, + "Should have a cached backend, but it didn't exist, this is an \ + implementation bug" + ); return; + }; + if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await { + error!( + %error, + %cache_index, + %piece_index, + %piece_offset, + "Failed to write piece into cache" + ); + } else { + trace!( + %cache_index, + %old_piece_index, + %piece_index, + %piece_offset, + "Successfully replaced old cached piece" + ); + caches.stored_pieces.insert(record_key, offset); } - - warn!( - %old_piece_index, - %piece_index, - "Should have replaced cached piece, but it didn't happen, this is an \ - implementation bug" - ); } // There is free space in cache, need to find a free spot and place piece there None => { - let mut sorted_caches = caches.iter_mut().enumerate().collect::>(); - // Sort piece caches by number of stored pieces to fill those that are less - // populated first - sorted_caches.sort_by_key(|(_, cache)| cache.stored_pieces.len()); - for (cache_index, cache) in sorted_caches { - let Some(offset) = cache.free_offsets.pop_front() else { - // Not this disk farm - continue; - }; + let Some(offset) = caches.pop_free_offset() else { + warn!( + %piece_index, + "Should have inserted piece into cache, but it didn't happen, this is an \ + implementation bug" + ); + return; + }; + let cache_index = usize::from(offset.cache_index); + let piece_offset = offset.piece_offset; + let Some(backend) = caches.backends.get(cache_index) else { + // Cache backend not exist + warn!( + %cache_index, + %piece_index, + "Should have a cached backend, but it didn't exist, this is an \ + implementation bug" + ); + return; + }; - if let Err(error) = cache.backend.write_piece(offset, piece_index, &piece).await - { - error!( - %error, - %cache_index, - %piece_index, - %offset, - "Failed to write piece into cache" - ); - } else { - trace!( - %cache_index, - %piece_index, - %offset, - "Successfully stored piece in cache" - ); - if let Some(metrics) = &self.metrics { - metrics.piece_cache_capacity_used.inc(); - } - cache.stored_pieces.insert(record_key, offset); + if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await { + error!( + %error, + %cache_index, + %piece_index, + %piece_offset, + "Failed to write piece into cache" + ); + } else { + trace!( + %cache_index, + %piece_index, + %piece_offset, + "Successfully stored piece in cache" + ); + if let Some(metrics) = &self.metrics { + metrics.piece_cache_capacity_used.inc(); } - return; + caches.stored_pieces.insert(record_key, offset); } - - warn!( - %piece_index, - "Should have inserted piece into cache, but it didn't happen, this is an \ - implementation bug" - ); } }; } @@ -923,10 +1068,10 @@ impl PlotCaches { /// where piece cache is not enough to store all the pieces on the network, while there is a lot of /// space in the plot that is not used by sectors yet and can be leverage as extra caching space. #[derive(Debug, Clone)] -pub struct FarmerCache { +pub struct FarmerCache { peer_id: PeerId, /// Individual dedicated piece caches - piece_caches: Arc>>, + piece_caches: Arc>>, /// Additional piece caches plot_caches: Arc, handlers: Arc, @@ -935,7 +1080,12 @@ pub struct FarmerCache { metrics: Option>, } -impl FarmerCache { +impl FarmerCache +where + CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, + usize: From, + CacheIndex: TryFrom, +{ /// Create new piece cache instance and corresponding worker. /// /// NOTE: Returned future is async, but does blocking operations and should be running in @@ -944,7 +1094,7 @@ impl FarmerCache { node_client: NC, peer_id: PeerId, registry: Option<&mut Registry>, - ) -> (Self, FarmerCacheWorker) + ) -> (Self, FarmerCacheWorker) where NC: NodeClient, { @@ -981,11 +1131,23 @@ impl FarmerCache { /// Get piece from cache pub async fn get_piece(&self, key: RecordKey) -> Option { - for (cache_index, cache) in self.piece_caches.read().await.iter().enumerate() { - let Some(&offset) = cache.stored_pieces.get(&key) else { - continue; - }; - match cache.backend.read_piece(offset).await { + let maybe_piece_found = { + let caches = self.piece_caches.read().await; + + caches.stored_pieces.get(&key).and_then(|offset| { + let cache_index = usize::from(offset.cache_index); + let piece_offset = offset.piece_offset; + + Some(( + piece_offset, + cache_index, + caches.backends.get(cache_index)?.clone(), + )) + }) + }; + + if let Some((piece_offset, cache_index, backend)) = maybe_piece_found { + match backend.read_piece(piece_offset).await { Ok(maybe_piece) => { return match maybe_piece { Some((_piece_index, piece)) => { @@ -1007,7 +1169,7 @@ impl FarmerCache { %error, %cache_index, ?key, - %offset, + %piece_offset, "Error while reading piece from cache, might be a disk corruption" ); @@ -1049,14 +1211,22 @@ impl FarmerCache { ) -> Option<(PieceCacheId, PieceCacheOffset)> { let key = RecordKey::from(piece_index.to_multihash()); - for cache in self.piece_caches.read().await.iter() { - let Some(&offset) = cache.stored_pieces.get(&key) else { - continue; - }; + let caches = self.piece_caches.read().await; + let Some(offset) = caches.stored_pieces.get(&key) else { + if let Some(metrics) = &self.metrics { + metrics.cache_find_miss.inc(); + } + + return None; + }; + let cache_index = usize::from(offset.cache_index); + let piece_offset = offset.piece_offset; + + if let Some(backend) = caches.backends.get(cache_index) { if let Some(metrics) = &self.metrics { metrics.cache_find_hit.inc(); } - return Some((*cache.backend.id(), offset)); + return Some((*backend.id(), piece_offset)); } if let Some(metrics) = &self.metrics { @@ -1103,21 +1273,29 @@ impl FarmerCache { } } -impl LocalRecordProvider for FarmerCache { +impl LocalRecordProvider for FarmerCache +where + CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, + usize: From, + CacheIndex: TryFrom, +{ fn record(&self, key: &RecordKey) -> Option { - for piece_cache in self.piece_caches.try_read()?.iter() { - if piece_cache.stored_pieces.contains_key(key) { - // Note: We store our own provider records locally without local addresses - // to avoid redundant storage and outdated addresses. Instead, these are - // acquired on demand when returning a `ProviderRecord` for the local node. - return Some(ProviderRecord { - key: key.clone(), - provider: self.peer_id, - expires: None, - addresses: Vec::new(), - }); - }; - } + if self + .piece_caches + .try_read()? + .stored_pieces + .contains_key(key) + { + // Note: We store our own provider records locally without local addresses + // to avoid redundant storage and outdated addresses. Instead, these are + // acquired on demand when returning a `ProviderRecord` for the local node. + return Some(ProviderRecord { + key: key.clone(), + provider: self.peer_id, + expires: None, + addresses: Vec::new(), + }); + }; let found_fut = self .plot_caches diff --git a/crates/subspace-farmer/src/farmer_cache/tests.rs b/crates/subspace-farmer/src/farmer_cache/tests.rs index 2ffd343d27..289191d960 100644 --- a/crates/subspace-farmer/src/farmer_cache/tests.rs +++ b/crates/subspace-farmer/src/farmer_cache/tests.rs @@ -24,6 +24,8 @@ use subspace_rpc_primitives::{ }; use tempfile::tempdir; +type TestCacheIndex = u8; + #[derive(Debug, Clone)] struct MockNodeClient { current_segment_index: Arc, @@ -185,7 +187,7 @@ async fn basic() { { let (farmer_cache, farmer_cache_worker) = - FarmerCache::new(node_client.clone(), public_key.to_peer_id(), None); + FarmerCache::::new(node_client.clone(), public_key.to_peer_id(), None); let farmer_cache_worker_exited = tokio::spawn(farmer_cache_worker.run(piece_getter.clone())); @@ -385,7 +387,7 @@ async fn basic() { pieces.lock().clear(); let (farmer_cache, farmer_cache_worker) = - FarmerCache::new(node_client.clone(), public_key.to_peer_id(), None); + FarmerCache::::new(node_client.clone(), public_key.to_peer_id(), None); let farmer_cache_worker_exited = tokio::spawn(farmer_cache_worker.run(piece_getter)); diff --git a/crates/subspace-farmer/src/farmer_piece_getter.rs b/crates/subspace-farmer/src/farmer_piece_getter.rs index 8df41d865a..67d4878389 100644 --- a/crates/subspace-farmer/src/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/farmer_piece_getter.rs @@ -103,9 +103,9 @@ pub struct DsnCacheRetryPolicy { pub backoff: ExponentialBackoff, } -struct Inner { +struct Inner { piece_provider: PieceProvider, - farmer_cache: FarmerCache, + farmer_cache: FarmerCache, node_client: NC, plotted_pieces: Arc>>, dsn_cache_retry_policy: DsnCacheRetryPolicy, @@ -116,18 +116,20 @@ struct Inner { /// Farmer-specific piece getter. /// /// Implements [`PieceGetter`] for plotting purposes, but useful outside of that as well. -pub struct FarmerPieceGetter { - inner: Arc>, +pub struct FarmerPieceGetter { + inner: Arc>, } -impl fmt::Debug for FarmerPieceGetter { +impl fmt::Debug + for FarmerPieceGetter +{ #[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FarmerPieceGetter").finish_non_exhaustive() } } -impl Clone for FarmerPieceGetter { +impl Clone for FarmerPieceGetter { #[inline] fn clone(&self) -> Self { Self { @@ -136,17 +138,20 @@ impl Clone for FarmerPieceGetter { } } -impl FarmerPieceGetter +impl FarmerPieceGetter where FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static, usize: From, + CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, + usize: From, + CacheIndex: TryFrom, PV: PieceValidator + Send + 'static, NC: NodeClient, { /// Create new instance pub fn new( piece_provider: PieceProvider, - farmer_cache: FarmerCache, + farmer_cache: FarmerCache, node_client: NC, plotted_pieces: Arc>>, dsn_cache_retry_policy: DsnCacheRetryPolicy, @@ -359,7 +364,7 @@ where /// Downgrade to [`WeakFarmerPieceGetter`] in order to break reference cycles with internally /// used [`Arc`] - pub fn downgrade(&self) -> WeakFarmerPieceGetter { + pub fn downgrade(&self) -> WeakFarmerPieceGetter { WeakFarmerPieceGetter { inner: Arc::downgrade(&self.inner), } @@ -367,10 +372,13 @@ where } #[async_trait] -impl PieceGetter for FarmerPieceGetter +impl PieceGetter for FarmerPieceGetter where FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static, usize: From, + CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, + usize: From, + CacheIndex: TryFrom, PV: PieceValidator + Send + 'static, NC: NodeClient, { @@ -409,11 +417,13 @@ where } /// Weak farmer piece getter, can be upgraded to [`FarmerPieceGetter`] -pub struct WeakFarmerPieceGetter { - inner: Weak>, +pub struct WeakFarmerPieceGetter { + inner: Weak>, } -impl fmt::Debug for WeakFarmerPieceGetter { +impl fmt::Debug + for WeakFarmerPieceGetter +{ #[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WeakFarmerPieceGetter") @@ -421,7 +431,7 @@ impl fmt::Debug for WeakFarmerPieceGetter } } -impl Clone for WeakFarmerPieceGetter { +impl Clone for WeakFarmerPieceGetter { #[inline] fn clone(&self) -> Self { Self { @@ -431,10 +441,14 @@ impl Clone for WeakFarmerPieceGetter { } #[async_trait] -impl PieceGetter for WeakFarmerPieceGetter +impl PieceGetter + for WeakFarmerPieceGetter where FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static, usize: From, + CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, + usize: From, + CacheIndex: TryFrom, PV: PieceValidator + Send + 'static, NC: NodeClient, { @@ -451,9 +465,9 @@ where } } -impl WeakFarmerPieceGetter { +impl WeakFarmerPieceGetter { /// Try to upgrade to [`FarmerPieceGetter`] if there is at least one other instance of it alive - pub fn upgrade(&self) -> Option> { + pub fn upgrade(&self) -> Option> { Some(FarmerPieceGetter { inner: self.inner.upgrade()?, })