Skip to content

Commit

Permalink
Resolve TODO for observed times with data columns
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Jan 23, 2025
1 parent a1b7d61 commit 7153ecf
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 48 deletions.
32 changes: 22 additions & 10 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3011,6 +3011,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob()));
self.observe_data_for_block_import(block_root, blob.slot());

let r = self.check_gossip_blob_availability_and_import(blob).await;
self.remove_notified(&block_root, r)
Expand Down Expand Up @@ -3044,6 +3045,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::DuplicateFullyImported(block_root));
}

self.observe_data_for_block_import(block_root, slot);

let r = self
.check_gossip_data_columns_availability_and_import(
slot,
Expand Down Expand Up @@ -3091,6 +3094,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
self.observe_data_for_block_import(block_root, slot);

let r = self
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
Expand Down Expand Up @@ -3122,6 +3126,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));

self.observe_data_for_block_import(block_root, slot);

let r = self
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
.await;
Expand Down Expand Up @@ -3149,6 +3155,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

/// Record seeing block data (blob / data column) required for block import (excludes data
/// columns for sampling).
fn observe_data_for_block_import(&self, block_root: Hash256, slot: Slot) {
self.block_times_cache
.write()
.set_time_data_for_import_observed(block_root, slot, timestamp_now());
}

/// Cache the columns in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_rpc_custody_columns(
Expand Down Expand Up @@ -3189,6 +3203,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

self.observe_data_for_block_import(block_root, slot);

let r = self
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await;
Expand Down Expand Up @@ -3681,16 +3697,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
data_column_recv,
} = import_data;

// Record the time at which this block's blobs became available.
if let Some(blobs_available) = block.blobs_available_timestamp() {
self.block_times_cache.write().set_time_blob_observed(
block_root,
block.slot(),
blobs_available,
);
}

// TODO(das) record custody column available timestamp
// Record the time at which this block is considered available.
self.block_times_cache.write().set_time_available(
block_root,
block.slot(),
block.available_timestamp(),
);

