Skip to content

Commit

Permalink
Merge pull request #3135 from autonomys/get_pieces-in-farmer-piece-ge…
Browse files Browse the repository at this point in the history
…tter

`PieceGetter::get_pieces()` in `FarmerPieceGetter`
  • Loading branch information
nazar-pc authored Oct 17, 2024
2 parents 788d42e + a224edc commit 1541ff2
Show file tree
Hide file tree
Showing 4 changed files with 334 additions and 11 deletions.
200 changes: 194 additions & 6 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ use crate::node_client::NodeClient;
use crate::utils::run_future_in_dedicated_thread;
use async_lock::RwLock as AsyncRwLock;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::mpsc;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, StreamExt};
use futures::{select, stream, FutureExt, SinkExt, Stream, StreamExt};
use prometheus_client::registry::Registry;
use rayon::prelude::*;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::future::join;
use std::hash::Hash;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use std::{fmt, mem};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
Expand All @@ -33,7 +37,6 @@ use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{KeyWithDistance, LocalRecordProvider};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::task::{block_in_place, yield_now};
use tracing::{debug, error, info, info_span, trace, warn, Instrument};

Expand Down Expand Up @@ -170,7 +173,7 @@ where
.expect("Always set during worker instantiation");

if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
worker_receiver.recv().await
worker_receiver.next().await
{
self.initialize(
&piece_getter,
Expand Down Expand Up @@ -200,7 +203,7 @@ where

loop {
select! {
maybe_command = worker_receiver.recv().fuse() => {
maybe_command = worker_receiver.next() => {
let Some(command) = maybe_command else {
// Nothing else left to do
return;
Expand Down Expand Up @@ -995,7 +998,7 @@ pub struct FarmerCache<CacheIndex> {
plot_caches: Arc<PlotCaches>,
handlers: Arc<Handlers>,
// We do not want to increase capacity unnecessarily on clone
worker_sender: Arc<mpsc::Sender<WorkerCommand>>,
worker_sender: mpsc::Sender<WorkerCommand>,
metrics: Option<Arc<FarmerCacheMetrics>>,
}

Expand Down Expand Up @@ -1032,7 +1035,7 @@ where
piece_caches: Arc::clone(&caches),
plot_caches: Arc::clone(&plot_caches),
handlers: Arc::clone(&handlers),
worker_sender: Arc::new(worker_sender),
worker_sender,
metrics: metrics.clone(),
};
let worker = FarmerCacheWorker {
Expand Down Expand Up @@ -1098,6 +1101,7 @@ where

if let Err(error) = self
.worker_sender
.clone()
.send(WorkerCommand::ForgetKey { key })
.await
{
Expand Down Expand Up @@ -1127,6 +1131,189 @@ where
None
}

/// Get pieces from cache.
///
/// Number of elements in returned stream is the same as in `piece_indices`.
pub async fn get_pieces<'a, PieceIndices>(
&'a self,
piece_indices: PieceIndices,
) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
{
let mut pieces_to_get_from_plot_cache = Vec::new();

let reading_from_piece_cache = {
let caches = self.piece_caches.read().await;
// Pieces to read from piece cache grouped by backend for efficiency reasons
let mut reading_from_piece_cache = HashMap::<CacheIndex, (CacheBackend, Vec<_>)>::new();

for piece_index in piece_indices {
let key = RecordKey::from(piece_index.to_multihash());

let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
self.peer_id,
key.clone(),
)) {
Some(offset) => offset,
None => {
pieces_to_get_from_plot_cache.push((piece_index, key));
continue;
}
};

let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;

match reading_from_piece_cache.entry(cache_index) {
Entry::Occupied(mut entry) => {
let (_backend, pieces) = entry.get_mut();
pieces.push((piece_index, piece_offset, key));
}
Entry::Vacant(entry) => {
let backend = match caches.get_backend(cache_index) {
Some(backend) => backend.clone(),
None => {
pieces_to_get_from_plot_cache.push((piece_index, key));
continue;
}
};
entry.insert((backend, vec![(piece_index, piece_offset, key)]));
}
}
}

reading_from_piece_cache
};

let (tx, mut rx) = mpsc::unbounded();

let mut fut = Box::pin(async move {
let tx = &tx;

let mut reading_from_piece_cache = reading_from_piece_cache
.into_iter()
.map(|(cache_index, (backend, pieces))| async move {
// TODO: Read a stream of pieces from each backend rather than read
// individual pieces sequentially, but `PieceCache` API ideally needs to be
// extended first to support this more efficiently, especially over the
// network
for (piece_index, piece_offset, key) in pieces {
let maybe_piece = match backend.read_piece(piece_offset).await {
Ok(Some((_piece_index, piece))) => {
if let Some(metrics) = &self.metrics {
metrics.cache_get_hit.inc();
}
Some(piece)
}
Ok(None) => {
if let Some(metrics) = &self.metrics {
metrics.cache_get_miss.inc();
}
None
}
Err(error) => {
error!(
%error,
%cache_index,
%piece_index,
%piece_offset,
?key,
"Error while reading piece from cache, might be a disk \
corruption"
);

if let Err(error) = self
.worker_sender
.clone()
.send(WorkerCommand::ForgetKey { key })
.await
{
trace!(
%error,
"Failed to send ForgetKey command to worker"
);
}

if let Some(metrics) = &self.metrics {
metrics.cache_get_error.inc();
}
None
}
};

tx.unbounded_send((piece_index, maybe_piece))
.expect("This future isn't polled after receiver is dropped; qed");
}
})
.collect::<FuturesUnordered<_>>();
// TODO: Can't use this due to https://github.com/rust-lang/rust/issues/64650
// Simply drain everything
// .for_each(|()| async {})

// TODO: Remove once https://github.com/rust-lang/rust/issues/64650 is resolved
let reading_from_piece_cache_fut = async move {
while let Some(()) = reading_from_piece_cache.next().await {
// Simply drain everything
}
};

let reading_from_plot_cache_fut = async {
if pieces_to_get_from_plot_cache.is_empty() {
return;
}

for cache in self.plot_caches.caches.read().await.iter() {
// Iterating over offsets in reverse order to both traverse elements in async code
// and being able to efficiently remove entries without extra allocations
for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];

if let Ok(Some(piece)) = cache.read_piece(key).await {
if let Some(metrics) = &self.metrics {
metrics.cache_get_hit.inc();
}
tx.unbounded_send((*piece_index, Some(piece)))
.expect("This future isn't polled after receiver is dropped; qed");

// Due to iteration in reverse order and swapping using elements at the end,
// this doesn't affect processing of the elements
pieces_to_get_from_plot_cache.swap_remove(offset);
}
}

if pieces_to_get_from_plot_cache.is_empty() {
return;
}
}

if let Some(metrics) = &self.metrics {
metrics
.cache_get_miss
.inc_by(pieces_to_get_from_plot_cache.len() as u64);
}

for (piece_index, _key) in pieces_to_get_from_plot_cache {
tx.unbounded_send((piece_index, None))
.expect("This future isn't polled after receiver is dropped; qed");
}
};

join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
});

// Drive above future and stream back any pieces that were downloaded so far
stream::poll_fn(move |cx| {
let end_result = fut.poll_unpin(cx);

if let Ok(maybe_result) = rx.try_next() {
return Poll::Ready(maybe_result);
}

end_result.map(|((), ())| None)
})
}

/// Returns a filtered list of pieces that were found in farmer cache, order is not guaranteed
pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
Expand Down Expand Up @@ -1236,6 +1423,7 @@ where
) {
if let Err(error) = self
.worker_sender
.clone()
.send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
.await
{
Expand Down
Loading

0 comments on commit 1541ff2

Please sign in to comment.