Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve piece downloading #3259

Merged
merged 12 commits into from
Nov 26, 2024
Merged
Prev Previous commit
Next Next commit
Tiny renaming
  • Loading branch information
nazar-pc committed Nov 26, 2024
commit cb0fd6b63613951169fd5b0c562fe1efce4518ca
14 changes: 7 additions & 7 deletions crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ where

debug!(num_pieces = %pieces_to_download.len(), "Starting");

let mut checked_connected_peers = HashSet::new();
let mut checked_peers = HashSet::new();

let Ok(connected_peers) = node.connected_peers().await else {
trace!("Connected peers error");
Expand All @@ -581,22 +581,22 @@ where
.take(num_pieces)
.enumerate()
.map(|(peer_index, peer_id)| {
checked_connected_peers.insert(peer_id);
checked_peers.insert(peer_id);

// Inside to avoid division by zero in case there are no connected peers or pieces
let step = num_pieces / num_connected_peers.min(num_pieces);

// Take unique first piece index for each connected peer and the rest just to check
// cached pieces up to recommended limit
let mut peer_piece_indices = pieces_to_download
let mut check_cached_pieces = pieces_to_download
.keys()
.cycle()
.skip(step * peer_index)
.take(num_pieces.min(CachedPieceByIndexRequest::RECOMMENDED_LIMIT))
.copied()
.collect::<Vec<_>>();
// Pick first piece index as the piece we want to download
let piece_index = peer_piece_indices.swap_remove(0);
let piece_index = check_cached_pieces.swap_remove(0);

let permit = semaphore.try_acquire();

Expand All @@ -611,7 +611,7 @@ where
piece_validator,
peer_id,
Vec::new(),
Arc::new(peer_piece_indices),
Arc::new(check_cached_pieces),
piece_index,
HashSet::new(),
HashSet::new(),
Expand Down Expand Up @@ -978,7 +978,7 @@ async fn download_cached_piece_from_peer<'a, PV>(
piece_validator: &'a PV,
peer_id: PeerId,
addresses: Vec<Multiaddr>,
peer_piece_indices: Arc<Vec<PieceIndex>>,
check_cached_pieces: Arc<Vec<PieceIndex>>,
piece_index: PieceIndex,
mut cached_pieces: HashSet<PieceIndex>,
mut not_cached_pieces: HashSet<PieceIndex>,
Expand All @@ -993,7 +993,7 @@ where
addresses,
CachedPieceByIndexRequest {
piece_index,
cached_pieces: peer_piece_indices,
cached_pieces: check_cached_pieces,
},
)
.await
Expand Down