Skip to content

Commit

Permalink
Handle early blocks (#2155)
Browse files Browse the repository at this point in the history
## Issue Addressed

NA

## Problem this PR addresses

There's an issue where Lighthouse is banning a lot of peers due to the following sequence of events:

1. Gossip block 0xabc arrives ~200ms early
    - It is propagated across the network, with respect to [`MAXIMUM_GOSSIP_CLOCK_DISPARITY`](https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#why-is-there-maximum_gossip_clock_disparity-when-validating-slot-ranges-of-messages-in-gossip-subnets).
    - However, it is not imported to our database since the block is early.
2. Attestations for 0xabc arrive, but the block was not imported.
    - The peer that sent the attestation is down-voted.
        - Each unknown-block attestation causes a score loss of 1, the peer is banned at -100.
        - When the peer is on an attestation subnet there can be hundreds of attestations, so the peer is banned quickly (before the missed block can be obtained via rpc).

## Potential solutions

I can think of three solutions to this:

1. Wait for attestation-queuing (#635) to arrive and solve this.
    - Easy
    - Not immediate fix.
    - Whilst this would work, I don't think it's a perfect solution for this particular issue, rather (3) is better.
1. Allow importing blocks with a tolerance of `MAXIMUM_GOSSIP_CLOCK_DISPARITY`.
    - Easy
    - ~~I have implemented this, for now.~~
1. If a block is verified for gossip propagation (i.e., signature verified) and it's within `MAXIMUM_GOSSIP_CLOCK_DISPARITY`, then queue it to be processed at the start of the appropriate slot.
    - More difficult
    - Feels like the best solution, I will try to implement this.
    
    
**This PR takes approach (3).**

## Changes included

- Implement the `block_delay_queue`, based upon a [`DelayQueue`](https://docs.rs/tokio-util/0.6.3/tokio_util/time/delay_queue/struct.DelayQueue.html) which can store blocks until it's time to import them.
- Add a new `DelayedImportBlock` variant to the `beacon_processor::WorkEvent` enum to handle this new event.
- In the `BeaconProcessor`, refactor a `tokio::select!` to a struct with an explicit `Stream` implementation. I experienced some issues with `tokio::select!` in the block delay queue and I also found it hard to debug. I think this explicit implementation is nicer and functionally equivalent (apart from the fact that `tokio::select!` randomly chooses futures to poll, whereas now we're deterministic).
- Add a testing framework to the `beacon_processor` module that tests this new block delay logic. I also tested a handful of other operations in the beacon processor (attns, slashings, exits) since it was super easy to copy-pasta the code from the `http_api` tester.
    - To implement these tests I added the concept of an optional `work_journal_tx` to the `BeaconProcessor` which will spit out a log of events. I used this in the tests to ensure that things were happening as I expect.
    - The tests are a little racey, but it's hard to avoid that when testing timing-based code. If we see CI failures I can revise. I haven't observed *any* failures due to races on my machine or on CI yet.
    - To assist with testing I allowed for directly setting the time on the `ManualSlotClock`.
- I gave the `beacon_processor::Worker` a `Toolbox` for two reasons; (a) it avoids changing tons of function sigs when you want to pass a new object to the worker and (b) it seemed cute.
  • Loading branch information
paulhauner authored and michaelsproul committed Mar 10, 2021
1 parent 8277e2f commit 8608dc1
Show file tree
Hide file tree
Showing 19 changed files with 1,161 additions and 115 deletions.
53 changes: 44 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(

/// A wrapper around a `SignedBeaconBlock` that indicates it has been approved for re-gossiping on
/// the p2p network.
#[derive(Debug)]
pub struct GossipVerifiedBlock<T: BeaconChainTypes> {
pub block: SignedBeaconBlock<T::EthSpec>,
pub block_root: Hash256,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod validator_pubkey_cache;

pub use self::beacon_chain::{
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, ChainSegmentResult,
ForkChoiceError, StateSkipConfig,
ForkChoiceError, StateSkipConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
};
pub use self::beacon_snapshot::BeaconSnapshot;
pub use self::chain_config::ChainConfig;
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/naive_aggregation_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ impl<E: EthSpec> NaiveAggregationPool<E> {
outcome
}

/// Returns the total number of attestations stored in `self`.
pub fn num_attestations(&self) -> usize {
self.maps.iter().map(|(_, map)| map.len()).sum()
}

/// Returns an aggregated `Attestation` with the given `data`, if any.
pub fn get(&self, data: &AttestationData) -> Option<Attestation<E>> {
self.maps.get(&data.slot).and_then(|map| map.get(data))
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/snapshot_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use types::{
pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4;

/// This snapshot is to be used for verifying a child of `self.beacon_block`.
#[derive(Debug)]
pub struct PreProcessingSnapshot<T: EthSpec> {
/// This state is equivalent to the `self.beacon_block.state_root()` state that has been
/// advanced forward one slot using `per_slot_processing`. This state is "primed and ready" for
Expand Down
5 changes: 4 additions & 1 deletion beacon_node/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ exit-future = "0.2.0"
slog-term = "2.6.0"
slog-async = "2.5.0"
logging = { path = "../../common/logging" }
environment = { path = "../../lighthouse/environment" }
discv5 = { version = "0.1.0-beta.3" }

[dependencies]
beacon_chain = { path = "../beacon_chain" }
Expand All @@ -31,7 +33,7 @@ tree_hash = "0.1.1"
futures = "0.3.7"
error-chain = "0.12.4"
tokio = { version = "1.1.0", features = ["full"] }
tokio-stream = "0.1.2"
tokio-stream = "0.1.3"
parking_lot = "0.11.0"
smallvec = "1.6.1"
rand = "0.7.3"
Expand All @@ -46,3 +48,4 @@ num_cpus = "1.13.0"
lru_cache = { path = "../../common/lru_cache" }
if-addrs = "0.6.4"
strum = { version = "0.20"}
tokio-util = { version = "0.6.3", features = ["time"] }
210 changes: 210 additions & 0 deletions beacon_node/network/src/beacon_processor/block_delay_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
//! Provides a mechanism which queues blocks for later processing when they arrive too early.
//!
//! When the `beacon_processor::Worker` imports a block that is acceptably early (i.e., within the
//! gossip propagation tolerance) it will send it to this queue where it will be placed in a
//! `DelayQueue` until the slot arrives. Once the block has been determined to be ready, it will be
//! sent back out on a channel to be processed by the `BeaconProcessor` again.
//!
//! There is the edge-case where the slot arrives before this queue manages to process it. In that
//! case, the block will be sent off for immediate processing (skipping the `DelayQueue`).
use super::MAX_DELAYED_BLOCK_QUEUE_LEN;
use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock};
use eth2_libp2p::PeerId;
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
use slog::{crit, debug, error, Logger};
use slot_clock::SlotClock;
use std::collections::HashSet;
use std::pin::Pin;
use std::task::Context;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::error::Error as TimeError;
use tokio_util::time::DelayQueue;

const TASK_NAME: &str = "beacon_processor_block_delay_queue";

/// Queue blocks for re-processing with an `ADDITIONAL_DELAY` after the slot starts. This is to
/// account for any slight drift in the system clock.
const ADDITIONAL_DELAY: Duration = Duration::from_millis(5);

/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
/// we signature-verify blocks before putting them in the queue *should* protect against this, but
/// it's nice to have extra protection.
const MAXIMUM_QUEUED_BLOCKS: usize = 16;

/// A block that arrived early and has been queued for later import.
pub struct QueuedBlock<T: BeaconChainTypes> {
pub peer_id: PeerId,
pub block: GossipVerifiedBlock<T>,
pub seen_timestamp: Duration,
}

/// Unifies the different messages processed by the block delay queue.
enum InboundEvent<T: BeaconChainTypes> {
/// A block that has been received early that we should queue for later processing.
EarlyBlock(QueuedBlock<T>),
/// A block that was queued for later processing and is ready for import.
ReadyBlock(QueuedBlock<T>),
/// The `DelayQueue` returned an error.
DelayQueueError(TimeError),
}

/// Combines the `DelayQueue` and `Receiver` streams into a single stream.
///
/// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained
/// control (specifically in the ordering of event processing).
struct InboundEvents<T: BeaconChainTypes> {
pub delay_queue: DelayQueue<QueuedBlock<T>>,
early_blocks_rx: Receiver<QueuedBlock<T>>,
}

impl<T: BeaconChainTypes> Stream for InboundEvents<T> {
type Item = InboundEvent<T>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Poll for expired blocks *before* we try to process new blocks.
//
// The sequential nature of blockchains means it is generally better to try and import all
// existing blocks before new ones.
match self.delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
return Poll::Ready(Some(InboundEvent::ReadyBlock(queued_block.into_inner())));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e)));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}

match self.early_blocks_rx.poll_recv(cx) {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::EarlyBlock(queued_block)));
}
Poll::Ready(None) => {
return Poll::Ready(None);
}
Poll::Pending => {}
}

Poll::Pending
}
}

/// Spawn a queue which will accept blocks via the returned `Sender`, potentially queue them until
/// their slot arrives, then send them back out via `ready_blocks_tx`.
pub fn spawn_block_delay_queue<T: BeaconChainTypes>(
ready_blocks_tx: Sender<QueuedBlock<T>>,
executor: &TaskExecutor,
slot_clock: T::SlotClock,
log: Logger,
) -> Sender<QueuedBlock<T>> {
let (early_blocks_tx, early_blocks_rx): (_, Receiver<QueuedBlock<_>>) =
mpsc::channel(MAX_DELAYED_BLOCK_QUEUE_LEN);

let queue_future = async move {
let mut queued_block_roots = HashSet::new();

let mut inbound_events = InboundEvents {
early_blocks_rx,
delay_queue: DelayQueue::new(),
};

loop {
match inbound_events.next().await {
// Some block has been indicated as "early" and should be processed when the
// appropriate slot arrives.
Some(InboundEvent::EarlyBlock(early_block)) => {
let block_slot = early_block.block.block.slot();
let block_root = early_block.block.block_root;

// Don't add the same block to the queue twice. This prevents DoS attacks.
if queued_block_roots.contains(&block_root) {
continue;
}

if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) {
// Check to ensure this won't over-fill the queue.
if queued_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS {
error!(
log,
"Early blocks queue is full";
"queue_size" => MAXIMUM_QUEUED_BLOCKS,
"msg" => "check system clock"
);
// Drop the block.
continue;
}

queued_block_roots.insert(block_root);
// Queue the block until the start of the appropriate slot, plus
// `ADDITIONAL_DELAY`.
inbound_events
.delay_queue
.insert(early_block, duration_till_slot + ADDITIONAL_DELAY);
} else {
// If there is no duration till the next slot, check to see if the slot
// has already arrived. If it has already arrived, send it out for
// immediate processing.
//
// If we can't read the slot or the slot hasn't arrived, simply drop the
// block.
//
// This logic is slightly awkward since `SlotClock::duration_to_slot`
// doesn't distinguish between a slot that has already arrived and an
// error reading the slot clock.
if let Some(now) = slot_clock.now() {
if block_slot <= now && ready_blocks_tx.try_send(early_block).is_err() {
error!(
log,
"Failed to send block";
);
}
}
}
}
// A block that was queued for later processing is now ready to be processed.
Some(InboundEvent::ReadyBlock(ready_block)) => {
let block_root = ready_block.block.block_root;

if !queued_block_roots.remove(&block_root) {
// Log an error to alert that we've made a bad assumption about how this
// program works, but still process the block anyway.
error!(
log,
"Unknown block in delay queue";
"block_root" => ?block_root
);
}

if ready_blocks_tx.try_send(ready_block).is_err() {
error!(
log,
"Failed to pop queued block";
);
}
}
Some(InboundEvent::DelayQueueError(e)) => crit!(
log,
"Failed to poll block delay queue";
"e" => ?e
),
None => {
debug!(
log,
"Block delay queue stopped";
"msg" => "shutting down"
);
break;
}
}
}
};

executor.spawn(queue_future, TASK_NAME);

early_blocks_tx
}
Loading

0 comments on commit 8608dc1

Please sign in to comment.