Skip to content

Commit

Permalink
Merge pull request #2644 from subspace/improve-farmer-caching
Browse files Browse the repository at this point in the history
Improve farmer caching
  • Loading branch information
nazar-pc authored Mar 27, 2024
2 parents c5475f0 + 2ecc51c commit 14f5d71
Showing 1 changed file with 142 additions and 104 deletions.
246 changes: 142 additions & 104 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ where
{
peer_id: PeerId,
node_client: NC,
caches: Arc<AsyncRwLock<Vec<PieceCacheState>>>,
piece_caches: Arc<AsyncRwLock<Vec<PieceCacheState>>>,
plot_caches: Arc<PlotCaches>,
handlers: Arc<Handlers>,
worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
}
Expand Down Expand Up @@ -165,7 +166,7 @@ where
}
// TODO: Consider implementing optional re-sync of the piece instead of just forgetting
WorkerCommand::ForgetKey { key } => {
let mut caches = self.caches.write().await;
let mut caches = self.piece_caches.write().await;

for (farm_index, cache) in caches.iter_mut().enumerate() {
let Some(offset) = cache.stored_pieces.remove(&key) else {
Expand Down Expand Up @@ -213,7 +214,7 @@ where
{
info!("Initializing piece cache");
// Pull old cache state since it will be replaced with a new one and reuse its allocations
let cache_state = mem::take(&mut *self.caches.write().await);
let cache_state = mem::take(&mut *self.piece_caches.write().await);
let mut stored_pieces = Vec::with_capacity(new_piece_caches.len());
let mut free_offsets = Vec::with_capacity(new_piece_caches.len());
for mut state in cache_state {
Expand Down Expand Up @@ -325,7 +326,7 @@ where
);

// Not the latest, but at least something
*self.caches.write().await = caches;
*self.piece_caches.write().await = caches;
return;
}
}
Expand Down Expand Up @@ -374,7 +375,7 @@ where
});

// Store whatever correct pieces are immediately available after restart
*self.caches.write().await = caches.clone();
*self.piece_caches.write().await = caches.clone();

debug!(
count = %piece_indices_to_store.len(),
Expand Down Expand Up @@ -466,14 +467,14 @@ where
downloaded_pieces_count += 1;
let progress = downloaded_pieces_count as f32 / pieces_to_download_total as f32 * 100.0;
if downloaded_pieces_count % INTERMEDIATE_CACHE_UPDATE_INTERVAL == 0 {
*self.caches.write().await = caches.clone();
*self.piece_caches.write().await = caches.clone();

info!("Piece cache sync {progress:.2}% complete");
}
self.handlers.progress.call_simple(&progress);
}

*self.caches.write().await = caches;
*self.piece_caches.write().await = caches;
self.handlers.progress.call_simple(&100.0);
worker_state.last_segment_index = last_segment_index;

