diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 0a48970231..dc2ec8fc6e 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -75,7 +75,8 @@ where { peer_id: PeerId, node_client: NC, - caches: Arc>>, + piece_caches: Arc>>, + plot_caches: Arc, handlers: Arc, worker_receiver: Option>, } @@ -165,7 +166,7 @@ where } // TODO: Consider implementing optional re-sync of the piece instead of just forgetting WorkerCommand::ForgetKey { key } => { - let mut caches = self.caches.write().await; + let mut caches = self.piece_caches.write().await; for (farm_index, cache) in caches.iter_mut().enumerate() { let Some(offset) = cache.stored_pieces.remove(&key) else { @@ -213,7 +214,7 @@ where { info!("Initializing piece cache"); // Pull old cache state since it will be replaced with a new one and reuse its allocations - let cache_state = mem::take(&mut *self.caches.write().await); + let cache_state = mem::take(&mut *self.piece_caches.write().await); let mut stored_pieces = Vec::with_capacity(new_piece_caches.len()); let mut free_offsets = Vec::with_capacity(new_piece_caches.len()); for mut state in cache_state { @@ -325,7 +326,7 @@ where ); // Not the latest, but at least something - *self.caches.write().await = caches; + *self.piece_caches.write().await = caches; return; } } @@ -374,7 +375,7 @@ where }); // Store whatever correct pieces are immediately available after restart - *self.caches.write().await = caches.clone(); + *self.piece_caches.write().await = caches.clone(); debug!( count = %piece_indices_to_store.len(), @@ -466,14 +467,14 @@ where downloaded_pieces_count += 1; let progress = downloaded_pieces_count as f32 / pieces_to_download_total as f32 * 100.0; if downloaded_pieces_count % INTERMEDIATE_CACHE_UPDATE_INTERVAL == 0 { - *self.caches.write().await = caches.clone(); + *self.piece_caches.write().await = caches.clone(); info!("Piece cache sync {progress:.2}% complete"); } self.handlers.progress.call_simple(&progress); } - *self.caches.write().await = caches; + *self.piece_caches.write().await = caches; self.handlers.progress.call_simple(&100.0); worker_state.last_segment_index = last_segment_index; @@ -497,22 +498,40 @@ where let pieces_to_maybe_include = segment_index .segment_piece_indexes() .into_iter() - .filter(|&piece_index| { - let maybe_include = worker_state - .heap - .should_include_key(KeyWrapper(piece_index)); - if !maybe_include { - trace!(%piece_index, "Piece doesn't need to be cached #1"); - } + .map(|piece_index| { + let worker_state = &*worker_state; - maybe_include - }) - .map(|piece_index| async move { - let maybe_piece = match self.node_client.piece(piece_index).await { - Ok(maybe_piece) => maybe_piece, - Err(error) => { + async move { + let should_store_in_piece_cache = worker_state + .heap + .should_include_key(KeyWrapper(piece_index)); + let key = RecordKey::from(piece_index.to_multihash()); + let should_store_in_plot_cache = + self.plot_caches.should_store(piece_index, &key).await; + + if !(should_store_in_piece_cache || should_store_in_plot_cache) { + trace!(%piece_index, "Piece doesn't need to be cached #1"); + + return None; + } + + let maybe_piece = match self.node_client.piece(piece_index).await { + Ok(maybe_piece) => maybe_piece, + Err(error) => { + error!( + %error, + %segment_index, + %piece_index, + "Failed to retrieve piece from node right after archiving, \ + this should never happen and is an implementation bug" + ); + + return None; + } + }; + + let Some(piece) = maybe_piece else { error!( - %error, %segment_index, %piece_index, "Failed to retrieve piece from node right after archiving, this \ @@ -520,21 +539,10 @@ where ); return None; - } - }; - - let Some(piece) = maybe_piece else { - error!( - %segment_index, - %piece_index, - "Failed to retrieve piece from node right after archiving, this should \ - never happen and is an implementation bug" - ); + }; - return None; - }; - - Some((piece_index, piece)) + Some((piece_index, piece)) + } }) .collect::>() .filter_map(|maybe_piece| async move { maybe_piece }) @@ -546,9 +554,19 @@ where self.acknowledge_archived_segment_processing(segment_index) .await; + // TODO: Would be nice to have concurrency here, but heap is causing a bit of + // difficulties unfortunately // Go through potentially matching pieces again now that segment was acknowledged and // try to persist them if necessary for (piece_index, piece) in pieces_to_maybe_include { + if !self + .plot_caches + .store_additional_piece(piece_index, &piece) + .await + { + trace!(%piece_index, "Piece doesn't need to be cached in plot cache"); + } + if !worker_state .heap .should_include_key(KeyWrapper(piece_index)) @@ -669,7 +687,7 @@ where let record_key = RecordKey::from(piece_index.to_multihash()); let heap_key = KeyWrapper(piece_index); - let mut caches = self.caches.write().await; + let mut caches = self.piece_caches.write().await; match worker_state.heap.insert(heap_key) { // Entry is already occupied, we need to find and replace old piece with new one Some(KeyWrapper(old_piece_index)) => { @@ -752,6 +770,78 @@ where } } +#[derive(Debug)] +struct PlotCaches { + /// Additional piece caches + caches: AsyncRwLock>>, + /// Next plot cache to use for storing pieces + next_plot_cache: AtomicUsize, +} + +impl PlotCaches { + async fn should_store(&self, piece_index: PieceIndex, key: &RecordKey) -> bool { + for (farm_index, cache) in self.caches.read().await.iter().enumerate() { + match cache.is_piece_maybe_stored(key).await { + Ok(MaybePieceStoredResult::No) => { + // Try another one if there is any + } + Ok(MaybePieceStoredResult::Vacant) => { + return true; + } + Ok(MaybePieceStoredResult::Yes) => { + // Already stored, nothing else left to do + return false; + } + Err(error) => { + warn!( + %farm_index, + %piece_index, + %error, + "Failed to check piece stored in cache" + ); + } + } + } + + false + } + + /// Store a piece in additional downloaded pieces, if there is space for them + async fn store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) -> bool { + let plot_caches = self.caches.read().await; + let plot_caches_len = plot_caches.len(); + + // Store pieces in plots using round-robin distribution + for _ in 0..plot_caches_len { + let plot_cache_index = + self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len; + + match plot_caches[plot_cache_index] + .try_store_piece(piece_index, piece) + .await + { + Ok(true) => { + return false; + } + Ok(false) => { + continue; + } + Err(error) => { + error!( + %error, + %piece_index, + %plot_cache_index, + "Failed to store additional piece in cache" + ); + continue; + } + } + } + + false + } +} + /// Farmer cache that aggregates different kinds of caches of multiple disks #[derive(Debug, Clone)] pub struct FarmerCache { @@ -759,9 +849,7 @@ pub struct FarmerCache { /// Individual dedicated piece caches piece_caches: Arc>>, /// Additional piece caches - plot_caches: Arc>>>, - /// Next plot cache to use for storing pieces - next_plot_cache: Arc, + plot_caches: Arc, handlers: Arc, // We do not want to increase capacity unnecessarily on clone worker_sender: Arc>, @@ -780,18 +868,23 @@ impl FarmerCache { let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY); let handlers = Arc::new(Handlers::default()); + let plot_caches = Arc::new(PlotCaches { + caches: AsyncRwLock::default(), + next_plot_cache: AtomicUsize::new(0), + }); + let instance = Self { peer_id, piece_caches: Arc::clone(&caches), - plot_caches: Arc::default(), - next_plot_cache: Arc::new(AtomicUsize::new(0)), + plot_caches: Arc::clone(&plot_caches), handlers: Arc::clone(&handlers), worker_sender: Arc::new(worker_sender), }; let worker = FarmerCacheWorker { peer_id, node_client, - caches, + piece_caches: caches, + plot_caches, handlers, worker_receiver: Some(worker_receiver), }; @@ -831,7 +924,7 @@ impl FarmerCache { } } - for cache in self.plot_caches.read().await.iter() { + for cache in self.plot_caches.caches.read().await.iter() { if let Ok(Some(piece)) = cache.read_piece(&key).await { return Some(piece); } @@ -844,71 +937,15 @@ impl FarmerCache { pub async fn maybe_store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) { let key = RecordKey::from(piece_index.to_multihash()); - for cache in self.piece_caches.read().await.iter() { - if cache.stored_pieces.contains_key(&key) { - // Already stored in normal piece cache, no need to store it again - return; - } - } - - let mut should_store = false; - for (farm_index, cache) in self.plot_caches.read().await.iter().enumerate() { - match cache.is_piece_maybe_stored(&key).await { - Ok(MaybePieceStoredResult::No) => { - // Try another one if there is any - } - Ok(MaybePieceStoredResult::Vacant) => { - should_store = true; - break; - } - Ok(MaybePieceStoredResult::Yes) => { - // Already stored, nothing else left to do - return; - } - Err(error) => { - warn!( - %farm_index, - %piece_index, - %error, - "Failed to check piece stored in cache" - ); - } - } - } + let should_store = self.plot_caches.should_store(piece_index, &key).await; if !should_store { return; } - let plot_caches = self.plot_caches.read().await; - let plot_caches_len = plot_caches.len(); - - // Store pieces in plots using round-robin distribution - for _ in 0..plot_caches_len { - let plot_cache_index = - self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len; - - match plot_caches[plot_cache_index] - .try_store_piece(piece_index, piece) - .await - { - Ok(true) => { - return; - } - Ok(false) => { - continue; - } - Err(error) => { - error!( - %error, - %piece_index, - %plot_cache_index, - "Failed to store additional piece in cache" - ); - continue; - } - } - } + self.plot_caches + .store_additional_piece(piece_index, piece) + .await; } /// Initialize replacement of backing caches @@ -925,7 +962,7 @@ impl FarmerCache { warn!(%error, "Failed to replace backing caches, worker exited"); } - *self.plot_caches.write().await = new_plot_caches; + *self.plot_caches.caches.write().await = new_plot_caches; } /// Subscribe to cache sync notifications @@ -952,6 +989,7 @@ impl LocalRecordProvider for FarmerCache { let found_fut = self .plot_caches + .caches .try_read()? .iter() .map(|plot_cache| {