Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POST /eth/v2/beacon/pool/attestations bugfixes #6867

Merged
merged 12 commits into from
Jan 31, 2025
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'_, T> {

pub fn single_attestation(&self) -> Option<SingleAttestation> {
Some(SingleAttestation {
committee_index: self.attestation.committee_index()? as usize,
attester_index: self.validator_index,
committee_index: self.attestation.committee_index()?,
attester_index: self.validator_index as u64,
data: self.attestation.data().clone(),
signature: self.attestation.signature().clone(),
})
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,15 +1118,15 @@ where
.unwrap();

let single_attestation =
attestation.to_single_attestation_with_attester_index(attester_index)?;
attestation.to_single_attestation_with_attester_index(attester_index as u64)?;

let attestation: Attestation<E> = single_attestation.to_attestation(committee.committee)?;

assert_eq!(
single_attestation.committee_index,
attestation.committee_index().unwrap() as usize
attestation.committee_index().unwrap()
);
assert_eq!(single_attestation.attester_index, validator_index);
assert_eq!(single_attestation.attester_index, validator_index as u64);
Ok(single_attestation)
}

Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rand = { workspace = true }
safe_arith = { workspace = true }
sensitive_url = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
slog = { workspace = true }
slot_clock = { workspace = true }
state_processing = { workspace = true }
Expand Down
53 changes: 37 additions & 16 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ mod validator;
mod validator_inclusion;
mod validators;
mod version;

