Skip to content

Commit

Permalink
Fix handling of duplicate pieces across caches
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Nov 17, 2024
1 parent 2c7a8c0 commit 67ea4cc
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 5 deletions.
6 changes: 5 additions & 1 deletion crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,11 @@ where
match maybe_cache {
Ok(cache) => {
let backend = cache.backend;
stored_pieces.extend(cache.cache_stored_pieces.into_iter());
for (key, cache_offset) in cache.cache_stored_pieces {
if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) {
dangling_free_offsets.push_front(old_cache_offset);
}
}
dangling_free_offsets.extend(
cache.cache_free_offsets.into_iter().filter(|free_offset| {
free_offset.piece_offset.0 < backend.used_capacity
Expand Down
167 changes: 163 additions & 4 deletions crates/subspace-farmer/src/farmer_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ async fn basic() {
// Update current segment header such that we keep-up after initial sync is triggered
current_segment_index.store(1, Ordering::Release);

// Send segment headers receiver such that keep-up sync can start not
// Send segment headers receiver such that keep-up sync can start now
let (mut archived_segment_headers_sender, archived_segment_headers_receiver) =
mpsc::channel(0);
archived_segment_headers_stream_request_receiver
Expand All @@ -287,7 +287,8 @@ async fn basic() {
.unwrap();

// Send segment header with the same segment index as "current", so it will have no
// side-effects, but acknowledgement will indicate that keep-up after initial sync has finished
// side effects, but acknowledgement will indicate that keep-up after initial sync has
// finished
{
let segment_header = SegmentHeader::V0 {
segment_index: SegmentIndex::ONE,
Expand Down Expand Up @@ -344,8 +345,8 @@ async fn basic() {
}
}

// Send two more segment headers (one is not enough because for above peer ID there are no pieces for it to
// store)
// Send two more segment headers (one is not enough because for above peer ID there are no
// pieces for it to store)
for segment_index in [2, 3] {
let segment_header = SegmentHeader::V0 {
segment_index: SegmentIndex::from(segment_index),
Expand Down Expand Up @@ -487,6 +488,164 @@ async fn basic() {
}
}

#[tokio::test(flavor = "multi_thread")]
async fn duplicate_indices() {
let current_segment_index = Arc::new(AtomicU64::new(0));
let pieces = Arc::default();
let (
archived_segment_headers_stream_request_sender,
mut archived_segment_headers_stream_request_receiver,
) = mpsc::channel(0);
let (acknowledge_archived_segment_header_sender, _acknowledge_archived_segment_header_receiver) =
mpsc::channel(0);

let node_client = MockNodeClient {
current_segment_index: Arc::clone(&current_segment_index),
pieces: Arc::clone(&pieces),
archived_segment_headers_stream_request_sender,
acknowledge_archived_segment_header_sender,
};
let piece_getter = MockPieceGetter {
pieces: Arc::clone(&pieces),
};
let public_key =
identity::PublicKey::from(identity::ed25519::PublicKey::try_from_bytes(&[42; 32]).unwrap());
let path1 = tempdir().unwrap();
let path2 = tempdir().unwrap();

// Initialize both disk caches with the same exact contents
for path in [path1.as_ref(), path2.as_ref()] {
let (farmer_cache, farmer_cache_worker) =
FarmerCache::new(node_client.clone(), public_key.to_peer_id(), None);

let farmer_cache_worker_exited =
tokio::spawn(farmer_cache_worker.run(piece_getter.clone()));

let (sender, receiver) = oneshot::channel();
farmer_cache
.on_sync_progress(Arc::new({
let sender = Mutex::new(Some(sender));

move |progress| {
if *progress == 100.0 {
if let Some(sender) = sender.lock().take() {
sender.send(()).unwrap();
}
}
}
}))
.detach();
farmer_cache
.replace_backing_caches(
vec![Arc::new(
DiskPieceCache::open(path, NonZeroU32::new(1).unwrap(), None, None).unwrap(),
)],
vec![],
)
.await;

// Wait for piece cache to be initialized
receiver.await.unwrap();

drop(farmer_cache);

// Make worker exit
let (mut archived_segment_headers_sender, archived_segment_headers_receiver) =
mpsc::channel(0);
archived_segment_headers_stream_request_receiver
.next()
.await
.unwrap()
.send(archived_segment_headers_receiver)
.unwrap();
// Make worker exit
archived_segment_headers_sender.close().await.unwrap();

farmer_cache_worker_exited.await.unwrap();
}

{
// Clear requested pieces
pieces.lock().clear();

let (farmer_cache, farmer_cache_worker) =
FarmerCache::new(node_client.clone(), public_key.to_peer_id(), None);

let farmer_cache_worker_exited = tokio::spawn(farmer_cache_worker.run(piece_getter));

let (sender, receiver) = oneshot::channel();
farmer_cache
.on_sync_progress(Arc::new({
let sender = Mutex::new(Some(sender));

move |progress| {
if *progress == 100.0 {
if let Some(sender) = sender.lock().take() {
sender.send(()).unwrap();
}
}
}
}))
.detach();

// Reopen with the same backing caches
farmer_cache
.replace_backing_caches(
vec![
Arc::new(
DiskPieceCache::open(
path1.as_ref(),
NonZeroU32::new(1).unwrap(),
None,
None,
)
.unwrap(),
),
Arc::new(
DiskPieceCache::open(
path2.as_ref(),
NonZeroU32::new(1).unwrap(),
None,
None,
)
.unwrap(),
),
],
vec![],
)
.await;

// Wait for piece cache to be initialized
receiver.await.unwrap();

// One piece must be requested
let requested_pieces = pieces.lock().keys().copied().collect::<Vec<_>>();
assert_eq!(requested_pieces.len(), 1);

// Must have stored requested piece
farmer_cache
.get_piece(requested_pieces[0].to_multihash())
.await
.unwrap();

drop(farmer_cache);

// Make worker exit
let (mut archived_segment_headers_sender, archived_segment_headers_receiver) =
mpsc::channel(0);
archived_segment_headers_stream_request_receiver
.next()
.await
.unwrap()
.send(archived_segment_headers_receiver)
.unwrap();
// Make worker exit
archived_segment_headers_sender.close().await.unwrap();

farmer_cache_worker_exited.await.unwrap();
}
}

#[test]
fn decode_piece_index_from_record_key_test() {
let piece_index_0 = PieceIndex::from(0);
Expand Down

0 comments on commit 67ea4cc

Please sign in to comment.