Expand All @@ -497,44 +498,51 @@ where
let pieces_to_maybe_include = segment_index
.segment_piece_indexes()
.into_iter()
.filter(|&piece_index| {
let maybe_include = worker_state
.heap
.should_include_key(KeyWrapper(piece_index));
if !maybe_include {
trace!(%piece_index, "Piece doesn't need to be cached #1");
}
.map(|piece_index| {
let worker_state = &*worker_state;

maybe_include
})
.map(|piece_index| async move {
let maybe_piece = match self.node_client.piece(piece_index).await {
Ok(maybe_piece) => maybe_piece,
Err(error) => {
async move {
let should_store_in_piece_cache = worker_state
.heap
.should_include_key(KeyWrapper(piece_index));
let key = RecordKey::from(piece_index.to_multihash());
let should_store_in_plot_cache =
self.plot_caches.should_store(piece_index, &key).await;

if !(should_store_in_piece_cache || should_store_in_plot_cache) {
trace!(%piece_index, "Piece doesn't need to be cached #1");

return None;
}

let maybe_piece = match self.node_client.piece(piece_index).await {
Ok(maybe_piece) => maybe_piece,
Err(error) => {
error!(
%error,
%segment_index,
%piece_index,
"Failed to retrieve piece from node right after archiving, \
this should never happen and is an implementation bug"
);

return None;
}
};

let Some(piece) = maybe_piece else {
error!(
%error,
%segment_index,
%piece_index,
"Failed to retrieve piece from node right after archiving, this \
should never happen and is an implementation bug"
);

return None;
}
};

let Some(piece) = maybe_piece else {
error!(
%segment_index,
%piece_index,
"Failed to retrieve piece from node right after archiving, this should \
never happen and is an implementation bug"
);
};

return None;
};

Some((piece_index, piece))
Some((piece_index, piece))
}
})
.collect::<FuturesUnordered<_>>()
.filter_map(|maybe_piece| async move { maybe_piece })
Expand All @@ -546,9 +554,19 @@ where
self.acknowledge_archived_segment_processing(segment_index)
.await;

// TODO: Would be nice to have concurrency here, but heap is causing a bit of
// difficulties unfortunately
// Go through potentially matching pieces again now that segment was acknowledged and
// try to persist them if necessary
for (piece_index, piece) in pieces_to_maybe_include {
if !self
.plot_caches
.store_additional_piece(piece_index, &piece)
.await
{
trace!(%piece_index, "Piece doesn't need to be cached in plot cache");
}

if !worker_state
.heap
.should_include_key(KeyWrapper(piece_index))
Expand Down Expand Up @@ -669,7 +687,7 @@ where
let record_key = RecordKey::from(piece_index.to_multihash());
let heap_key = KeyWrapper(piece_index);

let mut caches = self.caches.write().await;
let mut caches = self.piece_caches.write().await;
match worker_state.heap.insert(heap_key) {
// Entry is already occupied, we need to find and replace old piece with new one
Some(KeyWrapper(old_piece_index)) => {
Expand Down Expand Up @@ -752,16 +770,86 @@ where
}
}

#[derive(Debug)]
struct PlotCaches {
/// Additional piece caches
caches: AsyncRwLock<Vec<Arc<dyn PlotCache>>>,
/// Next plot cache to use for storing pieces
next_plot_cache: AtomicUsize,
}

impl PlotCaches {
async fn should_store(&self, piece_index: PieceIndex, key: &RecordKey) -> bool {
for (farm_index, cache) in self.caches.read().await.iter().enumerate() {
match cache.is_piece_maybe_stored(key).await {
Ok(MaybePieceStoredResult::No) => {
// Try another one if there is any
}
Ok(MaybePieceStoredResult::Vacant) => {
return true;
}
Ok(MaybePieceStoredResult::Yes) => {
// Already stored, nothing else left to do
return false;
}
Err(error) => {
warn!(
%farm_index,
%piece_index,
%error,
"Failed to check piece stored in cache"
);
}
}
}

false
}

/// Store a piece in additional downloaded pieces, if there is space for them
async fn store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) -> bool {
let plot_caches = self.caches.read().await;
let plot_caches_len = plot_caches.len();

// Store pieces in plots using round-robin distribution
for _ in 0..plot_caches_len {
let plot_cache_index =
self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;

match plot_caches[plot_cache_index]
.try_store_piece(piece_index, piece)
.await
{
Ok(true) => {
return false;
}
Ok(false) => {
continue;
}
Err(error) => {
error!(
%error,
%piece_index,
%plot_cache_index,
"Failed to store additional piece in cache"
);
continue;
}
}
}

false
}
}

/// Farmer cache that aggregates different kinds of caches of multiple disks
#[derive(Debug, Clone)]
pub struct FarmerCache {
peer_id: PeerId,
/// Individual dedicated piece caches
piece_caches: Arc<AsyncRwLock<Vec<PieceCacheState>>>,
/// Additional piece caches
plot_caches: Arc<AsyncRwLock<Vec<Arc<dyn PlotCache>>>>,
/// Next plot cache to use for storing pieces
next_plot_cache: Arc<AtomicUsize>,
plot_caches: Arc<PlotCaches>,
handlers: Arc<Handlers>,
// We do not want to increase capacity unnecessarily on clone
worker_sender: Arc<mpsc::Sender<WorkerCommand>>,
Expand All @@ -780,18 +868,23 @@ impl FarmerCache {
let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY);
let handlers = Arc::new(Handlers::default());

let plot_caches = Arc::new(PlotCaches {
caches: AsyncRwLock::default(),
next_plot_cache: AtomicUsize::new(0),
});

let instance = Self {
peer_id,
piece_caches: Arc::clone(&caches),
plot_caches: Arc::default(),
next_plot_cache: Arc::new(AtomicUsize::new(0)),
plot_caches: Arc::clone(&plot_caches),
handlers: Arc::clone(&handlers),
worker_sender: Arc::new(worker_sender),
};
let worker = FarmerCacheWorker {
peer_id,
node_client,
caches,
piece_caches: caches,
plot_caches,
handlers,
worker_receiver: Some(worker_receiver),
};
Expand Down Expand Up @@ -831,7 +924,7 @@ impl FarmerCache {
}
}

for cache in self.plot_caches.read().await.iter() {
for cache in self.plot_caches.caches.read().await.iter() {
if let Ok(Some(piece)) = cache.read_piece(&key).await {
return Some(piece);
}
Expand All @@ -844,71 +937,15 @@ impl FarmerCache {
pub async fn maybe_store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) {
let key = RecordKey::from(piece_index.to_multihash());

for cache in self.piece_caches.read().await.iter() {
if cache.stored_pieces.contains_key(&key) {
// Already stored in normal piece cache, no need to store it again
return;
}
}

let mut should_store = false;
for (farm_index, cache) in self.plot_caches.read().await.iter().enumerate() {
match cache.is_piece_maybe_stored(&key).await {
Ok(MaybePieceStoredResult::No) => {
// Try another one if there is any
}
Ok(MaybePieceStoredResult::Vacant) => {
should_store = true;
break;
}
Ok(MaybePieceStoredResult::Yes) => {
// Already stored, nothing else left to do
return;
}
Err(error) => {
warn!(
%farm_index,
%piece_index,
%error,
"Failed to check piece stored in cache"
);
}
}
}
let should_store = self.plot_caches.should_store(piece_index, &key).await;

if !should_store {
return;
}

let plot_caches = self.plot_caches.read().await;
let plot_caches_len = plot_caches.len();

// Store pieces in plots using round-robin distribution
for _ in 0..plot_caches_len {
let plot_cache_index =
self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;

match plot_caches[plot_cache_index]
.try_store_piece(piece_index, piece)
.await
{
Ok(true) => {
return;
}
Ok(false) => {
continue;
}
Err(error) => {
error!(
%error,
%piece_index,
%plot_cache_index,
"Failed to store additional piece in cache"
);
continue;
}
}
}
self.plot_caches
.store_additional_piece(piece_index, piece)
.await;
}

/// Initialize replacement of backing caches
Expand All @@ -925,7 +962,7 @@ impl FarmerCache {
warn!(%error, "Failed to replace backing caches, worker exited");
}

*self.plot_caches.write().await = new_plot_caches;
*self.plot_caches.caches.write().await = new_plot_caches;
}

/// Subscribe to cache sync notifications
Expand All @@ -952,6 +989,7 @@ impl LocalRecordProvider for FarmerCache {

let found_fut = self
.plot_caches
.caches
.try_read()?
.iter()
.map(|plot_cache| {
Expand Down

0 comments on commit 14f5d71

Please sign in to comment.