Skip to content

Commit

Permalink
Merge of #6862
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Feb 11, 2025
2 parents ec2fe38 + 10f9f10 commit 7592df1
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 55 deletions.
3 changes: 1 addition & 2 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2974,10 +2974,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Only completed sampling results are received. Blocks are unavailable by default and should
/// be pruned on finalization, on a timeout or by a max count.
pub async fn process_sampling_completed(self: &Arc<Self>, block_root: Hash256) {
// TODO(das): update fork-choice
// TODO(das): update fork-choice, act on sampling result, adjust log level
// NOTE: It is possible that sampling complets before block is imported into fork choice,
// in that case we may need to update availability cache.
// TODO(das): These log levels are too high, reduce once DAS matures
info!(self.log, "Sampling completed"; "block_root" => %block_root);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,6 @@ impl<E: EthSpec> PendingComponents<E> {
self.get_cached_blobs().iter().flatten().count()
}

/// Checks if a data column of a given index exists in the cache.
///
/// Returns:
/// - `true` if a data column for the given index exists.
/// - `false` otherwise.
fn data_column_exists(&self, data_column_index: u64) -> bool {
self.get_cached_data_column(data_column_index).is_some()
}

/// Returns the number of data columns that have been received and are stored in the cache.
pub fn num_received_data_columns(&self) -> usize {
self.verified_data_columns.len()
Expand Down Expand Up @@ -182,8 +173,7 @@ impl<E: EthSpec> PendingComponents<E> {
kzg_verified_data_columns: I,
) -> Result<(), AvailabilityCheckError> {
for data_column in kzg_verified_data_columns {
// TODO(das): Add equivalent checks for data columns if necessary
if !self.data_column_exists(data_column.index()) {
if self.get_cached_data_column(data_column.index()).is_none() {
self.verified_data_columns.push(data_column);
}
}
Expand Down
20 changes: 6 additions & 14 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1477,22 +1477,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
return None;
}
Err(e @ BlockError::InternalError(_)) => {
// BlobNotRequired is unreachable. Only constructed in `process_gossip_blob`
Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => {
error!(self.log, "Internal block gossip validation error";
"error" => %e
);
return None;
}
Err(e @ BlockError::BlobNotRequired(_)) => {
// TODO(das): penalty not implemented yet as other clients may still send us blobs
// during early stage of implementation.
debug!(self.log, "Received blobs for slot after PeerDAS epoch from peer";
"error" => %e,
"peer_id" => %peer_id,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return None;
}
};

metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);
Expand Down Expand Up @@ -1603,9 +1594,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;

// TODO(das) Might be too early to issue a request here. We haven't checked that the block
// actually includes blob transactions and thus has data. A peer could send a block is
// garbage commitments, and make us trigger sampling for a block that does not have data.
// Note: okay to issue sampling request before the block is execution verified. If the
// proposer sends us a block with invalid blob transactions it can trigger us to issue
// sampling queries that will never resolve. This attack is equivalent to withholding data.
// Dismissed proposal to move this block to post-execution: https://github.com/sigp/lighthouse/pull/6492
if block.num_expected_blobs() > 0 {
// Trigger sampling for block not yet execution valid. At this point column custodials are
// unlikely to have received their columns. Triggering sampling so early is only viable with
Expand Down
24 changes: 23 additions & 1 deletion beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,31 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
_seen_timestamp: Duration,
seen_timestamp: Duration,
process_type: BlockProcessType,
) {
// custody_columns must always have at least one element
let Some(slot) = custody_columns.first().map(|d| d.slot()) else {
return;
};

if let Ok(current_slot) = self.chain.slot() {
if current_slot == slot {
let delay = get_slot_delay_ms(seen_timestamp, slot, &self.chain.slot_clock);
metrics::observe_duration(&metrics::BEACON_BLOB_RPC_SLOT_START_DELAY_TIME, delay);
}
}

let mut indices = custody_columns.iter().map(|d| d.index).collect::<Vec<_>>();
indices.sort_unstable();
debug!(
self.log,
"RPC custody data columns received";
"indices" => ?indices,
"block_root" => %block_root,
"slot" => %slot,
);

let mut result = self
.chain
.process_rpc_custody_columns(custody_columns)
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// continue_request will send for processing as the request state is AwaitingProcessing
}
Err(e) => {
// TODO(das): is it okay to not log the peer source of request failures? Then we
// should log individual requests failures in the SyncNetworkContext
// No need to log peer source here. When sending a DataColumnsByRoot request we log
// the peer and the request ID which is linked to this `id` value here.
debug!(self.log,
"Received lookup download failure";
"block_root" => ?block_root,
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1217,12 +1217,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
requester: CustodyRequester,
response: CustodyByRootResult<T::EthSpec>,
) {
// TODO(das): get proper timestamp
let seen_timestamp = timestamp_now();
self.block_lookups
.on_download_response::<CustodyRequestState<T::EthSpec>>(
requester.0,
response.map(|(columns, peer_group)| (columns, peer_group, seen_timestamp)),
response,
&mut self.network,
);
}
Expand Down
18 changes: 10 additions & 8 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ impl<T> RpcEvent<T> {

pub type RpcResponseResult<T> = Result<(T, Duration), RpcResponseError>;

pub type CustodyByRootResult<T> = Result<(DataColumnSidecarList<T>, PeerGroup), RpcResponseError>;
/// Duration = latest seen timestamp of all received data columns
pub type CustodyByRootResult<T> =
Result<(DataColumnSidecarList<T>, PeerGroup, Duration), RpcResponseError>;

#[derive(Debug)]
pub enum RpcResponseError {
Expand Down Expand Up @@ -1190,7 +1192,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to
// an Option first to use in an `if let Some() { act on result }` block.
match result.as_ref() {
Some(Ok((columns, peer_group))) => {
Some(Ok((columns, peer_group, _))) => {
debug!(self.log, "Custody request success, removing"; "id" => ?id, "count" => columns.len(), "peers" => ?peer_group)
}
Some(Err(e)) => {
Expand All @@ -1208,7 +1210,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
id: Id,
block_root: Hash256,
block: RpcBlock<T::EthSpec>,
duration: Duration,
seen_timestamp: Duration,
) -> Result<(), SendErrorProcessor> {
let beacon_processor = self
.beacon_processor_if_enabled()
Expand All @@ -1221,7 +1223,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.send_rpc_beacon_block(
block_root,
block,
duration,
seen_timestamp,
BlockProcessType::SingleBlock { id },
)
.map_err(|e| {
Expand All @@ -1239,7 +1241,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
id: Id,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
duration: Duration,
seen_timestamp: Duration,
) -> Result<(), SendErrorProcessor> {
let beacon_processor = self
.beacon_processor_if_enabled()
Expand All @@ -1252,7 +1254,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.send_rpc_blobs(
block_root,
blobs,
duration,
seen_timestamp,
BlockProcessType::SingleBlob { id },
)
.map_err(|e| {
Expand All @@ -1270,7 +1272,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
_id: Id,
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
duration: Duration,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> Result<(), SendErrorProcessor> {
let beacon_processor = self
Expand All @@ -1280,7 +1282,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
debug!(self.log, "Sending custody columns for processing"; "block" => ?block_root, "process_type" => ?process_type);

beacon_processor
.send_rpc_custody_columns(block_root, custody_columns, duration, process_type)
.send_rpc_custody_columns(block_root, custody_columns, seen_timestamp, process_type)
.map_err(|e| {
error!(
self.log,
Expand Down
34 changes: 22 additions & 12 deletions beacon_node/network/src/sync/network_context/custody.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::sync::network_context::{
DataColumnsByRootRequestId, DataColumnsByRootSingleBlockRequest,
};

use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester};
Expand Down Expand Up @@ -61,7 +61,8 @@ struct ActiveBatchColumnsRequest {
indices: Vec<ColumnIndex>,
}

pub type CustodyRequestResult<E> = Result<Option<(DataColumnSidecarList<E>, PeerGroup)>, Error>;
pub type CustodyRequestResult<E> =
Result<Option<(DataColumnSidecarList<E>, PeerGroup, Duration)>, Error>;

impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
pub(crate) fn new(
Expand Down Expand Up @@ -102,8 +103,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
resp: RpcResponseResult<DataColumnSidecarList<T::EthSpec>>,
cx: &mut SyncNetworkContext<T>,
) -> CustodyRequestResult<T::EthSpec> {
// TODO(das): Should downscore peers for verify errors here

let Some(batch_request) = self.active_batch_columns_requests.get_mut(&req_id) else {
warn!(self.log,
"Received custody column response for unrequested index";
Expand All @@ -115,7 +114,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
};

match resp {
Ok((data_columns, _seen_timestamp)) => {
Ok((data_columns, seen_timestamp)) => {
debug!(self.log,
"Custody column download success";
"id" => ?self.custody_id,
Expand All @@ -141,7 +140,12 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
.ok_or(Error::BadState("unknown column_index".to_owned()))?;

if let Some(data_column) = data_columns.remove(column_index) {
column_request.on_download_success(req_id, peer_id, data_column)?;
column_request.on_download_success(
req_id,
peer_id,
data_column,
seen_timestamp,
)?;
} else {
// Peer does not have the requested data.
// TODO(das) do not consider this case a success. We know for sure the block has
Expand Down Expand Up @@ -204,20 +208,23 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
if self.column_requests.values().all(|r| r.is_downloaded()) {
// All requests have completed successfully.
let mut peers = HashMap::<PeerId, Vec<usize>>::new();
let mut seen_timestamps = vec![];
let columns = std::mem::take(&mut self.column_requests)
.into_values()
.map(|request| {
let (peer, data_column) = request.complete()?;
let (peer, data_column, seen_timestamp) = request.complete()?;
peers
.entry(peer)
.or_default()
.push(data_column.index as usize);
seen_timestamps.push(seen_timestamp);
Ok(data_column)
})
.collect::<Result<Vec<_>, _>>()?;

let peer_group = PeerGroup::from_set(peers);
return Ok(Some((columns, peer_group)));
let max_seen_timestamp = seen_timestamps.into_iter().max().unwrap_or(timestamp_now());
return Ok(Some((columns, peer_group, max_seen_timestamp)));
}

let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
Expand Down Expand Up @@ -335,7 +342,7 @@ struct ColumnRequest<E: EthSpec> {
enum Status<E: EthSpec> {
NotStarted(Instant),
Downloading(DataColumnsByRootRequestId),
Downloaded(PeerId, Arc<DataColumnSidecar<E>>),
Downloaded(PeerId, Arc<DataColumnSidecar<E>>, Duration),
}

impl<E: EthSpec> ColumnRequest<E> {
Expand Down Expand Up @@ -404,6 +411,7 @@ impl<E: EthSpec> ColumnRequest<E> {
req_id: DataColumnsByRootRequestId,
peer_id: PeerId,
data_column: Arc<DataColumnSidecar<E>>,
seen_timestamp: Duration,
) -> Result<(), Error> {
match &self.status {
Status::Downloading(expected_req_id) => {
Expand All @@ -413,7 +421,7 @@ impl<E: EthSpec> ColumnRequest<E> {
req_id,
});
}
self.status = Status::Downloaded(peer_id, data_column);
self.status = Status::Downloaded(peer_id, data_column, seen_timestamp);
Ok(())
}
other => Err(Error::BadState(format!(
Expand All @@ -422,9 +430,11 @@ impl<E: EthSpec> ColumnRequest<E> {
}
}

fn complete(self) -> Result<(PeerId, Arc<DataColumnSidecar<E>>), Error> {
fn complete(self) -> Result<(PeerId, Arc<DataColumnSidecar<E>>, Duration), Error> {
match self.status {
Status::Downloaded(peer_id, data_column) => Ok((peer_id, data_column)),
Status::Downloaded(peer_id, data_column, seen_timestamp) => {
Ok((peer_id, data_column, seen_timestamp))
}
other => Err(Error::BadState(format!(
"bad state complete expected Downloaded got {other:?}"
))),
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/network/src/sync/tests/lookups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,6 @@ impl TestRig {
self.complete_data_columns_by_root_request(id, data_columns);

// Expect work event
// TODO(das): worth it to append sender id to the work event for stricter assertion?
self.expect_rpc_sample_verify_work_event();

// Respond with valid result
Expand Down Expand Up @@ -755,7 +754,6 @@ impl TestRig {
}

// Expect work event
// TODO(das): worth it to append sender id to the work event for stricter assertion?
self.expect_rpc_custody_column_work_event();

// Respond with valid result
Expand Down
2 changes: 2 additions & 0 deletions consensus/types/src/data_column_custody_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub enum DataColumnCustodyGroupError {
/// The `get_custody_groups` function is used to determine the custody groups that a node is
/// assigned to.
///
/// Note: `get_custody_groups(node_id, x)` is a subset of `get_custody_groups(node_id, y)` if `x < y`.
///
/// spec: https://github.com/ethereum/consensus-specs/blob/8e0d0d48e81d6c7c5a8253ab61340f5ea5bac66a/specs/fulu/das-core.md#get_custody_groups
pub fn get_custody_groups(
raw_node_id: [u8; 32],
Expand Down

0 comments on commit 7592df1

Please sign in to comment.