Skip to content

Commit

Permalink
Remove duplicate checks for gossiped aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Oct 28, 2021
1 parent 2dc6163 commit 03f16e4
Show file tree
Hide file tree
Showing 18 changed files with 153 additions and 747 deletions.
57 changes: 10 additions & 47 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ mod batch;
use crate::{
beacon_chain::{MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT},
metrics,
observed_aggregates::ObserveOutcome,
observed_attesters::Error as ObservedAttestersError,
BeaconChain, BeaconChainError, BeaconChainTypes,
};
Expand All @@ -50,7 +49,6 @@ use state_processing::{
};
use std::borrow::Cow;
use strum::AsRefStr;
use tree_hash::TreeHash;
use types::{
Attestation, BeaconCommittee, CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation,
SelectionProof, SignedAggregateAndProof, Slot, SubnetId,
Expand Down Expand Up @@ -112,14 +110,6 @@ pub enum Error {
///
/// The peer has sent an invalid message.
AggregatorPubkeyUnknown(u64),
/// The attestation has been seen before; either in a block, on the gossip network or from a
/// local validator.
///
/// ## Peer scoring
///
/// It's unclear if this attestation is valid, however we have already observed it and do not
/// need to observe it again.
AttestationAlreadyKnown(Hash256),
/// There has already been an aggregation observed for this validator, we refuse to process a
/// second.
///
Expand Down Expand Up @@ -273,7 +263,6 @@ enum CheckAttestationSignature {
struct IndexedAggregatedAttestation<'a, T: BeaconChainTypes> {
signed_aggregate: &'a SignedAggregateAndProof<T::EthSpec>,
indexed_attestation: IndexedAttestation<T::EthSpec>,
attestation_root: Hash256,
}

/// Wraps a `Attestation` that has been verified up until the point that an `IndexedAttestation` can
Expand Down Expand Up @@ -387,8 +376,10 @@ fn process_slash_info<T: BeaconChainTypes>(
debug!(
chain.log,
"Unable to obtain indexed form of attestation for slasher";
"attestation_root" => format!("{:?}", attestation.tree_hash_root()),
"error" => format!("{:?}", e)
"slot" => attestation.data.slot,
"committee_index" => attestation.data.index,
"target_root" => ?attestation.data.target.root,
"error" => ?e
);
return err;
}
Expand Down Expand Up @@ -445,7 +436,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
fn verify_early_checks(
signed_aggregate: &SignedAggregateAndProof<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<Hash256, Error> {
) -> Result<(), Error> {
let attestation = &signed_aggregate.message.aggregate;

// Ensure attestation is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (within a
Expand All @@ -464,17 +455,6 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
});
}

// Ensure the valid aggregated attestation has not already been seen locally.
let attestation_root = attestation.tree_hash_root();
if chain
.observed_attestations
.write()
.is_known(attestation, attestation_root)
.map_err(|e| Error::BeaconChainError(e.into()))?
{
return Err(Error::AttestationAlreadyKnown(attestation_root));
}

let aggregator_index = signed_aggregate.message.aggregator_index;

// Ensure there has been no other observed aggregate for the given `aggregator_index`.
Expand Down Expand Up @@ -518,7 +498,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
if attestation.aggregation_bits.is_zero() {
Err(Error::EmptyAggregationBitfield)
} else {
Ok(attestation_root)
Ok(())
}
}

