Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POST /eth/v2/beacon/pool/attestations bugfixes #6867

Merged
merged 12 commits into from
Jan 31, 2025
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'_, T> {

pub fn single_attestation(&self) -> Option<SingleAttestation> {
Some(SingleAttestation {
committee_index: self.attestation.committee_index()? as usize,
attester_index: self.validator_index,
committee_index: self.attestation.committee_index()?,
attester_index: self.validator_index as u64,
data: self.attestation.data().clone(),
signature: self.attestation.signature().clone(),
})
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1131,15 +1131,15 @@ where
.unwrap();

let single_attestation =
attestation.to_single_attestation_with_attester_index(attester_index)?;
attestation.to_single_attestation_with_attester_index(attester_index as u64)?;

let attestation: Attestation<E> = single_attestation.to_attestation(committee.committee)?;

assert_eq!(
single_attestation.committee_index,
attestation.committee_index().unwrap() as usize
attestation.committee_index().unwrap()
);
assert_eq!(single_attestation.attester_index, validator_index);
assert_eq!(single_attestation.attester_index, validator_index as u64);
Ok(single_attestation)
}

Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rand = { workspace = true }
safe_arith = { workspace = true }
sensitive_url = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
slog = { workspace = true }
slot_clock = { workspace = true }
state_processing = { workspace = true }
Expand Down
58 changes: 39 additions & 19 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ mod validator;
mod validator_inclusion;
mod validators;
mod version;

