Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PieceGetter::get_pieces() in FarmerPieceGetter #3135

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 194 additions & 6 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ use crate::node_client::NodeClient;
use crate::utils::run_future_in_dedicated_thread;
use async_lock::RwLock as AsyncRwLock;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::mpsc;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, StreamExt};
use futures::{select, stream, FutureExt, SinkExt, Stream, StreamExt};
use prometheus_client::registry::Registry;
use rayon::prelude::*;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::future::join;
use std::hash::Hash;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use std::{fmt, mem};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
Expand All @@ -33,7 +37,6 @@ use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{KeyWithDistance, LocalRecordProvider};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::task::{block_in_place, yield_now};
use tracing::{debug, error, info, info_span, trace, warn, Instrument};

Expand Down Expand Up @@ -170,7 +173,7 @@ where
.expect("Always set during worker instantiation");

if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
worker_receiver.recv().await
worker_receiver.next().await
{
self.initialize(
&piece_getter,
Expand Down Expand Up @@ -200,7 +203,7 @@ where

loop {
select! {
maybe_command = worker_receiver.recv().fuse() => {
maybe_command = worker_receiver.next() => {
let Some(command) = maybe_command else {
// Nothing else left to do
return;
Expand Down Expand Up @@ -995,7 +998,7 @@ pub struct FarmerCache<CacheIndex> {
plot_caches: Arc<PlotCaches>,
handlers: Arc<Handlers>,
// We do not want to increase capacity unnecessarily on clone
worker_sender: Arc<mpsc::Sender<WorkerCommand>>,
worker_sender: mpsc::Sender<WorkerCommand>,
metrics: Option<Arc<FarmerCacheMetrics>>,
}

Expand Down Expand Up @@ -1032,7 +1035,7 @@ where
piece_caches: Arc::clone(&caches),
plot_caches: Arc::clone(&plot_caches),
handlers: Arc::clone(&handlers),
worker_sender: Arc::new(worker_sender),
worker_sender,
metrics: metrics.clone(),
};
let worker = FarmerCacheWorker {
Expand Down Expand Up @@ -1098,6 +1101,7 @@ where

if let Err(error) = self
.worker_sender
.clone()
.send(WorkerCommand::ForgetKey { key })
.await
{
Expand Down Expand Up @@ -1127,6 +1131,189 @@ where
None
}

/// Get pieces from cache.
///
/// Number of elements in returned stream is the same as in `piece_indices`.
pub async fn get_pieces<'a, PieceIndices>(
&'a self,
piece_indices: PieceIndices,
) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
{
let mut pieces_to_get_from_plot_cache = Vec::new();

let reading_from_piece_cache = {
let caches = self.piece_caches.read().await;
// Pieces to read from piece cache grouped by backend for efficiency reasons
let mut reading_from_piece_cache = HashMap::<CacheIndex, (CacheBackend, Vec<_>)>::new();

for piece_index in piece_indices {
let key = RecordKey::from(piece_index.to_multihash());

let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
self.peer_id,
key.clone(),
)) {
Some(offset) => offset,
None => {
pieces_to_get_from_plot_cache.push((piece_index, key));
continue;
}
};

let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;

match reading_from_piece_cache.entry(cache_index) {
Entry::Occupied(mut entry) => {
let (_backend, pieces) = entry.get_mut();
pieces.push((piece_index, piece_offset, key));
}
Entry::Vacant(entry) => {
let backend = match caches.get_backend(cache_index) {
Some(backend) => backend.clone(),
None => {
pieces_to_get_from_plot_cache.push((piece_index, key));
continue;
}
};
entry.insert((backend, vec![(piece_index, piece_offset, key)]));
}
}
}

reading_from_piece_cache
};

let (tx, mut rx) = mpsc::unbounded();

let mut fut = Box::pin(async move {
let tx = &tx;

let mut reading_from_piece_cache = reading_from_piece_cache
.into_iter()
.map(|(cache_index, (backend, pieces))| async move {
// TODO: Read a stream of pieces from each backend rather than read
// individual pieces sequentially, but `PieceCache` API ideally needs to be
// extended first to support this more efficiently, especially over the
// network
for (piece_index, piece_offset, key) in pieces {
let maybe_piece = match backend.read_piece(piece_offset).await {
Ok(Some((_piece_index, piece))) => {
if let Some(metrics) = &self.metrics {
metrics.cache_get_hit.inc();
}
Some(piece)
}
Ok(None) => {
if let Some(metrics) = &self.metrics {
metrics.cache_get_miss.inc();
}
None
}
Err(error) => {
error!(
%error,
%cache_index,
%piece_index,
%piece_offset,
?key,
"Error while reading piece from cache, might be a disk \
corruption"
);

if let Err(error) = self
.worker_sender
.clone()
.send(WorkerCommand::ForgetKey { key })
.await
{
trace!(
%error,
"Failed to send ForgetKey command to worker"
);
}

if let Some(metrics) = &self.metrics {
metrics.cache_get_error.inc();
}
None
}
};

tx.unbounded_send((piece_index, maybe_piece))
.expect("This future isn't polled after receiver is dropped; qed");
}
})
.collect::<FuturesUnordered<_>>();
// TODO: Can't use this due to https://github.com/rust-lang/rust/issues/64650
// Simply drain everything
// .for_each(|()| async {})

// TODO: Remove once https://github.com/rust-lang/rust/issues/64650 is resolved
let reading_from_piece_cache_fut = async move {
while let Some(()) = reading_from_piece_cache.next().await {
// Simply drain everything
}
};

let reading_from_plot_cache_fut = async {
if pieces_to_get_from_plot_cache.is_empty() {
return;
}

for cache in self.plot_caches.caches.read().await.iter() {
// Iterating over offsets in reverse order to both traverse elements in async code
// and being able to efficiently remove entries without extra allocations
for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];

if let Ok(Some(piece)) = cache.read_piece(key).await {
if let Some(metrics) = &self.metrics {
metrics.cache_get_hit.inc();
}
tx.unbounded_send((*piece_index, Some(piece)))
.expect("This future isn't polled after receiver is dropped; qed");

// Due to iteration in reverse order and swapping using elements at the end,
// this doesn't affect processing of the elements
pieces_to_get_from_plot_cache.swap_remove(offset);
}
}

if pieces_to_get_from_plot_cache.is_empty() {
return;
}
}

if let Some(metrics) = &self.metrics {
metrics
.cache_get_miss
.inc_by(pieces_to_get_from_plot_cache.len() as u64);
}

for (piece_index, _key) in pieces_to_get_from_plot_cache {
tx.unbounded_send((piece_index, None))
.expect("This future isn't polled after receiver is dropped; qed");
}
};

join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
});

// Drive above future and stream back any pieces that were downloaded so far
stream::poll_fn(move |cx| {
let end_result = fut.poll_unpin(cx);

if let Ok(maybe_result) = rx.try_next() {
return Poll::Ready(maybe_result);
}

end_result.map(|((), ())| None)
})
}

/// Returns a filtered list of pieces that were found in farmer cache, order is not guaranteed
pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
Expand Down Expand Up @@ -1236,6 +1423,7 @@ where
) {
if let Err(error) = self
.worker_sender
.clone()
.send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
.await
{
Expand Down
Loading
Loading