From 9cd5d42d87c6b83ad31063ef76d70426b64a3742 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 12:42:36 +1000 Subject: [PATCH 01/31] Add first compiling draft --- .../beacon_chain/src/attester_cache.rs | 181 ++++++++++++++++++ beacon_node/beacon_chain/src/beacon_chain.rs | 141 ++++++++++++++ beacon_node/beacon_chain/src/builder.rs | 1 + beacon_node/beacon_chain/src/errors.rs | 3 + beacon_node/beacon_chain/src/lib.rs | 1 + 5 files changed, 327 insertions(+) create mode 100644 beacon_node/beacon_chain/src/attester_cache.rs diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs new file mode 100644 index 00000000000..8ac0283c0b5 --- /dev/null +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -0,0 +1,181 @@ +use parking_lot::RwLock; +use std::collections::HashMap; +use types::{ + BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, Hash256, RelativeEpoch, Slot, +}; + +type TargetRoot = Hash256; +type TargetCheckpoint = Checkpoint; +type JustifiedCheckpoint = Checkpoint; +type CommitteeLength = usize; +type CommitteeIndex = u64; + +#[derive(Debug)] +pub enum Error { + BeaconState(BeaconStateError), + SlotTooLow { + slot: Slot, + first_slot: Slot, + }, + SlotTooHigh { + slot: Slot, + first_slot: Slot, + }, + InvalidCommitteeIndex { + slot_offset: usize, + committee_index: u64, + }, +} + +impl From for Error { + fn from(e: BeaconStateError) -> Self { + Error::BeaconState(e) + } +} + +struct CommitteeLengths { + first_slot: Slot, + lengths: Vec, + slot_offsets: Vec, +} + +impl CommitteeLengths { + fn new(state: &BeaconState) -> Result { + let slots_per_epoch = T::slots_per_epoch(); + let current_epoch = state.current_epoch(); + let committee_cache = state.committee_cache(RelativeEpoch::Current)?; + let committees_per_slot = committee_cache.committees_per_slot(); + + let mut lengths = Vec::with_capacity((committees_per_slot * slots_per_epoch) as usize); + let mut slot_offsets = Vec::with_capacity(slots_per_epoch as usize); + + for slot in current_epoch.slot_iter(slots_per_epoch) { + slot_offsets.push(lengths.len()); + for index in 0..committees_per_slot { + let length = state + .get_beacon_committee(slot, index as u64)? + .committee + .len(); + lengths.push(length); + } + } + + Ok(Self { + first_slot: current_epoch.start_slot(slots_per_epoch), + lengths, + slot_offsets, + }) + } + + fn get(&self, slot: Slot, committee_index: CommitteeIndex) -> Result { + let first_slot = self.first_slot; + let relative_slot = slot + .as_usize() + .checked_sub(first_slot.as_usize()) + .ok_or(Error::SlotTooLow { slot, first_slot })?; + let slot_offset = *self + .slot_offsets + .get(relative_slot) + .ok_or(Error::SlotTooHigh { slot, first_slot })?; + slot_offset + .checked_add(committee_index as usize) + .and_then(|lengths_index| self.lengths.get(lengths_index).copied()) + .ok_or(Error::InvalidCommitteeIndex { + slot_offset, + committee_index, + }) + } +} + +pub struct CacheItem { + current_justified_checkpoint: Checkpoint, + committee_lengths: CommitteeLengths, +} + +impl CacheItem { + pub fn new(state: &BeaconState) -> Result { + let current_justified_checkpoint = state.current_justified_checkpoint(); + let committee_lengths = CommitteeLengths::new(state)?; + Ok(Self { + current_justified_checkpoint, + committee_lengths, + }) + } + + fn get( + &self, + slot: Slot, + committee_index: CommitteeIndex, + ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { + self.committee_lengths + .get(slot, committee_index) + .map(|committee_length| (self.current_justified_checkpoint, committee_length)) + } +} + +#[derive(Default)] +pub struct AttesterCache { + cache: RwLock>, +} + +impl AttesterCache { + pub fn get( + &self, + target: &TargetCheckpoint, + slot: Slot, + committee_index: CommitteeIndex, + ) -> Result, Error> { + self.cache + .read() + .get(target) + .map(|cache_item| cache_item.get(slot, committee_index)) + .transpose() + } + + pub fn cache_state( + &self, + state: &BeaconState, + latest_beacon_block_root: Hash256, + ) -> Result<(), Error> { + let target = get_state_target(state, latest_beacon_block_root)?; + let cache_item = CacheItem::new(state)?; + self.cache.write().insert(target, cache_item); + Ok(()) + } + + pub fn cache_state_and_return_value( + &self, + state: &BeaconState, + latest_beacon_block_root: Hash256, + slot: Slot, + index: CommitteeIndex, + ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { + let target = get_state_target(state, latest_beacon_block_root)?; + let cache_item = CacheItem::new(state)?; + let value = cache_item.get(slot, index)?; + self.cache.write().insert(target, cache_item); + Ok(value) + } + + pub fn prune_below(&self, epoch: Epoch) { + self.cache.write().retain(|target, _| target.epoch >= epoch); + } +} + +fn get_state_target( + state: &BeaconState, + latest_beacon_block_root: Hash256, +) -> Result { + let target_epoch = state.current_epoch(); + let target_slot = target_epoch.start_slot(T::slots_per_epoch()); + let target_root = if state.slot() <= target_slot { + latest_beacon_block_root + } else { + *state.get_block_root(target_slot)? + }; + + Ok(Checkpoint { + epoch: target_epoch, + root: target_root, + }) +} diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 60924f6beb0..989f09e1f65 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2,6 +2,7 @@ use crate::attestation_verification::{ Error as AttestationError, SignatureVerifiedAttestation, VerifiedAggregatedAttestation, VerifiedUnaggregatedAttestation, }; +use crate::attester_cache::AttesterCache; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::block_verification::{ check_block_is_finalized_descendant, check_block_relevancy, get_block_root, @@ -289,6 +290,8 @@ pub struct BeaconChain { pub beacon_proposer_cache: Mutex, /// Caches a map of `validator_index -> validator_pubkey`. pub(crate) validator_pubkey_cache: TimeoutRwLock>, + /// A cache used when producing attestations. + pub(crate) attester_cache: Arc, /// A list of any hard-coded forks that have been disabled. pub disabled_forks: Vec, /// Sender given to tasks, so that if they encounter a state in which execution cannot @@ -1315,6 +1318,141 @@ impl BeaconChain { }) } + pub fn produce_unaggregated_attestation_new( + &self, + request_slot: Slot, + request_index: CommitteeIndex, + ) -> Result, Error> { + let request_epoch = request_slot.epoch(T::EthSpec::slots_per_epoch()); + + enum HeadOutcome { + SameEpoch { + current_justified_checkpoint: Checkpoint, + committee_len: usize, + }, + DifferentEpoch { + state_root: Hash256, + }, + } + + let (beacon_block_root, target, head_outcome) = self.with_head(|head| { + let head_state = &head.beacon_state; + + let beacon_block_root = if request_slot >= head_state.slot() { + head.beacon_block_root + } else { + *head_state.get_block_root(request_slot)? + }; + + let target_slot = request_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let target_root = if head_state.slot() <= target_slot { + beacon_block_root + } else { + *head_state.get_block_root(target_slot)? + }; + let target = Checkpoint { + epoch: request_epoch, + root: target_root, + }; + + // TODO: check for finalization. + + let head_outcome = if request_epoch == head_state.current_epoch() { + let current_justified_checkpoint = head_state.current_justified_checkpoint(); + let committee_len = head_state + .get_beacon_committee(request_slot, request_index)? + .committee + .len(); + HeadOutcome::SameEpoch { + current_justified_checkpoint, + committee_len, + } + } else { + HeadOutcome::DifferentEpoch { + state_root: *head_state.get_state_root(request_slot)?, + } + }; + + Ok::<_, BeaconChainError>((beacon_block_root, target, head_outcome)) + })?; + + let (justified_checkpoint, committee_len) = match head_outcome { + HeadOutcome::SameEpoch { + current_justified_checkpoint, + committee_len, + } => (current_justified_checkpoint, committee_len), + HeadOutcome::DifferentEpoch { state_root } => { + if let Some(tuple) = + self.attester_cache + .get(&target, request_slot, request_index)? + { + tuple + } else { + let mut state: BeaconState = self + .get_state(&state_root, None)? + .ok_or(Error::MissingBeaconState(state_root))?; + + if state.slot() > request_slot { + return Err(Error::CannotAttestToFutureState); + } else if state.current_epoch() < request_epoch { + // Only perform a "partial" state advance since we do not require the state roots to be + // accurate. + partial_state_advance( + &mut state, + Some(state_root), + request_epoch.start_slot(T::EthSpec::slots_per_epoch()), + &self.spec, + )?; + state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; + } + + self.attester_cache.cache_state_and_return_value( + &state, + beacon_block_root, + request_slot, + request_index, + )? + } + } + }; + + self.produce_unaggregated_attestation_parameterized( + request_slot, + request_index, + beacon_block_root, + justified_checkpoint, + target.root, + committee_len, + ) + } + + fn produce_unaggregated_attestation_parameterized( + &self, + slot: Slot, + index: CommitteeIndex, + beacon_block_root: Hash256, + current_justified_checkpoint: Checkpoint, + target_root: Hash256, + committee_len: usize, + ) -> Result, Error> { + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + + Ok(Attestation { + aggregation_bits: BitList::with_capacity(committee_len)?, + data: AttestationData { + slot, + index, + beacon_block_root, + source: current_justified_checkpoint, + target: Checkpoint { + epoch, + root: target_root, + }, + }, + signature: AggregateSignature::empty(), + }) + } + /// Accepts some `Attestation` from the network and attempts to verify it, returning `Ok(_)` if /// it is valid to be (re)broadcast on the gossip network. /// @@ -2936,6 +3074,9 @@ impl BeaconChain { self.head_tracker.clone(), )?; + self.attester_cache + .prune_below(new_finalized_checkpoint.epoch); + if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_finalized_subscribers() { event_handler.register(EventKind::FinalizedCheckpoint(SseFinalizedCheckpoint { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index a270872b26f..5413824a306 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -547,6 +547,7 @@ where shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()), beacon_proposer_cache: <_>::default(), validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), + attester_cache: <_>::default(), disabled_forks: self.disabled_forks, shutdown_sender: self .shutdown_sender diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index c4a0bb6d4a6..edadaf43980 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -1,3 +1,4 @@ +use crate::attester_cache::Error as AttesterCacheError; use crate::beacon_chain::ForkChoiceError; use crate::beacon_fork_choice_store::Error as ForkChoiceStoreError; use crate::eth1_chain::Error as Eth1ChainError; @@ -91,6 +92,7 @@ pub enum BeaconChainError { ObservedAttestationsError(ObservedAttestationsError), ObservedAttestersError(ObservedAttestersError), ObservedBlockProducersError(ObservedBlockProducersError), + AttesterCacheError(AttesterCacheError), PruningError(PruningError), ArithError(ArithError), InvalidShufflingId { @@ -137,6 +139,7 @@ easy_from_to!(NaiveAggregationError, BeaconChainError); easy_from_to!(ObservedAttestationsError, BeaconChainError); easy_from_to!(ObservedAttestersError, BeaconChainError); easy_from_to!(ObservedBlockProducersError, BeaconChainError); +easy_from_to!(AttesterCacheError, BeaconChainError); easy_from_to!(BlockSignatureVerifierError, BeaconChainError); easy_from_to!(PruningError, BeaconChainError); easy_from_to!(ArithError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index c67e9f98e2f..973564c699d 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -1,5 +1,6 @@ #![recursion_limit = "128"] // For lazy-static pub mod attestation_verification; +mod attester_cache; mod beacon_chain; mod beacon_fork_choice_store; mod beacon_proposer_cache; From a920f283abbad9033dc7ca06fc1296c164745cb1 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 12:56:59 +1000 Subject: [PATCH 02/31] Refactor to remove enum --- beacon_node/beacon_chain/src/beacon_chain.rs | 143 +++++++++---------- beacon_node/beacon_chain/src/errors.rs | 4 + 2 files changed, 74 insertions(+), 73 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 989f09e1f65..b81e25e4af3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1214,6 +1214,7 @@ impl BeaconChain { .get_by_slot_and_root(slot, attestation_data_root) } + /* /// Produce an unaggregated `Attestation` that is valid for the given `slot` and `index`. /// /// The produced `Attestation` will not be valid until it has been signed by exactly one @@ -1259,6 +1260,7 @@ impl BeaconChain { }) } } + */ /// Produces an "unaggregated" attestation for the given `slot` and `index` that attests to /// `beacon_block_root`. The provided `state` should match the `block.state_root` for the @@ -1318,30 +1320,40 @@ impl BeaconChain { }) } - pub fn produce_unaggregated_attestation_new( + pub fn produce_unaggregated_attestation( &self, request_slot: Slot, request_index: CommitteeIndex, ) -> Result, Error> { - let request_epoch = request_slot.epoch(T::EthSpec::slots_per_epoch()); - - enum HeadOutcome { - SameEpoch { - current_justified_checkpoint: Checkpoint, - committee_len: usize, - }, - DifferentEpoch { - state_root: Hash256, - }, - } - - let (beacon_block_root, target, head_outcome) = self.with_head(|head| { + let slots_per_epoch = T::EthSpec::slots_per_epoch(); + let request_epoch = request_slot.epoch(slots_per_epoch); + + let beacon_block_root; + let beacon_state_root; + let target; + let head_state_epoch; + let head_state_justified_checkpoint; + let head_state_committee_len; + if let Some(head) = self.canonical_head.try_read_for(HEAD_LOCK_TIMEOUT) { let head_state = &head.beacon_state; - let beacon_block_root = if request_slot >= head_state.slot() { - head.beacon_block_root + let finalized_slot = head_state + .finalized_checkpoint() + .epoch + .start_slot(slots_per_epoch); + if request_slot < finalized_slot { + return Err(Error::AttestingToFinalizedSlot { + finalized_slot, + request_slot, + }); + } + + if request_slot >= head_state.slot() { + beacon_block_root = head.beacon_block_root; + beacon_state_root = head.beacon_state_root(); } else { - *head_state.get_block_root(request_slot)? + beacon_block_root = *head_state.get_block_root(request_slot)?; + beacon_state_root = *head_state.get_state_root(request_slot)?; }; let target_slot = request_epoch.start_slot(T::EthSpec::slots_per_epoch()); @@ -1350,69 +1362,54 @@ impl BeaconChain { } else { *head_state.get_block_root(target_slot)? }; - let target = Checkpoint { + target = Checkpoint { epoch: request_epoch, root: target_root, }; - // TODO: check for finalization. - - let head_outcome = if request_epoch == head_state.current_epoch() { - let current_justified_checkpoint = head_state.current_justified_checkpoint(); - let committee_len = head_state - .get_beacon_committee(request_slot, request_index)? - .committee - .len(); - HeadOutcome::SameEpoch { - current_justified_checkpoint, - committee_len, - } - } else { - HeadOutcome::DifferentEpoch { - state_root: *head_state.get_state_root(request_slot)?, - } - }; - - Ok::<_, BeaconChainError>((beacon_block_root, target, head_outcome)) - })?; + head_state_epoch = head_state.current_epoch(); + head_state_justified_checkpoint = head_state.current_justified_checkpoint(); + head_state_committee_len = head_state + .get_beacon_committee(request_slot, request_index)? + .committee + .len(); + } else { + return Err(Error::CanonicalHeadLockTimeout); + } - let (justified_checkpoint, committee_len) = match head_outcome { - HeadOutcome::SameEpoch { - current_justified_checkpoint, - committee_len, - } => (current_justified_checkpoint, committee_len), - HeadOutcome::DifferentEpoch { state_root } => { - if let Some(tuple) = - self.attester_cache - .get(&target, request_slot, request_index)? - { - tuple - } else { - let mut state: BeaconState = self - .get_state(&state_root, None)? - .ok_or(Error::MissingBeaconState(state_root))?; - - if state.slot() > request_slot { - return Err(Error::CannotAttestToFutureState); - } else if state.current_epoch() < request_epoch { - // Only perform a "partial" state advance since we do not require the state roots to be - // accurate. - partial_state_advance( - &mut state, - Some(state_root), - request_epoch.start_slot(T::EthSpec::slots_per_epoch()), - &self.spec, - )?; - state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; - } + let (justified_checkpoint, committee_len) = if head_state_epoch == request_epoch { + (head_state_justified_checkpoint, head_state_committee_len) + } else { + if let Some(tuple) = self + .attester_cache + .get(&target, request_slot, request_index)? + { + tuple + } else { + let mut state: BeaconState = self + .get_state(&beacon_state_root, None)? + .ok_or(Error::MissingBeaconState(beacon_state_root))?; - self.attester_cache.cache_state_and_return_value( - &state, - beacon_block_root, - request_slot, - request_index, - )? + if state.slot() > request_slot { + return Err(Error::CannotAttestToFutureState); + } else if state.current_epoch() < request_epoch { + // Only perform a "partial" state advance since we do not require the state roots to be + // accurate. + partial_state_advance( + &mut state, + Some(beacon_state_root), + request_epoch.start_slot(T::EthSpec::slots_per_epoch()), + &self.spec, + )?; + state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; } + + self.attester_cache.cache_state_and_return_value( + &state, + beacon_block_root, + request_slot, + request_index, + )? } }; diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index edadaf43980..8297a603cdc 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -105,6 +105,10 @@ pub enum BeaconChainError { head_slot: Slot, request_slot: Slot, }, + AttestingToFinalizedSlot { + finalized_slot: Slot, + request_slot: Slot, + }, BadPreState { parent_root: Hash256, parent_slot: Slot, From dde2880962ee1c3f79f23b2d1926698f752012b2 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 14:40:05 +1000 Subject: [PATCH 03/31] Fix lint, ensure cache is primed --- .../beacon_chain/src/attester_cache.rs | 10 +-- beacon_node/beacon_chain/src/beacon_chain.rs | 70 +++++++++++-------- .../beacon_chain/src/state_advance_timer.rs | 6 ++ 3 files changed, 52 insertions(+), 34 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 8ac0283c0b5..17a942dfa50 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -4,7 +4,6 @@ use types::{ BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, Hash256, RelativeEpoch, Slot, }; -type TargetRoot = Hash256; type TargetCheckpoint = Checkpoint; type JustifiedCheckpoint = Checkpoint; type CommitteeLength = usize; @@ -132,14 +131,17 @@ impl AttesterCache { .transpose() } - pub fn cache_state( + pub fn maybe_cache_state( &self, state: &BeaconState, latest_beacon_block_root: Hash256, ) -> Result<(), Error> { let target = get_state_target(state, latest_beacon_block_root)?; - let cache_item = CacheItem::new(state)?; - self.cache.write().insert(target, cache_item); + let key_exists = self.cache.read().contains_key(&target); + if !key_exists { + let cache_item = CacheItem::new(state)?; + self.cache.write().insert(target, cache_item); + } Ok(()) } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b81e25e4af3..f505e6e7147 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1379,38 +1379,36 @@ impl BeaconChain { let (justified_checkpoint, committee_len) = if head_state_epoch == request_epoch { (head_state_justified_checkpoint, head_state_committee_len) + } else if let Some(tuple) = self + .attester_cache + .get(&target, request_slot, request_index)? + { + tuple } else { - if let Some(tuple) = self - .attester_cache - .get(&target, request_slot, request_index)? - { - tuple - } else { - let mut state: BeaconState = self - .get_state(&beacon_state_root, None)? - .ok_or(Error::MissingBeaconState(beacon_state_root))?; - - if state.slot() > request_slot { - return Err(Error::CannotAttestToFutureState); - } else if state.current_epoch() < request_epoch { - // Only perform a "partial" state advance since we do not require the state roots to be - // accurate. - partial_state_advance( - &mut state, - Some(beacon_state_root), - request_epoch.start_slot(T::EthSpec::slots_per_epoch()), - &self.spec, - )?; - state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; - } - - self.attester_cache.cache_state_and_return_value( - &state, - beacon_block_root, - request_slot, - request_index, - )? + let mut state: BeaconState = self + .get_state(&beacon_state_root, None)? + .ok_or(Error::MissingBeaconState(beacon_state_root))?; + + if state.slot() > request_slot { + return Err(Error::CannotAttestToFutureState); + } else if state.current_epoch() < request_epoch { + // Only perform a "partial" state advance since we do not require the state roots to be + // accurate. + partial_state_advance( + &mut state, + Some(beacon_state_root), + request_epoch.start_slot(T::EthSpec::slots_per_epoch()), + &self.spec, + )?; + state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; } + + self.attester_cache.cache_state_and_return_value( + &state, + beacon_block_root, + request_slot, + request_index, + )? }; self.produce_unaggregated_attestation_parameterized( @@ -2158,6 +2156,7 @@ impl BeaconChain { let block_root = fully_verified_block.block_root; let mut state = fully_verified_block.state; let current_slot = self.slot()?; + let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); let mut ops = fully_verified_block.confirmation_db_batch; let attestation_observation_timer = @@ -2221,6 +2220,17 @@ impl BeaconChain { } } + // Apply the state to the attester cache, only if it is from the previous epoch or earlier. + // + // In a perfect scenario there should be no need to add previous-epoch states to the cache. + // However, latency between the VC and the BN might cause the VC to produce attestations at + // a previous slot. + if state.current_epoch().saturating_add(2_u64) >= current_epoch { + self.attester_cache + .maybe_cache_state(&state, block_root) + .map_err(BeaconChainError::from)?; + } + let mut fork_choice = self.fork_choice.write(); // Do not import a block that doesn't descend from the finalized root. diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index eea329a2a3d..cc0f4ac4c46 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -304,6 +304,12 @@ fn advance_head( ); } + // Apply the state to the attester cache, if the cache deems it interesting. + beacon_chain + .attester_cache + .maybe_cache_state(&state, head_root) + .map_err(BeaconChainError::from)?; + let final_slot = state.slot(); // Insert the advanced state back into the snapshot cache. From 7d90b26e4e2bd2c78f4ebfe4bb168ce10b8240a2 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 14:57:19 +1000 Subject: [PATCH 04/31] Add max len to cache --- .../beacon_chain/src/attester_cache.rs | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 17a942dfa50..6fd91cbd2ff 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -9,6 +9,8 @@ type JustifiedCheckpoint = Checkpoint; type CommitteeLength = usize; type CommitteeIndex = u64; +const MAX_CACHE_LEN: usize = 64; + #[derive(Debug)] pub enum Error { BeaconState(BeaconStateError), @@ -140,7 +142,7 @@ impl AttesterCache { let key_exists = self.cache.read().contains_key(&target); if !key_exists { let cache_item = CacheItem::new(state)?; - self.cache.write().insert(target, cache_item); + self.insert_respecting_max_len(target, cache_item); } Ok(()) } @@ -155,10 +157,28 @@ impl AttesterCache { let target = get_state_target(state, latest_beacon_block_root)?; let cache_item = CacheItem::new(state)?; let value = cache_item.get(slot, index)?; - self.cache.write().insert(target, cache_item); + self.insert_respecting_max_len(target, cache_item); Ok(value) } + fn insert_respecting_max_len(&self, target: TargetCheckpoint, cache_item: CacheItem) { + let mut cache = self.cache.write(); + + if cache.len() >= MAX_CACHE_LEN { + while let Some(oldest) = cache + .iter() + .min_by_key(|(target, _)| target.epoch) + .map(|(target, _)| target) + .filter(|_| cache.len() >= MAX_CACHE_LEN) + .copied() + { + cache.remove(&oldest); + } + } + + cache.insert(target, cache_item); + } + pub fn prune_below(&self, epoch: Epoch) { self.cache.write().retain(|target, _| target.epoch >= epoch); } From 2d313e933cbd65f50db2666030d290b9166e11d0 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 15:10:51 +1000 Subject: [PATCH 05/31] Add some initial comments --- beacon_node/beacon_chain/src/attester_cache.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 6fd91cbd2ff..7ddfb4919d7 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -1,3 +1,14 @@ +//! This module provides the `AttesterCache`, a cache designed for reducing state-reads when +//! validators produce `AttestationData`. +//! +//! This cache is required *as well as* the `ShufflingCache` since the `ShufflingCache` does not +//! provide any information about the `state.current_justified_checkpoint`. It is not trivial to add +//! the justified checkpoint to the `ShufflingCache` since that cache keyed by shuffling decision +//! root, which is not suitable for the justified checkpoint. Whilst we can know the shuffling for +//! epoch `n` during `n - 1`, we *cannot* know the justified checkpoint. Instead, we *must* perform +//! `per_epoch_processing` to transform the state from epoch `n - 1` to epoch `n` so that rewards +//! and penalties can be computed and the `state.current_justified_checkpoint` can be updated. + use parking_lot::RwLock; use std::collections::HashMap; use types::{ @@ -9,6 +20,7 @@ type JustifiedCheckpoint = Checkpoint; type CommitteeLength = usize; type CommitteeIndex = u64; +/// The maximum number of `CacheItems` to be kept in memory. const MAX_CACHE_LEN: usize = 64; #[derive(Debug)] From 7688e1a2d76e3dd47e1a9b4ab66a0afccdb14698 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 15:29:31 +1000 Subject: [PATCH 06/31] Change cache key to decision root --- .../beacon_chain/src/attester_cache.rs | 79 +++++++++---------- beacon_node/beacon_chain/src/beacon_chain.rs | 21 +++-- .../beacon_chain/src/state_advance_timer.rs | 2 +- 3 files changed, 48 insertions(+), 54 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 7ddfb4919d7..adebf59a081 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -15,12 +15,11 @@ use types::{ BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, Hash256, RelativeEpoch, Slot, }; -type TargetCheckpoint = Checkpoint; type JustifiedCheckpoint = Checkpoint; type CommitteeLength = usize; type CommitteeIndex = u64; -/// The maximum number of `CacheItems` to be kept in memory. +/// The maximum number of `AttesterCacheValues` to be kept in memory. const MAX_CACHE_LEN: usize = 64; #[derive(Debug)] @@ -100,12 +99,12 @@ impl CommitteeLengths { } } -pub struct CacheItem { +pub struct AttesterCacheValue { current_justified_checkpoint: Checkpoint, committee_lengths: CommitteeLengths, } -impl CacheItem { +impl AttesterCacheValue { pub fn new(state: &BeaconState) -> Result { let current_justified_checkpoint = state.current_justified_checkpoint(); let committee_lengths = CommitteeLengths::new(state)?; @@ -126,35 +125,50 @@ impl CacheItem { } } +#[derive(PartialEq, Hash, Clone, Copy)] +pub struct AttesterCacheKey { + epoch: Epoch, + decision_root: Hash256, +} + +impl AttesterCacheKey { + pub fn new(epoch: Epoch, state: &BeaconState) -> Result { + let decision_slot = epoch.start_slot(T::slots_per_epoch()).saturating_sub(1_u64); + let decision_root = *state.get_block_root(decision_slot)?; + Ok(Self { + epoch, + decision_root, + }) + } +} + +impl Eq for AttesterCacheKey {} + #[derive(Default)] pub struct AttesterCache { - cache: RwLock>, + cache: RwLock>, } impl AttesterCache { pub fn get( &self, - target: &TargetCheckpoint, + key: &AttesterCacheKey, slot: Slot, committee_index: CommitteeIndex, ) -> Result, Error> { self.cache .read() - .get(target) + .get(key) .map(|cache_item| cache_item.get(slot, committee_index)) .transpose() } - pub fn maybe_cache_state( - &self, - state: &BeaconState, - latest_beacon_block_root: Hash256, - ) -> Result<(), Error> { - let target = get_state_target(state, latest_beacon_block_root)?; - let key_exists = self.cache.read().contains_key(&target); + pub fn maybe_cache_state(&self, state: &BeaconState) -> Result<(), Error> { + let key = AttesterCacheKey::new(state.current_epoch(), state)?; + let key_exists = self.cache.read().contains_key(&key); if !key_exists { - let cache_item = CacheItem::new(state)?; - self.insert_respecting_max_len(target, cache_item); + let cache_item = AttesterCacheValue::new(state)?; + self.insert_respecting_max_len(key, cache_item); } Ok(()) } @@ -162,25 +176,24 @@ impl AttesterCache { pub fn cache_state_and_return_value( &self, state: &BeaconState, - latest_beacon_block_root: Hash256, slot: Slot, index: CommitteeIndex, ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { - let target = get_state_target(state, latest_beacon_block_root)?; - let cache_item = CacheItem::new(state)?; + let key = AttesterCacheKey::new(state.current_epoch(), state)?; + let cache_item = AttesterCacheValue::new(state)?; let value = cache_item.get(slot, index)?; - self.insert_respecting_max_len(target, cache_item); + self.insert_respecting_max_len(key, cache_item); Ok(value) } - fn insert_respecting_max_len(&self, target: TargetCheckpoint, cache_item: CacheItem) { + fn insert_respecting_max_len(&self, key: AttesterCacheKey, value: AttesterCacheValue) { let mut cache = self.cache.write(); if cache.len() >= MAX_CACHE_LEN { while let Some(oldest) = cache .iter() - .min_by_key(|(target, _)| target.epoch) - .map(|(target, _)| target) + .map(|(key, _)| key) + .min_by_key(|key| key.epoch) .filter(|_| cache.len() >= MAX_CACHE_LEN) .copied() { @@ -188,28 +201,10 @@ impl AttesterCache { } } - cache.insert(target, cache_item); + cache.insert(key, value); } pub fn prune_below(&self, epoch: Epoch) { self.cache.write().retain(|target, _| target.epoch >= epoch); } } - -fn get_state_target( - state: &BeaconState, - latest_beacon_block_root: Hash256, -) -> Result { - let target_epoch = state.current_epoch(); - let target_slot = target_epoch.start_slot(T::slots_per_epoch()); - let target_root = if state.slot() <= target_slot { - latest_beacon_block_root - } else { - *state.get_block_root(target_slot)? - }; - - Ok(Checkpoint { - epoch: target_epoch, - root: target_root, - }) -} diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f505e6e7147..77be8deba04 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2,7 +2,7 @@ use crate::attestation_verification::{ Error as AttestationError, SignatureVerifiedAttestation, VerifiedAggregatedAttestation, VerifiedUnaggregatedAttestation, }; -use crate::attester_cache::AttesterCache; +use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::block_verification::{ check_block_is_finalized_descendant, check_block_relevancy, get_block_root, @@ -1334,6 +1334,7 @@ impl BeaconChain { let head_state_epoch; let head_state_justified_checkpoint; let head_state_committee_len; + let attester_cache_key; if let Some(head) = self.canonical_head.try_read_for(HEAD_LOCK_TIMEOUT) { let head_state = &head.beacon_state; @@ -1373,15 +1374,17 @@ impl BeaconChain { .get_beacon_committee(request_slot, request_index)? .committee .len(); + + attester_cache_key = AttesterCacheKey::new(request_epoch, head_state)?; } else { return Err(Error::CanonicalHeadLockTimeout); } let (justified_checkpoint, committee_len) = if head_state_epoch == request_epoch { (head_state_justified_checkpoint, head_state_committee_len) - } else if let Some(tuple) = self - .attester_cache - .get(&target, request_slot, request_index)? + } else if let Some(tuple) = + self.attester_cache + .get(&attester_cache_key, request_slot, request_index)? { tuple } else { @@ -1403,12 +1406,8 @@ impl BeaconChain { state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; } - self.attester_cache.cache_state_and_return_value( - &state, - beacon_block_root, - request_slot, - request_index, - )? + self.attester_cache + .cache_state_and_return_value(&state, request_slot, request_index)? }; self.produce_unaggregated_attestation_parameterized( @@ -2227,7 +2226,7 @@ impl BeaconChain { // a previous slot. if state.current_epoch().saturating_add(2_u64) >= current_epoch { self.attester_cache - .maybe_cache_state(&state, block_root) + .maybe_cache_state(&state) .map_err(BeaconChainError::from)?; } diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index cc0f4ac4c46..307f80ea6b9 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -307,7 +307,7 @@ fn advance_head( // Apply the state to the attester cache, if the cache deems it interesting. beacon_chain .attester_cache - .maybe_cache_state(&state, head_root) + .maybe_cache_state(&state) .map_err(BeaconChainError::from)?; let final_slot = state.slot(); From 12075872a44f8bd0efcedc6a3d6e229bc124fac0 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 15:38:21 +1000 Subject: [PATCH 07/31] Add comments --- beacon_node/beacon_chain/src/attester_cache.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index adebf59a081..53bb0d20d39 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -125,9 +125,23 @@ impl AttesterCacheValue { } } +/// The `AttesterCacheKey` is fundamentally the same thing as the shuffling decision roots, however +/// it provides a unique key for both of the following values: +/// +/// 1. The `state.current_justified_checkpoint`. +/// 2. The attester shuffling. +/// +/// This struct relies upon the premise that the `state.current_justified_checkpoint` in epoch `n` +/// is determined by the root of the latest block in epoch `n - 1`. Notably, this is identical to +/// how the proposer shuffling is keyed in `BeaconProposerCache`. +/// +/// It is also safe, but not maximally efficient, to key the attester shuffling by the same block +/// root. For better shuffling keying strategies, see the `ShufflingCache`. #[derive(PartialEq, Hash, Clone, Copy)] pub struct AttesterCacheKey { + /// The epoch from which the justified checkpoint should be observed. epoch: Epoch, + /// The root of the block at the last slot of `self.epoch - 1`. decision_root: Hash256, } From 1dbe12cb4887bad970d4d96545f8266bc833d975 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 16:16:16 +1000 Subject: [PATCH 08/31] Remove Vec from cache --- .../beacon_chain/src/attester_cache.rs | 117 +++++++++--------- beacon_node/beacon_chain/src/beacon_chain.rs | 18 ++- consensus/types/src/beacon_state.rs | 5 +- .../types/src/beacon_state/committee_cache.rs | 53 ++++++-- 4 files changed, 115 insertions(+), 78 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 53bb0d20d39..d971015a26d 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -11,8 +11,13 @@ use parking_lot::RwLock; use std::collections::HashMap; +use std::ops::Range; use types::{ - BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, Hash256, RelativeEpoch, Slot, + beacon_state::{ + compute_committee_index_in_epoch, compute_committee_range_in_epoch, epoch_committee_count, + }, + BeaconState, BeaconStateError, ChainSpec, Checkpoint, Epoch, EthSpec, Hash256, RelativeEpoch, + Slot, }; type JustifiedCheckpoint = Checkpoint; @@ -25,18 +30,11 @@ const MAX_CACHE_LEN: usize = 64; #[derive(Debug)] pub enum Error { BeaconState(BeaconStateError), - SlotTooLow { - slot: Slot, - first_slot: Slot, - }, - SlotTooHigh { - slot: Slot, - first_slot: Slot, - }, - InvalidCommitteeIndex { - slot_offset: usize, - committee_index: u64, - }, + WrongEpoch { request_epoch: Epoch, epoch: Epoch }, + SlotTooLow { slot: Slot, first_slot: Slot }, + SlotTooHigh { slot: Slot, first_slot: Slot }, + InvalidCommitteeIndex { committee_index: u64 }, + InverseRange { range: Range }, } impl From for Error { @@ -46,56 +44,56 @@ impl From for Error { } struct CommitteeLengths { - first_slot: Slot, - lengths: Vec, - slot_offsets: Vec, + epoch: Epoch, + active_validator_indices_len: usize, } impl CommitteeLengths { fn new(state: &BeaconState) -> Result { - let slots_per_epoch = T::slots_per_epoch(); - let current_epoch = state.current_epoch(); let committee_cache = state.committee_cache(RelativeEpoch::Current)?; - let committees_per_slot = committee_cache.committees_per_slot(); - - let mut lengths = Vec::with_capacity((committees_per_slot * slots_per_epoch) as usize); - let mut slot_offsets = Vec::with_capacity(slots_per_epoch as usize); - - for slot in current_epoch.slot_iter(slots_per_epoch) { - slot_offsets.push(lengths.len()); - for index in 0..committees_per_slot { - let length = state - .get_beacon_committee(slot, index as u64)? - .committee - .len(); - lengths.push(length); - } - } + let active_validator_indices_len = committee_cache.active_validator_indices().len(); Ok(Self { - first_slot: current_epoch.start_slot(slots_per_epoch), - lengths, - slot_offsets, + epoch: state.current_epoch(), + active_validator_indices_len, }) } - fn get(&self, slot: Slot, committee_index: CommitteeIndex) -> Result { - let first_slot = self.first_slot; - let relative_slot = slot - .as_usize() - .checked_sub(first_slot.as_usize()) - .ok_or(Error::SlotTooLow { slot, first_slot })?; - let slot_offset = *self - .slot_offsets - .get(relative_slot) - .ok_or(Error::SlotTooHigh { slot, first_slot })?; - slot_offset - .checked_add(committee_index as usize) - .and_then(|lengths_index| self.lengths.get(lengths_index).copied()) - .ok_or(Error::InvalidCommitteeIndex { - slot_offset, - committee_index, - }) + fn get( + &self, + slot: Slot, + committee_index: CommitteeIndex, + spec: &ChainSpec, + ) -> Result { + let slots_per_epoch = T::slots_per_epoch(); + let request_epoch = slot.epoch(slots_per_epoch); + if request_epoch != self.epoch { + return Err(Error::WrongEpoch { + request_epoch, + epoch: self.epoch, + }); + } + + let slots_per_epoch = slots_per_epoch as usize; + let committees_per_slot = + T::get_committee_count_per_slot(self.active_validator_indices_len, spec)?; + let index_in_epoch = compute_committee_index_in_epoch( + slot, + slots_per_epoch, + committees_per_slot, + committee_index as usize, + ); + let range = compute_committee_range_in_epoch( + epoch_committee_count(committees_per_slot, slots_per_epoch), + index_in_epoch, + self.active_validator_indices_len, + ) + .ok_or(Error::InvalidCommitteeIndex { committee_index })?; + + range + .end + .checked_sub(range.start) + .ok_or(Error::InverseRange { range }) } } @@ -114,13 +112,14 @@ impl AttesterCacheValue { }) } - fn get( + fn get( &self, slot: Slot, committee_index: CommitteeIndex, + spec: &ChainSpec, ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { self.committee_lengths - .get(slot, committee_index) + .get::(slot, committee_index, spec) .map(|committee_length| (self.current_justified_checkpoint, committee_length)) } } @@ -164,16 +163,17 @@ pub struct AttesterCache { } impl AttesterCache { - pub fn get( + pub fn get( &self, key: &AttesterCacheKey, slot: Slot, committee_index: CommitteeIndex, + spec: &ChainSpec, ) -> Result, Error> { self.cache .read() .get(key) - .map(|cache_item| cache_item.get(slot, committee_index)) + .map(|cache_item| cache_item.get::(slot, committee_index, spec)) .transpose() } @@ -192,10 +192,11 @@ impl AttesterCache { state: &BeaconState, slot: Slot, index: CommitteeIndex, + spec: &ChainSpec, ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { let key = AttesterCacheKey::new(state.current_epoch(), state)?; let cache_item = AttesterCacheValue::new(state)?; - let value = cache_item.get(slot, index)?; + let value = cache_item.get::(slot, index, spec)?; self.insert_respecting_max_len(key, cache_item); Ok(value) } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 77be8deba04..0e9b286872f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1382,10 +1382,12 @@ impl BeaconChain { let (justified_checkpoint, committee_len) = if head_state_epoch == request_epoch { (head_state_justified_checkpoint, head_state_committee_len) - } else if let Some(tuple) = - self.attester_cache - .get(&attester_cache_key, request_slot, request_index)? - { + } else if let Some(tuple) = self.attester_cache.get::( + &attester_cache_key, + request_slot, + request_index, + &self.spec, + )? { tuple } else { let mut state: BeaconState = self @@ -1406,8 +1408,12 @@ impl BeaconChain { state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; } - self.attester_cache - .cache_state_and_return_value(&state, request_slot, request_index)? + self.attester_cache.cache_state_and_return_value( + &state, + request_slot, + request_index, + &self.spec, + )? }; self.produce_unaggregated_attestation_parameterized( diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 673bca4a889..89b8ff7048c 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -22,7 +22,10 @@ use test_random_derive::TestRandom; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; -pub use self::committee_cache::CommitteeCache; +pub use self::committee_cache::{ + compute_committee_index_in_epoch, compute_committee_range_in_epoch, epoch_committee_count, + CommitteeCache, +}; pub use clone_config::CloneConfig; pub use eth_spec::*; pub use iter::BlockRootsIter; diff --git a/consensus/types/src/beacon_state/committee_cache.rs b/consensus/types/src/beacon_state/committee_cache.rs index 9c8f428d83e..49791220121 100644 --- a/consensus/types/src/beacon_state/committee_cache.rs +++ b/consensus/types/src/beacon_state/committee_cache.rs @@ -121,8 +121,12 @@ impl CommitteeCache { return None; } - let committee_index = - (slot.as_u64() % self.slots_per_epoch) * self.committees_per_slot + index; + let committee_index = compute_committee_index_in_epoch( + slot, + self.slots_per_epoch as usize, + self.committees_per_slot as usize, + index as usize, + ); let committee = self.compute_committee(committee_index as usize)?; Some(BeaconCommittee { @@ -219,7 +223,10 @@ impl CommitteeCache { /// /// Spec v0.12.1 pub fn epoch_committee_count(&self) -> usize { - self.committees_per_slot as usize * self.slots_per_epoch as usize + epoch_committee_count( + self.committees_per_slot as usize, + self.slots_per_epoch as usize, + ) } /// Returns the number of committees per slot for this cache's epoch. @@ -242,16 +249,7 @@ impl CommitteeCache { /// /// Spec v0.12.1 fn compute_committee_range(&self, index: usize) -> Option> { - let count = self.epoch_committee_count(); - if count == 0 || index >= count { - return None; - } - - let num_validators = self.shuffling.len(); - let start = (num_validators * index) / count; - let end = (num_validators * (index + 1)) / count; - - Some(start..end) + compute_committee_range_in_epoch(self.epoch_committee_count(), index, self.shuffling.len()) } /// Returns the index of some validator in `self.shuffling`. @@ -264,6 +262,35 @@ impl CommitteeCache { } } +pub fn compute_committee_index_in_epoch( + slot: Slot, + slots_per_epoch: usize, + committees_per_slot: usize, + committee_index: usize, +) -> usize { + (slot.as_usize() % slots_per_epoch) * committees_per_slot + committee_index +} + +pub fn compute_committee_range_in_epoch( + epoch_committee_count: usize, + index_in_epoch: usize, + shuffling_len: usize, +) -> Option> { + let count = epoch_committee_count; + if count == 0 || index_in_epoch >= count { + return None; + } + + let start = (shuffling_len * index_in_epoch) / count; + let end = (shuffling_len * (index_in_epoch + 1)) / count; + + Some(start..end) +} + +pub fn epoch_committee_count(committees_per_slot: usize, slots_per_epoch: usize) -> usize { + committees_per_slot * slots_per_epoch +} + /// Returns a list of all `validators` indices where the validator is active at the given /// `epoch`. /// From 64e26a9765a376935980ae31f15ae80904e0c46c Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 17:20:03 +1000 Subject: [PATCH 09/31] Fix bug with cache key, add comments --- .../beacon_chain/src/attester_cache.rs | 53 ++++++++--- beacon_node/beacon_chain/src/beacon_chain.rs | 87 ++++++++++--------- .../beacon_chain/src/state_advance_timer.rs | 2 +- 3 files changed, 92 insertions(+), 50 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index d971015a26d..55896417a7a 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -31,8 +31,6 @@ const MAX_CACHE_LEN: usize = 64; pub enum Error { BeaconState(BeaconStateError), WrongEpoch { request_epoch: Epoch, epoch: Epoch }, - SlotTooLow { slot: Slot, first_slot: Slot }, - SlotTooHigh { slot: Slot, first_slot: Slot }, InvalidCommitteeIndex { committee_index: u64 }, InverseRange { range: Range }, } @@ -43,12 +41,16 @@ impl From for Error { } } +/// Provides the length for each committee in a given `Epoch`. struct CommitteeLengths { + /// The `epoch` to which the lengths pertain. epoch: Epoch, + /// The length of the shuffling in `self.epoch`. active_validator_indices_len: usize, } impl CommitteeLengths { + /// Instantate `Self` using `state.current_epoch()`. fn new(state: &BeaconState) -> Result { let committee_cache = state.committee_cache(RelativeEpoch::Current)?; let active_validator_indices_len = committee_cache.active_validator_indices().len(); @@ -59,6 +61,7 @@ impl CommitteeLengths { }) } + /// Get the length of the committee at the given `slot` and `committee_index`. fn get( &self, slot: Slot, @@ -67,6 +70,8 @@ impl CommitteeLengths { ) -> Result { let slots_per_epoch = T::slots_per_epoch(); let request_epoch = slot.epoch(slots_per_epoch); + + // Sanity check. if request_epoch != self.epoch { return Err(Error::WrongEpoch { request_epoch, @@ -97,12 +102,14 @@ impl CommitteeLengths { } } +/// Provides information relevant to producing an attestation. pub struct AttesterCacheValue { current_justified_checkpoint: Checkpoint, committee_lengths: CommitteeLengths, } impl AttesterCacheValue { + /// Instantiate `Self` using `state.current_epoch()`. pub fn new(state: &BeaconState) -> Result { let current_justified_checkpoint = state.current_justified_checkpoint(); let committee_lengths = CommitteeLengths::new(state)?; @@ -112,6 +119,7 @@ impl AttesterCacheValue { }) } + /// Get the justified checkpoint and committee length for some `slot` and `committee_index`. fn get( &self, slot: Slot, @@ -134,8 +142,8 @@ impl AttesterCacheValue { /// is determined by the root of the latest block in epoch `n - 1`. Notably, this is identical to /// how the proposer shuffling is keyed in `BeaconProposerCache`. /// -/// It is also safe, but not maximally efficient, to key the attester shuffling by the same block -/// root. For better shuffling keying strategies, see the `ShufflingCache`. +/// It is also safe, but not maximally efficient, to key the attester shuffling with the same +/// strategy. For better shuffling keying strategies, see the `ShufflingCache`. #[derive(PartialEq, Hash, Clone, Copy)] pub struct AttesterCacheKey { /// The epoch from which the justified checkpoint should be observed. @@ -145,9 +153,25 @@ pub struct AttesterCacheKey { } impl AttesterCacheKey { - pub fn new(epoch: Epoch, state: &BeaconState) -> Result { - let decision_slot = epoch.start_slot(T::slots_per_epoch()).saturating_sub(1_u64); - let decision_root = *state.get_block_root(decision_slot)?; + /// Instantiate `Self` to key `state.current_epoch()`. + pub fn new( + epoch: Epoch, + state: &BeaconState, + latest_block_root: Hash256, + ) -> Result { + let slots_per_epoch = T::slots_per_epoch(); + let decision_slot = epoch.start_slot(slots_per_epoch).saturating_sub(1_u64); + + let decision_root = if decision_slot.epoch(slots_per_epoch) == epoch { + // This scenario is only possible during the genesis epoch. In this scenario, all-zeros + // is used as an alias to the genesis block. + Hash256::zero() + } else if epoch > state.current_epoch() { + latest_block_root + } else { + *state.get_block_root(decision_slot)? + }; + Ok(Self { epoch, decision_root, @@ -157,6 +181,10 @@ impl AttesterCacheKey { impl Eq for AttesterCacheKey {} +/// Provides a cache for the justified checkpoint and committee length when producing an +/// attestation. +/// +/// See the module-level documentation for more information. #[derive(Default)] pub struct AttesterCache { cache: RwLock>, @@ -177,8 +205,12 @@ impl AttesterCache { .transpose() } - pub fn maybe_cache_state(&self, state: &BeaconState) -> Result<(), Error> { - let key = AttesterCacheKey::new(state.current_epoch(), state)?; + pub fn maybe_cache_state( + &self, + state: &BeaconState, + latest_block_root: Hash256, + ) -> Result<(), Error> { + let key = AttesterCacheKey::new(state.current_epoch(), state, latest_block_root)?; let key_exists = self.cache.read().contains_key(&key); if !key_exists { let cache_item = AttesterCacheValue::new(state)?; @@ -190,11 +222,12 @@ impl AttesterCache { pub fn cache_state_and_return_value( &self, state: &BeaconState, + latest_block_root: Hash256, slot: Slot, index: CommitteeIndex, spec: &ChainSpec, ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { - let key = AttesterCacheKey::new(state.current_epoch(), state)?; + let key = AttesterCacheKey::new(state.current_epoch(), state, latest_block_root)?; let cache_item = AttesterCacheValue::new(state)?; let value = cache_item.get::(slot, index, spec)?; self.insert_respecting_max_len(key, cache_item); diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0e9b286872f..42feb9e626e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1333,7 +1333,7 @@ impl BeaconChain { let target; let head_state_epoch; let head_state_justified_checkpoint; - let head_state_committee_len; + let head_state_committee_len_opt; let attester_cache_key; if let Some(head) = self.canonical_head.try_read_for(HEAD_LOCK_TIMEOUT) { let head_state = &head.beacon_state; @@ -1370,51 +1370,60 @@ impl BeaconChain { head_state_epoch = head_state.current_epoch(); head_state_justified_checkpoint = head_state.current_justified_checkpoint(); - head_state_committee_len = head_state - .get_beacon_committee(request_slot, request_index)? - .committee - .len(); + head_state_committee_len_opt = if head_state_epoch == request_epoch { + Some( + head_state + .get_beacon_committee(request_slot, request_index)? + .committee + .len(), + ) + } else { + None + }; - attester_cache_key = AttesterCacheKey::new(request_epoch, head_state)?; + attester_cache_key = + AttesterCacheKey::new(request_epoch, head_state, beacon_block_root)?; } else { return Err(Error::CanonicalHeadLockTimeout); } - let (justified_checkpoint, committee_len) = if head_state_epoch == request_epoch { - (head_state_justified_checkpoint, head_state_committee_len) - } else if let Some(tuple) = self.attester_cache.get::( - &attester_cache_key, - request_slot, - request_index, - &self.spec, - )? { - tuple - } else { - let mut state: BeaconState = self - .get_state(&beacon_state_root, None)? - .ok_or(Error::MissingBeaconState(beacon_state_root))?; - - if state.slot() > request_slot { - return Err(Error::CannotAttestToFutureState); - } else if state.current_epoch() < request_epoch { - // Only perform a "partial" state advance since we do not require the state roots to be - // accurate. - partial_state_advance( - &mut state, - Some(beacon_state_root), - request_epoch.start_slot(T::EthSpec::slots_per_epoch()), - &self.spec, - )?; - state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; - } - - self.attester_cache.cache_state_and_return_value( - &state, + let (justified_checkpoint, committee_len) = + if let Some(head_state_committee_len) = head_state_committee_len_opt { + (head_state_justified_checkpoint, head_state_committee_len) + } else if let Some(tuple) = self.attester_cache.get::( + &attester_cache_key, request_slot, request_index, &self.spec, - )? - }; + )? { + tuple + } else { + let mut state: BeaconState = self + .get_state(&beacon_state_root, None)? + .ok_or(Error::MissingBeaconState(beacon_state_root))?; + + if state.slot() > request_slot { + return Err(Error::CannotAttestToFutureState); + } else if state.current_epoch() < request_epoch { + // Only perform a "partial" state advance since we do not require the state roots to be + // accurate. + partial_state_advance( + &mut state, + Some(beacon_state_root), + request_epoch.start_slot(T::EthSpec::slots_per_epoch()), + &self.spec, + )?; + state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; + } + + self.attester_cache.cache_state_and_return_value( + &state, + beacon_block_root, + request_slot, + request_index, + &self.spec, + )? + }; self.produce_unaggregated_attestation_parameterized( request_slot, @@ -2232,7 +2241,7 @@ impl BeaconChain { // a previous slot. if state.current_epoch().saturating_add(2_u64) >= current_epoch { self.attester_cache - .maybe_cache_state(&state) + .maybe_cache_state(&state, block_root) .map_err(BeaconChainError::from)?; } diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 307f80ea6b9..cc0f4ac4c46 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -307,7 +307,7 @@ fn advance_head( // Apply the state to the attester cache, if the cache deems it interesting. beacon_chain .attester_cache - .maybe_cache_state(&state) + .maybe_cache_state(&state, head_root) .map_err(BeaconChainError::from)?; let final_slot = state.slot(); From 0444d89bb2d6cb3e9165dbee1221825c5a8f7e12 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 17:40:15 +1000 Subject: [PATCH 10/31] Handle uninitialized cache, add comments --- .../beacon_chain/src/attester_cache.rs | 57 +++++++++++++------ beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- .../beacon_chain/src/state_advance_timer.rs | 2 +- consensus/types/src/beacon_state.rs | 11 +++- 4 files changed, 53 insertions(+), 19 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 55896417a7a..288e5593541 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -23,6 +23,7 @@ use types::{ type JustifiedCheckpoint = Checkpoint; type CommitteeLength = usize; type CommitteeIndex = u64; +type CacheHashMap = HashMap; /// The maximum number of `AttesterCacheValues` to be kept in memory. const MAX_CACHE_LEN: usize = 64; @@ -50,10 +51,17 @@ struct CommitteeLengths { } impl CommitteeLengths { - /// Instantate `Self` using `state.current_epoch()`. - fn new(state: &BeaconState) -> Result { - let committee_cache = state.committee_cache(RelativeEpoch::Current)?; - let active_validator_indices_len = committee_cache.active_validator_indices().len(); + /// Instantiate `Self` using `state.current_epoch()`. + fn new(state: &BeaconState, spec: &ChainSpec) -> Result { + let active_validator_indices_len = if let Ok(committee_cache) = + state.committee_cache(RelativeEpoch::Current) + { + committee_cache.active_validator_indices().len() + } else { + // Building the cache like this avoids taking a mutable reference to `BeaconState`. + let committee_cache = state.initialize_committee_cache(state.current_epoch(), spec)?; + committee_cache.active_validator_indices().len() + }; Ok(Self { epoch: state.current_epoch(), @@ -110,9 +118,9 @@ pub struct AttesterCacheValue { impl AttesterCacheValue { /// Instantiate `Self` using `state.current_epoch()`. - pub fn new(state: &BeaconState) -> Result { + pub fn new(state: &BeaconState, spec: &ChainSpec) -> Result { let current_justified_checkpoint = state.current_justified_checkpoint(); - let committee_lengths = CommitteeLengths::new(state)?; + let committee_lengths = CommitteeLengths::new(state, spec)?; Ok(Self { current_justified_checkpoint, committee_lengths, @@ -154,6 +162,14 @@ pub struct AttesterCacheKey { impl AttesterCacheKey { /// Instantiate `Self` to key `state.current_epoch()`. + /// + /// The `latest_block_root` should be the latest block that has been applied to `state`. This + /// parameter is required since the state does not store the block root for any block with the + /// same slot as `slot.slot()`. + /// + /// ## Errors + /// + /// May error if `epoch` is out of the range of `state.block_roots`. pub fn new( epoch: Epoch, state: &BeaconState, @@ -167,6 +183,8 @@ impl AttesterCacheKey { // is used as an alias to the genesis block. Hash256::zero() } else if epoch > state.current_epoch() { + // If the requested epoch is higher than the current epoch, the latest block will always + // be the decision root. latest_block_root } else { *state.get_block_root(decision_slot)? @@ -187,10 +205,12 @@ impl Eq for AttesterCacheKey {} /// See the module-level documentation for more information. #[derive(Default)] pub struct AttesterCache { - cache: RwLock>, + cache: RwLock, } impl AttesterCache { + /// Get the justified checkpoint and committee length for the `slot` and `committee_index` in + /// the state identified by the cache `key`. pub fn get( &self, key: &AttesterCacheKey, @@ -205,20 +225,23 @@ impl AttesterCache { .transpose() } + /// Cache the `state.current_epoch()` values if they are not already present in the state. pub fn maybe_cache_state( &self, state: &BeaconState, latest_block_root: Hash256, + spec: &ChainSpec, ) -> Result<(), Error> { let key = AttesterCacheKey::new(state.current_epoch(), state, latest_block_root)?; - let key_exists = self.cache.read().contains_key(&key); - if !key_exists { - let cache_item = AttesterCacheValue::new(state)?; - self.insert_respecting_max_len(key, cache_item); + let mut cache = self.cache.write(); + if !cache.contains_key(&key) { + let cache_item = AttesterCacheValue::new(state, spec)?; + Self::insert_respecting_max_len(&mut cache, key, cache_item); } Ok(()) } + /// pub fn cache_state_and_return_value( &self, state: &BeaconState, @@ -228,15 +251,17 @@ impl AttesterCache { spec: &ChainSpec, ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { let key = AttesterCacheKey::new(state.current_epoch(), state, latest_block_root)?; - let cache_item = AttesterCacheValue::new(state)?; + let cache_item = AttesterCacheValue::new(state, spec)?; let value = cache_item.get::(slot, index, spec)?; - self.insert_respecting_max_len(key, cache_item); + Self::insert_respecting_max_len(&mut self.cache.write(), key, cache_item); Ok(value) } - fn insert_respecting_max_len(&self, key: AttesterCacheKey, value: AttesterCacheValue) { - let mut cache = self.cache.write(); - + fn insert_respecting_max_len( + cache: &mut CacheHashMap, + key: AttesterCacheKey, + value: AttesterCacheValue, + ) { if cache.len() >= MAX_CACHE_LEN { while let Some(oldest) = cache .iter() diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 42feb9e626e..d7b77a62c73 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2241,7 +2241,7 @@ impl BeaconChain { // a previous slot. if state.current_epoch().saturating_add(2_u64) >= current_epoch { self.attester_cache - .maybe_cache_state(&state, block_root) + .maybe_cache_state(&state, block_root, &self.spec) .map_err(BeaconChainError::from)?; } diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index cc0f4ac4c46..56b000385f4 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -307,7 +307,7 @@ fn advance_head( // Apply the state to the attester cache, if the cache deems it interesting. beacon_chain .attester_cache - .maybe_cache_state(&state, head_root) + .maybe_cache_state(&state, head_root, &beacon_chain.spec) .map_err(BeaconChainError::from)?; let final_slot = state.slot(); diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 89b8ff7048c..302eb8cf4d8 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1311,10 +1311,19 @@ impl BeaconState { let epoch = relative_epoch.into_epoch(self.current_epoch()); let i = Self::committee_cache_index(relative_epoch); - *self.committee_cache_at_index_mut(i)? = CommitteeCache::initialized(&self, epoch, spec)?; + *self.committee_cache_at_index_mut(i)? = self.initialize_committee_cache(epoch, spec)?; Ok(()) } + /// Returns a committee cache initialized for the given `epoch`. + pub fn initialize_committee_cache( + &self, + epoch: Epoch, + spec: &ChainSpec, + ) -> Result { + CommitteeCache::initialized(&self, epoch, spec) + } + /// Advances the cache for this state into the next epoch. /// /// This should be used if the `slot` of this state is advanced beyond an epoch boundary. From 095f3598aee70006e56770bf4157ead095855eca Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 18:27:09 +1000 Subject: [PATCH 11/31] Add comments --- .../beacon_chain/src/attester_cache.rs | 26 ++++++++++++++++--- .../types/src/beacon_state/committee_cache.rs | 6 +++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 288e5593541..d8ac0660a74 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -31,9 +31,18 @@ const MAX_CACHE_LEN: usize = 64; #[derive(Debug)] pub enum Error { BeaconState(BeaconStateError), - WrongEpoch { request_epoch: Epoch, epoch: Epoch }, - InvalidCommitteeIndex { committee_index: u64 }, - InverseRange { range: Range }, + /// Indicates a cache inconsistency. + WrongEpoch { + request_epoch: Epoch, + epoch: Epoch, + }, + InvalidCommitteeIndex { + committee_index: u64, + }, + /// Indicates an inconsistency with the beacon state committees. + InverseRange { + range: Range, + }, } impl From for Error { @@ -241,7 +250,11 @@ impl AttesterCache { Ok(()) } + /// Cache the `state.current_epoch()` values, even if they are already present in `self`. Also, + /// return the value for the given `slot` and `index`. /// + /// This is roughly equivalent to using `Self::get` and then `Self::maybe_cache_state`, but the + /// main advantage is that it is atomic. pub fn cache_state_and_return_value( &self, state: &BeaconState, @@ -257,6 +270,9 @@ impl AttesterCache { Ok(value) } + /// Insert a value to `cache`, ensuring it does not exceed the maximum length. + /// + /// If the cache is already full, the item with the lowest epoch will be removed. fn insert_respecting_max_len( cache: &mut CacheHashMap, key: AttesterCacheKey, @@ -267,6 +283,7 @@ impl AttesterCache { .iter() .map(|(key, _)| key) .min_by_key(|key| key.epoch) + // Only return values whilst the cache is full. .filter(|_| cache.len() >= MAX_CACHE_LEN) .copied() { @@ -277,6 +294,9 @@ impl AttesterCache { cache.insert(key, value); } + /// Remove all entries where the `key.epoch` is lower than the given `epoch`. + /// + /// Generally, the provided `epoch` should be the finalized epoch. pub fn prune_below(&self, epoch: Epoch) { self.cache.write().retain(|target, _| target.epoch >= epoch); } diff --git a/consensus/types/src/beacon_state/committee_cache.rs b/consensus/types/src/beacon_state/committee_cache.rs index 49791220121..891559a5de9 100644 --- a/consensus/types/src/beacon_state/committee_cache.rs +++ b/consensus/types/src/beacon_state/committee_cache.rs @@ -262,6 +262,8 @@ impl CommitteeCache { } } +/// Computes the position of the given `committee_index` with respect to all committees in the +/// epoch. Generally used to provide input to the `compute_committee_range_in_epoch` function. pub fn compute_committee_index_in_epoch( slot: Slot, slots_per_epoch: usize, @@ -271,6 +273,9 @@ pub fn compute_committee_index_in_epoch( (slot.as_usize() % slots_per_epoch) * committees_per_slot + committee_index } +/// Computes the range for slicing the shuffled indices to determine the members of a committee. +/// +/// The `index_in_epoch` is generally computed using `compute_committee_index_in_epoch`. pub fn compute_committee_range_in_epoch( epoch_committee_count: usize, index_in_epoch: usize, @@ -287,6 +292,7 @@ pub fn compute_committee_range_in_epoch( Some(start..end) } +/// Returns the total number of committees in an epoch. pub fn epoch_committee_count(committees_per_slot: usize, slots_per_epoch: usize) -> usize { committees_per_slot * slots_per_epoch } From f538365d869801e144a678c6fd91e4c01e1e352e Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 18:34:46 +1000 Subject: [PATCH 12/31] Tidy beacon chain --- beacon_node/beacon_chain/src/beacon_chain.rs | 160 ++++++------------- 1 file changed, 47 insertions(+), 113 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d7b77a62c73..6ad20f3f590 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1214,112 +1214,12 @@ impl BeaconChain { .get_by_slot_and_root(slot, attestation_data_root) } - /* /// Produce an unaggregated `Attestation` that is valid for the given `slot` and `index`. /// /// The produced `Attestation` will not be valid until it has been signed by exactly one /// validator that is in the committee for `slot` and `index` in the canonical chain. /// /// Always attests to the canonical chain. - pub fn produce_unaggregated_attestation( - &self, - slot: Slot, - index: CommitteeIndex, - ) -> Result, Error> { - // Note: we're taking a lock on the head. The work involved here should be trivial enough - // that the lock should not be held for long. - let head = self - .canonical_head - .try_read_for(HEAD_LOCK_TIMEOUT) - .ok_or(Error::CanonicalHeadLockTimeout)?; - - if slot >= head.beacon_block.slot() { - self.produce_unaggregated_attestation_for_block( - slot, - index, - head.beacon_block_root, - Cow::Borrowed(&head.beacon_state), - head.beacon_state_root(), - ) - } else { - // We disallow producing attestations *prior* to the current head since such an - // attestation would require loading a `BeaconState` from disk. Loading `BeaconState` - // from disk is very resource intensive and proposes a DoS risk from validator clients. - // - // Although we generally allow validator clients to do things that might harm us (i.e., - // we trust them), sometimes we need to protect the BN from accidental errors which - // could cause it significant harm. - // - // This case is particularity harmful since the HTTP API can effectively call this - // function an unlimited amount of times. If `n` validators all happen to call it at - // the same time, we're going to load `n` states (and tree hash caches) into memory all - // at once. With `n >= 10` we're looking at hundreds of MB or GBs of RAM. - Err(Error::AttestingPriorToHead { - head_slot: head.beacon_block.slot(), - request_slot: slot, - }) - } - } - */ - - /// Produces an "unaggregated" attestation for the given `slot` and `index` that attests to - /// `beacon_block_root`. The provided `state` should match the `block.state_root` for the - /// `block` identified by `beacon_block_root`. - /// - /// The attestation doesn't _really_ have anything about it that makes it unaggregated per say, - /// however this function is only required in the context of forming an unaggregated - /// attestation. It would be an (undetectable) violation of the protocol to create a - /// `SignedAggregateAndProof` based upon the output of this function. - pub fn produce_unaggregated_attestation_for_block( - &self, - slot: Slot, - index: CommitteeIndex, - beacon_block_root: Hash256, - mut state: Cow>, - state_root: Hash256, - ) -> Result, Error> { - let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - - if state.slot() > slot { - return Err(Error::CannotAttestToFutureState); - } else if state.current_epoch() < epoch { - let mut_state = state.to_mut(); - // Only perform a "partial" state advance since we do not require the state roots to be - // accurate. - partial_state_advance( - mut_state, - Some(state_root), - epoch.start_slot(T::EthSpec::slots_per_epoch()), - &self.spec, - )?; - mut_state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; - } - - let committee_len = state.get_beacon_committee(slot, index)?.committee.len(); - - let target_slot = epoch.start_slot(T::EthSpec::slots_per_epoch()); - let target_root = if state.slot() <= target_slot { - beacon_block_root - } else { - *state.get_block_root(target_slot)? - }; - - Ok(Attestation { - aggregation_bits: BitList::with_capacity(committee_len)?, - data: AttestationData { - slot, - index, - beacon_block_root, - source: state.current_justified_checkpoint(), - target: Checkpoint { - epoch, - root: target_root, - }, - }, - signature: AggregateSignature::empty(), - }) - } - pub fn produce_unaggregated_attestation( &self, request_slot: Slot, @@ -1425,34 +1325,68 @@ impl BeaconChain { )? }; - self.produce_unaggregated_attestation_parameterized( - request_slot, - request_index, - beacon_block_root, - justified_checkpoint, - target.root, - committee_len, - ) + Ok(Attestation { + aggregation_bits: BitList::with_capacity(committee_len)?, + data: AttestationData { + slot: request_slot, + index: request_index, + beacon_block_root, + source: justified_checkpoint, + target, + }, + signature: AggregateSignature::empty(), + }) } - fn produce_unaggregated_attestation_parameterized( + /// Produces an "unaggregated" attestation for the given `slot` and `index` that attests to + /// `beacon_block_root`. The provided `state` should match the `block.state_root` for the + /// `block` identified by `beacon_block_root`. + /// + /// The attestation doesn't _really_ have anything about it that makes it unaggregated per say, + /// however this function is only required in the context of forming an unaggregated + /// attestation. It would be an (undetectable) violation of the protocol to create a + /// `SignedAggregateAndProof` based upon the output of this function. + pub fn produce_unaggregated_attestation_for_block( &self, slot: Slot, index: CommitteeIndex, beacon_block_root: Hash256, - current_justified_checkpoint: Checkpoint, - target_root: Hash256, - committee_len: usize, + mut state: Cow>, + state_root: Hash256, ) -> Result, Error> { let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + if state.slot() > slot { + return Err(Error::CannotAttestToFutureState); + } else if state.current_epoch() < epoch { + let mut_state = state.to_mut(); + // Only perform a "partial" state advance since we do not require the state roots to be + // accurate. + partial_state_advance( + mut_state, + Some(state_root), + epoch.start_slot(T::EthSpec::slots_per_epoch()), + &self.spec, + )?; + mut_state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; + } + + let committee_len = state.get_beacon_committee(slot, index)?.committee.len(); + + let target_slot = epoch.start_slot(T::EthSpec::slots_per_epoch()); + let target_root = if state.slot() <= target_slot { + beacon_block_root + } else { + *state.get_block_root(target_slot)? + }; + Ok(Attestation { aggregation_bits: BitList::with_capacity(committee_len)?, data: AttestationData { slot, index, beacon_block_root, - source: current_justified_checkpoint, + source: state.current_justified_checkpoint(), target: Checkpoint { epoch, root: target_root, From 66f9aa75bf8083c14aae5427c8e6d32d996c6bff Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 18:48:52 +1000 Subject: [PATCH 13/31] Apply restriction to HTTP api --- beacon_node/http_api/src/lib.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 057c8693dbc..8182ddc8f4a 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1699,9 +1699,12 @@ pub fn serve( .and_then( |query: api_types::ValidatorAttestationDataQuery, chain: Arc>| { blocking_json_task(move || { + let slots_per_epoch = T::EthSpec::slots_per_epoch(); let current_slot = chain .slot() .map_err(warp_utils::reject::beacon_chain_error)?; + let current_epoch = current_slot.epoch(slots_per_epoch); + let query_epoch = query.slot.epoch(slots_per_epoch); // allow a tolerance of one slot to account for clock skew if query.slot > current_slot + 1 { @@ -1709,6 +1712,18 @@ pub fn serve( "request slot {} is more than one slot past the current slot {}", query.slot, current_slot ))); + } else if query_epoch + 1 < current_epoch { + // Restrict attestations that are earlier than the previous epoch. This is + // an artificial restriction employed for two purposes: + // + // 1. To avoid presenting 500 errors to the user by requesting an epoch too + // low for the beacon chain to handle. + // 2. Because producing an attestation for a slot earlier than the previous + // epoch will not be helpful to the network. + return Err(warp_utils::reject::custom_bad_request(format!( + "request epoch {} is prior to the current epoch", + query_epoch + ))); } chain From 6fd2299d998d290e03ded34de9106027dde77af4 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 19:00:39 +1000 Subject: [PATCH 14/31] Tidy use of Option, add comments --- beacon_node/beacon_chain/src/beacon_chain.rs | 69 ++++++++++++++++---- 1 file changed, 57 insertions(+), 12 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6ad20f3f590..aa04b052031 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1220,6 +1220,10 @@ impl BeaconChain { /// validator that is in the committee for `slot` and `index` in the canonical chain. /// /// Always attests to the canonical chain. + /// + /// ## Errors + /// + /// May return an error if the `request_slot` is too far behind the head state. pub fn produce_unaggregated_attestation( &self, request_slot: Slot, @@ -1228,16 +1232,26 @@ impl BeaconChain { let slots_per_epoch = T::EthSpec::slots_per_epoch(); let request_epoch = request_slot.epoch(slots_per_epoch); + /* + * Phase 1/2: + * + * Take a short-lived read-lock on the head and copy the necessary information from it. + * + * It is important that this first phase is as quick as possible; creating contention for + * the head-lock is not desirable. + */ + let beacon_block_root; let beacon_state_root; let target; - let head_state_epoch; - let head_state_justified_checkpoint; - let head_state_committee_len_opt; + let current_epoch_attesting_info: Option<(Checkpoint, usize)>; let attester_cache_key; if let Some(head) = self.canonical_head.try_read_for(HEAD_LOCK_TIMEOUT) { let head_state = &head.beacon_state; + // There is no value in producing an attestation to a block that is pre-finalization and + // it is likely to cause expensive and pointless reads to the freezer database. Exit + // early if this is the case. let finalized_slot = head_state .finalized_checkpoint() .epoch @@ -1250,15 +1264,20 @@ impl BeaconChain { } if request_slot >= head_state.slot() { + // When attesting to the head slot or later, always use the head of the chain. beacon_block_root = head.beacon_block_root; beacon_state_root = head.beacon_state_root(); } else { + // Permits attesting to slots *prior* to the current head. This is desirable when + // the VC and BN are out-of-sync due to time issues or overloading. beacon_block_root = *head_state.get_block_root(request_slot)?; beacon_state_root = *head_state.get_state_root(request_slot)?; }; let target_slot = request_epoch.start_slot(T::EthSpec::slots_per_epoch()); let target_root = if head_state.slot() <= target_slot { + // In the case of skip-slots it is possible that the beacon block root and target + // root are equal. beacon_block_root } else { *head_state.get_block_root(target_slot)? @@ -1268,41 +1287,66 @@ impl BeaconChain { root: target_root, }; - head_state_epoch = head_state.current_epoch(); - head_state_justified_checkpoint = head_state.current_justified_checkpoint(); - head_state_committee_len_opt = if head_state_epoch == request_epoch { - Some( + current_epoch_attesting_info = if head_state.current_epoch() == request_epoch { + // When the head state is in the same epoch as the request, all the information + // required to attest is available on the head state. + Some(( + head_state.current_justified_checkpoint(), head_state .get_beacon_committee(request_slot, request_index)? .committee .len(), - ) + )) } else { + // If the head state is in a *different* epoch to the request, more work is required + // to determine the justified checkpoint and committee length. None }; + // Determine the key for `self.attester_cache`, in case it is required later in this + // routine. attester_cache_key = AttesterCacheKey::new(request_epoch, head_state, beacon_block_root)?; } else { return Err(Error::CanonicalHeadLockTimeout); } + /* + * Phase 2/2: + * + * Determine if the justified checkpoint and committee length copied from the head is + * suitable for this attestation. If not, try the attester cache. If it also fails, load + * a state from disk and prime the attester cache. + * + * Note: developers should endeavour to remember that whilst it is possible to know the + * committee lengths for epoch `n` in epoch `n - 1`, it is *not* possible to do so for the + * justified checkpoint. + */ + let (justified_checkpoint, committee_len) = - if let Some(head_state_committee_len) = head_state_committee_len_opt { - (head_state_justified_checkpoint, head_state_committee_len) - } else if let Some(tuple) = self.attester_cache.get::( + if let Some((justified_checkpoint, committee_len)) = current_epoch_attesting_info { + // The head state is in the same epoch as the attestation, so there is no more + // required information. + (justified_checkpoint, committee_len) + } else if let Some(cached_values) = self.attester_cache.get::( &attester_cache_key, request_slot, request_index, &self.spec, )? { - tuple + // The head state was not suitable to produce an attestation in this epoch, but the + // suitable values were already cached. Return them. + cached_values } else { + // Neither the head state, nor the attester cache was able to produce the required + // information to attest in this epoch. Load a `BeaconState` from disk and use it to + // fulfil the request (and prime the cache to avoid this next time). let mut state: BeaconState = self .get_state(&beacon_state_root, None)? .ok_or(Error::MissingBeaconState(beacon_state_root))?; if state.slot() > request_slot { + // This indicates an internal inconsistency. return Err(Error::CannotAttestToFutureState); } else if state.current_epoch() < request_epoch { // Only perform a "partial" state advance since we do not require the state roots to be @@ -1316,6 +1360,7 @@ impl BeaconChain { state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; } + // Compute the required values, add them to the cache and also return them. self.attester_cache.cache_state_and_return_value( &state, beacon_block_root, From 5ed7f42f6189fe3356984eb2087ad925cfda24f1 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 19:03:14 +1000 Subject: [PATCH 15/31] Tidy comments --- consensus/types/src/beacon_state.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 302eb8cf4d8..89f9fe1d37c 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1315,7 +1315,10 @@ impl BeaconState { Ok(()) } - /// Returns a committee cache initialized for the given `epoch`. + /// Initializes a new committee cache for the given `epoch`, regardless of whether one already + /// exists. Returns the committee cache without attaching it to `self`. + /// + /// To build a cache and store it on `self`, use `Self::build_committee_cache`. pub fn initialize_committee_cache( &self, epoch: Epoch, From e9ec567534ceaa10f75a389b7c577f8cb2bdfda5 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 26 Jul 2021 19:04:21 +1000 Subject: [PATCH 16/31] Fix HTTP error --- beacon_node/http_api/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 8182ddc8f4a..268083cc95c 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1721,7 +1721,7 @@ pub fn serve( // 2. Because producing an attestation for a slot earlier than the previous // epoch will not be helpful to the network. return Err(warp_utils::reject::custom_bad_request(format!( - "request epoch {} is prior to the current epoch", + "request epoch {} is prior to the previous epoch", query_epoch ))); } From d22ece5be5a98a6176833ae755caaca09580436b Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 27 Jul 2021 07:39:02 +1000 Subject: [PATCH 17/31] Add explicit error for ancient slot --- beacon_node/beacon_chain/src/beacon_chain.rs | 13 +++++++++++++ beacon_node/beacon_chain/src/errors.rs | 4 ++++ 2 files changed, 17 insertions(+) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index aa04b052031..3efb1bf5e8c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1263,6 +1263,19 @@ impl BeaconChain { }); } + // This function will eventually fail when trying to access a slot which is + // out-of-bounds of `state.block_roots`. This explicit error is intended to provide a + // clearer message to the user an ambiguous `SlotOutOfBounds` error. + let slots_per_historical_root = T::EthSpec::slots_per_historical_root() as u64; + let lowest_permissible_slot = + head_state.slot().saturating_sub(slots_per_historical_root); + if head_state.slot() < lowest_permissible_slot { + return Err(Error::AttestingToAncientSlot { + head_state_slot: head_state.slot(), + lowest_permissible_slot, + }); + } + if request_slot >= head_state.slot() { // When attesting to the head slot or later, always use the head of the chain. beacon_block_root = head.beacon_block_root; diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 8297a603cdc..248a3191871 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -109,6 +109,10 @@ pub enum BeaconChainError { finalized_slot: Slot, request_slot: Slot, }, + AttestingToAncientSlot { + head_state_slot: Slot, + lowest_permissible_slot: Slot, + }, BadPreState { parent_root: Hash256, parent_slot: Slot, From f511137b1c0cbdeb4335b11370d325e0ae66f0b0 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 27 Jul 2021 07:39:20 +1000 Subject: [PATCH 18/31] Remove restriction from API --- beacon_node/http_api/src/lib.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 268083cc95c..1e1f02486bf 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1703,8 +1703,6 @@ pub fn serve( let current_slot = chain .slot() .map_err(warp_utils::reject::beacon_chain_error)?; - let current_epoch = current_slot.epoch(slots_per_epoch); - let query_epoch = query.slot.epoch(slots_per_epoch); // allow a tolerance of one slot to account for clock skew if query.slot > current_slot + 1 { @@ -1712,18 +1710,6 @@ pub fn serve( "request slot {} is more than one slot past the current slot {}", query.slot, current_slot ))); - } else if query_epoch + 1 < current_epoch { - // Restrict attestations that are earlier than the previous epoch. This is - // an artificial restriction employed for two purposes: - // - // 1. To avoid presenting 500 errors to the user by requesting an epoch too - // low for the beacon chain to handle. - // 2. Because producing an attestation for a slot earlier than the previous - // epoch will not be helpful to the network. - return Err(warp_utils::reject::custom_bad_request(format!( - "request epoch {} is prior to the previous epoch", - query_epoch - ))); } chain From b501963f81de161f48db620ac9b0240e9f365304 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 27 Jul 2021 11:05:46 +1000 Subject: [PATCH 19/31] Remove unused variable --- beacon_node/http_api/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 1e1f02486bf..057c8693dbc 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1699,7 +1699,6 @@ pub fn serve( .and_then( |query: api_types::ValidatorAttestationDataQuery, chain: Arc>| { blocking_json_task(move || { - let slots_per_epoch = T::EthSpec::slots_per_epoch(); let current_slot = chain .slot() .map_err(warp_utils::reject::beacon_chain_error)?; From 0d0c972e30d1cc92daf7ce10c7acc13b263635b2 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 27 Jul 2021 11:22:33 +1000 Subject: [PATCH 20/31] Tidy, add comments --- .../types/src/beacon_state/committee_cache.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/consensus/types/src/beacon_state/committee_cache.rs b/consensus/types/src/beacon_state/committee_cache.rs index 891559a5de9..a4e446aee27 100644 --- a/consensus/types/src/beacon_state/committee_cache.rs +++ b/consensus/types/src/beacon_state/committee_cache.rs @@ -263,7 +263,10 @@ impl CommitteeCache { } /// Computes the position of the given `committee_index` with respect to all committees in the -/// epoch. Generally used to provide input to the `compute_committee_range_in_epoch` function. +/// epoch. +/// +/// The return result may be used to provide input to the `compute_committee_range_in_epoch` +/// function. pub fn compute_committee_index_in_epoch( slot: Slot, slots_per_epoch: usize, @@ -275,19 +278,19 @@ pub fn compute_committee_index_in_epoch( /// Computes the range for slicing the shuffled indices to determine the members of a committee. /// -/// The `index_in_epoch` is generally computed using `compute_committee_index_in_epoch`. +/// The `index_in_epoch` parameter can be computed computed using +/// `compute_committee_index_in_epoch`. pub fn compute_committee_range_in_epoch( epoch_committee_count: usize, index_in_epoch: usize, shuffling_len: usize, ) -> Option> { - let count = epoch_committee_count; - if count == 0 || index_in_epoch >= count { + if epoch_committee_count == 0 || index_in_epoch >= epoch_committee_count { return None; } - let start = (shuffling_len * index_in_epoch) / count; - let end = (shuffling_len * (index_in_epoch + 1)) / count; + let start = (shuffling_len * index_in_epoch) / epoch_committee_count; + let end = (shuffling_len * (index_in_epoch + 1)) / epoch_committee_count; Some(start..end) } From 06970595ed930ab83238178897f349f319dae7c4 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 27 Jul 2021 12:05:12 +1000 Subject: [PATCH 21/31] Move state reads into AttesterCache --- .../beacon_chain/src/attester_cache.rs | 89 ++++++++++++++++--- beacon_node/beacon_chain/src/beacon_chain.rs | 57 ++++-------- beacon_node/beacon_chain/src/errors.rs | 6 +- 3 files changed, 93 insertions(+), 59 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index d8ac0660a74..c88341ae82f 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -9,7 +9,9 @@ //! `per_epoch_processing` to transform the state from epoch `n - 1` to epoch `n` so that rewards //! and penalties can be computed and the `state.current_justified_checkpoint` can be updated. +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use parking_lot::RwLock; +use state_processing::state_advance::{partial_state_advance, Error as StateAdvanceError}; use std::collections::HashMap; use std::ops::Range; use types::{ @@ -31,6 +33,14 @@ const MAX_CACHE_LEN: usize = 64; #[derive(Debug)] pub enum Error { BeaconState(BeaconStateError), + // Boxed to avoid an infinite size recursion issue. + BeaconChain(Box), + MissingBeaconState(Hash256), + FailedToTransitionState(StateAdvanceError), + CannotAttestToFutureState { + state_slot: Slot, + request_slot: Slot, + }, /// Indicates a cache inconsistency. WrongEpoch { request_epoch: Epoch, @@ -51,6 +61,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: BeaconChainError) -> Self { + Error::BeaconChain(Box::new(e)) + } +} + /// Provides the length for each committee in a given `Epoch`. struct CommitteeLengths { /// The `epoch` to which the lengths pertain. @@ -250,23 +266,70 @@ impl AttesterCache { Ok(()) } - /// Cache the `state.current_epoch()` values, even if they are already present in `self`. Also, - /// return the value for the given `slot` and `index`. + /// Read the state identified by `state_root` from the database, advance it to the required + /// slot, use it to prime the cache and return the values for the provided `slot` and + /// `committee_index`. + /// + /// ## Notes /// - /// This is roughly equivalent to using `Self::get` and then `Self::maybe_cache_state`, but the - /// main advantage is that it is atomic. - pub fn cache_state_and_return_value( + /// This function takes a write-lock on the internal cache. It is generally advise to try + /// getting the value using `Self::get` before running this function. `Self::get` only takes a + /// read-lock and is therefore less likely to create head contention. + pub fn load_and_cache_state( &self, - state: &BeaconState, - latest_block_root: Hash256, + state_root: Hash256, + key: AttesterCacheKey, slot: Slot, - index: CommitteeIndex, - spec: &ChainSpec, + committee_index: CommitteeIndex, + chain: &BeaconChain, ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { - let key = AttesterCacheKey::new(state.current_epoch(), state, latest_block_root)?; - let cache_item = AttesterCacheValue::new(state, spec)?; - let value = cache_item.get::(slot, index, spec)?; - Self::insert_respecting_max_len(&mut self.cache.write(), key, cache_item); + let spec = &chain.spec; + let slots_per_epoch = T::EthSpec::slots_per_epoch(); + let epoch = slot.epoch(slots_per_epoch); + + // Take a write-lock on the cache before starting the state read. + // + // Whilst holding the write-lock during the state read will create contention, it prevents + // the scenario where multiple requests from separate threads cause duplicate state reads. + let mut cache = self.cache.write(); + + // Try the cache to see if someone has already primed it between the time the function was + // called and when the cache write-lock was obtained. This avoids performing duplicate state + // reads. + if let Some(value) = cache + .get(&key) + .map(|cache_item| cache_item.get::(slot, committee_index, spec)) + .transpose()? + { + return Ok(value); + } + + let mut state: BeaconState = chain + .get_state(&state_root, None)? + .ok_or(Error::MissingBeaconState(state_root))?; + + if state.slot() > slot { + // This indicates an internal inconsistency. + return Err(Error::CannotAttestToFutureState { + state_slot: state.slot(), + request_slot: slot, + }); + } else if state.current_epoch() < epoch { + // Only perform a "partial" state advance since we do not require the state roots to be + // accurate. + partial_state_advance( + &mut state, + Some(state_root), + epoch.start_slot(slots_per_epoch), + spec, + ) + .map_err(Error::FailedToTransitionState)?; + state.build_committee_cache(RelativeEpoch::Current, spec)?; + } + + let cache_item = AttesterCacheValue::new(&state, spec)?; + let value = cache_item.get::(slot, committee_index, spec)?; + Self::insert_respecting_max_len(&mut cache, key, cache_item); Ok(value) } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3efb1bf5e8c..2c1454e14c8 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1265,14 +1265,14 @@ impl BeaconChain { // This function will eventually fail when trying to access a slot which is // out-of-bounds of `state.block_roots`. This explicit error is intended to provide a - // clearer message to the user an ambiguous `SlotOutOfBounds` error. + // clearer message to the user than an ambiguous `SlotOutOfBounds` error. let slots_per_historical_root = T::EthSpec::slots_per_historical_root() as u64; let lowest_permissible_slot = head_state.slot().saturating_sub(slots_per_historical_root); - if head_state.slot() < lowest_permissible_slot { + if request_slot < lowest_permissible_slot { return Err(Error::AttestingToAncientSlot { - head_state_slot: head_state.slot(), lowest_permissible_slot, + request_slot, }); } @@ -1281,7 +1281,7 @@ impl BeaconChain { beacon_block_root = head.beacon_block_root; beacon_state_root = head.beacon_state_root(); } else { - // Permits attesting to slots *prior* to the current head. This is desirable when + // Permit attesting to slots *prior* to the current head. This is desirable when // the VC and BN are out-of-sync due to time issues or overloading. beacon_block_root = *head_state.get_block_root(request_slot)?; beacon_state_root = *head_state.get_state_root(request_slot)?; @@ -1289,8 +1289,8 @@ impl BeaconChain { let target_slot = request_epoch.start_slot(T::EthSpec::slots_per_epoch()); let target_root = if head_state.slot() <= target_slot { - // In the case of skip-slots it is possible that the beacon block root and target - // root are equal. + // If the state is earlier than the target slot then the target *must* be the head + // block root. beacon_block_root } else { *head_state.get_block_root(target_slot)? @@ -1327,13 +1327,9 @@ impl BeaconChain { /* * Phase 2/2: * - * Determine if the justified checkpoint and committee length copied from the head is - * suitable for this attestation. If not, try the attester cache. If it also fails, load - * a state from disk and prime the attester cache. - * - * Note: developers should endeavour to remember that whilst it is possible to know the - * committee lengths for epoch `n` in epoch `n - 1`, it is *not* possible to do so for the - * justified checkpoint. + * If the justified checkpoint and committee length from the head are suitable for this + * attestation, use them. If not, try the attester cache. If the cache misses, load a state + * from disk and prime the cache with it. */ let (justified_checkpoint, committee_len) = @@ -1347,39 +1343,18 @@ impl BeaconChain { request_index, &self.spec, )? { - // The head state was not suitable to produce an attestation in this epoch, but the - // suitable values were already cached. Return them. + // The suitable values were already cached. Return them. cached_values } else { // Neither the head state, nor the attester cache was able to produce the required - // information to attest in this epoch. Load a `BeaconState` from disk and use it to - // fulfil the request (and prime the cache to avoid this next time). - let mut state: BeaconState = self - .get_state(&beacon_state_root, None)? - .ok_or(Error::MissingBeaconState(beacon_state_root))?; - - if state.slot() > request_slot { - // This indicates an internal inconsistency. - return Err(Error::CannotAttestToFutureState); - } else if state.current_epoch() < request_epoch { - // Only perform a "partial" state advance since we do not require the state roots to be - // accurate. - partial_state_advance( - &mut state, - Some(beacon_state_root), - request_epoch.start_slot(T::EthSpec::slots_per_epoch()), - &self.spec, - )?; - state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; - } - - // Compute the required values, add them to the cache and also return them. - self.attester_cache.cache_state_and_return_value( - &state, - beacon_block_root, + // information to attest in this epoch. So, load a `BeaconState` from disk and use + // it to fulfil the request (and prime the cache to avoid this next time). + self.attester_cache.load_and_cache_state( + beacon_state_root, + attester_cache_key, request_slot, request_index, - &self.spec, + &self, )? }; diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 248a3191871..f484b194549 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -101,17 +101,13 @@ pub enum BeaconChainError { }, WeakSubjectivtyVerificationFailure, WeakSubjectivtyShutdownError(TrySendError), - AttestingPriorToHead { - head_slot: Slot, - request_slot: Slot, - }, AttestingToFinalizedSlot { finalized_slot: Slot, request_slot: Slot, }, AttestingToAncientSlot { - head_state_slot: Slot, lowest_permissible_slot: Slot, + request_slot: Slot, }, BadPreState { parent_root: Hash256, From 2fe2eaad478f4e6980046aa85de70861eb0d6429 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 27 Jul 2021 14:14:54 +1000 Subject: [PATCH 22/31] Add comments for cache size --- beacon_node/beacon_chain/src/attester_cache.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index c88341ae82f..b8de9d90a7d 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -28,7 +28,13 @@ type CommitteeIndex = u64; type CacheHashMap = HashMap; /// The maximum number of `AttesterCacheValues` to be kept in memory. -const MAX_CACHE_LEN: usize = 64; +/// +/// Each `AttesterCacheValues` is very small (~16 bytes) and the cache will generally be kept small +/// by pruning on finality. +/// +/// The value provided here is much larger than will be used during ideal network conditions, +/// however we make it large since the values are so small. +const MAX_CACHE_LEN: usize = 1_024; #[derive(Debug)] pub enum Error { From 290664684427a403a23a13a4d956aaf6ca0a4bb3 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 27 Jul 2021 14:15:06 +1000 Subject: [PATCH 23/31] Add warn log for cache miss --- beacon_node/beacon_chain/src/beacon_chain.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 2c1454e14c8..2dde6b31ba5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1241,6 +1241,7 @@ impl BeaconChain { * the head-lock is not desirable. */ + let head_state_slot; let beacon_block_root; let beacon_state_root; let target; @@ -1248,6 +1249,7 @@ impl BeaconChain { let attester_cache_key; if let Some(head) = self.canonical_head.try_read_for(HEAD_LOCK_TIMEOUT) { let head_state = &head.beacon_state; + head_state_slot = head_state.slot(); // There is no value in producing an attestation to a block that is pre-finalization and // it is likely to cause expensive and pointless reads to the freezer database. Exit @@ -1346,6 +1348,17 @@ impl BeaconChain { // The suitable values were already cached. Return them. cached_values } else { + // This scenario is likely to happen occasionally. It is reasonable to drop this to + // a `debug!` if it turns out to be frequent and unavoidable. At least whilst the + // feature is new it is nice to see if the cache is getting a lot of misses. + warn!( + self.log, + "Attester cache miss"; + "beacon_block_root" => %beacon_block_root, + "head_state_slot" => %head_state_slot, + "request_slot" => %request_slot, + ); + // Neither the head state, nor the attester cache was able to produce the required // information to attest in this epoch. So, load a `BeaconState` from disk and use // it to fulfil the request (and prime the cache to avoid this next time). From dd375a04385d710627a9a74c8c287c747f47a8f9 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 27 Jul 2021 15:01:45 +1000 Subject: [PATCH 24/31] Tidy comments --- .../beacon_chain/src/attester_cache.rs | 24 ++++++++++++------- beacon_node/beacon_chain/src/builder.rs | 10 ++++++++ 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index b8de9d90a7d..492ae1254e0 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -39,7 +39,7 @@ const MAX_CACHE_LEN: usize = 1_024; #[derive(Debug)] pub enum Error { BeaconState(BeaconStateError), - // Boxed to avoid an infinite size recursion issue. + // Boxed to avoid an infinite-size recursion issue. BeaconChain(Box), MissingBeaconState(Hash256), FailedToTransitionState(StateAdvanceError), @@ -73,7 +73,8 @@ impl From for Error { } } -/// Provides the length for each committee in a given `Epoch`. +/// Stores the minimal amount of data required compute the committee length for any committee at any +/// slot in a given `epoch`. struct CommitteeLengths { /// The `epoch` to which the lengths pertain. epoch: Epoch, @@ -141,7 +142,12 @@ impl CommitteeLengths { } } -/// Provides information relevant to producing an attestation. +/// Provides the following information for some epoch: +/// +/// - The `state.current_justified_checkpoint` value. +/// - The committee lengths for all indices and slots. +/// +/// These values are used during attestation production. pub struct AttesterCacheValue { current_justified_checkpoint: Checkpoint, committee_lengths: CommitteeLengths, @@ -171,8 +177,8 @@ impl AttesterCacheValue { } } -/// The `AttesterCacheKey` is fundamentally the same thing as the shuffling decision roots, however -/// it provides a unique key for both of the following values: +/// The `AttesterCacheKey` is fundamentally the same thing as the proposer shuffling decision root, +/// however here we use it as an identity for both of the following values: /// /// 1. The `state.current_justified_checkpoint`. /// 2. The attester shuffling. @@ -186,6 +192,8 @@ impl AttesterCacheValue { #[derive(PartialEq, Hash, Clone, Copy)] pub struct AttesterCacheKey { /// The epoch from which the justified checkpoint should be observed. + /// + /// Attestations which use `self.epoch` as `target.epoch` should use this key. epoch: Epoch, /// The root of the block at the last slot of `self.epoch - 1`. decision_root: Hash256, @@ -278,9 +286,9 @@ impl AttesterCache { /// /// ## Notes /// - /// This function takes a write-lock on the internal cache. It is generally advise to try - /// getting the value using `Self::get` before running this function. `Self::get` only takes a - /// read-lock and is therefore less likely to create head contention. + /// This function takes a write-lock on the internal cache. Prefer attempting a `Self::get` call + /// before running this function as `Self::get` only takes a read-lock and is therefore less + /// likely to create head contention. pub fn load_and_cache_state( &self, state_root: Hash256, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 5413824a306..bb13dab56f0 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -562,6 +562,16 @@ where .head() .map_err(|e| format!("Failed to get head: {:?}", e))?; + // Prime the attester cache with the head state. + beacon_chain + .attester_cache + .maybe_cache_state( + &head.beacon_state, + head.beacon_block_root, + &beacon_chain.spec, + ) + .map_err(|e| format!("Failed to prime attester cache: {:?}", e))?; + // Only perform the check if it was configured. if let Some(wss_checkpoint) = beacon_chain.config.weak_subjectivity_checkpoint { if let Err(e) = beacon_chain.verify_weak_subjectivity_checkpoint( From 5bedecfa34224159a688b4605da6872324b5dc2a Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 28 Jul 2021 07:53:26 +1000 Subject: [PATCH 25/31] Add metrics --- beacon_node/beacon_chain/src/beacon_chain.rs | 9 ++++++++ beacon_node/beacon_chain/src/metrics.rs | 22 ++++++++++++-------- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 2dde6b31ba5..3716388c8db 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1229,6 +1229,8 @@ impl BeaconChain { request_slot: Slot, request_index: CommitteeIndex, ) -> Result, Error> { + let _total_timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_SECONDS); + let slots_per_epoch = T::EthSpec::slots_per_epoch(); let request_epoch = request_slot.epoch(slots_per_epoch); @@ -1247,6 +1249,7 @@ impl BeaconChain { let target; let current_epoch_attesting_info: Option<(Checkpoint, usize)>; let attester_cache_key; + let head_timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_HEAD_SCRAPE_SECONDS); if let Some(head) = self.canonical_head.try_read_for(HEAD_LOCK_TIMEOUT) { let head_state = &head.beacon_state; head_state_slot = head_state.slot(); @@ -1325,6 +1328,7 @@ impl BeaconChain { } else { return Err(Error::CanonicalHeadLockTimeout); } + drop(head_timer); /* * Phase 2/2: @@ -1334,6 +1338,8 @@ impl BeaconChain { * from disk and prime the cache with it. */ + let cache_timer = + metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_CACHE_INTERACTION_SECONDS); let (justified_checkpoint, committee_len) = if let Some((justified_checkpoint, committee_len)) = current_epoch_attesting_info { // The head state is in the same epoch as the attestation, so there is no more @@ -1362,6 +1368,8 @@ impl BeaconChain { // Neither the head state, nor the attester cache was able to produce the required // information to attest in this epoch. So, load a `BeaconState` from disk and use // it to fulfil the request (and prime the cache to avoid this next time). + let _cache_build_timer = + metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_CACHE_PRIME_SECONDS); self.attester_cache.load_and_cache_state( beacon_state_root, attester_cache_key, @@ -1370,6 +1378,7 @@ impl BeaconChain { &self, )? }; + drop(cache_timer); Ok(Attestation { aggregation_bits: BitList::with_capacity(committee_len)?, diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index e0d7052f8e4..535c717bc37 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -210,18 +210,22 @@ lazy_static! { /* * Attestation Production */ - pub static ref ATTESTATION_PRODUCTION_REQUESTS: Result = try_create_int_counter( - "beacon_attestation_production_requests_total", - "Count of all attestation production requests" - ); - pub static ref ATTESTATION_PRODUCTION_SUCCESSES: Result = try_create_int_counter( - "beacon_attestation_production_successes_total", - "Count of attestations processed without error" - ); - pub static ref ATTESTATION_PRODUCTION_TIMES: Result = try_create_histogram( + pub static ref ATTESTATION_PRODUCTION_SECONDS: Result = try_create_histogram( "beacon_attestation_production_seconds", "Full runtime of attestation production" ); + pub static ref ATTESTATION_PRODUCTION_HEAD_SCRAPE_SECONDS: Result = try_create_histogram( + "attestation_production_head_scrape_seconds", + "Time taken to read the head state" + ); + pub static ref ATTESTATION_PRODUCTION_CACHE_INTERACTION_SECONDS: Result = try_create_histogram( + "attestation_production_cache_interaction_seconds", + "Time spent interacting with the attester cache" + ); + pub static ref ATTESTATION_PRODUCTION_CACHE_PRIME_SECONDS: Result = try_create_histogram( + "attestation_production_cache_prime_seconds", + "Time spent loading a new state from the disk due to a cache miss" + ); } // Second lazy-static block is used to account for macro recursion limit. From 0cdbe3af2441e7617d01fc23873f0a5e5142475e Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 28 Jul 2021 15:39:11 +1000 Subject: [PATCH 26/31] Apply suggestions from code review Co-authored-by: Michael Sproul --- beacon_node/beacon_chain/src/attester_cache.rs | 8 ++++---- beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 492ae1254e0..8b878493a29 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -3,7 +3,7 @@ //! //! This cache is required *as well as* the `ShufflingCache` since the `ShufflingCache` does not //! provide any information about the `state.current_justified_checkpoint`. It is not trivial to add -//! the justified checkpoint to the `ShufflingCache` since that cache keyed by shuffling decision +//! the justified checkpoint to the `ShufflingCache` since that cache is keyed by shuffling decision //! root, which is not suitable for the justified checkpoint. Whilst we can know the shuffling for //! epoch `n` during `n - 1`, we *cannot* know the justified checkpoint. Instead, we *must* perform //! `per_epoch_processing` to transform the state from epoch `n - 1` to epoch `n` so that rewards @@ -73,7 +73,7 @@ impl From for Error { } } -/// Stores the minimal amount of data required compute the committee length for any committee at any +/// Stores the minimal amount of data required to compute the committee length for any committee at any /// slot in a given `epoch`. struct CommitteeLengths { /// The `epoch` to which the lengths pertain. @@ -204,7 +204,7 @@ impl AttesterCacheKey { /// /// The `latest_block_root` should be the latest block that has been applied to `state`. This /// parameter is required since the state does not store the block root for any block with the - /// same slot as `slot.slot()`. + /// same slot as `state.slot()`. /// /// ## Errors /// @@ -288,7 +288,7 @@ impl AttesterCache { /// /// This function takes a write-lock on the internal cache. Prefer attempting a `Self::get` call /// before running this function as `Self::get` only takes a read-lock and is therefore less - /// likely to create head contention. + /// likely to create contention. pub fn load_and_cache_state( &self, state_root: Hash256, diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3716388c8db..7cc11768a6a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1360,7 +1360,7 @@ impl BeaconChain { warn!( self.log, "Attester cache miss"; - "beacon_block_root" => %beacon_block_root, + "beacon_block_root" => ?beacon_block_root, "head_state_slot" => %head_state_slot, "request_slot" => %request_slot, ); From 625a69e24ac8b63e1bad85465cc809500a3e5ea1 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 28 Jul 2021 15:57:48 +1000 Subject: [PATCH 27/31] Partially address review comments --- beacon_node/beacon_chain/src/attester_cache.rs | 15 ++++++--------- beacon_node/beacon_chain/src/beacon_chain.rs | 9 +++------ 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 8b878493a29..01662efc135 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -189,7 +189,7 @@ impl AttesterCacheValue { /// /// It is also safe, but not maximally efficient, to key the attester shuffling with the same /// strategy. For better shuffling keying strategies, see the `ShufflingCache`. -#[derive(PartialEq, Hash, Clone, Copy)] +#[derive(Eq, PartialEq, Hash, Clone, Copy)] pub struct AttesterCacheKey { /// The epoch from which the justified checkpoint should be observed. /// @@ -236,8 +236,6 @@ impl AttesterCacheKey { } } -impl Eq for AttesterCacheKey {} - /// Provides a cache for the justified checkpoint and committee length when producing an /// attestation. /// @@ -355,16 +353,15 @@ impl AttesterCache { key: AttesterCacheKey, value: AttesterCacheValue, ) { - if cache.len() >= MAX_CACHE_LEN { - while let Some(oldest) = cache + while cache.len() >= MAX_CACHE_LEN { + if let Some(oldest) = cache .iter() - .map(|(key, _)| key) + .map(|(key, _)| *key) .min_by_key(|key| key.epoch) - // Only return values whilst the cache is full. - .filter(|_| cache.len() >= MAX_CACHE_LEN) - .copied() { cache.remove(&oldest); + } else { + break; } } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7cc11768a6a..da21e264295 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1354,10 +1354,7 @@ impl BeaconChain { // The suitable values were already cached. Return them. cached_values } else { - // This scenario is likely to happen occasionally. It is reasonable to drop this to - // a `debug!` if it turns out to be frequent and unavoidable. At least whilst the - // feature is new it is nice to see if the cache is getting a lot of misses. - warn!( + debug!( self.log, "Attester cache miss"; "beacon_block_root" => ?beacon_block_root, @@ -2223,12 +2220,12 @@ impl BeaconChain { } } - // Apply the state to the attester cache, only if it is from the previous epoch or earlier. + // Apply the state to the attester cache, only if it is from the previous epoch or later. // // In a perfect scenario there should be no need to add previous-epoch states to the cache. // However, latency between the VC and the BN might cause the VC to produce attestations at // a previous slot. - if state.current_epoch().saturating_add(2_u64) >= current_epoch { + if state.current_epoch().saturating_add(1_u64) >= current_epoch { self.attester_cache .maybe_cache_state(&state, block_root, &self.spec) .map_err(BeaconChainError::from)?; From 8cb488efe5cd9fdd628bc6b7edb0cffd2ed980b7 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 28 Jul 2021 16:32:02 +1000 Subject: [PATCH 28/31] Clone the head state --- .../beacon_chain/src/attester_cache.rs | 17 ++++-- beacon_node/beacon_chain/src/beacon_chain.rs | 52 +++++++++++++------ 2 files changed, 50 insertions(+), 19 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 01662efc135..5ce0f591ccb 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -278,10 +278,14 @@ impl AttesterCache { Ok(()) } - /// Read the state identified by `state_root` from the database, advance it to the required + /// Read the state identified by `state_root` from the database*, advance it to the required /// slot, use it to prime the cache and return the values for the provided `slot` and /// `committee_index`. /// + /// *: The database read is avoided if `state_opt.is_some()`. + /// + /// If `state_opt.is_some()`, the `state_root` *must* match that state. + /// /// ## Notes /// /// This function takes a write-lock on the internal cache. Prefer attempting a `Self::get` call @@ -290,6 +294,7 @@ impl AttesterCache { pub fn load_and_cache_state( &self, state_root: Hash256, + state_opt: Option>, key: AttesterCacheKey, slot: Slot, committee_index: CommitteeIndex, @@ -316,9 +321,13 @@ impl AttesterCache { return Ok(value); } - let mut state: BeaconState = chain - .get_state(&state_root, None)? - .ok_or(Error::MissingBeaconState(state_root))?; + let mut state = if let Some(state) = state_opt { + state + } else { + chain + .get_state(&state_root, None)? + .ok_or(Error::MissingBeaconState(state_root))? + }; if state.slot() > slot { // This indicates an internal inconsistency. diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index da21e264295..c25ba2bd94c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1248,6 +1248,7 @@ impl BeaconChain { let beacon_state_root; let target; let current_epoch_attesting_info: Option<(Checkpoint, usize)>; + let head_state_clone: Option>>; let attester_cache_key; let head_timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_HEAD_SCRAPE_SECONDS); if let Some(head) = self.canonical_head.try_read_for(HEAD_LOCK_TIMEOUT) { @@ -1305,21 +1306,41 @@ impl BeaconChain { root: target_root, }; - current_epoch_attesting_info = if head_state.current_epoch() == request_epoch { - // When the head state is in the same epoch as the request, all the information - // required to attest is available on the head state. - Some(( - head_state.current_justified_checkpoint(), - head_state - .get_beacon_committee(request_slot, request_index)? - .committee - .len(), - )) - } else { - // If the head state is in a *different* epoch to the request, more work is required - // to determine the justified checkpoint and committee length. - None - }; + match request_epoch.cmp(&head_state.current_epoch()) { + // The request is in the same epoch as the head state. + Ordering::Equal => { + // When the head state is in the same epoch as the request, all the information + // required to attest is available on the head state. + current_epoch_attesting_info = Some(( + head_state.current_justified_checkpoint(), + head_state + .get_beacon_committee(request_slot, request_index)? + .committee + .len(), + )); + // There is no need to clone the head state, all required information has + // already been obtained. + head_state_clone = None; + } + // The request is in a *later* epoch than the head state. + Ordering::Greater => { + // The justified checkpoint in the head state is not useful in this scenario. + current_epoch_attesting_info = None; + // The head state *is* useful in this this scenario because we can advance it + // into the required epoch. + head_state_clone = Some(Box::new( + head_state.clone_with(CloneConfig::committee_caches_only()), + )); + } + // The request is in a *earlier* epoch than the head state. + Ordering::Less => { + // The justified checkpoint in the head state is not useful in this scenario. + current_epoch_attesting_info = None; + // The head state is not useful in this scenario, we must load an older one from + // disk. + head_state_clone = None; + } + } // Determine the key for `self.attester_cache`, in case it is required later in this // routine. @@ -1369,6 +1390,7 @@ impl BeaconChain { metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_CACHE_PRIME_SECONDS); self.attester_cache.load_and_cache_state( beacon_state_root, + head_state_clone.map(|boxed| *boxed), attester_cache_key, request_slot, request_index, From cefab045e0618d46d0bc8a2319ad80ace49b8d56 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 28 Jul 2021 16:48:26 +1000 Subject: [PATCH 29/31] Fix state root, load target --- beacon_node/beacon_chain/src/beacon_chain.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c25ba2bd94c..e7390e49138 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1285,12 +1285,10 @@ impl BeaconChain { if request_slot >= head_state.slot() { // When attesting to the head slot or later, always use the head of the chain. beacon_block_root = head.beacon_block_root; - beacon_state_root = head.beacon_state_root(); } else { // Permit attesting to slots *prior* to the current head. This is desirable when // the VC and BN are out-of-sync due to time issues or overloading. beacon_block_root = *head_state.get_block_root(request_slot)?; - beacon_state_root = *head_state.get_state_root(request_slot)?; }; let target_slot = request_epoch.start_slot(T::EthSpec::slots_per_epoch()); @@ -1321,6 +1319,8 @@ impl BeaconChain { // There is no need to clone the head state, all required information has // already been obtained. head_state_clone = None; + // This value is not required, yet we set it to something sensible nonetheless. + beacon_state_root = *head_state.get_state_root(request_slot)?; } // The request is in a *later* epoch than the head state. Ordering::Greater => { @@ -1331,6 +1331,8 @@ impl BeaconChain { head_state_clone = Some(Box::new( head_state.clone_with(CloneConfig::committee_caches_only()), )); + // The beacon state being used is that of the head state. + beacon_state_root = head.beacon_state_root(); } // The request is in a *earlier* epoch than the head state. Ordering::Less => { @@ -1339,6 +1341,11 @@ impl BeaconChain { // The head state is not useful in this scenario, we must load an older one from // disk. head_state_clone = None; + // This state root will be loaded from the database. + // + // Use the `target_slot` here instead of the `slot` to avoid replaying blocks + // during the database read. + beacon_state_root = *head_state.get_state_root(target_slot)?; } } From 9e23360d1104fe2e32667c098b92fc3c30a9610f Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 28 Jul 2021 19:18:54 +1000 Subject: [PATCH 30/31] Revert "Fix state root, load target" This reverts commit cefab045e0618d46d0bc8a2319ad80ace49b8d56. --- beacon_node/beacon_chain/src/beacon_chain.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e7390e49138..c25ba2bd94c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1285,10 +1285,12 @@ impl BeaconChain { if request_slot >= head_state.slot() { // When attesting to the head slot or later, always use the head of the chain. beacon_block_root = head.beacon_block_root; + beacon_state_root = head.beacon_state_root(); } else { // Permit attesting to slots *prior* to the current head. This is desirable when // the VC and BN are out-of-sync due to time issues or overloading. beacon_block_root = *head_state.get_block_root(request_slot)?; + beacon_state_root = *head_state.get_state_root(request_slot)?; }; let target_slot = request_epoch.start_slot(T::EthSpec::slots_per_epoch()); @@ -1319,8 +1321,6 @@ impl BeaconChain { // There is no need to clone the head state, all required information has // already been obtained. head_state_clone = None; - // This value is not required, yet we set it to something sensible nonetheless. - beacon_state_root = *head_state.get_state_root(request_slot)?; } // The request is in a *later* epoch than the head state. Ordering::Greater => { @@ -1331,8 +1331,6 @@ impl BeaconChain { head_state_clone = Some(Box::new( head_state.clone_with(CloneConfig::committee_caches_only()), )); - // The beacon state being used is that of the head state. - beacon_state_root = head.beacon_state_root(); } // The request is in a *earlier* epoch than the head state. Ordering::Less => { @@ -1341,11 +1339,6 @@ impl BeaconChain { // The head state is not useful in this scenario, we must load an older one from // disk. head_state_clone = None; - // This state root will be loaded from the database. - // - // Use the `target_slot` here instead of the `slot` to avoid replaying blocks - // during the database read. - beacon_state_root = *head_state.get_state_root(target_slot)?; } } From e7c3008124db18f28a03f0f9a981b258173cd7a9 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 28 Jul 2021 19:19:07 +1000 Subject: [PATCH 31/31] Revert "Clone the head state" This reverts commit 8cb488efe5cd9fdd628bc6b7edb0cffd2ed980b7. --- .../beacon_chain/src/attester_cache.rs | 17 ++---- beacon_node/beacon_chain/src/beacon_chain.rs | 52 ++++++------------- 2 files changed, 19 insertions(+), 50 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 5ce0f591ccb..01662efc135 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -278,14 +278,10 @@ impl AttesterCache { Ok(()) } - /// Read the state identified by `state_root` from the database*, advance it to the required + /// Read the state identified by `state_root` from the database, advance it to the required /// slot, use it to prime the cache and return the values for the provided `slot` and /// `committee_index`. /// - /// *: The database read is avoided if `state_opt.is_some()`. - /// - /// If `state_opt.is_some()`, the `state_root` *must* match that state. - /// /// ## Notes /// /// This function takes a write-lock on the internal cache. Prefer attempting a `Self::get` call @@ -294,7 +290,6 @@ impl AttesterCache { pub fn load_and_cache_state( &self, state_root: Hash256, - state_opt: Option>, key: AttesterCacheKey, slot: Slot, committee_index: CommitteeIndex, @@ -321,13 +316,9 @@ impl AttesterCache { return Ok(value); } - let mut state = if let Some(state) = state_opt { - state - } else { - chain - .get_state(&state_root, None)? - .ok_or(Error::MissingBeaconState(state_root))? - }; + let mut state: BeaconState = chain + .get_state(&state_root, None)? + .ok_or(Error::MissingBeaconState(state_root))?; if state.slot() > slot { // This indicates an internal inconsistency. diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c25ba2bd94c..da21e264295 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1248,7 +1248,6 @@ impl BeaconChain { let beacon_state_root; let target; let current_epoch_attesting_info: Option<(Checkpoint, usize)>; - let head_state_clone: Option>>; let attester_cache_key; let head_timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_HEAD_SCRAPE_SECONDS); if let Some(head) = self.canonical_head.try_read_for(HEAD_LOCK_TIMEOUT) { @@ -1306,41 +1305,21 @@ impl BeaconChain { root: target_root, }; - match request_epoch.cmp(&head_state.current_epoch()) { - // The request is in the same epoch as the head state. - Ordering::Equal => { - // When the head state is in the same epoch as the request, all the information - // required to attest is available on the head state. - current_epoch_attesting_info = Some(( - head_state.current_justified_checkpoint(), - head_state - .get_beacon_committee(request_slot, request_index)? - .committee - .len(), - )); - // There is no need to clone the head state, all required information has - // already been obtained. - head_state_clone = None; - } - // The request is in a *later* epoch than the head state. - Ordering::Greater => { - // The justified checkpoint in the head state is not useful in this scenario. - current_epoch_attesting_info = None; - // The head state *is* useful in this this scenario because we can advance it - // into the required epoch. - head_state_clone = Some(Box::new( - head_state.clone_with(CloneConfig::committee_caches_only()), - )); - } - // The request is in a *earlier* epoch than the head state. - Ordering::Less => { - // The justified checkpoint in the head state is not useful in this scenario. - current_epoch_attesting_info = None; - // The head state is not useful in this scenario, we must load an older one from - // disk. - head_state_clone = None; - } - } + current_epoch_attesting_info = if head_state.current_epoch() == request_epoch { + // When the head state is in the same epoch as the request, all the information + // required to attest is available on the head state. + Some(( + head_state.current_justified_checkpoint(), + head_state + .get_beacon_committee(request_slot, request_index)? + .committee + .len(), + )) + } else { + // If the head state is in a *different* epoch to the request, more work is required + // to determine the justified checkpoint and committee length. + None + }; // Determine the key for `self.attester_cache`, in case it is required later in this // routine. @@ -1390,7 +1369,6 @@ impl BeaconChain { metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_CACHE_PRIME_SECONDS); self.attester_cache.load_and_cache_state( beacon_state_root, - head_state_clone.map(|boxed| *boxed), attester_cache_key, request_slot, request_index,