// import
let chain = self.clone();
Expand Down
52 changes: 30 additions & 22 deletions beacon_node/beacon_chain/src/block_times_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ type BlockRoot = Hash256;
#[derive(Clone, Default)]
pub struct Timestamps {
pub observed: Option<Duration>,
pub all_blobs_observed: Option<Duration>,
pub all_data_for_import_observed: Option<Duration>,
pub consensus_verified: Option<Duration>,
pub started_execution: Option<Duration>,
pub available: Option<Duration>,
pub executed: Option<Duration>,
pub attestable: Option<Duration>,
pub imported: Option<Duration>,
Expand All @@ -32,15 +33,13 @@ pub struct Timestamps {
pub struct BlockDelays {
/// Time after start of slot we saw the block.
pub observed: Option<Duration>,
/// The time after the start of the slot we saw all blobs.
pub all_blobs_observed: Option<Duration>,
/// The time after the start of the slot we saw all blobs / data columns.
pub all_data_for_import_observed: Option<Duration>,
/// The time it took to complete consensus verification of the block.
pub consensus_verification_time: Option<Duration>,
/// The time it took to complete execution verification of the block.
pub execution_time: Option<Duration>,
/// The delay from the start of the slot before the block became available
///
/// Equal to max(`observed + execution_time`, `all_blobs_observed`).
pub available: Option<Duration>,
/// Time after `available`.
pub attestable: Option<Duration>,
Expand All @@ -59,34 +58,34 @@ impl BlockDelays {
let observed = times
.observed
.and_then(|observed_time| observed_time.checked_sub(slot_start_time));
let all_blobs_observed = times
.all_blobs_observed
.and_then(|all_blobs_observed| all_blobs_observed.checked_sub(slot_start_time));
let all_data_for_import_observed =
times
.all_data_for_import_observed
.and_then(|all_data_for_import_observed| {
all_data_for_import_observed.checked_sub(slot_start_time)
});
let consensus_verification_time = times
.consensus_verified
.and_then(|consensus_verified| consensus_verified.checked_sub(times.observed?));
let execution_time = times
.executed
.and_then(|executed| executed.checked_sub(times.started_execution?));
// Duration since UNIX epoch at which block became available.
let available_time = times
.executed
.map(|executed| std::cmp::max(executed, times.all_blobs_observed.unwrap_or_default()));
// Duration from the start of the slot until the block became available.
let available_delay =
available_time.and_then(|available_time| available_time.checked_sub(slot_start_time));
let available_delay = times
.available
.and_then(|available_time| available_time.checked_sub(slot_start_time));
let attestable = times
.attestable
.and_then(|attestable_time| attestable_time.checked_sub(slot_start_time));
let imported = times
.imported
.and_then(|imported_time| imported_time.checked_sub(available_time?));
.and_then(|imported_time| imported_time.checked_sub(times.available?));
let set_as_head = times
.set_as_head
.and_then(|set_as_head_time| set_as_head_time.checked_sub(times.imported?));
BlockDelays {
observed,
all_blobs_observed,
all_data_for_import_observed,
consensus_verification_time,
execution_time,
available: available_delay,
Expand Down Expand Up @@ -157,25 +156,25 @@ impl BlockTimesCache {
}
}

pub fn set_time_blob_observed(
pub fn set_time_data_for_import_observed(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
) {
// Unlike other functions in this file, we update the blob observed time only if it is
// *greater* than existing blob observation times. This allows us to know the observation
// time of the last blob to arrive.
// Unlike other functions in this file, we update the data observed time only if it is
// *greater* than existing data observation times. This allows us to know the observation
// time of the last data to arrive.
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.all_blobs_observed
.all_data_for_import_observed
.map_or(true, |prev| timestamp > prev)
{
block_times.timestamps.all_blobs_observed = Some(timestamp);
block_times.timestamps.all_data_for_import_observed = Some(timestamp);
}
}

Expand Down Expand Up @@ -237,6 +236,15 @@ impl BlockTimesCache {
)
}

pub fn set_time_available(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.available,
timestamp,
)
}

pub fn set_time_attestable(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
self.set_time_if_less(
block_root,
Expand Down
10 changes: 5 additions & 5 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1361,12 +1361,12 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
.as_millis() as i64,
);

// The time from the start of the slot when all blobs have been observed. Technically this
// is the time we last saw a blob related to this block/slot.
// The time from the start of the slot when all blobs / custody data columns have been
// observed. Technically this is the time we last saw a blob related to this block/slot.
metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_ALL_OBSERVED_SLOT_START,
block_delays
.all_blobs_observed
.all_data_for_import_observed
.unwrap_or_else(|| Duration::from_secs(0))
.as_millis() as i64,
);
Expand Down Expand Up @@ -1441,7 +1441,7 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
"slot" => head_block_slot,
"total_delay_ms" => block_delay_total.as_millis(),
"observed_delay_ms" => format_delay(&block_delays.observed),
"blob_delay_ms" => format_delay(&block_delays.all_blobs_observed),
"data_delay_ms" => format_delay(&block_delays.all_data_for_import_observed),
"consensus_time_ms" => format_delay(&block_delays.consensus_verification_time),
"execution_time_ms" => format_delay(&block_delays.execution_time),
"available_delay_ms" => format_delay(&block_delays.available),
Expand All @@ -1458,7 +1458,7 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
"slot" => head_block_slot,
"total_delay_ms" => block_delay_total.as_millis(),
"observed_delay_ms" => format_delay(&block_delays.observed),
"blob_delay_ms" => format_delay(&block_delays.all_blobs_observed),
"data_delay_ms" => format_delay(&block_delays.all_data_for_import_observed),
"consensus_time_ms" => format_delay(&block_delays.consensus_verification_time),
"execution_time_ms" => format_delay(&block_delays.execution_time),
"available_delay_ms" => format_delay(&block_delays.available),
Expand Down
21 changes: 11 additions & 10 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::block_verification_types::{
use crate::data_availability_checker::overflow_lru_cache::{
DataAvailabilityCheckerInner, ReconstructColumnsDecision,
};
use crate::validator_monitor::timestamp_now;
use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore};
use kzg::Kzg;
use slog::{debug, error, Logger};
Expand Down Expand Up @@ -350,8 +351,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
block,
blobs,
blobs_available_timestamp: None,
data_columns: None,
available_timestamp: timestamp_now(),
spec: self.spec.clone(),
}))
} else {
Expand All @@ -370,13 +371,13 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
block,
blobs: None,
blobs_available_timestamp: None,
data_columns: Some(
data_column_list
.into_iter()
.map(|d| d.clone_arc())
.collect(),
),
available_timestamp: timestamp_now(),
spec: self.spec.clone(),
}))
} else {
Expand All @@ -388,8 +389,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
block,
blobs: None,
blobs_available_timestamp: None,
data_columns: None,
available_timestamp: timestamp_now(),
spec: self.spec.clone(),
}))
}
Expand Down Expand Up @@ -445,8 +446,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
block,
blobs,
blobs_available_timestamp: None,
data_columns: None,
available_timestamp: timestamp_now(),
spec: self.spec.clone(),
})
} else {
Expand All @@ -461,7 +462,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
data_columns: data_columns.map(|data_columns| {
data_columns.into_iter().map(|d| d.into_inner()).collect()
}),
blobs_available_timestamp: None,
available_timestamp: timestamp_now(),
spec: self.spec.clone(),
})
} else {
Expand All @@ -473,7 +474,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block,
blobs: None,
data_columns: None,
blobs_available_timestamp: None,
available_timestamp: timestamp_now(),
spec: self.spec.clone(),
})
};
Expand Down Expand Up @@ -750,7 +751,7 @@ pub struct AvailableBlock<E: EthSpec> {
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
/// Timestamp at which this block first became available (UNIX timestamp, time since 1970).
blobs_available_timestamp: Option<Duration>,
available_timestamp: Duration,
pub spec: Arc<ChainSpec>,
}

Expand All @@ -767,7 +768,7 @@ impl<E: EthSpec> AvailableBlock<E> {
block,
blobs,
data_columns,
blobs_available_timestamp: None,
available_timestamp: timestamp_now(),
spec,
}
}
Expand All @@ -783,8 +784,8 @@ impl<E: EthSpec> AvailableBlock<E> {
self.blobs.as_ref()
}

pub fn blobs_available_timestamp(&self) -> Option<Duration> {
self.blobs_available_timestamp
pub fn available_timestamp(&self) -> Duration {
self.available_timestamp
}

pub fn data_columns(&self) -> Option<&DataColumnSidecarList<E>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::block_verification_types::{
};
use crate::data_availability_checker::{Availability, AvailabilityCheckError};
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
use crate::validator_monitor::timestamp_now;
use crate::BeaconChainTypes;
use lru::LruCache;
use parking_lot::RwLock;
Expand Down Expand Up @@ -332,7 +333,7 @@ impl<E: EthSpec> PendingComponents<E> {
block,
blobs,
data_columns,
blobs_available_timestamp,
available_timestamp: timestamp_now(),
spec: spec.clone(),
};
Ok(Availability::Available(Box::new(
Expand Down

0 comments on commit 7153ecf

Please sign in to comment.