Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - More sync edge cases + prettify range #1834

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 69 additions & 34 deletions beacon_node/network/src/sync/range_sync/batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::sync::RequestId;
use eth2_libp2p::rpc::methods::BlocksByRangeRequest;
use eth2_libp2p::PeerId;
use slog::{crit, warn, Logger};
use ssz::Encode;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
Expand All @@ -15,6 +14,13 @@ const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5;
/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty.
const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3;

/// Error type of a batch in a wrong state.
// Such errors should never be encountered.
pub struct WrongState(pub(super) String);

/// Auxiliary type alias for readability.
type IsFailed = bool;

/// A segment of a chain.
pub struct BatchInfo<T: EthSpec> {
/// Start slot of the batch.
Expand Down Expand Up @@ -57,6 +63,14 @@ impl<T: EthSpec> BatchState<T> {
pub fn poison(&mut self) -> BatchState<T> {
std::mem::replace(self, BatchState::Poisoned)
}

pub fn is_failed(&self) -> IsFailed {
match self {
BatchState::Failed => true,
BatchState::Poisoned => unreachable!("Poisoned batch"),
_ => false,
}
}
}

impl<T: EthSpec> BatchInfo<T> {
Expand Down Expand Up @@ -134,16 +148,20 @@ impl<T: EthSpec> BatchInfo<T> {
}

/// Adds a block to a downloading batch.
pub fn add_block(&mut self, block: SignedBeaconBlock<T>, logger: &Logger) {
pub fn add_block(&mut self, block: SignedBeaconBlock<T>) -> Result<(), WrongState> {
match self.state.poison() {
BatchState::Downloading(peer, mut blocks, req_id) => {
blocks.push(block);
self.state = BatchState::Downloading(peer, blocks, req_id)
self.state = BatchState::Downloading(peer, blocks, req_id);
Ok(())
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Add block for batch in wrong state"; "state" => ?other);
self.state = other
self.state = other;
Err(WrongState(format!(
"Add block for batch in wrong state {:?}",
self.state
)))
}
}
}
Expand All @@ -153,8 +171,7 @@ impl<T: EthSpec> BatchInfo<T> {
#[must_use = "Batch may have failed"]
pub fn download_completed(
&mut self,
logger: &Logger,
) -> Result<usize /* Received blocks */, &BatchState<T>> {
) -> Result<usize /* Received blocks */, Result<(Slot, Slot, IsFailed), WrongState>> {
match self.state.poison() {
BatchState::Downloading(peer, blocks, _request_id) => {
// verify that blocks are in range
Expand Down Expand Up @@ -182,9 +199,8 @@ impl<T: EthSpec> BatchInfo<T> {
// drop the blocks
BatchState::AwaitingDownload
};
warn!(logger, "Batch received out of range blocks";
&self, "expected" => expected, "received" => received);
return Err(&self.state);

return Err(Ok((expected, received, self.state.is_failed())));
}
}

Expand All @@ -194,15 +210,17 @@ impl<T: EthSpec> BatchInfo<T> {
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Download completed for batch in wrong state"; "state" => ?other);
self.state = other;
Err(&self.state)
Err(Err(WrongState(format!(
"Download completed for batch in wrong state {:?}",
self.state
))))
}
}
}

#[must_use = "Batch may have failed"]
pub fn download_failed(&mut self, logger: &Logger) -> &BatchState<T> {
pub fn download_failed(&mut self) -> Result<IsFailed, WrongState> {
match self.state.poison() {
BatchState::Downloading(peer, _, _request_id) => {
// register the attempt and check if the batch can be tried again
Expand All @@ -215,13 +233,15 @@ impl<T: EthSpec> BatchInfo<T> {
// drop the blocks
BatchState::AwaitingDownload
};
&self.state
Ok(self.state.is_failed())
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Download failed for batch in wrong state"; "state" => ?other);
self.state = other;
&self.state
Err(WrongState(format!(
"Download failed for batch in wrong state {:?}",
self.state
)))
}
}
}
Expand All @@ -230,37 +250,42 @@ impl<T: EthSpec> BatchInfo<T> {
&mut self,
peer: PeerId,
request_id: RequestId,
logger: &Logger,
) {
) -> Result<(), WrongState> {
match self.state.poison() {
BatchState::AwaitingDownload => {
self.state = BatchState::Downloading(peer, Vec::new(), request_id);
Ok(())
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Starting download for batch in wrong state"; "state" => ?other);
self.state = other
self.state = other;
Err(WrongState(format!(
"Starting download for batch in wrong state {:?}",
self.state
)))
}
}
}

pub fn start_processing(&mut self, logger: &Logger) -> Vec<SignedBeaconBlock<T>> {
pub fn start_processing(&mut self) -> Result<Vec<SignedBeaconBlock<T>>, WrongState> {
match self.state.poison() {
BatchState::AwaitingProcessing(peer, blocks) => {
self.state = BatchState::Processing(Attempt::new(peer, &blocks));
blocks
Ok(blocks)
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Starting procesing batch in wrong state"; "state" => ?other);
self.state = other;
vec![]
Err(WrongState(format!(
"Starting procesing batch in wrong state {:?}",
self.state
)))
}
}
}

#[must_use = "Batch may have failed"]
pub fn processing_completed(&mut self, was_sucessful: bool, logger: &Logger) -> &BatchState<T> {
pub fn processing_completed(&mut self, was_sucessful: bool) -> Result<IsFailed, WrongState> {
match self.state.poison() {
BatchState::Processing(attempt) => {
self.state = if !was_sucessful {
Expand All @@ -278,19 +303,21 @@ impl<T: EthSpec> BatchInfo<T> {
} else {
BatchState::AwaitingValidation(attempt)
};
&self.state
Ok(self.state.is_failed())
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Procesing completed for batch in wrong state"; "state" => ?other);
self.state = other;
&self.state
Err(WrongState(format!(
"Procesing completed for batch in wrong state: {:?}",
self.state
)))
}
}
}

#[must_use = "Batch may have failed"]
pub fn validation_failed(&mut self, logger: &Logger) -> &BatchState<T> {
pub fn validation_failed(&mut self) -> Result<IsFailed, WrongState> {
match self.state.poison() {
BatchState::AwaitingValidation(attempt) => {
self.failed_processing_attempts.push(attempt);
Expand All @@ -303,13 +330,15 @@ impl<T: EthSpec> BatchInfo<T> {
} else {
BatchState::AwaitingDownload
};
&self.state
Ok(self.state.is_failed())
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Validation failed for batch in wrong state"; "state" => ?other);
self.state = other;
&self.state
Err(WrongState(format!(
"Validation failed for batch in wrong state: {:?}",
self.state
)))
}
}
}
Expand Down Expand Up @@ -370,8 +399,14 @@ impl<T: EthSpec> slog::KV for BatchInfo<T> {
impl<T: EthSpec> std::fmt::Debug for BatchState<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BatchState::Processing(_) => f.write_str("Processing"),
BatchState::AwaitingValidation(_) => f.write_str("AwaitingValidation"),
BatchState::Processing(Attempt {
ref peer_id,
hash: _,
}) => write!(f, "Processing({})", peer_id),
BatchState::AwaitingValidation(Attempt {
ref peer_id,
hash: _,
}) => write!(f, "AwaitingValidation({})", peer_id),
BatchState::AwaitingDownload => f.write_str("AwaitingDownload"),
BatchState::Failed => f.write_str("Failed"),
BatchState::AwaitingProcessing(ref peer, ref blocks) => {
Expand Down
Loading