Skip to content

Commit

Permalink
Add FarmerCache::get_pieces() method that can get many pieces concu…
Browse files Browse the repository at this point in the history
…rrently from different caches
  • Loading branch information
nazar-pc committed Oct 16, 2024
1 parent c045062 commit bfe4033
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 3 deletions.
188 changes: 187 additions & 1 deletion crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ use async_lock::RwLock as AsyncRwLock;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::mpsc;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, SinkExt, StreamExt};
use futures::{select, stream, FutureExt, SinkExt, Stream, StreamExt};
use prometheus_client::registry::Registry;
use rayon::prelude::*;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::future::join;
use std::hash::Hash;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use std::{fmt, mem};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
Expand Down Expand Up @@ -1128,6 +1131,189 @@ where
None
}

/// Get pieces from cache.
///
/// Number of elements in returned stream is the same as in `piece_indices`.
pub async fn get_pieces<'a, PieceIndices>(
&'a self,
piece_indices: PieceIndices,
) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
{
let mut pieces_to_get_from_plot_cache = Vec::new();

let reading_from_piece_cache = {
let caches = self.piece_caches.read().await;
// Pieces to read from piece cache grouped by backend for efficiency reasons
let mut reading_from_piece_cache = HashMap::<CacheIndex, (CacheBackend, Vec<_>)>::new();

for piece_index in piece_indices {
let key = RecordKey::from(piece_index.to_multihash());

let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
self.peer_id,
key.clone(),
)) {
Some(offset) => offset,
None => {
pieces_to_get_from_plot_cache.push((piece_index, key));
continue;
}
};

let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;

match reading_from_piece_cache.entry(cache_index) {
Entry::Occupied(mut entry) => {
let (_backend, pieces) = entry.get_mut();
pieces.push((piece_index, piece_offset, key));
}
Entry::Vacant(entry) => {
let backend = match caches.get_backend(cache_index) {
Some(backend) => backend.clone(),
None => {
pieces_to_get_from_plot_cache.push((piece_index, key));
continue;
}
};
entry.insert((backend, vec![(piece_index, piece_offset, key)]));
}
}
}

reading_from_piece_cache
};

let (tx, mut rx) = mpsc::unbounded();

let mut fut = Box::pin(async move {
let tx = &tx;

let mut reading_from_piece_cache = reading_from_piece_cache
.into_iter()
.map(|(cache_index, (backend, pieces))| async move {
// TODO: Read a stream of pieces from each backend rather than read
// individual pieces sequentially, but `PieceCache` API ideally needs to be
// extended first to support this more efficiently, especially over the
// network
for (piece_index, piece_offset, key) in pieces {
let maybe_piece = match backend.read_piece(piece_offset).await {
Ok(Some((_piece_index, piece))) => {
if let Some(metrics) = &self.metrics {
metrics.cache_get_hit.inc();
}
Some(piece)
}
Ok(None) => {
if let Some(metrics) = &self.metrics {
metrics.cache_get_miss.inc();
}
None
}
Err(error) => {
error!(
%error,
%cache_index,
%piece_index,
%piece_offset,
?key,
"Error while reading piece from cache, might be a disk \
corruption"
);

if let Err(error) = self
.worker_sender
.clone()
.send(WorkerCommand::ForgetKey { key })
.await
{
trace!(
%error,
"Failed to send ForgetKey command to worker"
);
}

if let Some(metrics) = &self.metrics {
metrics.cache_get_error.inc();
}
None
}
};

tx.unbounded_send((piece_index, maybe_piece))
.expect("This future isn't polled after receiver is dropped; qed");
}
})
.collect::<FuturesUnordered<_>>();
// TODO: Can't use this due to https://github.com/rust-lang/rust/issues/64650
// Simply drain everything
// .for_each(|()| async {})

// TODO: Remove once https://github.com/rust-lang/rust/issues/64650 is resolved
let reading_from_piece_cache_fut = async move {
while let Some(()) = reading_from_piece_cache.next().await {
// Simply drain everything
}
};

let reading_from_plot_cache_fut = async {
if pieces_to_get_from_plot_cache.is_empty() {
return;
}

for cache in self.plot_caches.caches.read().await.iter() {
// Iterating over offsets in reverse order to both traverse elements in async code
// and being able to efficiently remove entries without extra allocations
for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];

if let Ok(Some(piece)) = cache.read_piece(key).await {
if let Some(metrics) = &self.metrics {
metrics.cache_get_hit.inc();
}
tx.unbounded_send((*piece_index, Some(piece)))
.expect("This future isn't polled after receiver is dropped; qed");

// Due to iteration in reverse order and swapping using elements at the end,
// this doesn't affect processing of the elements
pieces_to_get_from_plot_cache.swap_remove(offset);
}
}

if pieces_to_get_from_plot_cache.is_empty() {
return;
}
}

if let Some(metrics) = &self.metrics {
metrics
.cache_get_miss
.inc_by(pieces_to_get_from_plot_cache.len() as u64);
}

for (piece_index, _key) in pieces_to_get_from_plot_cache {
tx.unbounded_send((piece_index, None))
.expect("This future isn't polled after receiver is dropped; qed");
}
};

join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
});

// Drive above future and stream back any pieces that were downloaded so far
stream::poll_fn(move |cx| {
let end_result = fut.poll_unpin(cx);

if let Ok(maybe_result) = rx.try_next() {
return Poll::Ready(maybe_result);
}

end_result.map(|((), ())| None)
})
}

/// Returns a filtered list of pieces that were found in farmer cache, order is not guaranteed
pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
duration_constructors,
exact_size_is_empty,
fmt_helpers_for_derive,
future_join,
hash_extract_if,
impl_trait_in_assoc_type,
int_roundings,
Expand Down
6 changes: 4 additions & 2 deletions crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ where
}
}

/// Get pieces with provided indices from cache
/// Get pieces with provided indices from cache.
///
/// Number of elements in returned stream is the same as in `piece_indices`.
pub async fn get_from_cache<'a, PieceIndices>(
&'a self,
piece_indices: PieceIndices,
) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + 'a
) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Unpin + 'a
where
PieceIndices: IntoIterator<Item = PieceIndex> + 'a,
{
Expand Down

0 comments on commit bfe4033

Please sign in to comment.