Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify and lower state caches #5313

Closed
wants to merge 10 commits into from
6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ merkle_proof = { workspace = true }
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
operation_pool = { workspace = true }
parking_lot = { workspace = true }
promise_cache = { path = "../../common/promise_cache" }
proto_array = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
Expand Down
241 changes: 69 additions & 172 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,9 @@ use crate::observed_blob_sidecars::ObservedBlobSidecars;
use crate::observed_block_producers::ObservedBlockProducers;
use crate::observed_operations::{ObservationOutcome, ObservedOperations};
use crate::observed_slashable::ObservedSlashable;
use crate::parallel_state_cache::ParallelStateCache;
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::pre_finalization_cache::PreFinalizationBlockCache;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::sync_committee_verification::{
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
};
Expand Down Expand Up @@ -130,10 +128,6 @@ pub type ForkChoiceError = fork_choice::Error<crate::ForkChoiceStoreError>;
/// Alias to appease clippy.
type HashBlockTuple<E> = (Hash256, RpcBlock<E>);

/// The time-out before failure during an operation to take a read/write RwLock on the
/// attestation cache.
pub const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);

/// The time-out before failure during an operation to take a read/write RwLock on the
/// validator pubkey cache.
pub const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -445,8 +439,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
/// Used to track the heads of the beacon chain.
pub(crate) head_tracker: Arc<HeadTracker>,
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub shuffling_cache: TimeoutRwLock<ShufflingCache>,
/// A cache of eth1 deposit data at epoch boundaries for deposit finalization
pub eth1_finalization_cache: TimeoutRwLock<Eth1FinalizationCache>,
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
Expand All @@ -461,10 +453,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
/// A cache used to track pre-finalization block roots for quick rejection.
pub pre_finalization_block_cache: PreFinalizationBlockCache,
/// A cache used to de-duplicate HTTP state requests.
///
/// The cache is keyed by `state_root`.
pub parallel_state_cache: Arc<RwLock<ParallelStateCache<T::EthSpec>>>,
/// A cache used to produce light_client server messages
pub light_client_server_cache: LightClientServerCache<T>,
/// Sender to signal the light_client server to produce new updates
Expand Down Expand Up @@ -3398,7 +3386,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// corrupt the node's database permanently.
// -----------------------------------------------------------------------------------------

self.import_block_update_shuffling_cache(block_root, &mut state);
self.import_block_observe_attestations(
block,
&state,
Expand Down Expand Up @@ -3885,48 +3872,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
fn import_block_update_shuffling_cache(
&self,
block_root: Hash256,
state: &mut BeaconState<T::EthSpec>,
) {
if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) {
warn!(
self.log,
"Failed to prime shuffling cache";
"error" => ?e
);
}
}

fn import_block_update_shuffling_cache_fallible(
&self,
block_root: Hash256,
state: &mut BeaconState<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> {
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;

let shuffling_is_cached = self
.shuffling_cache
.try_read_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.contains(&shuffling_id);

if !shuffling_is_cached {
state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(relative_epoch)?;
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.insert_value(shuffling_id, committee_cache);
}
}
Ok(())
}

#[allow(clippy::too_many_arguments)]
fn import_block_update_deposit_contract_finalization(
&self,
Expand Down Expand Up @@ -6069,7 +6014,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get_block(&head_block_root)
.ok_or(Error::MissingBeaconBlock(head_block_root))?;

let shuffling_id = BlockShufflingIds {
let shuffling_id = shuffling_id::BlockShufflingIds {
current: head_block.current_epoch_shuffling_id.clone(),
next: head_block.next_epoch_shuffling_id.clone(),
previous: None,
Expand All @@ -6081,135 +6026,87 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
head_block_epoch: head_block.slot.epoch(T::EthSpec::slots_per_epoch()),
})?;

// Obtain the shuffling cache, timing how long we wait.
let cache_wait_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES);

let mut shuffling_cache = self
.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?;

metrics::stop_timer(cache_wait_timer);

if let Some(cache_item) = shuffling_cache.get(&shuffling_id) {
// The shuffling cache is no longer required, drop the write-lock to allow concurrent
// access.
drop(shuffling_cache);

let committee_cache = cache_item.wait().map_err(Error::ShufflingCacheError)?;
map_fn(&committee_cache, shuffling_id.shuffling_decision_block)
} else {
// Create an entry in the cache that "promises" this value will eventually be computed.
// This avoids the case where multiple threads attempt to produce the same value at the
// same time.
//
// Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same
// promise from being created twice.
let sender = shuffling_cache
.create_promise(shuffling_id.clone())
.map_err(Error::ShufflingCacheError)?;

// Drop the shuffling cache to avoid holding the lock for any longer than
// required.
drop(shuffling_cache);

debug!(
self.log,
"Committee cache miss";
"shuffling_id" => ?shuffling_epoch,
"head_block_root" => head_block_root.to_string(),
);

// If the block's state will be so far ahead of `shuffling_epoch` that even its
// previous epoch committee cache will be too new, then error. Callers of this function
// shouldn't be requesting such old shufflings for this `head_block_root`.
let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch());
if head_block_epoch > shuffling_epoch + 1 {
return Err(Error::InvalidStateForShuffling {
state_epoch: head_block_epoch,
shuffling_epoch,
});
}

