diff --git a/Cargo.lock b/Cargo.lock index 02871ed79c6..259c9a22934 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2562,6 +2562,7 @@ name = "eth2" version = "0.1.0" dependencies = [ "derivative", + "either", "eth2_keystore", "ethereum_serde_utils", "ethereum_ssz", @@ -9690,6 +9691,7 @@ dependencies = [ "beacon_node_fallback", "bls", "doppelganger_service", + "either", "environment", "eth2", "futures", diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index ffaf61e41af..a69eb99a512 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -327,8 +327,8 @@ impl VerifiedUnaggregatedAttestation<'_, T> { pub fn single_attestation(&self) -> Option { Some(SingleAttestation { - committee_index: self.attestation.committee_index()? as usize, - attester_index: self.validator_index, + committee_index: self.attestation.committee_index()?, + attester_index: self.validator_index as u64, data: self.attestation.data().clone(), signature: self.attestation.signature().clone(), }) diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index e88ce71a7b7..4526b2b3607 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1131,15 +1131,15 @@ where .unwrap(); let single_attestation = - attestation.to_single_attestation_with_attester_index(attester_index)?; + attestation.to_single_attestation_with_attester_index(attester_index as u64)?; let attestation: Attestation = single_attestation.to_attestation(committee.committee)?; assert_eq!( single_attestation.committee_index, - attestation.committee_index().unwrap() as usize + attestation.committee_index().unwrap() ); - assert_eq!(single_attestation.attester_index, validator_index); + assert_eq!(single_attestation.attester_index, validator_index as u64); Ok(single_attestation) } diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 61f3370c702..2fb3ec06bf9 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -32,6 +32,7 @@ rand = { workspace = true } safe_arith = { workspace = true } sensitive_url = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } slog = { workspace = true } slot_clock = { workspace = true } state_processing = { workspace = true } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 29c27198c02..77c9bcc34f3 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -30,7 +30,6 @@ mod validator; mod validator_inclusion; mod validators; mod version; - use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3}; use crate::version::fork_versioned_response; @@ -63,6 +62,7 @@ pub use publish_blocks::{ publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock, }; use serde::{Deserialize, Serialize}; +use serde_json::Value; use slog::{crit, debug, error, info, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; @@ -83,14 +83,13 @@ use tokio_stream::{ wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}, StreamExt, }; -use types::ChainSpec; use types::{ fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId, - AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, - ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, - SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, - SingleAttestation, Slot, SyncCommitteeMessage, SyncContributionData, + AttesterSlashing, BeaconStateError, ChainSpec, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, + ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, + RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, + SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, + SyncCommitteeMessage, SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ @@ -1279,6 +1278,9 @@ pub fn serve( let consensus_version_header_filter = warp::header::header::(CONSENSUS_VERSION_HEADER); + let optional_consensus_version_header_filter = + warp::header::optional::(CONSENSUS_VERSION_HEADER); + // POST beacon/blocks let post_beacon_blocks = eth_v1 .and(warp::path("beacon")) @@ -1829,20 +1831,19 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()); - let beacon_pool_path_any = any_version + let beacon_pool_path_v2 = eth_v2 .and(warp::path("beacon")) .and(warp::path("pool")) .and(task_spawner_filter.clone()) .and(chain_filter.clone()); - let beacon_pool_path_v2 = eth_v2 + let beacon_pool_path_any = any_version .and(warp::path("beacon")) .and(warp::path("pool")) .and(task_spawner_filter.clone()) .and(chain_filter.clone()); - // POST beacon/pool/attestations - let post_beacon_pool_attestations = beacon_pool_path + let post_beacon_pool_attestations_v1 = beacon_pool_path .clone() .and(warp::path("attestations")) .and(warp::path::end()) @@ -1851,9 +1852,6 @@ pub fn serve( .and(reprocess_send_filter.clone()) .and(log_filter.clone()) .then( - // V1 and V2 are identical except V2 has a consensus version header in the request. - // We only require this header for SSZ deserialization, which isn't supported for - // this endpoint presently. |task_spawner: TaskSpawner, chain: Arc>, attestations: Vec>, @@ -1879,18 +1877,40 @@ pub fn serve( .clone() .and(warp::path("attestations")) .and(warp::path::end()) - .and(warp_utils::json::json()) + .and(warp_utils::json::json::()) + .and(optional_consensus_version_header_filter) .and(network_tx_filter.clone()) - .and(reprocess_send_filter) + .and(reprocess_send_filter.clone()) .and(log_filter.clone()) .then( |task_spawner: TaskSpawner, chain: Arc>, - attestations: Vec, + payload: Value, + fork_name: Option, network_tx: UnboundedSender>, reprocess_tx: Option>, log: Logger| async move { - let attestations = attestations.into_iter().map(Either::Right).collect(); + let attestations = + match crate::publish_attestations::deserialize_attestation_payload::( + payload, fork_name, &log, + ) { + Ok(attestations) => attestations, + Err(err) => { + warn!( + log, + "Unable to deserialize attestation POST request"; + "error" => ?err + ); + return warp::reply::with_status( + warp::reply::json( + &"Unable to deserialize request body".to_string(), + ), + eth2::StatusCode::BAD_REQUEST, + ) + .into_response(); + } + }; + let result = crate::publish_attestations::publish_attestations( task_spawner, chain, @@ -4765,7 +4785,7 @@ pub fn serve( .uor(post_beacon_blinded_blocks) .uor(post_beacon_blocks_v2) .uor(post_beacon_blinded_blocks_v2) - .uor(post_beacon_pool_attestations) + .uor(post_beacon_pool_attestations_v1) .uor(post_beacon_pool_attestations_v2) .uor(post_beacon_pool_attester_slashings) .uor(post_beacon_pool_proposer_slashings) diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index 111dee3cffb..1b9949d4d55 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -44,6 +44,7 @@ use either::Either; use eth2::types::Failure; use lighthouse_network::PubsubMessage; use network::NetworkMessage; +use serde_json::Value; use slog::{debug, error, warn, Logger}; use std::borrow::Cow; use std::sync::Arc; @@ -52,11 +53,11 @@ use tokio::sync::{ mpsc::{Sender, UnboundedSender}, oneshot, }; -use types::{Attestation, EthSpec, SingleAttestation}; +use types::{Attestation, EthSpec, ForkName, SingleAttestation}; // Error variants are only used in `Debug` and considered `dead_code` by the compiler. #[derive(Debug)] -enum Error { +pub enum Error { Validation(AttestationError), Publication, ForkChoice(#[allow(dead_code)] BeaconChainError), @@ -64,6 +65,7 @@ enum Error { ReprocessDisabled, ReprocessFull, ReprocessTimeout, + InvalidJson(#[allow(dead_code)] serde_json::Error), FailedConversion(#[allow(dead_code)] BeaconChainError), } @@ -74,6 +76,36 @@ enum PublishAttestationResult { Failure(Error), } +#[allow(clippy::type_complexity)] +pub fn deserialize_attestation_payload( + payload: Value, + fork_name: Option, + log: &Logger, +) -> Result, SingleAttestation>>, Error> { + if fork_name.is_some_and(|fork_name| fork_name.electra_enabled()) || fork_name.is_none() { + if fork_name.is_none() { + warn!( + log, + "No Consensus Version header specified."; + ); + } + + Ok(serde_json::from_value::>(payload) + .map_err(Error::InvalidJson)? + .into_iter() + .map(Either::Right) + .collect()) + } else { + Ok( + serde_json::from_value::>>(payload) + .map_err(Error::InvalidJson)? + .into_iter() + .map(Either::Left) + .collect(), + ) + } +} + fn verify_and_publish_attestation( chain: &Arc>, either_attestation: &Either, SingleAttestation>, @@ -163,12 +195,12 @@ fn convert_to_attestation<'a, T: BeaconChainTypes>( |committee_cache, _| { let Some(committee) = committee_cache.get_beacon_committee( single_attestation.data.slot, - single_attestation.committee_index as u64, + single_attestation.committee_index, ) else { return Err(BeaconChainError::AttestationError( types::AttestationError::NoCommitteeForSlotAndIndex { slot: single_attestation.data.slot, - index: single_attestation.committee_index as u64, + index: single_attestation.committee_index, }, )); }; @@ -199,7 +231,7 @@ pub async fn publish_attestations( .iter() .map(|att| match att { Either::Left(att) => (att.data().slot, att.committee_index()), - Either::Right(att) => (att.data.slot, Some(att.committee_index as u64)), + Either::Right(att) => (att.data.slot, Some(att.committee_index)), }) .collect::>(); diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 60a4c507832..bb3086945bc 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -5,6 +5,7 @@ use beacon_chain::{ ChainConfig, }; use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage; +use either::Either; use eth2::types::ProduceBlockV3Response; use eth2::types::{DepositContractData, StateId}; use execution_layer::{ForkchoiceState, PayloadAttributes}; @@ -906,9 +907,11 @@ async fn queue_attestations_from_http() { .flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att)) .collect::>(); + let attestations = Either::Right(single_attestations); + tokio::spawn(async move { client - .post_beacon_pool_attestations_v2(&single_attestations, fork_name) + .post_beacon_pool_attestations_v2::(attestations, fork_name) .await .expect("attestations should be processed successfully") }) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 99b76966106..bc3159e0743 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -3,6 +3,7 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, BeaconChain, ChainConfig, StateSkipConfig, WhenSlotSkipped, }; +use either::Either; use eth2::{ mixin::{RequestAccept, ResponseForkName, ResponseOptional}, reqwest::RequestBuilder, @@ -1810,12 +1811,25 @@ impl ApiTester { self } - pub async fn test_post_beacon_pool_attestations_valid_v1(mut self) -> Self { + pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self { self.client .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); + let fork_name = self + .attestations + .first() + .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) + .unwrap(); + + let attestations = Either::Left(self.attestations.clone()); + + self.client + .post_beacon_pool_attestations_v2::(attestations, fork_name) + .await + .unwrap(); + assert!( self.network_rx.network_recv.recv().await.is_some(), "valid attestation should be sent to network" @@ -1833,8 +1847,10 @@ impl ApiTester { .first() .map(|att| self.chain.spec.fork_name_at_slot::(att.data.slot)) .unwrap(); + + let attestations = Either::Right(self.single_attestations.clone()); self.client - .post_beacon_pool_attestations_v2(self.single_attestations.as_slice(), fork_name) + .post_beacon_pool_attestations_v2::(attestations, fork_name) .await .unwrap(); assert!( @@ -1900,10 +1916,10 @@ impl ApiTester { .first() .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) .unwrap(); - + let attestations = Either::Right(attestations); let err_v2 = self .client - .post_beacon_pool_attestations_v2(attestations.as_slice(), fork_name) + .post_beacon_pool_attestations_v2::(attestations, fork_name) .await .unwrap_err(); @@ -6054,9 +6070,9 @@ impl ApiTester { .chain .spec .fork_name_at_slot::(self.chain.slot().unwrap()); - + let attestations = Either::Right(self.single_attestations.clone()); self.client - .post_beacon_pool_attestations_v2(&self.single_attestations, fork_name) + .post_beacon_pool_attestations_v2::(attestations, fork_name) .await .unwrap(); @@ -6375,10 +6391,10 @@ async fn post_beacon_blocks_duplicate() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn beacon_pools_post_attestations_valid_v1() { +async fn beacon_pools_post_attestations_valid() { ApiTester::new() .await - .test_post_beacon_pool_attestations_valid_v1() + .test_post_beacon_pool_attestations_valid() .await; } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 8d07ef1a129..5c1d4f24e54 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -103,7 +103,7 @@ impl NetworkBeaconProcessor { |committee_cache, _| { let Some(committee) = committee_cache.get_beacon_committee( single_attestation.data.slot, - single_attestation.committee_index as u64, + single_attestation.committee_index, ) else { warn!( self.log, diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index ca7fa7ccdbe..a1bc9d025bd 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -7,6 +7,7 @@ edition = { workspace = true } [dependencies] derivative = { workspace = true } +either = { workspace = true } eth2_keystore = { workspace = true } ethereum_serde_utils = { workspace = true } ethereum_ssz = { workspace = true } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index af8573a5789..b86aa627657 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -17,6 +17,7 @@ pub mod types; use self::mixin::{RequestAccept, ResponseOptional}; use self::types::{Error as ResponseError, *}; use derivative::Derivative; +use either::Either; use futures::Stream; use futures_util::StreamExt; use lighthouse_network::PeerId; @@ -1324,9 +1325,9 @@ impl BeaconNodeHttpClient { } /// `POST v2/beacon/pool/attestations` - pub async fn post_beacon_pool_attestations_v2( + pub async fn post_beacon_pool_attestations_v2( &self, - attestations: &[SingleAttestation], + attestations: Either>, Vec>, fork_name: ForkName, ) -> Result<(), Error> { let mut path = self.eth_path(V2)?; @@ -1337,13 +1338,26 @@ impl BeaconNodeHttpClient { .push("pool") .push("attestations"); - self.post_with_timeout_and_consensus_header( - path, - &attestations, - self.timeouts.attestation, - fork_name, - ) - .await?; + match attestations { + Either::Right(attestations) => { + self.post_with_timeout_and_consensus_header( + path, + &attestations, + self.timeouts.attestation, + fork_name, + ) + .await?; + } + Either::Left(attestations) => { + self.post_with_timeout_and_consensus_header( + path, + &attestations, + self.timeouts.attestation, + fork_name, + ) + .await?; + } + }; Ok(()) } diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 47e41acb5b1..276b27b0f8a 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -24,7 +24,7 @@ pub enum Error { IncorrectStateVariant, InvalidCommitteeLength, InvalidCommitteeIndex, - AttesterNotInCommittee(usize), + AttesterNotInCommittee(u64), InvalidCommittee, MissingCommittee, NoCommitteeForSlotAndIndex { slot: Slot, index: CommitteeIndex }, @@ -238,7 +238,7 @@ impl Attestation { pub fn to_single_attestation_with_attester_index( &self, - attester_index: usize, + attester_index: u64, ) -> Result { match self { Self::Base(_) => Err(Error::IncorrectStateVariant), @@ -375,14 +375,14 @@ impl AttestationElectra { pub fn to_single_attestation_with_attester_index( &self, - attester_index: usize, + attester_index: u64, ) -> Result { let Some(committee_index) = self.committee_index() else { return Err(Error::InvalidCommitteeIndex); }; Ok(SingleAttestation { - committee_index: committee_index as usize, + committee_index, attester_index, data: self.data.clone(), signature: self.signature.clone(), @@ -579,8 +579,10 @@ impl ForkVersionDeserialize for Vec> { PartialEq, )] pub struct SingleAttestation { - pub committee_index: usize, - pub attester_index: usize, + #[serde(with = "serde_utils::quoted_u64")] + pub committee_index: u64, + #[serde(with = "serde_utils::quoted_u64")] + pub attester_index: u64, pub data: AttestationData, pub signature: AggregateSignature, } @@ -591,7 +593,7 @@ impl SingleAttestation { .iter() .enumerate() .find_map(|(i, &validator_index)| { - if self.attester_index == validator_index { + if self.attester_index as usize == validator_index { return Some(i); } None @@ -600,7 +602,7 @@ impl SingleAttestation { let mut committee_bits: BitVector = BitVector::default(); committee_bits - .set(self.committee_index, true) + .set(self.committee_index as usize, true) .map_err(|_| Error::InvalidCommitteeIndex)?; let mut aggregation_bits = diff --git a/consensus/types/src/subnet_id.rs b/consensus/types/src/subnet_id.rs index 981d6d5653d..7a5357c6cc5 100644 --- a/consensus/types/src/subnet_id.rs +++ b/consensus/types/src/subnet_id.rs @@ -67,7 +67,7 @@ impl SubnetId { ) -> Result { Self::compute_subnet::( attestation.data.slot, - attestation.committee_index as u64, + attestation.committee_index, committee_count_per_slot, spec, ) diff --git a/validator_client/validator_services/Cargo.toml b/validator_client/validator_services/Cargo.toml index 21f0ae2d776..b4495a7c818 100644 --- a/validator_client/validator_services/Cargo.toml +++ b/validator_client/validator_services/Cargo.toml @@ -8,6 +8,7 @@ authors = ["Sigma Prime "] beacon_node_fallback = { workspace = true } bls = { workspace = true } doppelganger_service = { workspace = true } +either = { workspace = true } environment = { workspace = true } eth2 = { workspace = true } futures = { workspace = true } diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 58c6ea32988..9a6f94d52bc 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -1,5 +1,6 @@ use crate::duties_service::{DutiesService, DutyAndProof}; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; +use either::Either; use environment::RuntimeContext; use futures::future::join_all; use slog::{crit, debug, error, info, trace, warn}; @@ -461,7 +462,7 @@ impl AttestationService { .iter() .zip(validator_indices) .filter_map(|(a, i)| { - match a.to_single_attestation_with_attester_index(*i as usize) { + match a.to_single_attestation_with_attester_index(*i) { Ok(a) => Some(a), Err(e) => { // This shouldn't happen unless BN and VC are out of sync with @@ -479,8 +480,12 @@ impl AttestationService { } }) .collect::>(); + beacon_node - .post_beacon_pool_attestations_v2(&single_attestations, fork_name) + .post_beacon_pool_attestations_v2::( + Either::Right(single_attestations), + fork_name, + ) .await } else { beacon_node