use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::version::fork_versioned_response;
Expand Down Expand Up @@ -63,6 +62,7 @@ pub use publish_blocks::{
publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use slog::{crit, debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
Expand All @@ -83,14 +83,13 @@ use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamExt,
};
use types::ChainSpec;
use types::{
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName,
ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit,
SingleAttestation, Slot, SyncCommitteeMessage, SyncContributionData,
AttesterSlashing, BeaconStateError, ChainSpec, CommitteeCache, ConfigAndPreset, Epoch, EthSpec,
ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing,
RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData,
};
use validator::pubkey_to_validator_index;
use version::{
Expand Down Expand Up @@ -1279,6 +1278,9 @@ pub fn serve<T: BeaconChainTypes>(
let consensus_version_header_filter =
warp::header::header::<ForkName>(CONSENSUS_VERSION_HEADER);

let optional_consensus_version_header_filter =
warp::header::optional::<ForkName>(CONSENSUS_VERSION_HEADER);

// POST beacon/blocks
let post_beacon_blocks = eth_v1
.and(warp::path("beacon"))
Expand Down Expand Up @@ -1829,20 +1831,19 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone());

let beacon_pool_path_any = any_version
let beacon_pool_path_v2 = eth_v2
.and(warp::path("beacon"))
.and(warp::path("pool"))
.and(task_spawner_filter.clone())
.and(chain_filter.clone());

let beacon_pool_path_v2 = eth_v2
let beacon_pool_path_any = any_version
.and(warp::path("beacon"))
.and(warp::path("pool"))
.and(task_spawner_filter.clone())
.and(chain_filter.clone());

// POST beacon/pool/attestations
let post_beacon_pool_attestations = beacon_pool_path
let post_beacon_pool_attestations_v1 = beacon_pool_path
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
Expand All @@ -1851,9 +1852,6 @@ pub fn serve<T: BeaconChainTypes>(
.and(reprocess_send_filter.clone())
.and(log_filter.clone())
.then(
// V1 and V2 are identical except V2 has a consensus version header in the request.
// We only require this header for SSZ deserialization, which isn't supported for
// this endpoint presently.
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<Attestation<T::EthSpec>>,
Expand All @@ -1879,18 +1877,40 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(warp_utils::json::json::<Value>())
.and(optional_consensus_version_header_filter)
.and(network_tx_filter.clone())
.and(reprocess_send_filter)
.and(reprocess_send_filter.clone())
.and(log_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<SingleAttestation>,
payload: Value,
fork_name: Option<ForkName>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_tx: Option<Sender<ReprocessQueueMessage>>,
log: Logger| async move {
let attestations = attestations.into_iter().map(Either::Right).collect();
let attestations =
match crate::publish_attestations::deserialize_attestation_payload::<T>(
payload, fork_name, &log,
) {
Ok(attestations) => attestations,
Err(err) => {
warn!(
log,
"Unable to deserialize attestation POST request";
"error" => ?err
);
return warp::reply::with_status(
warp::reply::json(
&"Unable to deserialize request body".to_string(),
),
eth2::StatusCode::BAD_REQUEST,
)
.into_response();
}
};

let result = crate::publish_attestations::publish_attestations(
task_spawner,
chain,
Expand Down Expand Up @@ -4765,7 +4785,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_beacon_blinded_blocks)
.uor(post_beacon_blocks_v2)
.uor(post_beacon_blinded_blocks_v2)
.uor(post_beacon_pool_attestations)
.uor(post_beacon_pool_attestations_v1)
.uor(post_beacon_pool_attestations_v2)
.uor(post_beacon_pool_attester_slashings)
.uor(post_beacon_pool_proposer_slashings)
Expand Down
42 changes: 37 additions & 5 deletions beacon_node/http_api/src/publish_attestations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use either::Either;
use eth2::types::Failure;
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use serde_json::Value;
use slog::{debug, error, warn, Logger};
use std::borrow::Cow;
use std::sync::Arc;
Expand All @@ -52,18 +53,19 @@ use tokio::sync::{
mpsc::{Sender, UnboundedSender},
oneshot,
};
use types::{Attestation, EthSpec, SingleAttestation};
use types::{Attestation, EthSpec, ForkName, SingleAttestation};

// Error variants are only used in `Debug` and considered `dead_code` by the compiler.
#[derive(Debug)]
enum Error {
pub enum Error {
Validation(AttestationError),
Publication,
ForkChoice(#[allow(dead_code)] BeaconChainError),
AggregationPool(#[allow(dead_code)] AttestationError),
ReprocessDisabled,
ReprocessFull,
ReprocessTimeout,
InvalidJson(#[allow(dead_code)] serde_json::Error),
FailedConversion(#[allow(dead_code)] BeaconChainError),
}

Expand All @@ -74,6 +76,36 @@ enum PublishAttestationResult {
Failure(Error),
}

#[allow(clippy::type_complexity)]
pub fn deserialize_attestation_payload<T: BeaconChainTypes>(
payload: Value,
fork_name: Option<ForkName>,
log: &Logger,
) -> Result<Vec<Either<Attestation<T::EthSpec>, SingleAttestation>>, Error> {
if fork_name.is_some_and(|fork_name| fork_name.electra_enabled()) || fork_name.is_none() {
if fork_name.is_none() {
warn!(
log,
"No Consensus Version header specified.";
);
}

Ok(serde_json::from_value::<Vec<SingleAttestation>>(payload)
.map_err(Error::InvalidJson)?
.into_iter()
.map(Either::Right)
.collect())
} else {
Ok(
serde_json::from_value::<Vec<Attestation<T::EthSpec>>>(payload)
.map_err(Error::InvalidJson)?
.into_iter()
.map(Either::Left)
.collect(),
)
}
}

fn verify_and_publish_attestation<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
either_attestation: &Either<Attestation<T::EthSpec>, SingleAttestation>,
Expand Down Expand Up @@ -163,12 +195,12 @@ fn convert_to_attestation<'a, T: BeaconChainTypes>(
|committee_cache, _| {
let Some(committee) = committee_cache.get_beacon_committee(
single_attestation.data.slot,
single_attestation.committee_index as u64,
single_attestation.committee_index,
) else {
return Err(BeaconChainError::AttestationError(
types::AttestationError::NoCommitteeForSlotAndIndex {
slot: single_attestation.data.slot,
index: single_attestation.committee_index as u64,
index: single_attestation.committee_index,
},
));
};
Expand Down Expand Up @@ -199,7 +231,7 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
.iter()
.map(|att| match att {
Either::Left(att) => (att.data().slot, att.committee_index()),
Either::Right(att) => (att.data.slot, Some(att.committee_index as u64)),
Either::Right(att) => (att.data.slot, Some(att.committee_index)),
})
.collect::<Vec<_>>();

Expand Down
5 changes: 4 additions & 1 deletion beacon_node/http_api/tests/interactive_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use beacon_chain::{
ChainConfig,
};
use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage;
use either::Either;
use eth2::types::ProduceBlockV3Response;
use eth2::types::{DepositContractData, StateId};
use execution_layer::{ForkchoiceState, PayloadAttributes};
Expand Down Expand Up @@ -906,9 +907,11 @@ async fn queue_attestations_from_http() {
.flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att))
.collect::<Vec<_>>();

let attestations = Either::Right(single_attestations);

tokio::spawn(async move {
client
.post_beacon_pool_attestations_v2(&single_attestations, fork_name)
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.expect("attestations should be processed successfully")
})
Expand Down
32 changes: 24 additions & 8 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use beacon_chain::{
test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType},
BeaconChain, ChainConfig, StateSkipConfig, WhenSlotSkipped,
};
use either::Either;
use eth2::{
mixin::{RequestAccept, ResponseForkName, ResponseOptional},
reqwest::RequestBuilder,
Expand Down Expand Up @@ -1810,12 +1811,25 @@ impl ApiTester {
self
}

pub async fn test_post_beacon_pool_attestations_valid_v1(mut self) -> Self {
pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self {
self.client
.post_beacon_pool_attestations_v1(self.attestations.as_slice())
.await
.unwrap();

let fork_name = self
.attestations
.first()
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data().slot))
.unwrap();

let attestations = Either::Left(self.attestations.clone());

self.client
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.unwrap();

assert!(
self.network_rx.network_recv.recv().await.is_some(),
"valid attestation should be sent to network"
Expand All @@ -1833,8 +1847,10 @@ impl ApiTester {
.first()
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data.slot))
.unwrap();

let attestations = Either::Right(self.single_attestations.clone());
self.client
.post_beacon_pool_attestations_v2(self.single_attestations.as_slice(), fork_name)
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.unwrap();
assert!(
Expand Down Expand Up @@ -1900,10 +1916,10 @@ impl ApiTester {
.first()
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data().slot))
.unwrap();

let attestations = Either::Right(attestations);
let err_v2 = self
.client
.post_beacon_pool_attestations_v2(attestations.as_slice(), fork_name)
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.unwrap_err();

Expand Down Expand Up @@ -6054,9 +6070,9 @@ impl ApiTester {
.chain
.spec
.fork_name_at_slot::<E>(self.chain.slot().unwrap());

let attestations = Either::Right(self.single_attestations.clone());
self.client
.post_beacon_pool_attestations_v2(&self.single_attestations, fork_name)
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.unwrap();

Expand Down Expand Up @@ -6375,10 +6391,10 @@ async fn post_beacon_blocks_duplicate() {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_pools_post_attestations_valid_v1() {
async fn beacon_pools_post_attestations_valid() {
ApiTester::new()
.await
.test_post_beacon_pool_attestations_valid_v1()
.test_post_beacon_pool_attestations_valid()
.await;
}

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|committee_cache, _| {
let Some(committee) = committee_cache.get_beacon_committee(
single_attestation.data.slot,
single_attestation.committee_index as u64,
single_attestation.committee_index,
) else {
warn!(
self.log,
Expand Down
Loading