Skip to content

Commit

Permalink
Optimize process_attestation with active balance cache (#2560)
Browse files Browse the repository at this point in the history
## Proposed Changes

Cache the total active balance for the current epoch in the `BeaconState`. Computing this value takes around 1ms, and this was negatively impacting block processing times on Prater, particularly when reconstructing states.

With a large number of attestations in each block, I saw the `process_attestations` function taking 150ms, which means that reconstructing hot states can take up to 4.65s (31 * 150ms), and reconstructing freezer states can take up to 307s (2047 * 150ms).

I opted to add the cache to the beacon state rather than computing the total active balance at the start of state processing and threading it through. Although this would be simpler in a way, it would waste time, particularly during block replay, as the total active balance doesn't change for the duration of an epoch. So we save ~32ms for hot states, and up to 8.1s for freezer states (using `--slots-per-restore-point 8192`).
  • Loading branch information
michaelsproul committed Sep 3, 2021
1 parent f4aa1d8 commit 9c785a9
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 24 deletions.
14 changes: 4 additions & 10 deletions beacon_node/operation_pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use std::ptr;
use types::{
sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, Attestation, AttesterSlashing,
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, ForkVersion, Hash256,
ProposerSlashing, RelativeEpoch, SignedVoluntaryExit, Slot, SyncAggregate,
SyncCommitteeContribution, Validator,
ProposerSlashing, SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution,
Validator,
};

type SyncContributions<T> = RwLock<HashMap<SyncAggregateId, Vec<SyncCommitteeContribution<T>>>>;
Expand Down Expand Up @@ -259,11 +259,8 @@ impl<T: EthSpec> OperationPool<T> {
let prev_epoch = state.previous_epoch();
let current_epoch = state.current_epoch();
let all_attestations = self.attestations.read();
let active_indices = state
.get_cached_active_validator_indices(RelativeEpoch::Current)
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
let total_active_balance = state
.get_total_balance(active_indices, spec)
.get_total_active_balance()
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;

// Split attestations for the previous & current epochs, so that we
Expand Down Expand Up @@ -1143,10 +1140,7 @@ mod release_tests {
.expect("should have valid best attestations");
assert_eq!(best_attestations.len(), max_attestations);

let active_indices = state
.get_cached_active_validator_indices(RelativeEpoch::Current)
.unwrap();
let total_active_balance = state.get_total_balance(active_indices, spec).unwrap();
let total_active_balance = state.get_total_active_balance().unwrap();

// Set of indices covered by previous attestations in `best_attestations`.
let mut seen_indices = BTreeSet::new();
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/src/partial_beacon_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ macro_rules! impl_try_into_beacon_state {
finalized_checkpoint: $inner.finalized_checkpoint,

// Caching
total_active_balance: <_>::default(),
committee_caches: <_>::default(),
pubkey_cache: <_>::default(),
exit_cache: <_>::default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub fn process_sync_aggregate<T: EthSpec>(
}

// Compute participant and proposer rewards
let total_active_balance = state.get_total_active_balance(spec)?;
let total_active_balance = state.get_total_active_balance()?;
let total_active_increments =
total_active_balance.safe_div(spec.effective_balance_increment)?;
let total_base_rewards = get_base_reward_per_increment(total_active_balance, spec)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub mod altair {
get_attestation_participation_flag_indices(state, data, inclusion_delay, spec)?;

// Update epoch participation flags.
let total_active_balance = state.get_total_active_balance(spec)?;
let total_active_balance = state.get_total_active_balance()?;
let mut proposer_reward_numerator = 0;
for index in &indexed_attestation.attesting_indices {
let index = *index as usize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub fn process_epoch<T: EthSpec>(
process_sync_committee_updates(state, spec)?;

// Rotate the epoch caches to suit the epoch transition.
state.advance_caches()?;
state.advance_caches(spec)?;

Ok(EpochProcessingSummary::Altair {
participation_cache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn process_epoch<T: EthSpec>(
process_participation_record_updates(state)?;

// Rotate the epoch caches to suit the epoch transition.
state.advance_caches()?;
state.advance_caches(spec)?;

Ok(EpochProcessingSummary::Base {
total_balances: validator_statuses.total_balances,
Expand Down
1 change: 1 addition & 0 deletions consensus/state_processing/src/upgrade/altair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub fn upgrade_to_altair<E: EthSpec>(
current_sync_committee: temp_sync_committee.clone(), // not read
next_sync_committee: temp_sync_committee, // not read
// Caches
total_active_balance: pre.total_active_balance,
committee_caches: mem::take(&mut pre.committee_caches),
pubkey_cache: mem::take(&mut pre.pubkey_cache),
exit_cache: mem::take(&mut pre.exit_cache),
Expand Down
95 changes: 86 additions & 9 deletions consensus/types/src/beacon_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ pub enum Error {
},
PreviousCommitteeCacheUninitialized,
CurrentCommitteeCacheUninitialized,
TotalActiveBalanceCacheUninitialized,
TotalActiveBalanceCacheInconsistent {
initialized_epoch: Epoch,
current_epoch: Epoch,
},
RelativeEpochError(RelativeEpochError),
ExitCacheUninitialized,
CommitteeCacheUninitialized(Option<RelativeEpoch>),
Expand Down Expand Up @@ -275,6 +280,13 @@ where
#[tree_hash(skip_hashing)]
#[test_random(default)]
#[derivative(Clone(clone_with = "clone_default"))]
pub total_active_balance: Option<(Epoch, u64)>,
#[serde(skip_serializing, skip_deserializing)]
#[ssz(skip_serializing)]
#[ssz(skip_deserializing)]
#[tree_hash(skip_hashing)]
#[test_random(default)]
#[derivative(Clone(clone_with = "clone_default"))]
pub committee_caches: [CommitteeCache; CACHED_EPOCHS],
#[serde(skip_serializing, skip_deserializing)]
#[ssz(skip_serializing)]
Expand Down Expand Up @@ -353,6 +365,7 @@ impl<T: EthSpec> BeaconState<T> {
finalized_checkpoint: Checkpoint::default(),

// Caching (not in spec)
total_active_balance: None,
committee_caches: [
CommitteeCache::default(),
CommitteeCache::default(),
Expand Down Expand Up @@ -1226,12 +1239,45 @@ impl<T: EthSpec> BeaconState<T> {
}

/// Implementation of `get_total_active_balance`, matching the spec.
pub fn get_total_active_balance(&self, spec: &ChainSpec) -> Result<u64, Error> {
///
/// Requires the total active balance cache to be initialised, which is initialised whenever
/// the current committee cache is.
///
/// Returns minimum `EFFECTIVE_BALANCE_INCREMENT`, to avoid div by 0.
pub fn get_total_active_balance(&self) -> Result<u64, Error> {
let (initialized_epoch, balance) = self
.total_active_balance()
.ok_or(Error::TotalActiveBalanceCacheUninitialized)?;

let current_epoch = self.current_epoch();
if initialized_epoch == current_epoch {
Ok(balance)
} else {
Err(Error::TotalActiveBalanceCacheInconsistent {
initialized_epoch,
current_epoch,
})
}
}

/// Build the total active balance cache.
///
/// This function requires the current committee cache to be already built. It is called
/// automatically when `build_committee_cache` is called for the current epoch.
fn build_total_active_balance_cache(&mut self, spec: &ChainSpec) -> Result<(), Error> {
// Order is irrelevant, so use the cached indices.
self.get_total_balance(
let current_epoch = self.current_epoch();
let total_active_balance = self.get_total_balance(
self.get_cached_active_validator_indices(RelativeEpoch::Current)?,
spec,
)
)?;
*self.total_active_balance_mut() = Some((current_epoch, total_active_balance));
Ok(())
}

/// Set the cached total active balance to `None`, representing no known value.
pub fn drop_total_active_balance_cache(&mut self) {
*self.total_active_balance_mut() = None;
}

/// Get a mutable reference to the epoch participation flags for `epoch`.
Expand Down Expand Up @@ -1294,6 +1340,7 @@ impl<T: EthSpec> BeaconState<T> {

/// Drop all caches on the state.
pub fn drop_all_caches(&mut self) -> Result<(), Error> {
self.drop_total_active_balance_cache();
self.drop_committee_cache(RelativeEpoch::Previous)?;
self.drop_committee_cache(RelativeEpoch::Current)?;
self.drop_committee_cache(RelativeEpoch::Next)?;
Expand Down Expand Up @@ -1323,11 +1370,14 @@ impl<T: EthSpec> BeaconState<T> {
.committee_cache_at_index(i)?
.is_initialized_at(relative_epoch.into_epoch(self.current_epoch()));

if is_initialized {
Ok(())
} else {
self.force_build_committee_cache(relative_epoch, spec)
if !is_initialized {
self.force_build_committee_cache(relative_epoch, spec)?;
}

if self.total_active_balance().is_none() && relative_epoch == RelativeEpoch::Current {
self.build_total_active_balance_cache(spec)?;
}
Ok(())
}

/// Always builds the previous epoch cache, even if it is already initialized.
Expand Down Expand Up @@ -1359,10 +1409,36 @@ impl<T: EthSpec> BeaconState<T> {
///
/// This should be used if the `slot` of this state is advanced beyond an epoch boundary.
///
/// Note: whilst this function will preserve already-built caches, it will not build any.
pub fn advance_caches(&mut self) -> Result<(), Error> {
/// Note: this function will not build any new committee caches, but will build the total
/// balance cache if the (new) current epoch cache is initialized.
pub fn advance_caches(&mut self, spec: &ChainSpec) -> Result<(), Error> {
self.committee_caches_mut().rotate_left(1);

// Re-compute total active balance for current epoch.
//
// This can only be computed once the state's effective balances have been updated
// for the current epoch. I.e. it is not possible to know this value with the same
// lookahead as the committee shuffling.
let curr = Self::committee_cache_index(RelativeEpoch::Current);
let curr_cache = mem::take(self.committee_cache_at_index_mut(curr)?);

// If current epoch cache is initialized, compute the total active balance from its
// indices. We check that the cache is initialized at the _next_ epoch because the slot has
// not yet been advanced.
let new_current_epoch = self.next_epoch()?;
if curr_cache.is_initialized_at(new_current_epoch) {
*self.total_active_balance_mut() = Some((
new_current_epoch,
self.get_total_balance(curr_cache.active_validator_indices(), spec)?,
));
}
// If the cache is not initialized, then the previous cached value for the total balance is
// wrong, so delete it.
else {
self.drop_total_active_balance_cache();
}
*self.committee_cache_at_index_mut(curr)? = curr_cache;

let next = Self::committee_cache_index(RelativeEpoch::Next);
*self.committee_cache_at_index_mut(next)? = CommitteeCache::default();
Ok(())
Expand Down Expand Up @@ -1504,6 +1580,7 @@ impl<T: EthSpec> BeaconState<T> {
};
if config.committee_caches {
*res.committee_caches_mut() = self.committee_caches().clone();
*res.total_active_balance_mut() = *self.total_active_balance();
}
if config.pubkey_cache {
*res.pubkey_cache_mut() = self.pubkey_cache().clone();
Expand Down
3 changes: 3 additions & 0 deletions consensus/types/src/beacon_state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ fn test_clone_config<E: EthSpec>(base_state: &BeaconState<E>, clone_config: Clon
state
.committee_cache(RelativeEpoch::Next)
.expect("committee cache exists");
state
.total_active_balance()
.expect("total active balance exists");
} else {
state
.committee_cache(RelativeEpoch::Previous)
Expand Down
2 changes: 1 addition & 1 deletion testing/ef_tests/src/cases/rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl<E: EthSpec> Case for RewardsTest<E> {

Ok(convert_all_base_deltas(&deltas))
} else {
let total_active_balance = state.get_total_active_balance(spec)?;
let total_active_balance = state.get_total_active_balance()?;

let source_deltas = compute_altair_flag_deltas(
&state,
Expand Down

0 comments on commit 9c785a9

Please sign in to comment.