Skip to content

Commit

Permalink
Use a generic get_message_delay_ms function
Browse files Browse the repository at this point in the history
  • Loading branch information
pawanjay176 committed Aug 11, 2021
1 parent a7ef51a commit 1613888
Showing 1 changed file with 43 additions and 72 deletions.
115 changes: 43 additions & 72 deletions beacon_node/beacon_chain/src/validator_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::marker::PhantomData;
use std::str::Utf8Error;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use types::{
AttestationData, AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec,
Hash256, IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof,
AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec, Hash256,
IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof,
SignedContributionAndProof, Slot, SyncCommitteeMessage, VoluntaryExit,
};

Expand Down Expand Up @@ -633,22 +633,6 @@ impl<T: EthSpec> ValidatorMonitor<T> {
}
}

/// Returns the duration between when the attestation `data` could be produced (1/3rd through
/// the slot) and `seen_timestamp`.
fn get_unaggregated_attestation_delay_ms<S: SlotClock>(
seen_timestamp: Duration,
data: &AttestationData,
slot_clock: &S,
) -> Duration {
slot_clock
.start_of(data.slot)
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.and_then(|gross_delay| {
gross_delay.checked_sub(slot_clock.unagg_attestation_production_delay())
})
.unwrap_or_else(|| Duration::from_secs(0))
}

/// Register an attestation seen on the gossip network.
pub fn register_gossip_unaggregated_attestation<S: SlotClock>(
&self,
Expand Down Expand Up @@ -688,7 +672,12 @@ impl<T: EthSpec> ValidatorMonitor<T> {
) {
let data = &indexed_attestation.data;
let epoch = data.slot.epoch(T::slots_per_epoch());
let delay = Self::get_unaggregated_attestation_delay_ms(seen_timestamp, data, slot_clock);
let delay = get_message_delay_ms(
seen_timestamp,
data.slot,
slot_clock.unagg_attestation_production_delay(),
slot_clock,
);

indexed_attestation.attesting_indices.iter().for_each(|i| {
if let Some(validator) = self.get_validator(*i) {
Expand Down Expand Up @@ -723,22 +712,6 @@ impl<T: EthSpec> ValidatorMonitor<T> {
})
}

/// Returns the duration between when a `AggregateAndproof` with `data` could be produced (2/3rd
/// through the slot) and `seen_timestamp`.
fn get_aggregated_attestation_delay_ms<S: SlotClock>(
seen_timestamp: Duration,
data: &AttestationData,
slot_clock: &S,
) -> Duration {
slot_clock
.start_of(data.slot)
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.and_then(|gross_delay| {
gross_delay.checked_sub(slot_clock.agg_attestation_production_delay())
})
.unwrap_or_else(|| Duration::from_secs(0))
}

/// Register a `signed_aggregate_and_proof` seen on the gossip network.
pub fn register_gossip_aggregated_attestation<S: SlotClock>(
&self,
Expand Down Expand Up @@ -783,7 +756,12 @@ impl<T: EthSpec> ValidatorMonitor<T> {
) {
let data = &indexed_attestation.data;
let epoch = data.slot.epoch(T::slots_per_epoch());
let delay = Self::get_aggregated_attestation_delay_ms(seen_timestamp, data, slot_clock);
let delay = get_message_delay_ms(
seen_timestamp,
data.slot,
slot_clock.agg_attestation_production_delay(),
slot_clock,
);

let aggregator_index = signed_aggregate_and_proof.message.aggregator_index;
if let Some(validator) = self.get_validator(aggregator_index) {
Expand Down Expand Up @@ -892,22 +870,6 @@ impl<T: EthSpec> ValidatorMonitor<T> {
})
}

/// Returns the duration between when the sync_committee message could be produced (1/3rd through
/// the slot) and `seen_timestamp`.
fn get_sync_committee_message_delay_ms<S: SlotClock>(
seen_timestamp: Duration,
data: &SyncCommitteeMessage,
slot_clock: &S,
) -> Duration {
slot_clock
.start_of(data.slot)
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.and_then(|gross_delay| {
gross_delay.checked_sub(slot_clock.sync_committee_message_production_delay())
})
.unwrap_or_else(|| Duration::from_secs(0))
}

/// Register a sync committee message received over gossip.
pub fn register_gossip_sync_committee_message<S: SlotClock>(
&self,
Expand Down Expand Up @@ -950,9 +912,10 @@ impl<T: EthSpec> ValidatorMonitor<T> {
let id = &validator.id;

let epoch = sync_committee_message.slot.epoch(T::slots_per_epoch());
let delay = Self::get_sync_committee_message_delay_ms(
let delay = get_message_delay_ms(
seen_timestamp,
sync_committee_message,
sync_committee_message.slot,
slot_clock.sync_committee_message_production_delay(),
slot_clock,
);

Expand Down Expand Up @@ -983,22 +946,6 @@ impl<T: EthSpec> ValidatorMonitor<T> {
}
}

/// Returns the duration between when the sync_committee message could be produced (1/3rd through
/// the slot) and `seen_timestamp`.
fn get_sync_contribution_delay_ms<S: SlotClock>(
seen_timestamp: Duration,
data: &SignedContributionAndProof<T>,
slot_clock: &S,
) -> Duration {
slot_clock
.start_of(data.message.contribution.slot)
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.and_then(|gross_delay| {
gross_delay.checked_sub(slot_clock.sync_committee_contribution_production_delay())
})
.unwrap_or_else(|| Duration::from_secs(0))
}

/// Register a sync committee contribution received over gossip.
pub fn register_gossip_sync_committee_contribution<S: SlotClock>(
&self,
Expand Down Expand Up @@ -1045,8 +992,12 @@ impl<T: EthSpec> ValidatorMonitor<T> {
let slot = sync_contribution.message.contribution.slot;
let epoch = slot.epoch(T::slots_per_epoch());
let beacon_block_root = sync_contribution.message.contribution.beacon_block_root;
let delay =
Self::get_sync_contribution_delay_ms(seen_timestamp, sync_contribution, slot_clock);
let delay = get_message_delay_ms(
seen_timestamp,
slot,
slot_clock.sync_committee_contribution_production_delay(),
slot_clock,
);

let aggregator_index = sync_contribution.message.aggregator_index;
if let Some(validator) = self.get_validator(aggregator_index) {
Expand Down Expand Up @@ -1461,3 +1412,23 @@ pub fn get_slot_delay_ms<S: SlotClock>(
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.unwrap_or_else(|| Duration::from_secs(0))
}

/// Returns the duration between when any message could be produced and the `seen_timestamp`.
///
/// `message_production_delay` is the duration from the beginning of the slot when the message
/// should be produced.
/// e.g. for unagg attestations, `message_production_delay = slot_duration / 3`.
///
/// `slot` is the slot for which the message was produced.
fn get_message_delay_ms<S: SlotClock>(
seen_timestamp: Duration,
slot: Slot,
message_production_delay: Duration,
slot_clock: &S,
) -> Duration {
slot_clock
.start_of(slot)
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.and_then(|gross_delay| gross_delay.checked_sub(message_production_delay))
.unwrap_or_else(|| Duration::from_secs(0))
}

0 comments on commit 1613888

Please sign in to comment.