Expand All @@ -531,10 +511,9 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {

let attestation = &signed_aggregate.message.aggregate;
let aggregator_index = signed_aggregate.message.aggregator_index;
let attestation_root = match Self::verify_early_checks(signed_aggregate, chain) {
Ok(root) => root,
Err(e) => return Err(SignatureNotChecked(&signed_aggregate.message.aggregate, e)),
};
if let Err(e) = Self::verify_early_checks(signed_aggregate, chain) {
return Err(SignatureNotChecked(&signed_aggregate.message.aggregate, e));
}

let indexed_attestation =
match map_attestation_committee(chain, attestation, |(committee, _)| {
Expand Down Expand Up @@ -566,7 +545,6 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
Ok(IndexedAggregatedAttestation {
signed_aggregate,
indexed_attestation,
attestation_root,
})
}
}
Expand All @@ -575,25 +553,11 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> {
/// Run the checks that happen after the indexed attestation and signature have been checked.
fn verify_late_checks(
signed_aggregate: &SignedAggregateAndProof<T::EthSpec>,
attestation_root: Hash256,
chain: &BeaconChain<T>,
) -> Result<(), Error> {
let attestation = &signed_aggregate.message.aggregate;
let aggregator_index = signed_aggregate.message.aggregator_index;

// Observe the valid attestation so we do not re-process it.
//
// It's important to double check that the attestation is not already known, otherwise two
// attestations processed at the same time could be published.
if let ObserveOutcome::AlreadyKnown = chain
.observed_attestations
.write()
.observe_item(attestation, Some(attestation_root))
.map_err(|e| Error::BeaconChainError(e.into()))?
{
return Err(Error::AttestationAlreadyKnown(attestation_root));
}

// Observe the aggregator so we don't process another aggregate from them.
//
// It's important to double check that the attestation is not already known, otherwise two
Expand Down Expand Up @@ -651,7 +615,6 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> {
let IndexedAggregatedAttestation {
signed_aggregate,
indexed_attestation,
attestation_root,
} = signed_aggregate;

match check_signature {
Expand All @@ -675,7 +638,7 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> {
CheckAttestationSignature::No => (),
};

if let Err(e) = Self::verify_late_checks(signed_aggregate, attestation_root, chain) {
if let Err(e) = Self::verify_late_checks(signed_aggregate, chain) {
return Err(SignatureValid(indexed_attestation, e));
}

Expand Down
25 changes: 0 additions & 25 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ use crate::naive_aggregation_pool::{
AggregatedAttestationMap, Error as NaiveAggregationError, NaiveAggregationPool,
SyncContributionAggregateMap,
};
use crate::observed_aggregates::{
Error as AttestationObservationError, ObservedAggregateAttestations, ObservedSyncContributions,
};
use crate::observed_attesters::{
ObservedAggregators, ObservedAttesters, ObservedSyncAggregators, ObservedSyncContributors,
};
Expand Down Expand Up @@ -246,10 +243,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// a method to get an aggregated `SyncCommitteeContribution` for some `SyncCommitteeContributionData`.
pub naive_sync_aggregation_pool:
RwLock<NaiveAggregationPool<SyncContributionAggregateMap<T::EthSpec>>>,
/// Contains a store of attestations which have been observed by the beacon chain.
pub(crate) observed_attestations: RwLock<ObservedAggregateAttestations<T::EthSpec>>,
/// Contains a store of sync contributions which have been observed by the beacon chain.
pub(crate) observed_sync_contributions: RwLock<ObservedSyncContributions<T::EthSpec>>,
/// Maintains a record of which validators have been seen to publish gossip attestations in
/// recent epochs.
pub observed_gossip_attesters: RwLock<ObservedAttesters<T::EthSpec>>,
Expand Down Expand Up @@ -2289,24 +2282,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());
let mut ops = fully_verified_block.confirmation_db_batch;

let attestation_observation_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION);

// Iterate through the attestations in the block and register them as an "observed
// attestation". This will stop us from propagating them on the gossip network.
for a in signed_block.message().body().attestations() {
match self.observed_attestations.write().observe_item(a, None) {
// If the observation was successful or if the slot for the attestation was too
// low, continue.
//
// We ignore `SlotTooLow` since this will be very common whilst syncing.
Ok(_) | Err(AttestationObservationError::SlotTooLow { .. }) => {}
Err(e) => return Err(BlockError::BeaconChainError(e.into())),
}
}

metrics::stop_timer(attestation_observation_timer);

// If a slasher is configured, provide the attestations from the block.
if let Some(slasher) = self.slasher.as_ref() {
for attestation in signed_block.message().body().attestations() {
Expand Down
4 changes: 0 additions & 4 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,10 +717,6 @@ where
// TODO: allow for persisting and loading the pool from disk.
naive_sync_aggregation_pool: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_attestations: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_sync_contributions: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_gossip_attesters: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_block_attesters: <_>::default(),
Expand Down
3 changes: 0 additions & 3 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::eth1_chain::Error as Eth1ChainError;
use crate::historical_blocks::HistoricalBlockError;
use crate::migrate::PruningError;
use crate::naive_aggregation_pool::Error as NaiveAggregationError;
use crate::observed_aggregates::Error as ObservedAttestationsError;
use crate::observed_attesters::Error as ObservedAttestersError;
use crate::observed_block_producers::Error as ObservedBlockProducersError;
use futures::channel::mpsc::TrySendError;
Expand Down Expand Up @@ -91,7 +90,6 @@ pub enum BeaconChainError {
ValidatorPubkeyUnknown(PublicKeyBytes),
OpPoolError(OpPoolError),
NaiveAggregationError(NaiveAggregationError),
ObservedAttestationsError(ObservedAttestationsError),
ObservedAttestersError(ObservedAttestersError),
ObservedBlockProducersError(ObservedBlockProducersError),
AttesterCacheError(AttesterCacheError),
Expand Down Expand Up @@ -144,7 +142,6 @@ easy_from_to!(AttesterSlashingValidationError, BeaconChainError);
easy_from_to!(SszTypesError, BeaconChainError);
easy_from_to!(OpPoolError, BeaconChainError);
easy_from_to!(NaiveAggregationError, BeaconChainError);
easy_from_to!(ObservedAttestationsError, BeaconChainError);
easy_from_to!(ObservedAttestersError, BeaconChainError);
easy_from_to!(ObservedBlockProducersError, BeaconChainError);
easy_from_to!(AttesterCacheError, BeaconChainError);
Expand Down
1 change: 0 additions & 1 deletion beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub mod historical_blocks;
mod metrics;
pub mod migrate;
mod naive_aggregation_pool;
mod observed_aggregates;
mod observed_attesters;
mod observed_block_producers;
pub mod observed_operations;
Expand Down
4 changes: 0 additions & 4 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ lazy_static! {
"beacon_block_processing_db_write_seconds",
"Time spent writing a newly processed block and state to DB"
);
pub static ref BLOCK_PROCESSING_ATTESTATION_OBSERVATION: Result<Histogram> = try_create_histogram(
"beacon_block_processing_attestation_observation_seconds",
"Time spent hashing and remembering all the attestations in the block"
);
pub static ref BLOCK_SYNC_AGGREGATE_SET_BITS: Result<IntGauge> = try_create_int_gauge(
"block_sync_aggregate_set_bits",
"The number of true bits in the last sync aggregate in a block"
Expand Down
Loading

0 comments on commit 03f16e4

Please sign in to comment.