use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::version::fork_versioned_response;
Expand Down Expand Up @@ -63,6 +62,7 @@ pub use publish_blocks::{
publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use slog::{crit, debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
Expand All @@ -83,15 +83,15 @@ use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamExt,
};
use types::ChainSpec;
use types::{
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName,
ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit,
SingleAttestation, Slot, SyncCommitteeMessage, SyncContributionData,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData,
};
use types::{ChainSpec, SingleAttestation};
use validator::pubkey_to_validator_index;
use version::{
add_consensus_version_header, add_ssz_content_type_header,
Expand Down Expand Up @@ -1829,37 +1829,37 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone());

let beacon_pool_path_any = any_version
let beacon_pool_path_v2 = eth_v2
.and(warp::path("beacon"))
.and(warp::path("pool"))
.and(task_spawner_filter.clone())
.and(chain_filter.clone());

let beacon_pool_path_v2 = eth_v2
let beacon_pool_path_any = any_version
.and(warp::path("beacon"))
.and(warp::path("pool"))
.and(task_spawner_filter.clone())
.and(chain_filter.clone());

// POST beacon/pool/attestations
let post_beacon_pool_attestations = beacon_pool_path
let post_beacon_pool_attestations_any = beacon_pool_path
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(warp_utils::json::json::<Vec<Attestation<T::EthSpec>>>())
.and(network_tx_filter.clone())
.and(reprocess_send_filter.clone())
.and(log_filter.clone())
.then(
// V1 and V2 are identical except V2 has a consensus version header in the request.
// We only require this header for SSZ deserialization, which isn't supported for
// this endpoint presently.
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<Attestation<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_tx: Option<Sender<ReprocessQueueMessage>>,
log: Logger| async move {
// V1 and V2 are identical except V2 can also accept `SingleAttestation`
// and has a consensus version header in the request. We only require
// this header for SSZ deserialization, which isn't supported for
// this endpoint presently.
let attestations = attestations.into_iter().map(Either::Left).collect();
let result = crate::publish_attestations::publish_attestations(
task_spawner,
Expand All @@ -1879,18 +1879,39 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(warp_utils::json::json::<Value>())
.and(network_tx_filter.clone())
.and(reprocess_send_filter)
.and(log_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<SingleAttestation>,
body: Value,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_tx: Option<Sender<ReprocessQueueMessage>>,
log: Logger| async move {
let attestations = attestations.into_iter().map(Either::Right).collect();
// V1 and V2 are identical except V2 can also accept `SingleAttestation`
// and has a consensus version header in the request. We only require
// this header for SSZ deserialization, which isn't supported for
// this endpoint presently.
let attestations = if let Ok(single_attestations) =
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
serde_json::from_value::<Vec<SingleAttestation>>(body.clone())
{
single_attestations.into_iter().map(Either::Right).collect()
} else if let Ok(attestations) =
serde_json::from_value::<Vec<Attestation<T::EthSpec>>>(body.clone())
{
attestations.into_iter().map(Either::Left).collect()
} else {
return warp::reply::with_status(
warp::reply::json(&format!(
"Unable to deserialize request body, {:?}",
body
)),
eth2::StatusCode::BAD_REQUEST,
)
.into_response();
};
let result = crate::publish_attestations::publish_attestations(
task_spawner,
chain,
Expand Down Expand Up @@ -4765,7 +4786,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_beacon_blinded_blocks)
.uor(post_beacon_blocks_v2)
.uor(post_beacon_blinded_blocks_v2)
.uor(post_beacon_pool_attestations)
.uor(post_beacon_pool_attestations_any)
.uor(post_beacon_pool_attestations_v2)
.uor(post_beacon_pool_attester_slashings)
.uor(post_beacon_pool_proposer_slashings)
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/http_api/src/publish_attestations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ fn convert_to_attestation<'a, T: BeaconChainTypes>(
|committee_cache, _| {
let Some(committee) = committee_cache.get_beacon_committee(
single_attestation.data.slot,
single_attestation.committee_index as u64,
single_attestation.committee_index,
) else {
return Err(BeaconChainError::AttestationError(
types::AttestationError::NoCommitteeForSlotAndIndex {
slot: single_attestation.data.slot,
index: single_attestation.committee_index as u64,
index: single_attestation.committee_index,
},
));
};
Expand Down Expand Up @@ -199,7 +199,7 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
.iter()
.map(|att| match att {
Either::Left(att) => (att.data().slot, att.committee_index()),
Either::Right(att) => (att.data.slot, Some(att.committee_index as u64)),
Either::Right(att) => (att.data.slot, Some(att.committee_index)),
})
.collect::<Vec<_>>();

Expand Down
5 changes: 4 additions & 1 deletion beacon_node/http_api/tests/interactive_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use beacon_chain::{
ChainConfig,
};
use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage;
use either::Either;
use eth2::types::ProduceBlockV3Response;
use eth2::types::{DepositContractData, StateId};
use execution_layer::{ForkchoiceState, PayloadAttributes};
Expand Down Expand Up @@ -906,9 +907,11 @@ async fn queue_attestations_from_http() {
.flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att))
.collect::<Vec<_>>();

let attestations = Either::Right(single_attestations);

tokio::spawn(async move {
client
.post_beacon_pool_attestations_v2(&single_attestations, fork_name)
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.expect("attestations should be processed successfully")
})
Expand Down
32 changes: 24 additions & 8 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use beacon_chain::{
test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType},
BeaconChain, ChainConfig, StateSkipConfig, WhenSlotSkipped,
};
use either::Either;
use eth2::{
mixin::{RequestAccept, ResponseForkName, ResponseOptional},
reqwest::RequestBuilder,
Expand Down Expand Up @@ -1810,12 +1811,25 @@ impl ApiTester {
self
}

pub async fn test_post_beacon_pool_attestations_valid_v1(mut self) -> Self {
pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self {
self.client
.post_beacon_pool_attestations_v1(self.attestations.as_slice())
.await
.unwrap();

let fork_name = self
.attestations
.first()
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data().slot))
.unwrap();

let attestations = Either::Left(self.attestations.clone());

self.client
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.unwrap();

assert!(
self.network_rx.network_recv.recv().await.is_some(),
"valid attestation should be sent to network"
Expand All @@ -1833,8 +1847,10 @@ impl ApiTester {
.first()
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data.slot))
.unwrap();

let attestations = Either::Right(self.single_attestations.clone());
self.client
.post_beacon_pool_attestations_v2(self.single_attestations.as_slice(), fork_name)
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.unwrap();
assert!(
Expand Down Expand Up @@ -1900,10 +1916,10 @@ impl ApiTester {
.first()
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data().slot))
.unwrap();

let attestations = Either::Right(attestations);
let err_v2 = self
.client
.post_beacon_pool_attestations_v2(attestations.as_slice(), fork_name)
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.unwrap_err();

Expand Down Expand Up @@ -6054,9 +6070,9 @@ impl ApiTester {
.chain
.spec
.fork_name_at_slot::<E>(self.chain.slot().unwrap());

let attestations = Either::Right(self.single_attestations.clone());
self.client
.post_beacon_pool_attestations_v2(&self.single_attestations, fork_name)
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.unwrap();

Expand Down Expand Up @@ -6375,10 +6391,10 @@ async fn post_beacon_blocks_duplicate() {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_pools_post_attestations_valid_v1() {
async fn beacon_pools_post_attestations_valid() {
ApiTester::new()
.await
.test_post_beacon_pool_attestations_valid_v1()
.test_post_beacon_pool_attestations_valid()
.await;
}

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|committee_cache, _| {
let Some(committee) = committee_cache.get_beacon_committee(
single_attestation.data.slot,
single_attestation.committee_index as u64,
single_attestation.committee_index,
) else {
warn!(
self.log,
Expand Down
1 change: 1 addition & 0 deletions common/eth2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = { workspace = true }

[dependencies]
derivative = { workspace = true }
either = { workspace = true }
eth2_keystore = { workspace = true }
ethereum_serde_utils = { workspace = true }
ethereum_ssz = { workspace = true }
Expand Down
32 changes: 23 additions & 9 deletions common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod types;
use self::mixin::{RequestAccept, ResponseOptional};
use self::types::{Error as ResponseError, *};
use derivative::Derivative;
use either::Either;
use futures::Stream;
use futures_util::StreamExt;
use lighthouse_network::PeerId;
Expand Down Expand Up @@ -1324,9 +1325,9 @@ impl BeaconNodeHttpClient {
}

/// `POST v2/beacon/pool/attestations`
pub async fn post_beacon_pool_attestations_v2(
pub async fn post_beacon_pool_attestations_v2<E: EthSpec>(
&self,
attestations: &[SingleAttestation],
attestations: Either<Vec<Attestation<E>>, Vec<SingleAttestation>>,
fork_name: ForkName,
) -> Result<(), Error> {
let mut path = self.eth_path(V2)?;
Expand All @@ -1337,13 +1338,26 @@ impl BeaconNodeHttpClient {
.push("pool")
.push("attestations");

self.post_with_timeout_and_consensus_header(
path,
&attestations,
self.timeouts.attestation,
fork_name,
)
.await?;
match attestations {
Either::Right(attestations) => {
self.post_with_timeout_and_consensus_header(
path,
&attestations,
self.timeouts.attestation,
fork_name,
)
.await?;
}
Either::Left(attestations) => {
self.post_with_timeout_and_consensus_header(
path,
&attestations,
self.timeouts.attestation,
fork_name,
)
.await?;
}
};

Ok(())
}
Expand Down
Loading
Loading