-
Notifications
You must be signed in to change notification settings - Fork 249
/
Copy pathsegment_reconstruction.rs
99 lines (82 loc) · 3.35 KB
/
segment_reconstruction.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use crate::PieceGetter;
use futures::StreamExt;
use subspace_archiving::piece_reconstructor::{PiecesReconstructor, ReconstructorError};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_core_primitives::segments::{ArchivedHistorySegment, RecordedHistorySegment};
use subspace_erasure_coding::ErasureCoding;
use subspace_kzg::Kzg;
use thiserror::Error;
use tokio::task::JoinError;
use tracing::{debug, error, info};
#[derive(Debug, Error)]
pub(crate) enum SegmentReconstructionError {
/// Not enough pieces to reconstruct a segment
#[error("Not enough pieces to reconstruct a segment")]
NotEnoughPiecesAcquired,
/// Internal piece retrieval process failed
#[error("Piece reconstruction failed: {0}")]
ReconstructionFailed(#[from] ReconstructorError),
/// Internal piece retrieval process failed
#[error("Pieces retrieval failed: {0}")]
PieceRetrievalFailed(#[from] anyhow::Error),
/// Join error
#[error("Join error: {0}")]
JoinError(#[from] JoinError),
}
pub(crate) async fn recover_missing_piece<PG>(
piece_getter: &PG,
kzg: Kzg,
erasure_coding: ErasureCoding,
missing_piece_index: PieceIndex,
) -> Result<Piece, SegmentReconstructionError>
where
PG: PieceGetter + Send + Sync,
{
info!(%missing_piece_index, "Recovering missing piece...");
let segment_index = missing_piece_index.segment_index();
let position = missing_piece_index.position();
let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;
let mut received_pieces = 0_usize;
let mut segment_pieces = vec![None::<Piece>; ArchivedHistorySegment::NUM_PIECES];
let mut pieces_iter = segment_index.segment_piece_indexes().into_iter();
// Download in batches until we get enough or exhaust available pieces
while !pieces_iter.is_empty() && received_pieces != required_pieces_number {
let piece_indices = pieces_iter
.by_ref()
.take(required_pieces_number - received_pieces);
let mut received_segment_pieces = piece_getter.get_pieces(piece_indices).await?;
while let Some((piece_index, result)) = received_segment_pieces.next().await {
match result {
Ok(Some(piece)) => {
received_pieces += 1;
segment_pieces
.get_mut(piece_index.position() as usize)
.expect("Piece position is by definition within segment; qed")
.replace(piece);
}
Ok(None) => {
debug!(%piece_index, "Piece was not found");
}
Err(error) => {
debug!(%error, %piece_index, "Failed to get piece");
}
}
}
}
if received_pieces < required_pieces_number {
error!(
%missing_piece_index,
%received_pieces,
%required_pieces_number,
"Recovering missing piece failed."
);
return Err(SegmentReconstructionError::NotEnoughPiecesAcquired);
}
let result = tokio::task::spawn_blocking(move || {
let reconstructor = PiecesReconstructor::new(kzg, erasure_coding);
reconstructor.reconstruct_piece(&segment_pieces, position as usize)
})
.await??;
info!(%missing_piece_index, "Recovering missing piece succeeded.");
Ok(result)
}