Skip to content

Commit

Permalink
Implement PieceGetter::get_pieces() in FarmerPieceGetter
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Oct 16, 2024
1 parent bfe4033 commit a224edc
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 3 deletions.
130 changes: 130 additions & 0 deletions crates/subspace-farmer/src/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::future::retry;
use backoff::ExponentialBackoff;
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::{stream, FutureExt, Stream, StreamExt};
use std::fmt;
use std::hash::Hash;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use std::task::Poll;
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_farmer_components::PieceGetter;
use subspace_networking::utils::multihash::ToMultihash;
Expand Down Expand Up @@ -272,6 +276,132 @@ where
);
Ok(None)
}

async fn get_pieces<'a, PieceIndices>(
&'a self,
piece_indices: PieceIndices,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
>
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
{
let (tx, mut rx) = mpsc::unbounded();

let mut fut = Box::pin(async move {
let tx = &tx;

debug!("Getting pieces from farmer cache");
let mut pieces_not_found_in_farmer_cache = Vec::new();
let mut pieces_in_farmer_cache =
self.inner.farmer_cache.get_pieces(piece_indices).await;

while let Some((piece_index, maybe_piece)) = pieces_in_farmer_cache.next().await {
let Some(piece) = maybe_piece else {
pieces_not_found_in_farmer_cache.push(piece_index);
continue;
};
tx.unbounded_send((piece_index, Ok(Some(piece))))
.expect("This future isn't polled after receiver is dropped; qed");
}

if pieces_not_found_in_farmer_cache.is_empty() {
return;
}

debug!(
remaining_piece_count = %pieces_not_found_in_farmer_cache.len(),
"Getting pieces from DSN cache"
);
let mut pieces_not_found_in_dsn_cache = Vec::new();
let mut pieces_in_dsn_cache = self
.inner
.piece_provider
.get_from_cache(pieces_not_found_in_farmer_cache)
.await;

while let Some((piece_index, maybe_piece)) = pieces_in_dsn_cache.next().await {
let Some(piece) = maybe_piece else {
pieces_not_found_in_dsn_cache.push(piece_index);
continue;
};
// TODO: Would be nice to have concurrency here
self.inner
.farmer_cache
.maybe_store_additional_piece(piece_index, &piece)
.await;
tx.unbounded_send((piece_index, Ok(Some(piece))))
.expect("This future isn't polled after receiver is dropped; qed");
}

if pieces_not_found_in_dsn_cache.is_empty() {
return;
}

debug!(
remaining_piece_count = %pieces_not_found_in_dsn_cache.len(),
"Getting pieces from node"
);
let pieces_not_found_on_node = pieces_not_found_in_dsn_cache
.into_iter()
.map(|piece_index| async move {
match self.inner.node_client.piece(piece_index).await {
Ok(Some(piece)) => {
trace!(%piece_index, "Got piece from node successfully");
self.inner
.farmer_cache
.maybe_store_additional_piece(piece_index, &piece)
.await;

tx.unbounded_send((piece_index, Ok(Some(piece))))
.expect("This future isn't polled after receiver is dropped; qed");
None
}
Ok(None) => Some(piece_index),
Err(error) => {
error!(
%error,
%piece_index,
"Failed to retrieve first segment piece from node"
);
Some(piece_index)
}
}
})
.collect::<FuturesUnordered<_>>()
.filter_map(|maybe_piece_index| async move { maybe_piece_index })
.collect::<Vec<_>>()
.await;

debug!(
remaining_piece_count = %pieces_not_found_on_node.len(),
"Some pieces were not easily reachable"
);
pieces_not_found_on_node
.into_iter()
.map(|piece_index| async move {
let maybe_piece = self.get_piece_slow_internal(piece_index).await;

tx.unbounded_send((piece_index, Ok(maybe_piece)))
.expect("This future isn't polled after receiver is dropped; qed");
})
.collect::<FuturesUnordered<_>>()
// Simply drain everything
.for_each(|()| async {})
.await;
});

// Drive above future and stream back any pieces that were downloaded so far
Ok(Box::new(stream::poll_fn(move |cx| {
let end_result = fut.poll_unpin(cx);

if let Ok(maybe_result) = rx.try_next() {
return Poll::Ready(maybe_result);
}

end_result.map(|()| None)
})))
}
}

/// Weak farmer piece getter, can be upgraded to [`FarmerPieceGetter`]
Expand Down
8 changes: 5 additions & 3 deletions crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,23 +428,25 @@ impl RecordStore for DummyRecordStore {
struct KademliaWrapper {
local_peer_id: PeerId,
kademlia: Kademlia<DummyRecordStore>,
noop_context: Context<'static>,
}

impl KademliaWrapper {
fn new(local_peer_id: PeerId) -> Self {
Self {
local_peer_id,
kademlia: Kademlia::new(local_peer_id, DummyRecordStore),
noop_context: Context::from_waker(noop_waker_ref()),
}
}

fn add_peer(&mut self, peer_id: &PeerId, addresses: Vec<Multiaddr>) {
for address in addresses {
self.kademlia.add_address(peer_id, address);
}
while self.kademlia.poll(&mut self.noop_context).is_ready() {
while self
.kademlia
.poll(&mut Context::from_waker(noop_waker_ref()))
.is_ready()
{
// Simply drain useless events generated by above calls
}
}
Expand Down

0 comments on commit a224edc

Please sign in to comment.