let state_read_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES);

// If the head of the chain can serve this request, use it.
//
// This code is a little awkward because we need to ensure that the head we read and
// the head we copy is identical. Taking one lock to read the head values and another
// to copy the head is liable to race-conditions.
let head_state_opt = self.with_head(|head| {
if head.beacon_block_root == head_block_root {
Ok(Some((head.beacon_state.clone(), head.beacon_state_root())))
} else {
Ok::<_, Error>(None)
}
})?;
// If the block's state will be so far ahead of `shuffling_epoch` that even its
// previous epoch committee cache will be too new, then error. Callers of this function
// shouldn't be requesting such old shufflings for this `head_block_root`.
let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch());
if head_block_epoch > shuffling_epoch + 1 {
return Err(Error::InvalidStateForShuffling {
state_epoch: head_block_epoch,
shuffling_epoch,
});
}

// Compute the `target_slot` to advance the block's state to.
//
// Since there's a one-epoch look-ahead on the attester shuffling, it suffices to
// only advance into the first slot of the epoch prior to `shuffling_epoch`.
//
// If the `head_block` is already ahead of that slot, then we should load the state
// at that slot, as we've determined above that the `shuffling_epoch` cache will
// not be too far in the past.
let target_slot = std::cmp::max(
shuffling_epoch
.saturating_sub(1_u64)
.start_slot(T::EthSpec::slots_per_epoch()),
head_block.slot,
);
let state_read_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES);

// If the head state is useful for this request, use it. Otherwise, read a state from
// disk that is advanced as close as possible to `target_slot`.
let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt {
(state, state_root)
// If the head of the chain can serve this request, use it.
//
// This code is a little awkward because we need to ensure that the head we read and
// the head we copy is identical. Taking one lock to read the head values and another
// to copy the head is liable to race-conditions.
let head_state_opt = self.with_head(|head| {
if head.beacon_block_root == head_block_root {
Ok(Some((head.beacon_state.clone(), head.beacon_state_root())))
} else {
let (state_root, state) = self
.store
.get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)?
.ok_or(Error::MissingBeaconState(head_block.state_root))?;
(state, state_root)
};
Ok::<_, Error>(None)
}
})?;

metrics::stop_timer(state_read_timer);
let state_skip_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES);
// Compute the `target_slot` to advance the block's state to.
//
// Since there's a one-epoch look-ahead on the attester shuffling, it suffices to
// only advance into the first slot of the epoch prior to `shuffling_epoch`.
//
// If the `head_block` is already ahead of that slot, then we should load the state
// at that slot, as we've determined above that the `shuffling_epoch` cache will
// not be too far in the past.
let target_slot = std::cmp::max(
shuffling_epoch
.saturating_sub(1_u64)
.start_slot(T::EthSpec::slots_per_epoch()),
head_block.slot,
);

// If the state is still in an earlier epoch, advance it to the `target_slot` so
// that its next epoch committee cache matches the `shuffling_epoch`.
if state.current_epoch() + 1 < shuffling_epoch {
// Advance the state into the required slot, using the "partial" method since the
// state roots are not relevant for the shuffling.
partial_state_advance(&mut state, Some(state_root), target_slot, &self.spec)?;
}
metrics::stop_timer(state_skip_timer);
// If the head state is useful for this request, use it. Otherwise, read a state from
// disk that is advanced as close as possible to `target_slot`.
let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt {
(state, state_root)
} else {
let (state_root, state) = self
.store
.get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)?
.ok_or(Error::MissingBeaconState(head_block.state_root))?;
(state, state_root)
};

let committee_building_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES);
metrics::stop_timer(state_read_timer);
let state_skip_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES);

let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch)
.map_err(Error::IncorrectStateForAttestation)?;
// If the state is still in an earlier epoch, advance it to the `target_slot` so
// that its next epoch committee cache matches the `shuffling_epoch`.
if state.current_epoch() + 1 < shuffling_epoch {
// Advance the state into the required slot, using the "partial" method since the
// state roots are not relevant for the shuffling.
partial_state_advance(&mut state, Some(state_root), target_slot, &self.spec)?;
}
metrics::stop_timer(state_skip_timer);

state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_building_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES);

let committee_cache = state.committee_cache(relative_epoch)?.clone();
let shuffling_decision_block = shuffling_id.shuffling_decision_block;
let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch)
.map_err(Error::IncorrectStateForAttestation)?;

self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.insert_value(shuffling_id, &committee_cache);
state.build_committee_cache(relative_epoch, &self.spec)?;

metrics::stop_timer(committee_building_timer);
let committee_cache = state.committee_cache(relative_epoch)?.clone();
let shuffling_decision_block = shuffling_id.shuffling_decision_block;

sender.send(committee_cache.clone());
metrics::stop_timer(committee_building_timer);

map_fn(&committee_cache, shuffling_decision_block)
}
map_fn(&committee_cache, shuffling_decision_block)
}

/// Dumps the entire canonical chain, from the head to genesis to a vector for analysis.
Expand Down
Loading
Loading