diff --git a/crates/subspace-farmer/src/farmer_piece_getter.rs b/crates/subspace-farmer/src/farmer_piece_getter.rs index dd6f8ad606..14b5fc49ac 100644 --- a/crates/subspace-farmer/src/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/farmer_piece_getter.rs @@ -8,11 +8,15 @@ use async_trait::async_trait; use backoff::backoff::Backoff; use backoff::future::retry; use backoff::ExponentialBackoff; +use futures::channel::mpsc; +use futures::stream::FuturesUnordered; +use futures::{stream, FutureExt, Stream, StreamExt}; use std::fmt; use std::hash::Hash; use std::num::NonZeroUsize; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Weak}; +use std::task::Poll; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use subspace_farmer_components::PieceGetter; use subspace_networking::utils::multihash::ToMultihash; @@ -272,6 +276,132 @@ where ); Ok(None) } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + let (tx, mut rx) = mpsc::unbounded(); + + let mut fut = Box::pin(async move { + let tx = &tx; + + debug!("Getting pieces from farmer cache"); + let mut pieces_not_found_in_farmer_cache = Vec::new(); + let mut pieces_in_farmer_cache = + self.inner.farmer_cache.get_pieces(piece_indices).await; + + while let Some((piece_index, maybe_piece)) = pieces_in_farmer_cache.next().await { + let Some(piece) = maybe_piece else { + pieces_not_found_in_farmer_cache.push(piece_index); + continue; + }; + tx.unbounded_send((piece_index, Ok(Some(piece)))) + .expect("This future isn't polled after receiver is dropped; qed"); + } + + if pieces_not_found_in_farmer_cache.is_empty() { + return; + } + + debug!( + remaining_piece_count = %pieces_not_found_in_farmer_cache.len(), + "Getting pieces from DSN cache" + ); + let mut pieces_not_found_in_dsn_cache = Vec::new(); + let mut pieces_in_dsn_cache = self + .inner + .piece_provider + .get_from_cache(pieces_not_found_in_farmer_cache) + .await; + + while let Some((piece_index, maybe_piece)) = pieces_in_dsn_cache.next().await { + let Some(piece) = maybe_piece else { + pieces_not_found_in_dsn_cache.push(piece_index); + continue; + }; + // TODO: Would be nice to have concurrency here + self.inner + .farmer_cache + .maybe_store_additional_piece(piece_index, &piece) + .await; + tx.unbounded_send((piece_index, Ok(Some(piece)))) + .expect("This future isn't polled after receiver is dropped; qed"); + } + + if pieces_not_found_in_dsn_cache.is_empty() { + return; + } + + debug!( + remaining_piece_count = %pieces_not_found_in_dsn_cache.len(), + "Getting pieces from node" + ); + let pieces_not_found_on_node = pieces_not_found_in_dsn_cache + .into_iter() + .map(|piece_index| async move { + match self.inner.node_client.piece(piece_index).await { + Ok(Some(piece)) => { + trace!(%piece_index, "Got piece from node successfully"); + self.inner + .farmer_cache + .maybe_store_additional_piece(piece_index, &piece) + .await; + + tx.unbounded_send((piece_index, Ok(Some(piece)))) + .expect("This future isn't polled after receiver is dropped; qed"); + None + } + Ok(None) => Some(piece_index), + Err(error) => { + error!( + %error, + %piece_index, + "Failed to retrieve first segment piece from node" + ); + Some(piece_index) + } + } + }) + .collect::>() + .filter_map(|maybe_piece_index| async move { maybe_piece_index }) + .collect::>() + .await; + + debug!( + remaining_piece_count = %pieces_not_found_on_node.len(), + "Some pieces were not easily reachable" + ); + pieces_not_found_on_node + .into_iter() + .map(|piece_index| async move { + let maybe_piece = self.get_piece_slow_internal(piece_index).await; + + tx.unbounded_send((piece_index, Ok(maybe_piece))) + .expect("This future isn't polled after receiver is dropped; qed"); + }) + .collect::>() + // Simply drain everything + .for_each(|()| async {}) + .await; + }); + + // Drive above future and stream back any pieces that were downloaded so far + Ok(Box::new(stream::poll_fn(move |cx| { + let end_result = fut.poll_unpin(cx); + + if let Ok(maybe_result) = rx.try_next() { + return Poll::Ready(maybe_result); + } + + end_result.map(|()| None) + }))) + } } /// Weak farmer piece getter, can be upgraded to [`FarmerPieceGetter`] diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index e7ce87a2c5..e0cf6d2166 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -428,7 +428,6 @@ impl RecordStore for DummyRecordStore { struct KademliaWrapper { local_peer_id: PeerId, kademlia: Kademlia, - noop_context: Context<'static>, } impl KademliaWrapper { @@ -436,7 +435,6 @@ impl KademliaWrapper { Self { local_peer_id, kademlia: Kademlia::new(local_peer_id, DummyRecordStore), - noop_context: Context::from_waker(noop_waker_ref()), } } @@ -444,7 +442,11 @@ impl KademliaWrapper { for address in addresses { self.kademlia.add_address(peer_id, address); } - while self.kademlia.poll(&mut self.noop_context).is_ready() { + while self + .kademlia + .poll(&mut Context::from_waker(noop_waker_ref())) + .is_ready() + { // Simply drain useless events generated by above calls } }