From a152c27351cd5a26d45cef6736f763cef39dbf17 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Tue, 13 Feb 2024 19:09:25 +0100 Subject: [PATCH 1/7] Introduce ComputationCache to avoid redundant state computation --- Cargo.lock | 2 + beacon_node/beacon_chain/src/beacon_chain.rs | 5 -- beacon_node/beacon_chain/src/builder.rs | 7 -- beacon_node/beacon_chain/src/lib.rs | 1 - .../beacon_chain/src/parallel_state_cache.rs | 22 ----- beacon_node/http_api/src/state_id.rs | 49 ---------- beacon_node/store/Cargo.toml | 1 + beacon_node/store/src/errors.rs | 11 +++ beacon_node/store/src/hot_cold_store.rs | 89 +++++++++++-------- common/oneshot_broadcast/src/lib.rs | 4 +- common/promise_cache/Cargo.toml | 1 + common/promise_cache/src/computation_cache.rs | 84 +++++++++++++++++ common/promise_cache/src/lib.rs | 2 + 13 files changed, 154 insertions(+), 124 deletions(-) delete mode 100644 beacon_node/beacon_chain/src/parallel_state_cache.rs create mode 100644 common/promise_cache/src/computation_cache.rs diff --git a/Cargo.lock b/Cargo.lock index e680abbe13d..f34e5c6a7d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6198,6 +6198,7 @@ dependencies = [ "derivative", "itertools", "oneshot_broadcast", + "parking_lot 0.12.1", "slog", ] @@ -7767,6 +7768,7 @@ dependencies = [ "logging", "lru", "parking_lot 0.12.1", + "promise_cache", "safe_arith", "serde", "slog", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 2f0da3abc53..3200650601f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -53,7 +53,6 @@ use crate::observed_blob_sidecars::ObservedBlobSidecars; use crate::observed_block_producers::ObservedBlockProducers; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; -use crate::parallel_state_cache::ParallelStateCache; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; @@ -461,10 +460,6 @@ pub struct BeaconChain { pub block_times_cache: Arc>, /// A cache used to track pre-finalization block roots for quick rejection. pub pre_finalization_block_cache: PreFinalizationBlockCache, - /// A cache used to de-duplicate HTTP state requests. - /// - /// The cache is keyed by `state_root`. - pub parallel_state_cache: Arc>>, /// Sender given to tasks, so that if they encounter a state in which execution cannot /// continue they can request that everything shuts down. pub shutdown_sender: Sender, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 59a276cdc66..2dc746e3ca4 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -23,7 +23,6 @@ use futures::channel::mpsc::Sender; use kzg::{Kzg, TrustedSetup}; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::{Mutex, RwLock}; -use promise_cache::PromiseCache; use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold}; use slasher::Slasher; use slog::{crit, debug, error, info, o, Logger}; @@ -857,7 +856,6 @@ where let genesis_time = head_snapshot.beacon_state.genesis_time(); let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot)); let shuffling_cache_size = self.chain_config.shuffling_cache_size; - let parallel_state_cache_size = self.chain_config.parallel_state_cache_size; // Calculate the weak subjectivity point in which to backfill blocks to. let genesis_backfill_slot = if self.chain_config.genesis_backfill { @@ -940,11 +938,6 @@ where beacon_proposer_cache, block_times_cache: <_>::default(), pre_finalization_block_cache: <_>::default(), - parallel_state_cache: Arc::new(RwLock::new(PromiseCache::new( - parallel_state_cache_size, - Default::default(), - log.clone(), - ))), validator_pubkey_cache, attester_cache: <_>::default(), early_attester_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 4c6a22d0167..ccc7f412441 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -43,7 +43,6 @@ pub mod observed_block_producers; pub mod observed_operations; mod observed_slashable; pub mod otb_verification_service; -mod parallel_state_cache; mod persisted_beacon_chain; mod persisted_fork_choice; mod pre_finalization_cache; diff --git a/beacon_node/beacon_chain/src/parallel_state_cache.rs b/beacon_node/beacon_chain/src/parallel_state_cache.rs deleted file mode 100644 index d568d3248cd..00000000000 --- a/beacon_node/beacon_chain/src/parallel_state_cache.rs +++ /dev/null @@ -1,22 +0,0 @@ -use promise_cache::{PromiseCache, Protect}; -use types::{BeaconState, Hash256}; - -#[derive(Debug, Default)] -pub struct ParallelStateProtector; - -impl Protect for ParallelStateProtector { - type SortKey = usize; - - /// Evict in arbitrary (hashmap) order by using the same key for every value. - fn sort_key(&self, _: &Hash256) -> Self::SortKey { - 0 - } - - /// We don't care too much about preventing evictions of particular states here. All the states - /// in this cache should be different from the head state. - fn protect_from_eviction(&self, _: &Hash256) -> bool { - false - } -} - -pub type ParallelStateCache = PromiseCache, ParallelStateProtector>; diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index c4b721f0411..fdc99fa954e 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -1,7 +1,6 @@ use crate::ExecutionOptimistic; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::types::StateId as CoreStateId; -use slog::{debug, warn}; use std::fmt; use std::str::FromStr; use types::{BeaconState, Checkpoint, EthSpec, Fork, Hash256, Slot}; @@ -188,49 +187,6 @@ impl StateId { _ => (self.root(chain)?, None), }; - let mut opt_state_cache = Some(chain.parallel_state_cache.write()); - - // Try the cache. - if let Some(cache_item) = opt_state_cache - .as_mut() - .and_then(|cache| cache.get(&state_root)) - { - drop(opt_state_cache.take()); - match cache_item.wait() { - Ok(state) => { - debug!( - chain.logger(), - "HTTP state cache hit"; - "state_root" => ?state_root, - "slot" => state.slot(), - ); - return Ok(((*state).clone(), execution_optimistic, finalized)); - } - Err(e) => { - warn!( - chain.logger(), - "State promise failed"; - "state_root" => ?state_root, - "outcome" => "re-computing", - "error" => ?e, - ); - } - } - } - - // Re-lock only in case of failed promise. - debug!( - chain.logger(), - "HTTP state cache miss"; - "state_root" => ?state_root - ); - let mut state_cache = opt_state_cache.unwrap_or_else(|| chain.parallel_state_cache.write()); - - let sender = state_cache.create_promise(state_root).map_err(|e| { - warp_utils::reject::custom_server_error(format!("too many concurrent requests: {e:?}")) - })?; - drop(state_cache); - let state = chain .get_state(&state_root, slot_opt) .map_err(warp_utils::reject::beacon_chain_error) @@ -243,11 +199,6 @@ impl StateId { }) })?; - // Fulfil promise (and re-lock again). - let mut state_cache = chain.parallel_state_cache.write(); - state_cache.resolve_promise(sender, state_root, &state); - drop(state_cache); - Ok((state, execution_optimistic, finalized)) } diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 288d167b419..4bacaef7800 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -31,3 +31,4 @@ safe_arith = { workspace = true } bls = { workspace = true } smallvec = { workspace = true } logging = { workspace = true } +promise_cache = { path = "../../common/promise_cache" } diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index b7eaac054f6..63c11d8deab 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -2,6 +2,7 @@ use crate::config::StoreConfigError; use crate::hdiff; use crate::hot_cold_store::HotColdDBError; use ssz::DecodeError; +use promise_cache::computation_cache::ComputationCacheError; use state_processing::BlockReplayError; use types::{milhouse, BeaconStateError, Epoch, EpochCacheError, Hash256, InconsistentFork, Slot}; @@ -75,6 +76,7 @@ pub enum Error { InconsistentFork(InconsistentFork), ZeroCacheSize, CacheBuildError(EpochCacheError), + CachedComputationError, } pub trait HandleUnavailable { @@ -151,6 +153,15 @@ impl From for Error { } } +impl From> for Error { + fn from(e: ComputationCacheError) -> Error { + match e { + ComputationCacheError::Error(Some(e)) => e, + _ => Error::CachedComputationError, + } + } +} + #[derive(Debug)] pub struct DBError { pub message: String, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 03d280fb030..2996a433e2b 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -44,6 +44,7 @@ use std::time::Duration; use types::blob_sidecar::BlobSidecarList; use types::*; use zstd::{Decoder, Encoder}; +use promise_cache::computation_cache::ComputationCache; pub const MAX_PARENT_STATES_TO_CACHE: u64 = 1; @@ -78,6 +79,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// /// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required. state_cache: Mutex>, + hot_computation_cache: ComputationCache>>, /// Immutable validator cache. pub immutable_validators: Arc>>, /// LRU cache of replayed states. @@ -86,6 +88,7 @@ pub struct HotColdDB, Cold: ItemStore> { historic_state_cache: Mutex>>, /// Cache of hierarchical diff buffers. diff_buffer_cache: Mutex>, + diff_buffer_computation_cache: ComputationCache, /// Chain spec. pub(crate) spec: ChainSpec, /// Logger. @@ -211,9 +214,11 @@ impl HotColdDB, MemoryStore> { hot_db: MemoryStore::open(), block_cache: Mutex::new(BlockCache::new(block_cache_size)), state_cache: Mutex::new(StateCache::new(state_cache_size)), + hot_computation_cache: ComputationCache::new(), immutable_validators: Arc::new(RwLock::new(Default::default())), historic_state_cache: Mutex::new(LruCache::new(historic_state_cache_size)), diff_buffer_cache: Mutex::new(LruCache::new(diff_buffer_cache_size)), + diff_buffer_computation_cache: ComputationCache::new(), config, hierarchy, spec, @@ -257,9 +262,11 @@ impl HotColdDB, LevelDB> { hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(BlockCache::new(block_cache_size)), state_cache: Mutex::new(StateCache::new(state_cache_size)), + hot_computation_cache: ComputationCache::new(), immutable_validators: Arc::new(RwLock::new(Default::default())), historic_state_cache: Mutex::new(LruCache::new(historic_state_cache_size)), diff_buffer_cache: Mutex::new(LruCache::new(diff_buffer_cache_size)), + diff_buffer_computation_cache: ComputationCache::new(), config, hierarchy, spec, @@ -1326,16 +1333,18 @@ impl, Cold: ItemStore> HotColdDB "state_root" => ?state_root, ); - let state_from_disk = self.load_hot_state(state_root)?; + Ok(self.hot_computation_cache.get_or_compute(state_root, || { + let state_from_disk = self.load_hot_state(state_root)?; - if let Some((state, block_root)) = state_from_disk { - self.state_cache - .lock() - .put_state(*state_root, block_root, &state)?; - Ok(Some(state)) - } else { - Ok(None) - } + if let Some((state, block_root)) = state_from_disk { + self.state_cache + .lock() + .put_state(*state_root, block_root, &state)?; + Ok(Some(state)) + } else { + Ok(None) + } + })?) } /// Load a post-finalization state from the hot database. @@ -1858,45 +1867,47 @@ impl, Cold: ItemStore> HotColdDB return Ok((slot, buffer.clone())); } - // Load buffer for the previous state. - // This amount of recursion (<10 levels) should be OK. - let t = std::time::Instant::now(); - let (_buffer_slot, mut buffer) = match self.hierarchy.storage_strategy(slot)? { - // Base case. - StorageStrategy::Snapshot => { - let state = self - .load_cold_state_as_snapshot(slot)? - .ok_or(Error::MissingSnapshot(slot))?; - let buffer = HDiffBuffer::from_state(state); - - self.diff_buffer_cache.lock().put(slot, buffer.clone()); - debug!( + Ok(self.diff_buffer_computation_cache.get_or_compute(&slot, || { + // Load buffer for the previous state. + // This amount of recursion (<10 levels) should be OK. + let t = std::time::Instant::now(); + let (_buffer_slot, mut buffer) = match self.hierarchy.storage_strategy(slot)? { + // Base case. + StorageStrategy::Snapshot => { + let state = self + .load_cold_state_as_snapshot(slot)? + .ok_or(Error::MissingSnapshot(slot))?; + let buffer = HDiffBuffer::from_state(state); + + self.diff_buffer_cache.lock().put(slot, buffer.clone()); + debug!( self.log, "Added diff buffer to cache"; "load_time_ms" => t.elapsed().as_millis(), "slot" => slot ); - return Ok((slot, buffer)); - } - // Recursive case. - StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_slot(from)?, - StorageStrategy::ReplayFrom(from) => return self.load_hdiff_buffer_for_slot(from), - }; + return Ok((slot, buffer)); + } + // Recursive case. + StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_slot(from)?, + StorageStrategy::ReplayFrom(from) => return self.load_hdiff_buffer_for_slot(from), + }; - // Load diff and apply it to buffer. - let diff = self.load_hdiff_for_slot(slot)?; - diff.apply(&mut buffer)?; + // Load diff and apply it to buffer. + let diff = self.load_hdiff_for_slot(slot)?; + diff.apply(&mut buffer)?; - self.diff_buffer_cache.lock().put(slot, buffer.clone()); - debug!( - self.log, - "Added diff buffer to cache"; - "load_time_ms" => t.elapsed().as_millis(), - "slot" => slot - ); + self.diff_buffer_cache.lock().put(slot, buffer.clone()); + debug!( + self.log, + "Added diff buffer to cache"; + "load_time_ms" => t.elapsed().as_millis(), + "slot" => slot + ); - Ok((slot, buffer)) + Ok((slot, buffer)) + })?) } /// Load cold blocks between `start_slot` and `end_slot` inclusive. diff --git a/common/oneshot_broadcast/src/lib.rs b/common/oneshot_broadcast/src/lib.rs index 2c616b3bb38..829dddad88a 100644 --- a/common/oneshot_broadcast/src/lib.rs +++ b/common/oneshot_broadcast/src/lib.rs @@ -10,6 +10,7 @@ pub enum Error { SenderDropped, } +#[derive(Debug)] enum Future { /// The future is ready and the item may be consumed. Ready(T), @@ -19,6 +20,7 @@ enum Future { SenderDropped, } +#[derive(Debug)] struct MutexCondvar { mutex: Mutex>, condvar: Condvar, @@ -50,7 +52,7 @@ impl Drop for Sender { /// The receiving pair of the `oneshot` channel. Always receives the message sent by the `Sender` /// (if any). -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Receiver(Arc>); impl Receiver { diff --git a/common/promise_cache/Cargo.toml b/common/promise_cache/Cargo.toml index b5fa42bd438..d1946d0886b 100644 --- a/common/promise_cache/Cargo.toml +++ b/common/promise_cache/Cargo.toml @@ -8,3 +8,4 @@ derivative = { workspace = true } oneshot_broadcast = { path = "../oneshot_broadcast" } itertools = { workspace = true } slog = { workspace = true } +parking_lot = { workspace = true } diff --git a/common/promise_cache/src/computation_cache.rs b/common/promise_cache/src/computation_cache.rs new file mode 100644 index 00000000000..6d9833dd370 --- /dev/null +++ b/common/promise_cache/src/computation_cache.rs @@ -0,0 +1,84 @@ +use oneshot_broadcast::{oneshot, Receiver}; +use parking_lot::Mutex; +use std::collections::HashMap; +use std::hash::Hash; + +#[derive(Debug)] +pub struct ComputationCache +where + K: Hash + Eq + Clone, + V: Clone, +{ + cache: Mutex>>>, +} + +pub enum ComputationCacheError { + Error(Option), + Panic, +} + +impl ComputationCache +where + K: Hash + Eq + Clone, + V: Clone, +{ + pub fn new() -> Self { + Self { + cache: Mutex::new(HashMap::new()), + } + } + + #[allow(clippy::await_holding_lock)] // https://github.com/rust-lang/rust-clippy/issues/6446 + pub fn get_or_compute( + &self, + key: &K, + computation: F, + ) -> Result> + where + F: FnOnce() -> Result, + { + let mut cache = self.cache.lock(); + match cache.get(key) { + Some(item) => { + let item = item.clone(); + drop(cache); + item.recv() + .map_err(|_| ComputationCacheError::Panic) + .and_then(|res| res.map_err(|_| ComputationCacheError::Error(None))) + } + None => { + let (sender, receiver) = oneshot(); + cache.insert(key.clone(), receiver); + drop(cache); + match computation() { + Ok(value) => { + sender.send(Ok(value)); + Ok(self + .cache + .lock() + .remove(key) + .expect("value has vanished") + .recv() + .expect("we sent the value") + .expect("we sent a success")) + } + Err(err) => { + sender.send(Err(())); + self.cache.lock().remove(key); + Err(ComputationCacheError::Error(Some(err))) + } + } + } + } + } +} + +impl Default for ComputationCache +where + K: Hash + Eq + Clone, + V: Clone, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs index 36b6bd984f5..c738ca2b9be 100644 --- a/common/promise_cache/src/lib.rs +++ b/common/promise_cache/src/lib.rs @@ -1,3 +1,5 @@ +pub mod computation_cache; + use derivative::Derivative; use itertools::Itertools; use oneshot_broadcast::{oneshot, Receiver, Sender}; From 5046979fd5f3cc37c6b6e4b48c60c73779654965 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Fri, 16 Feb 2024 16:31:21 +0100 Subject: [PATCH 2/7] experimentally remove shuffling cache, rename ComputationCache to PromiseCache --- beacon_node/beacon_chain/src/beacon_chain.rs | 237 +++++---------- beacon_node/beacon_chain/src/builder.rs | 9 - .../beacon_chain/src/canonical_head.rs | 29 -- beacon_node/beacon_chain/src/chain_config.rs | 3 - beacon_node/beacon_chain/src/errors.rs | 1 - .../beacon_chain/src/shuffling_cache.rs | 30 +- .../beacon_chain/src/state_advance_timer.rs | 12 +- beacon_node/http_api/src/lib.rs | 135 ++------- .../gossip_methods.rs | 13 - beacon_node/src/config.rs | 4 - beacon_node/store/src/errors.rs | 8 +- beacon_node/store/src/hot_cold_store.rs | 14 +- common/promise_cache/src/computation_cache.rs | 84 ------ common/promise_cache/src/lib.rs | 272 +++++------------- 14 files changed, 178 insertions(+), 673 deletions(-) delete mode 100644 common/promise_cache/src/computation_cache.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3200650601f..a0944218cd9 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -56,7 +56,7 @@ use crate::observed_slashable::ObservedSlashable; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; -use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; +use crate::shuffling_cache::BlockShufflingIds; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, }; @@ -127,10 +127,6 @@ pub type ForkChoiceError = fork_choice::Error; /// Alias to appease clippy. type HashBlockTuple = (Hash256, RpcBlock); -/// The time-out before failure during an operation to take a read/write RwLock on the -/// attestation cache. -pub const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1); - /// The time-out before failure during an operation to take a read/write RwLock on the /// validator pubkey cache. pub const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1); @@ -444,8 +440,6 @@ pub struct BeaconChain { pub event_handler: Option>, /// Used to track the heads of the beacon chain. pub(crate) head_tracker: Arc, - /// Caches the attester shuffling for a given epoch and shuffling key root. - pub shuffling_cache: TimeoutRwLock, /// A cache of eth1 deposit data at epoch boundaries for deposit finalization pub eth1_finalization_cache: TimeoutRwLock, /// Caches the beacon block proposer shuffling for a given epoch and shuffling key root. @@ -3245,7 +3239,7 @@ impl BeaconChain { &self, signed_block: AvailableBlock, block_root: Hash256, - mut state: BeaconState, + state: BeaconState, confirmed_state_roots: Vec, payload_verification_status: PayloadVerificationStatus, parent_block: SignedBlindedBeaconBlock, @@ -3392,7 +3386,6 @@ impl BeaconChain { // corrupt the node's database permanently. // ----------------------------------------------------------------------------------------- - self.import_block_update_shuffling_cache(block_root, &mut state); self.import_block_observe_attestations( block, &state, @@ -3843,48 +3836,6 @@ impl BeaconChain { } } - // For the current and next epoch of this state, ensure we have the shuffling from this - // block in our cache. - fn import_block_update_shuffling_cache( - &self, - block_root: Hash256, - state: &mut BeaconState, - ) { - if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) { - warn!( - self.log, - "Failed to prime shuffling cache"; - "error" => ?e - ); - } - } - - fn import_block_update_shuffling_cache_fallible( - &self, - block_root: Hash256, - state: &mut BeaconState, - ) -> Result<(), BlockError> { - for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] { - let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?; - - let shuffling_is_cached = self - .shuffling_cache - .try_read_for(ATTESTATION_CACHE_LOCK_TIMEOUT) - .ok_or(Error::AttestationCacheLockTimeout)? - .contains(&shuffling_id); - - if !shuffling_is_cached { - state.build_committee_cache(relative_epoch, &self.spec)?; - let committee_cache = state.committee_cache(relative_epoch)?; - self.shuffling_cache - .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) - .ok_or(Error::AttestationCacheLockTimeout)? - .insert_value(shuffling_id, committee_cache); - } - } - Ok(()) - } - #[allow(clippy::too_many_arguments)] fn import_block_update_deposit_contract_finalization( &self, @@ -6036,135 +5987,87 @@ impl BeaconChain { head_block_epoch: head_block.slot.epoch(T::EthSpec::slots_per_epoch()), })?; - // Obtain the shuffling cache, timing how long we wait. - let cache_wait_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES); - - let mut shuffling_cache = self - .shuffling_cache - .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) - .ok_or(Error::AttestationCacheLockTimeout)?; - - metrics::stop_timer(cache_wait_timer); - - if let Some(cache_item) = shuffling_cache.get(&shuffling_id) { - // The shuffling cache is no longer required, drop the write-lock to allow concurrent - // access. - drop(shuffling_cache); - - let committee_cache = cache_item.wait().map_err(Error::ShufflingCacheError)?; - map_fn(&committee_cache, shuffling_id.shuffling_decision_block) - } else { - // Create an entry in the cache that "promises" this value will eventually be computed. - // This avoids the case where multiple threads attempt to produce the same value at the - // same time. - // - // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same - // promise from being created twice. - let sender = shuffling_cache - .create_promise(shuffling_id.clone()) - .map_err(Error::ShufflingCacheError)?; - - // Drop the shuffling cache to avoid holding the lock for any longer than - // required. - drop(shuffling_cache); - - debug!( - self.log, - "Committee cache miss"; - "shuffling_id" => ?shuffling_epoch, - "head_block_root" => head_block_root.to_string(), - ); - - // If the block's state will be so far ahead of `shuffling_epoch` that even its - // previous epoch committee cache will be too new, then error. Callers of this function - // shouldn't be requesting such old shufflings for this `head_block_root`. - let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch()); - if head_block_epoch > shuffling_epoch + 1 { - return Err(Error::InvalidStateForShuffling { - state_epoch: head_block_epoch, - shuffling_epoch, - }); - } - - let state_read_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES); - - // If the head of the chain can serve this request, use it. - // - // This code is a little awkward because we need to ensure that the head we read and - // the head we copy is identical. Taking one lock to read the head values and another - // to copy the head is liable to race-conditions. - let head_state_opt = self.with_head(|head| { - if head.beacon_block_root == head_block_root { - Ok(Some((head.beacon_state.clone(), head.beacon_state_root()))) - } else { - Ok::<_, Error>(None) - } - })?; + // If the block's state will be so far ahead of `shuffling_epoch` that even its + // previous epoch committee cache will be too new, then error. Callers of this function + // shouldn't be requesting such old shufflings for this `head_block_root`. + let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch()); + if head_block_epoch > shuffling_epoch + 1 { + return Err(Error::InvalidStateForShuffling { + state_epoch: head_block_epoch, + shuffling_epoch, + }); + } - // Compute the `target_slot` to advance the block's state to. - // - // Since there's a one-epoch look-ahead on the attester shuffling, it suffices to - // only advance into the first slot of the epoch prior to `shuffling_epoch`. - // - // If the `head_block` is already ahead of that slot, then we should load the state - // at that slot, as we've determined above that the `shuffling_epoch` cache will - // not be too far in the past. - let target_slot = std::cmp::max( - shuffling_epoch - .saturating_sub(1_u64) - .start_slot(T::EthSpec::slots_per_epoch()), - head_block.slot, - ); + let state_read_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES); - // If the head state is useful for this request, use it. Otherwise, read a state from - // disk that is advanced as close as possible to `target_slot`. - let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt { - (state, state_root) + // If the head of the chain can serve this request, use it. + // + // This code is a little awkward because we need to ensure that the head we read and + // the head we copy is identical. Taking one lock to read the head values and another + // to copy the head is liable to race-conditions. + let head_state_opt = self.with_head(|head| { + if head.beacon_block_root == head_block_root { + Ok(Some((head.beacon_state.clone(), head.beacon_state_root()))) } else { - let (state_root, state) = self - .store - .get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)? - .ok_or(Error::MissingBeaconState(head_block.state_root))?; - (state, state_root) - }; + Ok::<_, Error>(None) + } + })?; - metrics::stop_timer(state_read_timer); - let state_skip_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES); + // Compute the `target_slot` to advance the block's state to. + // + // Since there's a one-epoch look-ahead on the attester shuffling, it suffices to + // only advance into the first slot of the epoch prior to `shuffling_epoch`. + // + // If the `head_block` is already ahead of that slot, then we should load the state + // at that slot, as we've determined above that the `shuffling_epoch` cache will + // not be too far in the past. + let target_slot = std::cmp::max( + shuffling_epoch + .saturating_sub(1_u64) + .start_slot(T::EthSpec::slots_per_epoch()), + head_block.slot, + ); - // If the state is still in an earlier epoch, advance it to the `target_slot` so - // that its next epoch committee cache matches the `shuffling_epoch`. - if state.current_epoch() + 1 < shuffling_epoch { - // Advance the state into the required slot, using the "partial" method since the - // state roots are not relevant for the shuffling. - partial_state_advance(&mut state, Some(state_root), target_slot, &self.spec)?; - } - metrics::stop_timer(state_skip_timer); + // If the head state is useful for this request, use it. Otherwise, read a state from + // disk that is advanced as close as possible to `target_slot`. + let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt { + (state, state_root) + } else { + let (state_root, state) = self + .store + .get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)? + .ok_or(Error::MissingBeaconState(head_block.state_root))?; + (state, state_root) + }; - let committee_building_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES); + metrics::stop_timer(state_read_timer); + let state_skip_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES); - let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch) - .map_err(Error::IncorrectStateForAttestation)?; + // If the state is still in an earlier epoch, advance it to the `target_slot` so + // that its next epoch committee cache matches the `shuffling_epoch`. + if state.current_epoch() + 1 < shuffling_epoch { + // Advance the state into the required slot, using the "partial" method since the + // state roots are not relevant for the shuffling. + partial_state_advance(&mut state, Some(state_root), target_slot, &self.spec)?; + } + metrics::stop_timer(state_skip_timer); - state.build_committee_cache(relative_epoch, &self.spec)?; + let committee_building_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES); - let committee_cache = state.committee_cache(relative_epoch)?.clone(); - let shuffling_decision_block = shuffling_id.shuffling_decision_block; + let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch) + .map_err(Error::IncorrectStateForAttestation)?; - self.shuffling_cache - .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) - .ok_or(Error::AttestationCacheLockTimeout)? - .insert_value(shuffling_id, &committee_cache); + state.build_committee_cache(relative_epoch, &self.spec)?; - metrics::stop_timer(committee_building_timer); + let committee_cache = state.committee_cache(relative_epoch)?.clone(); + let shuffling_decision_block = shuffling_id.shuffling_decision_block; - sender.send(committee_cache.clone()); + metrics::stop_timer(committee_building_timer); - map_fn(&committee_cache, shuffling_decision_block) - } + map_fn(&committee_cache, shuffling_decision_block) } /// Dumps the entire canonical chain, from the head to genesis to a vector for analysis. diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 2dc746e3ca4..cabea6daa91 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -8,7 +8,6 @@ use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_bound use crate::head_tracker::HeadTracker; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::persisted_beacon_chain::PersistedBeaconChain; -use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig}; use crate::ChainConfig; @@ -763,8 +762,6 @@ where )?; } - let head_shuffling_ids = BlockShufflingIds::try_from_head(head_block_root, &head_state)?; - let mut head_snapshot = BeaconSnapshot { beacon_block_root: head_block_root, beacon_block: Arc::new(head_block), @@ -855,7 +852,6 @@ where let genesis_validators_root = head_snapshot.beacon_state.genesis_validators_root(); let genesis_time = head_snapshot.beacon_state.genesis_time(); let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot)); - let shuffling_cache_size = self.chain_config.shuffling_cache_size; // Calculate the weak subjectivity point in which to backfill blocks to. let genesis_backfill_slot = if self.chain_config.genesis_backfill { @@ -929,11 +925,6 @@ where fork_choice_signal_rx, event_handler: self.event_handler, head_tracker, - shuffling_cache: TimeoutRwLock::new(ShufflingCache::new( - shuffling_cache_size, - head_shuffling_ids, - log.clone(), - )), eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())), beacon_proposer_cache, block_times_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index e54ea6d49ca..d7ba105e8fc 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -31,9 +31,7 @@ //! the head block root. This is unacceptable for fast-responding functions like the networking //! stack. -use crate::beacon_chain::ATTESTATION_CACHE_LOCK_TIMEOUT; use crate::persisted_fork_choice::PersistedForkChoice; -use crate::shuffling_cache::BlockShufflingIds; use crate::{ beacon_chain::{BeaconForkChoice, BeaconStore, OverrideForkchoiceUpdate, FORK_CHOICE_DB_KEY}, block_times_cache::BlockTimesCache, @@ -813,33 +811,6 @@ impl BeaconChain { .beacon_state .attester_shuffling_decision_root(self.genesis_block_root, RelativeEpoch::Current); - match BlockShufflingIds::try_from_head( - new_snapshot.beacon_block_root, - &new_snapshot.beacon_state, - ) { - Ok(head_shuffling_ids) => { - self.shuffling_cache - .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) - .map(|mut shuffling_cache| shuffling_cache.update_protector(head_shuffling_ids)) - .unwrap_or_else(|| { - error!( - self.log, - "Failed to obtain cache write lock"; - "lock" => "shuffling_cache", - "task" => "update head shuffling decision root" - ); - }); - } - Err(e) => { - error!( - self.log, - "Failed to get head shuffling ids"; - "error" => ?e, - "head_block_root" => ?new_snapshot.beacon_block_root - ); - } - } - observe_head_block_delays( &mut self.block_times_cache.write(), &new_head_proto_block, diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 79d8f2a4419..123b3b9a584 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -73,8 +73,6 @@ pub struct ChainConfig { pub prepare_payload_lookahead: Duration, /// Use EL-free optimistic sync for the finalized part of the chain. pub optimistic_finalized_sync: bool, - /// The size of the shuffling cache, - pub shuffling_cache_size: usize, /// If using a weak-subjectivity sync, whether we should download blocks all the way back to /// genesis. pub genesis_backfill: bool, @@ -114,7 +112,6 @@ impl Default for ChainConfig { prepare_payload_lookahead: Duration::from_secs(4), // This value isn't actually read except in tests. optimistic_finalized_sync: true, - shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE, genesis_backfill: false, always_prepare_payload: false, progressive_balances_mode: ProgressiveBalancesMode::Fast, diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 6d2a178faa7..d2ab4ceb30b 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -211,7 +211,6 @@ pub enum BeaconChainError { }, AttestationHeadNotInForkChoice(Hash256), MissingPersistedForkChoice, - ShufflingCacheError(promise_cache::PromiseCacheError), BlsToExecutionPriorToCapella, BlsToExecutionConflictsWithPool, InconsistentFork(InconsistentFork), diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 64b24ccd043..b4dcf75d912 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,9 +1,4 @@ -use promise_cache::{PromiseCache, Protect}; -use slog::{debug, Logger}; -use types::{ - beacon_state::CommitteeCache, AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256, - RelativeEpoch, -}; +use types::{AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256, RelativeEpoch}; /// The size of the cache that stores committee caches for quicker verification. /// @@ -23,29 +18,6 @@ use types::{ /// better than low-resource nodes going OOM. pub const DEFAULT_CACHE_SIZE: usize = 16; -impl Protect for BlockShufflingIds { - type SortKey = Epoch; - - fn sort_key(&self, k: &AttestationShufflingId) -> Epoch { - k.shuffling_epoch - } - - fn protect_from_eviction(&self, shuffling_id: &AttestationShufflingId) -> bool { - Some(shuffling_id) == self.id_for_epoch(shuffling_id.shuffling_epoch).as_ref() - } - - fn notify_eviction(&self, shuffling_id: &AttestationShufflingId, logger: &Logger) { - debug!( - logger, - "Removing old shuffling from cache"; - "shuffling_epoch" => shuffling_id.shuffling_epoch, - "shuffling_decision_block" => ?shuffling_id.shuffling_decision_block - ); - } -} - -pub type ShufflingCache = PromiseCache; - /// Contains the shuffling IDs for a beacon block. #[derive(Debug, Clone)] pub struct BlockShufflingIds { diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index d686e6950de..5f4aec15a2a 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -15,8 +15,7 @@ //! 2. There's a possibility that the head block is never built upon, causing wasted CPU cycles. use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::{ - beacon_chain::ATTESTATION_CACHE_LOCK_TIMEOUT, chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR, - BeaconChain, BeaconChainError, BeaconChainTypes, + chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR, BeaconChain, BeaconChainError, BeaconChainTypes, }; use slog::{debug, error, warn, Logger}; use slot_clock::SlotClock; @@ -397,18 +396,9 @@ fn advance_head( ) .map_err(BeaconChainError::from)?; - // Update the attester cache. let shuffling_id = AttestationShufflingId::new(head_block_root, &state, RelativeEpoch::Next) .map_err(BeaconChainError::from)?; - let committee_cache = state - .committee_cache(RelativeEpoch::Next) - .map_err(BeaconChainError::from)?; - beacon_chain - .shuffling_cache - .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) - .ok_or(BeaconChainError::AttestationCacheLockTimeout)? - .insert_value(shuffling_id.clone(), committee_cache); debug!( log, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 4655ada9454..00e6ec3acda 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -77,8 +77,8 @@ use tokio_stream::{ StreamExt, }; use types::{ - fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId, - AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, + fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttesterSlashing, + BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, @@ -833,111 +833,38 @@ pub fn serve( let current_epoch = state.current_epoch(); let epoch = query.epoch.unwrap_or(current_epoch); - // Attempt to obtain the committee_cache from the beacon chain - let decision_slot = (epoch.saturating_sub(2u64)) - .end_slot(T::EthSpec::slots_per_epoch()); - // Find the decision block and skip to another method on any kind - // of failure - let shuffling_id = if let Ok(Some(shuffling_decision_block)) = - chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev) - { - Some(AttestationShufflingId { - shuffling_epoch: epoch, - shuffling_decision_block, - }) - } else { - None - }; - - // Attempt to read from the chain cache if there exists a - // shuffling_id - let maybe_cached_shuffling = if let Some(shuffling_id) = - shuffling_id.as_ref() - { - chain - .shuffling_cache - .try_write_for(std::time::Duration::from_secs(1)) - .and_then(|mut cache_write| cache_write.get(shuffling_id)) - .and_then(|cache_item| cache_item.wait().ok()) - } else { - None - }; - - let committee_cache = if let Some(shuffling) = - maybe_cached_shuffling - { - shuffling - } else { - let possibly_built_cache = - match RelativeEpoch::from_epoch(current_epoch, epoch) { - Ok(relative_epoch) - if state.committee_cache_is_initialized( - relative_epoch, - ) => - { - state - .committee_cache(relative_epoch) - .map(Arc::clone) - } - _ => CommitteeCache::initialized( - state, - epoch, - &chain.spec, - ), + let committee_cache = + match RelativeEpoch::from_epoch(current_epoch, epoch) { + Ok(relative_epoch) + if state + .committee_cache_is_initialized(relative_epoch) => + { + state.committee_cache(relative_epoch).map(Arc::clone) } - .map_err(|e| { - match e { - BeaconStateError::EpochOutOfBounds => { - let max_sprp = - T::EthSpec::slots_per_historical_root() - as u64; - let first_subsequent_restore_point_slot = - ((epoch.start_slot( - T::EthSpec::slots_per_epoch(), - ) / max_sprp) - + 1) - * max_sprp; - if epoch < current_epoch { - warp_utils::reject::custom_bad_request( - format!( - "epoch out of bounds, \ - try state at slot {}", - first_subsequent_restore_point_slot, - ), - ) - } else { - warp_utils::reject::custom_bad_request( - "epoch out of bounds, \ - too far in future" - .into(), - ) - } - } - _ => { - warp_utils::reject::beacon_chain_error(e.into()) - } - } - })?; - - // Attempt to write to the beacon cache (only if the cache - // size is not the default value). - if chain.config.shuffling_cache_size - != beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE - { - if let Some(shuffling_id) = shuffling_id { - if let Some(mut cache_write) = chain - .shuffling_cache - .try_write_for(std::time::Duration::from_secs(1)) - { - cache_write.insert_value( - shuffling_id, - &possibly_built_cache, - ); + _ => CommitteeCache::initialized(state, epoch, &chain.spec), + } + .map_err(|e| match e { + BeaconStateError::EpochOutOfBounds => { + let max_sprp = + T::EthSpec::slots_per_historical_root() as u64; + let first_subsequent_restore_point_slot = ((epoch + .start_slot(T::EthSpec::slots_per_epoch()) + / max_sprp) + + 1) + * max_sprp; + if epoch < current_epoch { + warp_utils::reject::custom_bad_request(format!( + "epoch out of bounds, try state at slot {}", + first_subsequent_restore_point_slot, + )) + } else { + warp_utils::reject::custom_bad_request( + "epoch out of bounds, too far in future".into(), + ) } } - } - possibly_built_cache - }; + _ => warp_utils::reject::beacon_chain_error(e.into()), + })?; // Use either the supplied slot or all slots in the epoch. let slots = diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 957d539c87e..4f5bfa28f44 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -2315,19 +2315,6 @@ impl NetworkBeaconProcessor { debug!(self.log, "Attestation for finalized state"; "peer_id" => % peer_id); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } - AttnError::BeaconChainError(BeaconChainError::ShufflingCacheError(e)) => { - debug!( - self.log, - "Dropping attestation"; - "target_root" => ?failed_att.attestation().data.target.root, - "beacon_block_root" => ?beacon_block_root, - "slot" => ?failed_att.attestation().data.slot, - "type" => ?attestation_type, - "error" => ?e, - "peer_id" => % peer_id - ); - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); - } AttnError::BeaconChainError(e) => { /* * Lighthouse hit an unexpected error whilst processing the attestation. It diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 879d1d22b9c..2d1d1d5a371 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -163,10 +163,6 @@ pub fn get_config( cli_args.is_present("light-client-server"); } - if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? { - client_config.chain.shuffling_cache_size = cache_size; - } - /* * Prometheus metrics HTTP server */ diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 63c11d8deab..6ed68d55f9d 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -2,7 +2,7 @@ use crate::config::StoreConfigError; use crate::hdiff; use crate::hot_cold_store::HotColdDBError; use ssz::DecodeError; -use promise_cache::computation_cache::ComputationCacheError; +use promise_cache::PromiseCacheError; use state_processing::BlockReplayError; use types::{milhouse, BeaconStateError, Epoch, EpochCacheError, Hash256, InconsistentFork, Slot}; @@ -153,10 +153,10 @@ impl From for Error { } } -impl From> for Error { - fn from(e: ComputationCacheError) -> Error { +impl From> for Error { + fn from(e: PromiseCacheError) -> Error { match e { - ComputationCacheError::Error(Some(e)) => e, + PromiseCacheError::Error(Some(e)) => e, _ => Error::CachedComputationError, } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 2996a433e2b..481d4760c9a 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -44,7 +44,7 @@ use std::time::Duration; use types::blob_sidecar::BlobSidecarList; use types::*; use zstd::{Decoder, Encoder}; -use promise_cache::computation_cache::ComputationCache; +use promise_cache::PromiseCache; pub const MAX_PARENT_STATES_TO_CACHE: u64 = 1; @@ -79,7 +79,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// /// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required. state_cache: Mutex>, - hot_computation_cache: ComputationCache>>, + hot_computation_cache: PromiseCache>>, /// Immutable validator cache. pub immutable_validators: Arc>>, /// LRU cache of replayed states. @@ -88,7 +88,7 @@ pub struct HotColdDB, Cold: ItemStore> { historic_state_cache: Mutex>>, /// Cache of hierarchical diff buffers. diff_buffer_cache: Mutex>, - diff_buffer_computation_cache: ComputationCache, + diff_buffer_computation_cache: PromiseCache, /// Chain spec. pub(crate) spec: ChainSpec, /// Logger. @@ -214,11 +214,11 @@ impl HotColdDB, MemoryStore> { hot_db: MemoryStore::open(), block_cache: Mutex::new(BlockCache::new(block_cache_size)), state_cache: Mutex::new(StateCache::new(state_cache_size)), - hot_computation_cache: ComputationCache::new(), + hot_computation_cache: PromiseCache::new(), immutable_validators: Arc::new(RwLock::new(Default::default())), historic_state_cache: Mutex::new(LruCache::new(historic_state_cache_size)), diff_buffer_cache: Mutex::new(LruCache::new(diff_buffer_cache_size)), - diff_buffer_computation_cache: ComputationCache::new(), + diff_buffer_computation_cache: PromiseCache::new(), config, hierarchy, spec, @@ -262,11 +262,11 @@ impl HotColdDB, LevelDB> { hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(BlockCache::new(block_cache_size)), state_cache: Mutex::new(StateCache::new(state_cache_size)), - hot_computation_cache: ComputationCache::new(), + hot_computation_cache: PromiseCache::new(), immutable_validators: Arc::new(RwLock::new(Default::default())), historic_state_cache: Mutex::new(LruCache::new(historic_state_cache_size)), diff_buffer_cache: Mutex::new(LruCache::new(diff_buffer_cache_size)), - diff_buffer_computation_cache: ComputationCache::new(), + diff_buffer_computation_cache: PromiseCache::new(), config, hierarchy, spec, diff --git a/common/promise_cache/src/computation_cache.rs b/common/promise_cache/src/computation_cache.rs deleted file mode 100644 index 6d9833dd370..00000000000 --- a/common/promise_cache/src/computation_cache.rs +++ /dev/null @@ -1,84 +0,0 @@ -use oneshot_broadcast::{oneshot, Receiver}; -use parking_lot::Mutex; -use std::collections::HashMap; -use std::hash::Hash; - -#[derive(Debug)] -pub struct ComputationCache -where - K: Hash + Eq + Clone, - V: Clone, -{ - cache: Mutex>>>, -} - -pub enum ComputationCacheError { - Error(Option), - Panic, -} - -impl ComputationCache -where - K: Hash + Eq + Clone, - V: Clone, -{ - pub fn new() -> Self { - Self { - cache: Mutex::new(HashMap::new()), - } - } - - #[allow(clippy::await_holding_lock)] // https://github.com/rust-lang/rust-clippy/issues/6446 - pub fn get_or_compute( - &self, - key: &K, - computation: F, - ) -> Result> - where - F: FnOnce() -> Result, - { - let mut cache = self.cache.lock(); - match cache.get(key) { - Some(item) => { - let item = item.clone(); - drop(cache); - item.recv() - .map_err(|_| ComputationCacheError::Panic) - .and_then(|res| res.map_err(|_| ComputationCacheError::Error(None))) - } - None => { - let (sender, receiver) = oneshot(); - cache.insert(key.clone(), receiver); - drop(cache); - match computation() { - Ok(value) => { - sender.send(Ok(value)); - Ok(self - .cache - .lock() - .remove(key) - .expect("value has vanished") - .recv() - .expect("we sent the value") - .expect("we sent a success")) - } - Err(err) => { - sender.send(Err(())); - self.cache.lock().remove(key); - Err(ComputationCacheError::Error(Some(err))) - } - } - } - } - } -} - -impl Default for ComputationCache -where - K: Hash + Eq + Clone, - V: Clone, -{ - fn default() -> Self { - Self::new() - } -} diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs index c738ca2b9be..e68fb889e5b 100644 --- a/common/promise_cache/src/lib.rs +++ b/common/promise_cache/src/lib.rs @@ -1,229 +1,85 @@ -pub mod computation_cache; - -use derivative::Derivative; -use itertools::Itertools; -use oneshot_broadcast::{oneshot, Receiver, Sender}; -use slog::Logger; +use oneshot_broadcast::{oneshot, Receiver}; +use parking_lot::Mutex; use std::collections::HashMap; use std::hash::Hash; -use std::sync::Arc; #[derive(Debug)] -pub struct PromiseCache -where - K: Hash + Eq + Clone, - P: Protect, +pub struct PromiseCache + where + K: Hash + Eq + Clone, + V: Clone, { - cache: HashMap>, - capacity: usize, - protector: P, - max_concurrent_promises: usize, - logger: Logger, -} - -/// A value implementing `Protect` is capable of preventing keys of type `K` from being evicted. -/// -/// It also dictates an ordering on keys which is used to prioritise evictions. -pub trait Protect { - type SortKey: Ord; - - fn sort_key(&self, k: &K) -> Self::SortKey; - - fn protect_from_eviction(&self, k: &K) -> bool; - - fn notify_eviction(&self, _k: &K, _log: &Logger) {} -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""))] -pub enum CacheItem { - Complete(Arc), - Promise(Receiver>), -} - -impl std::fmt::Debug for CacheItem { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - match self { - CacheItem::Complete(value) => value.fmt(f), - CacheItem::Promise(_) => "Promise(..)".fmt(f), - } - } -} - -#[derive(Debug)] -pub enum PromiseCacheError { - Failed(oneshot_broadcast::Error), - MaxConcurrentPromises(usize), -} - -pub trait ToArc { - fn to_arc(&self) -> Arc; -} - -impl CacheItem { - pub fn is_promise(&self) -> bool { - matches!(self, CacheItem::Promise(_)) - } - - pub fn wait(self) -> Result, PromiseCacheError> { - match self { - CacheItem::Complete(value) => Ok(value), - CacheItem::Promise(receiver) => receiver.recv().map_err(PromiseCacheError::Failed), - } - } + cache: Mutex>>>, } -impl ToArc for Arc { - fn to_arc(&self) -> Arc { - self.clone() - } -} - -impl ToArc for T -where - T: Clone, -{ - fn to_arc(&self) -> Arc { - Arc::new(self.clone()) - } +pub enum PromiseCacheError { + Error(Option), + Panic, } -impl PromiseCache -where - K: Hash + Eq + Clone, - P: Protect, +impl PromiseCache + where + K: Hash + Eq + Clone, + V: Clone, { - pub fn new(capacity: usize, protector: P, logger: Logger) -> Self { - // Making the concurrent promises directly configurable is considered overkill for now, - // so we just derive a vaguely sensible value from the cache size. - let max_concurrent_promises = std::cmp::max(2, capacity / 8); + pub fn new() -> Self { Self { - cache: HashMap::new(), - capacity, - protector, - max_concurrent_promises, - logger, + cache: Mutex::new(HashMap::new()), } } - pub fn get(&mut self, key: &K) -> Option> { - match self.cache.get(key) { - // The cache contained the value, return it. - item @ Some(CacheItem::Complete(_)) => item.cloned(), - // The cache contains a promise for the value. Check to see if the promise has already - // been resolved, without waiting for it. - item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { - // The promise has already been resolved. Replace the entry in the cache with a - // `Complete` entry and then return the value. - Ok(Some(value)) => { - let ready = CacheItem::Complete(value); - self.insert_cache_item(key.clone(), ready.clone()); - Some(ready) - } - // The promise has not yet been resolved. Return the promise so the caller can await - // it. - Ok(None) => item.cloned(), - // The sender has been dropped without sending a value. There was most likely an - // error computing the value. Drop the key from the cache and return - // `None` so the caller can recompute the value. - // - // It's worth noting that this is the only place where we removed unresolved - // promises from the cache. This means unresolved promises will only be removed if - // we try to access them again. This is OK, since the promises don't consume much - // memory. We expect that *all* promises should be resolved, unless there is a - // programming or database error. - Err(oneshot_broadcast::Error::SenderDropped) => { - self.cache.remove(key); - None + pub fn get_or_compute( + &self, + key: &K, + computation: F, + ) -> Result> + where + F: FnOnce() -> Result, + { + let mut cache = self.cache.lock(); + match cache.get(key) { + Some(item) => { + let item = item.clone(); + drop(cache); + println!("*********** PROMISE CACHE HIT ************"); + item.recv() + .map_err(|_| PromiseCacheError::Panic) + .and_then(|res| res.map_err(|_| PromiseCacheError::Error(None))) + } + None => { + let (sender, receiver) = oneshot(); + cache.insert(key.clone(), receiver); + drop(cache); + println!("*********** PROMISE CACHE MISS ************"); + match computation() { + Ok(value) => { + sender.send(Ok(value)); + Ok(self + .cache + .lock() + .remove(key) + .expect("value has vanished") + .recv() + .expect("we sent the value") + .expect("we sent a success")) + } + Err(err) => { + sender.send(Err(())); + self.cache.lock().remove(key); + Err(PromiseCacheError::Error(Some(err))) + } } - }, - // The cache does not have this value and it's not already promised to be computed. - None => None, - } - } - - pub fn contains(&self, key: &K) -> bool { - self.cache.contains_key(key) - } - - pub fn insert_value>(&mut self, key: K, value: &C) { - if self - .cache - .get(&key) - // Replace the value if it's not present or if it's a promise. A bird in the hand is - // worth two in the promise-bush! - .map_or(true, CacheItem::is_promise) - { - self.insert_cache_item(key, CacheItem::Complete(value.to_arc())); - } - } - - /// Take care of resolving a promise by ensuring the value is made available: - /// - /// 1. To all waiting thread that are holding a `Receiver`. - /// 2. In the cache itself for future callers. - pub fn resolve_promise>(&mut self, sender: Sender>, key: K, value: &C) { - // Use the sender to notify all actively waiting receivers. - let arc_value = value.to_arc(); - sender.send(arc_value.clone()); - - // Re-insert the value into the cache. The promise may have been evicted in the meantime, - // but we probably want to keep this value (which resolved recently) over other older cache - // entries. - self.insert_value(key, &arc_value); - } - - /// Prunes the cache first before inserting a new item. - fn insert_cache_item(&mut self, key: K, cache_item: CacheItem) { - self.prune_cache(); - self.cache.insert(key, cache_item); - } - - pub fn create_promise(&mut self, key: K) -> Result>, PromiseCacheError> { - let num_active_promises = self.cache.values().filter(|item| item.is_promise()).count(); - if num_active_promises >= self.max_concurrent_promises { - return Err(PromiseCacheError::MaxConcurrentPromises( - num_active_promises, - )); - } - - let (sender, receiver) = oneshot(); - self.insert_cache_item(key, CacheItem::Promise(receiver)); - Ok(sender) - } - - fn prune_cache(&mut self) { - let target_cache_size = self.capacity.saturating_sub(1); - if let Some(prune_count) = self.cache.len().checked_sub(target_cache_size) { - let keys_to_prune = self - .cache - .keys() - .filter(|k| !self.protector.protect_from_eviction(*k)) - .sorted_by_key(|k| self.protector.sort_key(k)) - .take(prune_count) - .cloned() - .collect::>(); - - for key in &keys_to_prune { - self.protector.notify_eviction(key, &self.logger); - self.cache.remove(key); } } } +} - pub fn update_protector(&mut self, protector: P) { - self.protector = protector; - } - - pub fn len(&self) -> usize { - self.cache.len() - } - - pub fn is_empty(&self) -> bool { - self.cache.is_empty() - } - - pub fn max_concurrent_promises(&self) -> usize { - self.max_concurrent_promises +impl Default for PromiseCache + where + K: Hash + Eq + Clone, + V: Clone, +{ + fn default() -> Self { + Self::new() } } From 4c8ab44c5a64d9253a7b3d7cc24e3c61ce968278 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Fri, 16 Feb 2024 16:34:39 +0100 Subject: [PATCH 3/7] cargo fmt --- beacon_node/store/src/errors.rs | 2 +- beacon_node/store/src/hot_cold_store.rs | 76 +++++++++++++------------ common/promise_cache/src/lib.rs | 28 ++++----- 3 files changed, 53 insertions(+), 53 deletions(-) diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 6ed68d55f9d..34f55a1fcac 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -1,8 +1,8 @@ use crate::config::StoreConfigError; use crate::hdiff; use crate::hot_cold_store::HotColdDBError; -use ssz::DecodeError; use promise_cache::PromiseCacheError; +use ssz::DecodeError; use state_processing::BlockReplayError; use types::{milhouse, BeaconStateError, Epoch, EpochCacheError, Hash256, InconsistentFork, Slot}; diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 481d4760c9a..8885b605ec0 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -24,6 +24,7 @@ use itertools::process_results; use leveldb::iterator::LevelDBIterator; use lru::LruCache; use parking_lot::{Mutex, RwLock}; +use promise_cache::PromiseCache; use safe_arith::SafeArith; use serde::{Deserialize, Serialize}; use slog::{debug, error, info, trace, warn, Logger}; @@ -44,7 +45,6 @@ use std::time::Duration; use types::blob_sidecar::BlobSidecarList; use types::*; use zstd::{Decoder, Encoder}; -use promise_cache::PromiseCache; pub const MAX_PARENT_STATES_TO_CACHE: u64 = 1; @@ -1867,47 +1867,51 @@ impl, Cold: ItemStore> HotColdDB return Ok((slot, buffer.clone())); } - Ok(self.diff_buffer_computation_cache.get_or_compute(&slot, || { - // Load buffer for the previous state. - // This amount of recursion (<10 levels) should be OK. - let t = std::time::Instant::now(); - let (_buffer_slot, mut buffer) = match self.hierarchy.storage_strategy(slot)? { - // Base case. - StorageStrategy::Snapshot => { - let state = self - .load_cold_state_as_snapshot(slot)? - .ok_or(Error::MissingSnapshot(slot))?; - let buffer = HDiffBuffer::from_state(state); - - self.diff_buffer_cache.lock().put(slot, buffer.clone()); - debug!( + Ok(self + .diff_buffer_computation_cache + .get_or_compute(&slot, || { + // Load buffer for the previous state. + // This amount of recursion (<10 levels) should be OK. + let t = std::time::Instant::now(); + let (_buffer_slot, mut buffer) = match self.hierarchy.storage_strategy(slot)? { + // Base case. + StorageStrategy::Snapshot => { + let state = self + .load_cold_state_as_snapshot(slot)? + .ok_or(Error::MissingSnapshot(slot))?; + let buffer = HDiffBuffer::from_state(state); + + self.diff_buffer_cache.lock().put(slot, buffer.clone()); + debug!( + self.log, + "Added diff buffer to cache"; + "load_time_ms" => t.elapsed().as_millis(), + "slot" => slot + ); + + return Ok((slot, buffer)); + } + // Recursive case. + StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_slot(from)?, + StorageStrategy::ReplayFrom(from) => { + return self.load_hdiff_buffer_for_slot(from) + } + }; + + // Load diff and apply it to buffer. + let diff = self.load_hdiff_for_slot(slot)?; + diff.apply(&mut buffer)?; + + self.diff_buffer_cache.lock().put(slot, buffer.clone()); + debug!( self.log, "Added diff buffer to cache"; "load_time_ms" => t.elapsed().as_millis(), "slot" => slot ); - return Ok((slot, buffer)); - } - // Recursive case. - StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_slot(from)?, - StorageStrategy::ReplayFrom(from) => return self.load_hdiff_buffer_for_slot(from), - }; - - // Load diff and apply it to buffer. - let diff = self.load_hdiff_for_slot(slot)?; - diff.apply(&mut buffer)?; - - self.diff_buffer_cache.lock().put(slot, buffer.clone()); - debug!( - self.log, - "Added diff buffer to cache"; - "load_time_ms" => t.elapsed().as_millis(), - "slot" => slot - ); - - Ok((slot, buffer)) - })?) + Ok((slot, buffer)) + })?) } /// Load cold blocks between `start_slot` and `end_slot` inclusive. diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs index e68fb889e5b..ff80415d900 100644 --- a/common/promise_cache/src/lib.rs +++ b/common/promise_cache/src/lib.rs @@ -5,9 +5,9 @@ use std::hash::Hash; #[derive(Debug)] pub struct PromiseCache - where - K: Hash + Eq + Clone, - V: Clone, +where + K: Hash + Eq + Clone, + V: Clone, { cache: Mutex>>>, } @@ -18,9 +18,9 @@ pub enum PromiseCacheError { } impl PromiseCache - where - K: Hash + Eq + Clone, - V: Clone, +where + K: Hash + Eq + Clone, + V: Clone, { pub fn new() -> Self { Self { @@ -28,13 +28,9 @@ impl PromiseCache } } - pub fn get_or_compute( - &self, - key: &K, - computation: F, - ) -> Result> - where - F: FnOnce() -> Result, + pub fn get_or_compute(&self, key: &K, computation: F) -> Result> + where + F: FnOnce() -> Result, { let mut cache = self.cache.lock(); match cache.get(key) { @@ -75,9 +71,9 @@ impl PromiseCache } impl Default for PromiseCache - where - K: Hash + Eq + Clone, - V: Clone, +where + K: Hash + Eq + Clone, + V: Clone, { fn default() -> Self { Self::new() From 02f7835a292586d235d6ca6b44c081d7fd1269d8 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Mon, 19 Feb 2024 22:59:49 +0100 Subject: [PATCH 4/7] clean up and fix tests --- beacon_node/beacon_chain/src/beacon_chain.rs | 3 +- beacon_node/beacon_chain/src/lib.rs | 1 - .../beacon_chain/src/shuffling_cache.rs | 360 ------------------ consensus/types/src/shuffling_id.rs | 36 ++ lighthouse/tests/beacon_node.rs | 20 - 5 files changed, 37 insertions(+), 383 deletions(-) delete mode 100644 beacon_node/beacon_chain/src/shuffling_cache.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a0944218cd9..a9bdd2e39c6 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -56,7 +56,6 @@ use crate::observed_slashable::ObservedSlashable; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; -use crate::shuffling_cache::BlockShufflingIds; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, }; @@ -5975,7 +5974,7 @@ impl BeaconChain { .get_block(&head_block_root) .ok_or(Error::MissingBeaconBlock(head_block_root))?; - let shuffling_id = BlockShufflingIds { + let shuffling_id = shuffling_id::BlockShufflingIds { current: head_block.current_epoch_shuffling_id.clone(), next: head_block.next_epoch_shuffling_id.clone(), previous: None, diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index ccc7f412441..96963672b72 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -48,7 +48,6 @@ mod persisted_fork_choice; mod pre_finalization_cache; pub mod proposer_prep_service; pub mod schema_change; -pub mod shuffling_cache; pub mod state_advance_timer; pub mod sync_committee_rewards; pub mod sync_committee_verification; diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs deleted file mode 100644 index b4dcf75d912..00000000000 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ /dev/null @@ -1,360 +0,0 @@ -use types::{AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256, RelativeEpoch}; - -/// The size of the cache that stores committee caches for quicker verification. -/// -/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash + -/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this -/// ignores a few extra bytes in the caches that should be insignificant compared to the indices). -/// -/// The cache size also determines the maximum number of concurrent committee cache "promises" that -/// can be issued. In effect, this limits the number of concurrent states that can be loaded into -/// memory for the committee cache. This prevents excessive memory usage at the cost of rejecting -/// some attestations. -/// -/// We set this value to 2 since states can be quite large and have a significant impact on memory -/// usage. A healthy network cannot have more than a few committee caches and those caches should -/// always be inserted during block import. Unstable networks with a high degree of forking might -/// see some attestations dropped due to this concurrency limit, however I propose that this is -/// better than low-resource nodes going OOM. -pub const DEFAULT_CACHE_SIZE: usize = 16; - -/// Contains the shuffling IDs for a beacon block. -#[derive(Debug, Clone)] -pub struct BlockShufflingIds { - pub current: AttestationShufflingId, - pub next: AttestationShufflingId, - pub previous: Option, - pub block_root: Hash256, -} - -impl BlockShufflingIds { - /// Returns the shuffling ID for the given epoch. - /// - /// Returns `None` if `epoch` is prior to `self.previous?.shuffling_epoch` or - /// `self.current.shuffling_epoch` (if `previous` is `None`). - pub fn id_for_epoch(&self, epoch: Epoch) -> Option { - if epoch == self.current.shuffling_epoch { - Some(self.current.clone()) - } else if self - .previous - .as_ref() - .map_or(false, |id| id.shuffling_epoch == epoch) - { - self.previous.clone() - } else if epoch == self.next.shuffling_epoch { - Some(self.next.clone()) - } else if epoch > self.next.shuffling_epoch { - Some(AttestationShufflingId::from_components( - epoch, - self.block_root, - )) - } else { - None - } - } - - pub fn try_from_head( - head_block_root: Hash256, - head_state: &BeaconState, - ) -> Result { - let get_shuffling_id = |relative_epoch| { - AttestationShufflingId::new(head_block_root, head_state, relative_epoch).map_err(|e| { - format!( - "Unable to get attester shuffling decision slot for the epoch {:?}: {:?}", - relative_epoch, e - ) - }) - }; - - Ok(Self { - current: get_shuffling_id(RelativeEpoch::Current)?, - next: get_shuffling_id(RelativeEpoch::Next)?, - previous: Some(get_shuffling_id(RelativeEpoch::Previous)?), - block_root: head_block_root, - }) - } -} - -// Disable tests in debug since the beacon chain harness is slow unless in release. -#[cfg(not(debug_assertions))] -#[cfg(test)] -mod test { - use super::*; - use crate::test_utils::EphemeralHarnessType; - use promise_cache::{CacheItem, PromiseCacheError}; - use std::sync::Arc; - use task_executor::test_utils::null_logger; - use types::*; - - type E = MinimalEthSpec; - type TestBeaconChainType = EphemeralHarnessType; - type BeaconChainHarness = crate::test_utils::BeaconChainHarness; - const TEST_CACHE_SIZE: usize = 5; - - // Creates a new shuffling cache for testing - fn new_shuffling_cache() -> ShufflingCache { - let current_epoch = 8; - let head_shuffling_ids = BlockShufflingIds { - current: shuffling_id(current_epoch), - next: shuffling_id(current_epoch + 1), - previous: Some(shuffling_id(current_epoch - 1)), - block_root: Hash256::from_low_u64_le(0), - }; - let logger = null_logger().unwrap(); - ShufflingCache::new(TEST_CACHE_SIZE, head_shuffling_ids, logger) - } - - /// Returns two different committee caches for testing. - fn committee_caches() -> (Arc, Arc) { - let harness = BeaconChainHarness::builder(MinimalEthSpec) - .default_spec() - .deterministic_keypairs(8) - .fresh_ephemeral_store() - .build(); - let (mut state, _) = harness.get_current_state_and_root(); - state - .build_committee_cache(RelativeEpoch::Current, &harness.chain.spec) - .unwrap(); - state - .build_committee_cache(RelativeEpoch::Next, &harness.chain.spec) - .unwrap(); - let committee_a = state - .committee_cache(RelativeEpoch::Current) - .unwrap() - .clone(); - let committee_b = state.committee_cache(RelativeEpoch::Next).unwrap().clone(); - assert!(committee_a != committee_b); - (committee_a, committee_b) - } - - /// Builds a deterministic but incoherent shuffling ID from a `u64`. - fn shuffling_id(id: u64) -> AttestationShufflingId { - AttestationShufflingId { - shuffling_epoch: id.into(), - shuffling_decision_block: Hash256::from_low_u64_be(id), - } - } - - #[test] - fn resolved_promise() { - let (committee_a, _) = committee_caches(); - let id_a = shuffling_id(1); - let mut cache = new_shuffling_cache(); - - // Create a promise. - let sender = cache.create_promise(id_a.clone()).unwrap(); - - // Retrieve the newly created promise. - let item = cache.get(&id_a).unwrap(); - assert!( - matches!(item, CacheItem::Promise(_)), - "the item should be a promise" - ); - - // Resolve the promise. - sender.send(committee_a.clone()); - - // Ensure the promise has been resolved. - let item = cache.get(&id_a).unwrap(); - assert!( - matches!(item, CacheItem::Complete(committee) if committee == committee_a), - "the promise should be resolved" - ); - assert_eq!(cache.len(), 1, "the cache should have one entry"); - } - - #[test] - fn unresolved_promise() { - let id_a = shuffling_id(1); - let mut cache = new_shuffling_cache(); - - // Create a promise. - let sender = cache.create_promise(id_a.clone()).unwrap(); - - // Retrieve the newly created promise. - let item = cache.get(&id_a).unwrap(); - assert!( - matches!(item, CacheItem::Promise(_)), - "the item should be a promise" - ); - - // Drop the sender without resolving the promise, simulating an error computing the - // committee. - drop(sender); - - // Ensure the key now indicates an empty slot. - assert!(cache.get(&id_a).is_none(), "the slot should be empty"); - assert!(cache.is_empty(), "the cache should be empty"); - } - - #[test] - fn two_promises() { - let (committee_a, committee_b) = committee_caches(); - let (id_a, id_b) = (shuffling_id(1), shuffling_id(2)); - let mut cache = new_shuffling_cache(); - - // Create promise A. - let sender_a = cache.create_promise(id_a.clone()).unwrap(); - - // Retrieve promise A. - let item = cache.get(&id_a).unwrap(); - assert!( - matches!(item, CacheItem::Promise(_)), - "item a should be a promise" - ); - - // Create promise B. - let sender_b = cache.create_promise(id_b.clone()).unwrap(); - - // Retrieve promise B. - let item = cache.get(&id_b).unwrap(); - assert!( - matches!(item, CacheItem::Promise(_)), - "item b should be a promise" - ); - - // Resolve promise A. - sender_a.send(committee_a.clone()); - // Ensure promise A has been resolved. - let item = cache.get(&id_a).unwrap(); - assert!( - matches!(item, CacheItem::Complete(committee) if committee == committee_a), - "promise A should be resolved" - ); - - // Resolve promise B. - sender_b.send(committee_b.clone()); - // Ensure promise B has been resolved. - let item = cache.get(&id_b).unwrap(); - assert!( - matches!(item, CacheItem::Complete(committee) if committee == committee_b), - "promise B should be resolved" - ); - - // Check both entries again. - assert!( - matches!(cache.get(&id_a).unwrap(), CacheItem::Complete(committee) if committee == committee_a), - "promise A should remain resolved" - ); - assert!( - matches!(cache.get(&id_b).unwrap(), CacheItem::Complete(committee) if committee == committee_b), - "promise B should remain resolved" - ); - assert_eq!(cache.len(), 2, "the cache should have two entries"); - } - - #[test] - fn too_many_promises() { - let mut cache = new_shuffling_cache(); - - for i in 0..cache.max_concurrent_promises() { - cache.create_promise(shuffling_id(i as u64)).unwrap(); - } - - // Ensure that the next promise returns an error. It is important for the application to - // dump his ass when he can't keep his promises, you're a queen and you deserve better. - assert!(matches!( - cache.create_promise(shuffling_id(cache.max_concurrent_promises() as u64)), - Err(PromiseCacheError::MaxConcurrentPromises(n)) - if n == cache.max_concurrent_promises() - )); - assert_eq!( - cache.len(), - cache.max_concurrent_promises(), - "the cache should have two entries" - ); - } - - #[test] - fn should_insert_committee_cache() { - let mut cache = new_shuffling_cache(); - let id_a = shuffling_id(1); - let committee_cache_a = Arc::new(CommitteeCache::default()); - cache.insert_value(id_a.clone(), &committee_cache_a); - assert!( - matches!(cache.get(&id_a).unwrap(), CacheItem::Complete(committee_cache) if committee_cache == committee_cache_a), - "should insert committee cache" - ); - } - - #[test] - fn should_prune_committee_cache_with_lowest_epoch() { - let mut cache = new_shuffling_cache(); - let shuffling_id_and_committee_caches = (0..(TEST_CACHE_SIZE + 1)) - .map(|i| (shuffling_id(i as u64), Arc::new(CommitteeCache::default()))) - .collect::>(); - - for (shuffling_id, committee_cache) in shuffling_id_and_committee_caches.iter() { - cache.insert_value(shuffling_id.clone(), committee_cache); - } - - for i in 1..(TEST_CACHE_SIZE + 1) { - assert!( - cache.contains(&shuffling_id_and_committee_caches.get(i).unwrap().0), - "should contain recent epoch shuffling ids" - ); - } - - assert!( - !cache.contains(&shuffling_id_and_committee_caches.get(0).unwrap().0), - "should not contain oldest epoch shuffling id" - ); - assert_eq!(cache.len(), TEST_CACHE_SIZE, "should limit cache size"); - } - - #[test] - fn should_retain_head_state_shufflings() { - let mut cache = new_shuffling_cache(); - let current_epoch = 10; - let committee_cache = Arc::new(CommitteeCache::default()); - - // Insert a few entries for next the epoch with different decision roots. - for i in 0..TEST_CACHE_SIZE { - let shuffling_id = AttestationShufflingId { - shuffling_epoch: (current_epoch + 1).into(), - shuffling_decision_block: Hash256::from_low_u64_be(current_epoch + i as u64), - }; - cache.insert_value(shuffling_id, &committee_cache); - } - - // Now, update the head shuffling ids - let head_shuffling_ids = BlockShufflingIds { - current: shuffling_id(current_epoch), - next: shuffling_id(current_epoch + 1), - previous: Some(shuffling_id(current_epoch - 1)), - block_root: Hash256::from_low_u64_le(42), - }; - cache.update_protector(head_shuffling_ids.clone()); - - // Insert head state shuffling ids. Should not be overridden by other shuffling ids. - cache.insert_value(head_shuffling_ids.current.clone(), &committee_cache); - cache.insert_value(head_shuffling_ids.next.clone(), &committee_cache); - cache.insert_value( - head_shuffling_ids.previous.clone().unwrap(), - &committee_cache, - ); - - // Insert a few entries for older epochs. - for i in 0..TEST_CACHE_SIZE { - let shuffling_id = AttestationShufflingId { - shuffling_epoch: Epoch::from(i), - shuffling_decision_block: Hash256::from_low_u64_be(i as u64), - }; - cache.insert_value(shuffling_id, &committee_cache); - } - - assert!( - cache.contains(&head_shuffling_ids.current), - "should retain head shuffling id for the current epoch." - ); - assert!( - cache.contains(&head_shuffling_ids.next), - "should retain head shuffling id for the next epoch." - ); - assert!( - cache.contains(&head_shuffling_ids.previous.unwrap()), - "should retain head shuffling id for previous epoch." - ); - assert_eq!(cache.len(), TEST_CACHE_SIZE, "should limit cache size"); - } -} diff --git a/consensus/types/src/shuffling_id.rs b/consensus/types/src/shuffling_id.rs index a5bdc866733..a4dcecfbd9c 100644 --- a/consensus/types/src/shuffling_id.rs +++ b/consensus/types/src/shuffling_id.rs @@ -51,3 +51,39 @@ impl AttestationShufflingId { } } } + +/// Contains the shuffling IDs for a beacon block. +#[derive(Debug, Clone)] +pub struct BlockShufflingIds { + pub current: AttestationShufflingId, + pub next: AttestationShufflingId, + pub previous: Option, + pub block_root: Hash256, +} + +impl BlockShufflingIds { + /// Returns the shuffling ID for the given epoch. + /// + /// Returns `None` if `epoch` is prior to `self.previous?.shuffling_epoch` or + /// `self.current.shuffling_epoch` (if `previous` is `None`). + pub fn id_for_epoch(&self, epoch: Epoch) -> Option { + if epoch == self.current.shuffling_epoch { + Some(self.current.clone()) + } else if self + .previous + .as_ref() + .map_or(false, |id| id.shuffling_epoch == epoch) + { + self.previous.clone() + } else if epoch == self.next.shuffling_epoch { + Some(self.next.clone()) + } else if epoch > self.next.shuffling_epoch { + Some(AttestationShufflingId::from_components( + epoch, + self.block_root, + )) + } else { + None + } + } +} diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 3036afb69f4..2c6e6001d1c 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -148,26 +148,6 @@ fn disable_lock_timeouts_flag() { .with_config(|config| assert!(!config.chain.enable_lock_timeouts)); } -#[test] -fn shuffling_cache_default() { - CommandLineTest::new() - .run_with_zero_port() - .with_config(|config| { - assert_eq!( - config.chain.shuffling_cache_size, - beacon_node::beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE - ) - }); -} - -#[test] -fn shuffling_cache_set() { - CommandLineTest::new() - .flag("shuffling-cache-size", Some("500")) - .run_with_zero_port() - .with_config(|config| assert_eq!(config.chain.shuffling_cache_size, 500)); -} - #[test] fn fork_choice_before_proposal_timeout_default() { CommandLineTest::new() From 60bbd60f3ba32bee59bc0c07c6264fdaeeca10d9 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Fri, 23 Feb 2024 19:16:38 +0100 Subject: [PATCH 5/7] more cleanup and comments --- Cargo.lock | 4 --- beacon_node/beacon_chain/Cargo.toml | 1 - common/promise_cache/Cargo.toml | 3 -- common/promise_cache/src/lib.rs | 43 +++++++++++++++++++++++++++-- 4 files changed, 41 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f34e5c6a7d2..f25fea2a6bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -571,7 +571,6 @@ dependencies = [ "merkle_proof", "operation_pool", "parking_lot 0.12.1", - "promise_cache", "proto_array", "rand 0.8.5", "rayon", @@ -6195,11 +6194,8 @@ dependencies = [ name = "promise_cache" version = "0.1.0" dependencies = [ - "derivative", - "itertools", "oneshot_broadcast", "parking_lot 0.12.1", - "slog", ] [[package]] diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index aa169f663dc..4719a94a126 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -45,7 +45,6 @@ lru = { workspace = true } merkle_proof = { workspace = true } operation_pool = { workspace = true } parking_lot = { workspace = true } -promise_cache = { path = "../../common/promise_cache" } proto_array = { workspace = true } rand = { workspace = true } rayon = { workspace = true } diff --git a/common/promise_cache/Cargo.toml b/common/promise_cache/Cargo.toml index d1946d0886b..e27b193537c 100644 --- a/common/promise_cache/Cargo.toml +++ b/common/promise_cache/Cargo.toml @@ -4,8 +4,5 @@ version = "0.1.0" edition.workspace = true [dependencies] -derivative = { workspace = true } oneshot_broadcast = { path = "../oneshot_broadcast" } -itertools = { workspace = true } -slog = { workspace = true } parking_lot = { workspace = true } diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs index ff80415d900..e5034c1e11a 100644 --- a/common/promise_cache/src/lib.rs +++ b/common/promise_cache/src/lib.rs @@ -1,8 +1,21 @@ +//! A cache to avoid redundant computation +//! +//! Cached values (such as states) have to reprocessed (e.g. loaded from disk) if they are not +//! present in their cache. After that, they are added to their cache so that this computation is +//! not needed if there is further need for that value. However, during the necessary computation +//! other threads may also require that value and start computing it, causing additional CPU load +//! and adding unnecessary latency for that second thread. +//! +//! This crate offers the [`PromiseCache`], which does not cache values, but computations for those +//! values (identified by some key), allowing additional threads to simply wait for already ongoing +//! computations instead of needlessly also running that computation. Refer to [`PromiseCache`] for +//! usage instructions. use oneshot_broadcast::{oneshot, Receiver}; use parking_lot::Mutex; use std::collections::HashMap; use std::hash::Hash; +/// Caches computation of a value `V` identified by a key `K`. #[derive(Debug)] pub struct PromiseCache where @@ -12,8 +25,13 @@ where cache: Mutex>>>, } +/// Returned by [`PromiseCache::get_or_compute`] when a computation fails. pub enum PromiseCacheError { + /// The computation failed because the passed closure returned an error. For the first thread, + /// the `Option` will contain the error. As errors are often not clonable, all other threads + /// will only receive `None` to avoid `E` having to be `Clone`. Error(Option), + /// The computation failed because the passed closure panicked. Panic, } @@ -28,6 +46,24 @@ where } } + /// Compute a value for the specified key or wait for an already ongoing computation. + /// + /// If the closure is successful, the computed value is returned. Otherwise, a + /// [`PromiseCacheError`] is returned. + /// + /// The result values are not retained: as soon as the first thread has returned, new threads + /// will recompute the value again. Therefore, you should store the resulting value in another + /// cache, so that threads that are just a bit too late can still use the value computed herein. + /// + /// It is possible (and in some cases, advisable) to provide different closures at different + /// code locations for the same `PromiseCache`: If computation is easier in some contexts, + /// other threads also may also benefit from that. However, if a thread calls `get_or_compute` + /// with a "fast" closure while computation is already in progress with a "slow" closure, that + /// thread may wait longer than it would have by simply using its "fast" closure. This is + /// unavoidable as we can not compute the complexity of closures. + /// + /// NOTE: do not hold any locks while calling this function! Lock necessary locks within the + /// passed closure instead. pub fn get_or_compute(&self, key: &K, computation: F) -> Result> where F: FnOnce() -> Result, @@ -37,7 +73,6 @@ where Some(item) => { let item = item.clone(); drop(cache); - println!("*********** PROMISE CACHE HIT ************"); item.recv() .map_err(|_| PromiseCacheError::Panic) .and_then(|res| res.map_err(|_| PromiseCacheError::Error(None))) @@ -46,7 +81,6 @@ where let (sender, receiver) = oneshot(); cache.insert(key.clone(), receiver); drop(cache); - println!("*********** PROMISE CACHE MISS ************"); match computation() { Ok(value) => { sender.send(Ok(value)); @@ -54,9 +88,14 @@ where .cache .lock() .remove(key) + // PANIC: should not happen, as the insert and remove is guarded so that + // no two threads will end up removing the same key. .expect("value has vanished") .recv() + // PANIC: can not happen, as we just above sent the value instead of + // dropping the sender without sending one. .expect("we sent the value") + // PANIC: can not happen: the result is `Ok`, as you can see above. .expect("we sent a success")) } Err(err) => { From 537ead4e5b9ee02dcf44347bd7a51af123754b17 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Mon, 26 Feb 2024 18:59:00 +0100 Subject: [PATCH 6/7] cache HDiffBuffers Arc'd to prevent large clones --- beacon_node/store/src/hdiff.rs | 6 ++ beacon_node/store/src/hot_cold_store.rs | 98 ++++++++++++------------- 2 files changed, 53 insertions(+), 51 deletions(-) diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index b831df5da04..a1f2bc48316 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -94,6 +94,12 @@ impl HDiffBuffer { *state.balances_mut() = VList::new(self.balances).unwrap(); Ok(state) } + + pub fn as_state(&self, spec: &ChainSpec) -> Result, Error> { + let mut state = BeaconState::from_ssz_bytes(&self.state, spec).unwrap(); + *state.balances_mut() = VList::try_from_iter(self.balances.iter().copied()).unwrap(); + Ok(state) + } } impl HDiff { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 8885b605ec0..5abd4117b98 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -79,7 +79,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// /// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required. state_cache: Mutex>, - hot_computation_cache: PromiseCache>>, + state_promise_cache: PromiseCache>>, /// Immutable validator cache. pub immutable_validators: Arc>>, /// LRU cache of replayed states. @@ -87,8 +87,8 @@ pub struct HotColdDB, Cold: ItemStore> { #[allow(dead_code)] historic_state_cache: Mutex>>, /// Cache of hierarchical diff buffers. - diff_buffer_cache: Mutex>, - diff_buffer_computation_cache: PromiseCache, + diff_buffer_cache: Mutex>>, + diff_buffer_promise_cache: PromiseCache)>, /// Chain spec. pub(crate) spec: ChainSpec, /// Logger. @@ -214,11 +214,11 @@ impl HotColdDB, MemoryStore> { hot_db: MemoryStore::open(), block_cache: Mutex::new(BlockCache::new(block_cache_size)), state_cache: Mutex::new(StateCache::new(state_cache_size)), - hot_computation_cache: PromiseCache::new(), + state_promise_cache: PromiseCache::new(), immutable_validators: Arc::new(RwLock::new(Default::default())), historic_state_cache: Mutex::new(LruCache::new(historic_state_cache_size)), diff_buffer_cache: Mutex::new(LruCache::new(diff_buffer_cache_size)), - diff_buffer_computation_cache: PromiseCache::new(), + diff_buffer_promise_cache: PromiseCache::new(), config, hierarchy, spec, @@ -262,11 +262,11 @@ impl HotColdDB, LevelDB> { hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(BlockCache::new(block_cache_size)), state_cache: Mutex::new(StateCache::new(state_cache_size)), - hot_computation_cache: PromiseCache::new(), + state_promise_cache: PromiseCache::new(), immutable_validators: Arc::new(RwLock::new(Default::default())), historic_state_cache: Mutex::new(LruCache::new(historic_state_cache_size)), diff_buffer_cache: Mutex::new(LruCache::new(diff_buffer_cache_size)), - diff_buffer_computation_cache: PromiseCache::new(), + diff_buffer_promise_cache: PromiseCache::new(), config, hierarchy, spec, @@ -1333,7 +1333,7 @@ impl, Cold: ItemStore> HotColdDB "state_root" => ?state_root, ); - Ok(self.hot_computation_cache.get_or_compute(state_root, || { + Ok(self.state_promise_cache.get_or_compute(state_root, || { let state_from_disk = self.load_hot_state(state_root)?; if let Some((state, block_root)) = state_from_disk { @@ -1825,7 +1825,7 @@ impl, Cold: ItemStore> HotColdDB /// Will reconstruct the state if it lies between restore points. pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result>, Error> { let (base_slot, hdiff_buffer) = self.load_hdiff_buffer_for_slot(slot)?; - let base_state = hdiff_buffer.into_state(&self.spec)?; + let base_state = hdiff_buffer.as_state(&self.spec)?; debug_assert_eq!(base_slot, base_state.slot()); if base_state.slot() == slot { @@ -1857,7 +1857,7 @@ impl, Cold: ItemStore> HotColdDB /// Returns `HDiffBuffer` for the specified slot, or `HDiffBuffer` for the `ReplayFrom` slot if /// the diff for the specified slot is not stored. - fn load_hdiff_buffer_for_slot(&self, slot: Slot) -> Result<(Slot, HDiffBuffer), Error> { + fn load_hdiff_buffer_for_slot(&self, slot: Slot) -> Result<(Slot, Arc), Error> { if let Some(buffer) = self.diff_buffer_cache.lock().get(&slot) { debug!( self.log, @@ -1867,51 +1867,47 @@ impl, Cold: ItemStore> HotColdDB return Ok((slot, buffer.clone())); } - Ok(self - .diff_buffer_computation_cache - .get_or_compute(&slot, || { - // Load buffer for the previous state. - // This amount of recursion (<10 levels) should be OK. - let t = std::time::Instant::now(); - let (_buffer_slot, mut buffer) = match self.hierarchy.storage_strategy(slot)? { - // Base case. - StorageStrategy::Snapshot => { - let state = self - .load_cold_state_as_snapshot(slot)? - .ok_or(Error::MissingSnapshot(slot))?; - let buffer = HDiffBuffer::from_state(state); - - self.diff_buffer_cache.lock().put(slot, buffer.clone()); - debug!( - self.log, - "Added diff buffer to cache"; - "load_time_ms" => t.elapsed().as_millis(), - "slot" => slot - ); + Ok(self.diff_buffer_promise_cache.get_or_compute(&slot, || { + // Load buffer for the previous state. + // This amount of recursion (<10 levels) should be OK. + let t = std::time::Instant::now(); + let (_buffer_slot, mut buffer) = match self.hierarchy.storage_strategy(slot)? { + // Base case. + StorageStrategy::Snapshot => { + let state = self + .load_cold_state_as_snapshot(slot)? + .ok_or(Error::MissingSnapshot(slot))?; + let buffer = Arc::new(HDiffBuffer::from_state(state)); + + self.diff_buffer_cache.lock().put(slot, buffer.clone()); + debug!( + self.log, + "Added diff buffer to cache"; + "load_time_ms" => t.elapsed().as_millis(), + "slot" => slot + ); - return Ok((slot, buffer)); - } - // Recursive case. - StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_slot(from)?, - StorageStrategy::ReplayFrom(from) => { - return self.load_hdiff_buffer_for_slot(from) - } - }; + return Ok((slot, buffer)); + } + // Recursive case. + StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_slot(from)?, + StorageStrategy::ReplayFrom(from) => return self.load_hdiff_buffer_for_slot(from), + }; - // Load diff and apply it to buffer. - let diff = self.load_hdiff_for_slot(slot)?; - diff.apply(&mut buffer)?; + // Load diff and apply it to buffer. + let diff = self.load_hdiff_for_slot(slot)?; + diff.apply(Arc::make_mut(&mut buffer))?; - self.diff_buffer_cache.lock().put(slot, buffer.clone()); - debug!( - self.log, - "Added diff buffer to cache"; - "load_time_ms" => t.elapsed().as_millis(), - "slot" => slot - ); + self.diff_buffer_cache.lock().put(slot, buffer.clone()); + debug!( + self.log, + "Added diff buffer to cache"; + "load_time_ms" => t.elapsed().as_millis(), + "slot" => slot + ); - Ok((slot, buffer)) - })?) + Ok((slot, buffer)) + })?) } /// Load cold blocks between `start_slot` and `end_slot` inclusive. From a400a0f549d2c778a0e2c75b3637b4b86baec1d4 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Thu, 29 Feb 2024 20:39:30 +0100 Subject: [PATCH 7/7] code review --- beacon_node/store/src/hdiff.rs | 29 ++++++++++++++++++++----- beacon_node/store/src/hot_cold_store.rs | 4 ++-- common/promise_cache/src/lib.rs | 27 ++++------------------- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index de0ec8048c1..6d722249619 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -97,9 +97,17 @@ impl HDiffBuffer { pub fn as_state(&self, spec: &ChainSpec) -> Result, Error> { let mut state = BeaconState::from_ssz_bytes(&self.state, spec).unwrap(); - *state.balances_mut() = VList::try_from_iter(self.balances.iter().copied()).unwrap(); + *state.balances_mut() = List::try_from_iter(self.balances.iter().copied()).unwrap(); Ok(state) } + + pub fn state(&self) -> &[u8] { + &self.state + } + + pub fn balances(&self) -> &[u64] { + &self.balances + } } impl HDiff { @@ -121,6 +129,21 @@ impl HDiff { Ok(()) } + pub fn apply_to_parts( + &self, + state: &[u8], + mut balances: Vec, + ) -> Result { + let mut target_state = vec![]; + self.state_diff.apply(state, &mut target_state)?; + + self.balances_diff.apply(&mut balances)?; + Ok(HDiffBuffer { + state: target_state, + balances, + }) + } + pub fn state_diff_len(&self) -> usize { self.state_diff.bytes.len() } @@ -156,10 +179,6 @@ impl BytesDiff { } pub fn apply(&self, source: &[u8], target: &mut Vec) -> Result<(), Error> { - self.apply_xdelta(source, target) - } - - pub fn apply_xdelta(&self, source: &[u8], target: &mut Vec) -> Result<(), Error> { *target = xdelta3::decode(&self.bytes, source).ok_or(Error::UnableToApplyDiff)?; Ok(()) } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 7cacd78247d..d5aef26f0f4 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1871,7 +1871,7 @@ impl, Cold: ItemStore> HotColdDB // Load buffer for the previous state. // This amount of recursion (<10 levels) should be OK. let t = std::time::Instant::now(); - let (_buffer_slot, mut buffer) = match self.hierarchy.storage_strategy(slot)? { + let (_buffer_slot, buffer) = match self.hierarchy.storage_strategy(slot)? { // Base case. StorageStrategy::Snapshot => { let state = self @@ -1896,7 +1896,7 @@ impl, Cold: ItemStore> HotColdDB // Load diff and apply it to buffer. let diff = self.load_hdiff_for_slot(slot)?; - diff.apply(Arc::make_mut(&mut buffer))?; + let buffer = Arc::new(diff.apply_to_parts(buffer.state(), buffer.balances().to_vec())?); self.diff_buffer_cache.lock().put(slot, buffer.clone()); debug!( diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs index e5034c1e11a..acd93efe940 100644 --- a/common/promise_cache/src/lib.rs +++ b/common/promise_cache/src/lib.rs @@ -81,29 +81,10 @@ where let (sender, receiver) = oneshot(); cache.insert(key.clone(), receiver); drop(cache); - match computation() { - Ok(value) => { - sender.send(Ok(value)); - Ok(self - .cache - .lock() - .remove(key) - // PANIC: should not happen, as the insert and remove is guarded so that - // no two threads will end up removing the same key. - .expect("value has vanished") - .recv() - // PANIC: can not happen, as we just above sent the value instead of - // dropping the sender without sending one. - .expect("we sent the value") - // PANIC: can not happen: the result is `Ok`, as you can see above. - .expect("we sent a success")) - } - Err(err) => { - sender.send(Err(())); - self.cache.lock().remove(key); - Err(PromiseCacheError::Error(Some(err))) - } - } + let result = computation(); + sender.send(result.as_ref().cloned().map_err(|_| ())); + self.cache.lock().remove(key); + result.map_err(|e| PromiseCacheError::Error(Some(e))) } } }