From 11b39ab74cf0f5c094f83b6738a03cd4e7fcd1d3 Mon Sep 17 00:00:00 2001 From: pawan Date: Sat, 21 Aug 2021 00:01:06 +0530 Subject: [PATCH 1/9] Add fork to context only if fork epoch is set --- consensus/types/src/fork_context.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/consensus/types/src/fork_context.rs b/consensus/types/src/fork_context.rs index 6da188570e8..1290d85b8a0 100644 --- a/consensus/types/src/fork_context.rs +++ b/consensus/types/src/fork_context.rs @@ -1,6 +1,6 @@ use parking_lot::RwLock; -use crate::{ChainSpec, EthSpec, ForkName, Hash256, Slot}; +use crate::{ChainSpec, Epoch, EthSpec, ForkName, Hash256, Slot}; use std::collections::HashMap; /// Provides fork specific info like the current fork name and the fork digests corresponding to every valid fork. @@ -26,12 +26,18 @@ impl ForkContext { ChainSpec::compute_fork_digest(spec.genesis_fork_version, genesis_validators_root), )]; - // Only add Altair to list of forks if it's enabled (i.e. spec.altair_fork_epoch != None) - if spec.altair_fork_epoch.is_some() { - fork_to_digest.push(( - ForkName::Altair, - ChainSpec::compute_fork_digest(spec.altair_fork_version, genesis_validators_root), - )) + // Only add Altair to list of forks if it's enabled + // Note: `altair_fork_epoch = None | Some(Epoch::max_value())` implies altair hasn't been activated yet on the config. + if let Some(altair_epoch) = spec.altair_fork_epoch { + if altair_epoch != Epoch::max_value() { + fork_to_digest.push(( + ForkName::Altair, + ChainSpec::compute_fork_digest( + spec.altair_fork_version, + genesis_validators_root, + ), + )); + } } let fork_to_digest: HashMap = fork_to_digest.into_iter().collect(); From c8b33b9b280f8f3b0d74c29ae14440d90fd98213 Mon Sep 17 00:00:00 2001 From: pawan Date: Sat, 21 Aug 2021 00:01:27 +0530 Subject: [PATCH 2/9] Reject post fork gossip before fork --- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 6 +- beacon_node/eth2_libp2p/src/types/pubsub.rs | 69 +++++++++----------- beacon_node/network/src/service.rs | 30 ++++++++- 3 files changed, 65 insertions(+), 40 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 316917ac5ac..508c9f979dc 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -43,6 +43,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; +use types::ForkName; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, ChainSpec, EnrForkId, EthSpec, ForkContext, SignedBeaconBlock, Slot, SubnetId, SyncSubnetId, @@ -103,6 +104,8 @@ pub enum BehaviourEvent { topic: TopicHash, /// The message itself. message: PubsubMessage, + /// The fork corresponding to the topic the message was received on. + fork_name: ForkName, }, /// Inform the network to send a Status to this peer. StatusPeer(PeerId), @@ -810,13 +813,14 @@ impl NetworkBehaviourEventProcess for Behaviour< warn!(self.log, "Failed to report message validation"; "message_id" => %id, "peer_id" => %propagation_source, "error" => ?e); } } - Ok(msg) => { + Ok((msg, fork_name)) => { // Notify the network self.add_event(BehaviourEvent::PubsubMessage { id, source: propagation_source, topic: gs_msg.topic, message: msg, + fork_name, }); } } diff --git a/beacon_node/eth2_libp2p/src/types/pubsub.rs b/beacon_node/eth2_libp2p/src/types/pubsub.rs index 75ef6e8ab26..0c7326f23c6 100644 --- a/beacon_node/eth2_libp2p/src/types/pubsub.rs +++ b/beacon_node/eth2_libp2p/src/types/pubsub.rs @@ -117,6 +117,8 @@ impl PubsubMessage { } /// This decodes `data` into a `PubsubMessage` given a topic. + /// Returns the decoded data along with the fork name corresponding to the topic + /// the message was received on. /* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will * need to be modified. */ @@ -124,83 +126,74 @@ impl PubsubMessage { topic: &TopicHash, data: &[u8], fork_context: &ForkContext, - ) -> Result { + ) -> Result<(Self, ForkName), String> { match GossipTopic::decode(topic.as_str()) { Err(_) => Err(format!("Unknown gossipsub topic: {:?}", topic)), Ok(gossip_topic) => { + let fork_name = fork_context + .from_context_bytes(gossip_topic.fork_digest) + .ok_or(format!( + "Unknown gossipsub fork digest: {:?}", + gossip_topic.fork_digest + ))?; + // All topics are currently expected to be compressed and decompressed with snappy. // This is done in the `SnappyTransform` struct. // Therefore compression has already been handled for us by the time we are // decoding the objects here. // the ssz decoders - match gossip_topic.kind() { + let msg = match gossip_topic.kind() { GossipKind::BeaconAggregateAndProof => { let agg_and_proof = SignedAggregateAndProof::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::AggregateAndProofAttestation(Box::new( - agg_and_proof, - ))) + PubsubMessage::AggregateAndProofAttestation(Box::new(agg_and_proof)) } GossipKind::Attestation(subnet_id) => { let attestation = Attestation::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::Attestation(Box::new(( - *subnet_id, - attestation, - )))) + PubsubMessage::Attestation(Box::new((*subnet_id, attestation))) } GossipKind::BeaconBlock => { - let beacon_block = - match fork_context.from_context_bytes(gossip_topic.fork_digest) { - Some(ForkName::Base) => SignedBeaconBlock::::Base( - SignedBeaconBlockBase::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ), - Some(ForkName::Altair) => SignedBeaconBlock::::Altair( - SignedBeaconBlockAltair::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ), - None => { - return Err(format!( - "Unknown gossipsub fork digest: {:?}", - gossip_topic.fork_digest - )) - } - }; - Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block))) + let beacon_block = match fork_name { + ForkName::Base => SignedBeaconBlock::::Base( + SignedBeaconBlockBase::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + ), + ForkName::Altair => SignedBeaconBlock::::Altair( + SignedBeaconBlockAltair::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + ), + }; + PubsubMessage::BeaconBlock(Box::new(beacon_block)) } GossipKind::VoluntaryExit => { let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit))) + PubsubMessage::VoluntaryExit(Box::new(voluntary_exit)) } GossipKind::ProposerSlashing => { let proposer_slashing = ProposerSlashing::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::ProposerSlashing(Box::new(proposer_slashing))) + PubsubMessage::ProposerSlashing(Box::new(proposer_slashing)) } GossipKind::AttesterSlashing => { let attester_slashing = AttesterSlashing::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::AttesterSlashing(Box::new(attester_slashing))) + PubsubMessage::AttesterSlashing(Box::new(attester_slashing)) } GossipKind::SignedContributionAndProof => { let sync_aggregate = SignedContributionAndProof::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::SignedContributionAndProof(Box::new( - sync_aggregate, - ))) + PubsubMessage::SignedContributionAndProof(Box::new(sync_aggregate)) } GossipKind::SyncCommitteeMessage(subnet_id) => { let sync_committee = SyncCommitteeMessage::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::SyncCommitteeMessage(Box::new(( - *subnet_id, - sync_committee, - )))) + PubsubMessage::SyncCommitteeMessage(Box::new((*subnet_id, sync_committee))) } - } + }; + Ok((msg, *fork_name)) } } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 26c1e272fd0..965b4af2054 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -190,6 +190,8 @@ impl NetworkService { &beacon_chain.spec, )); + debug!(network_log, "Current fork"; "fork_name" => ?fork_context.current_fork()); + // launch libp2p service let (network_globals, mut libp2p) = LibP2PService::new( executor.clone(), @@ -600,10 +602,36 @@ fn spawn_service( id, source, message, - .. + fork_name, + topic, } => { // Update prometheus metrics. metrics::expose_receive_metrics(&message); + + if fork_name != service.fork_context.current_fork() { + if let Some((_, next_fork_duration)) = service.beacon_chain.duration_to_next_fork() { + // This implies that the peer is sending messages on post fork topics + // before the fork. We ignore the message and score down the peer. + if next_fork_duration > beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY { + debug!( + service.log, + "Peer sent gossip on incorrect fork version topic"; + "peer" => %source, + "fork_topic" => %topic, + ); + service.libp2p.report_peer(&source, PeerAction::LowToleranceError, ReportSource::Gossipsub); + service + .libp2p + .swarm + .behaviour_mut() + .report_message_validation_result( + &source, id, MessageAcceptance::Ignore, + ); + continue; + } + } + + } match message { // attestation information gets processed in the attestation service PubsubMessage::Attestation(ref subnet_and_attestation) => { From 221f703bc1b11268beeacc605e0d21fab16ecaa0 Mon Sep 17 00:00:00 2001 From: pawan Date: Tue, 24 Aug 2021 19:14:18 +0530 Subject: [PATCH 3/9] Deserialize u64::max from config as None --- consensus/types/src/chain_spec.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index dfa8775b2cc..f82210c93d4 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -1,5 +1,6 @@ use crate::*; use int_to_bytes::int_to_bytes4; +use serde::Deserializer; use serde_derive::{Deserialize, Serialize}; use serde_utils::quoted_u64::MaybeQuoted; use std::fs::File; @@ -467,7 +468,7 @@ impl ChainSpec { domain_sync_committee_selection_proof: 8, domain_contribution_and_proof: 9, altair_fork_version: [0x01, 0x00, 0x00, 0x00], - altair_fork_epoch: Some(Epoch::new(u64::MAX)), + altair_fork_epoch: None, /* * Network specific @@ -506,7 +507,7 @@ impl ChainSpec { // Altair epochs_per_sync_committee_period: Epoch::new(8), altair_fork_version: [0x01, 0x00, 0x00, 0x01], - altair_fork_epoch: Some(Epoch::new(u64::MAX)), + altair_fork_epoch: None, // Other network_id: 2, // lighthouse testnet network id deposit_chain_id: 5, @@ -544,6 +545,7 @@ pub struct Config { #[serde(with = "serde_utils::bytes_4_hex")] altair_fork_version: [u8; 4], + #[serde(deserialize_with = "deserialize_fork_epoch")] altair_fork_epoch: Option>, #[serde(with = "serde_utils::quoted_u64")] @@ -582,6 +584,20 @@ impl Default for Config { } } +/// Util function to deserialize a u64::max() fork epoch as `None`. +fn deserialize_fork_epoch<'de, D>(deserializer: D) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + let decoded: Option> = serde::de::Deserialize::deserialize(deserializer)?; + if let Some(fork_epoch) = decoded { + if fork_epoch.value != Epoch::max_value() { + return Ok(Some(fork_epoch)); + } + } + Ok(None) +} + impl Config { /// Maps `self` to an identifier for an `EthSpec` instance. /// @@ -606,7 +622,7 @@ impl Config { altair_fork_version: spec.altair_fork_version, altair_fork_epoch: spec .altair_fork_epoch - .map(|slot| MaybeQuoted { value: slot }), + .map(|epoch| MaybeQuoted { value: epoch }), seconds_per_slot: spec.seconds_per_slot, seconds_per_eth1_block: spec.seconds_per_eth1_block, From 0ef0cfd063105b70e3d8f8316a19c4330f44fbce Mon Sep 17 00:00:00 2001 From: pawan Date: Tue, 24 Aug 2021 22:54:12 +0530 Subject: [PATCH 4/9] Fix tests --- .../eth2_libp2p/src/rpc/codec/ssz_snappy.rs | 6 ++- beacon_node/eth2_libp2p/tests/common/mod.rs | 8 +++- beacon_node/eth2_libp2p/tests/rpc_tests.rs | 37 +++++++++++++++---- 3 files changed, 41 insertions(+), 10 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs index 915572fd12a..6d59859315c 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs @@ -615,7 +615,11 @@ mod tests { type Spec = types::MainnetEthSpec; fn fork_context() -> ForkContext { - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &Spec::default_spec()) + let mut chain_spec = Spec::default_spec(); + // Set fork_epoch to `Some` to ensure that the `ForkContext` object + // includes altair in the list of forks + chain_spec.altair_fork_epoch = Some(types::Epoch::new(42)); + ForkContext::new::(types::Slot::new(0), Hash256::zero(), &chain_spec) } fn base_block() -> SignedBeaconBlock { diff --git a/beacon_node/eth2_libp2p/tests/common/mod.rs b/beacon_node/eth2_libp2p/tests/common/mod.rs index 8c28512d045..1023bbacd40 100644 --- a/beacon_node/eth2_libp2p/tests/common/mod.rs +++ b/beacon_node/eth2_libp2p/tests/common/mod.rs @@ -11,14 +11,18 @@ use std::sync::Arc; use std::sync::Weak; use std::time::Duration; use tokio::runtime::Runtime; -use types::{ChainSpec, EnrForkId, ForkContext, Hash256, MinimalEthSpec}; +use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, Hash256, MinimalEthSpec}; type E = MinimalEthSpec; use tempfile::Builder as TempBuilder; /// Returns a dummy fork context fn fork_context() -> ForkContext { - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &ChainSpec::minimal()) + let mut chain_spec = E::default_spec(); + // Set fork_epoch to `Some` to ensure that the `ForkContext` object + // includes altair in the list of forks + chain_spec.altair_fork_epoch = Some(types::Epoch::new(42)); + ForkContext::new::(types::Slot::new(0), Hash256::zero(), &chain_spec) } pub struct Libp2pInstance(LibP2PService, exit_future::Signal); diff --git a/beacon_node/eth2_libp2p/tests/rpc_tests.rs b/beacon_node/eth2_libp2p/tests/rpc_tests.rs index 9d1faf748cf..1e5f1e4e1c3 100644 --- a/beacon_node/eth2_libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2_libp2p/tests/rpc_tests.rs @@ -138,11 +138,16 @@ fn test_blocks_by_range_chunked_rpc() { step: 0, }); - // BlocksByRange Response let spec = E::default_spec(); - let empty_block = BeaconBlock::empty(&spec); - let empty_signed = SignedBeaconBlock::from_block(empty_block, Signature::empty()); - let rpc_response = Response::BlocksByRange(Some(Box::new(empty_signed))); + + // BlocksByRange Response + let full_block = BeaconBlock::Base(BeaconBlockBase::::full(&spec)); + let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); + let rpc_response_base = Response::BlocksByRange(Some(Box::new(signed_full_block))); + + let full_block = BeaconBlock::Altair(BeaconBlockAltair::::full(&spec)); + let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); + let rpc_response_altair = Response::BlocksByRange(Some(Box::new(signed_full_block))); // keep count of the number of messages received let mut messages_received = 0; @@ -167,7 +172,11 @@ fn test_blocks_by_range_chunked_rpc() { warn!(log, "Sender received a response"); match response { Response::BlocksByRange(Some(_)) => { - assert_eq!(response, rpc_response.clone()); + if messages_received < 5 { + assert_eq!(response, rpc_response_base.clone()); + } else { + assert_eq!(response, rpc_response_altair.clone()); + } messages_received += 1; warn!(log, "Chunk received"); } @@ -197,7 +206,14 @@ fn test_blocks_by_range_chunked_rpc() { if request == rpc_request { // send the response warn!(log, "Receiver got request"); - for _ in 1..=messages_to_send { + for i in 0..messages_to_send { + // Send first half of responses as base blocks and + // second half as altair blocks. + let rpc_response = if i < 5 { + rpc_response_base.clone() + } else { + rpc_response_altair.clone() + }; receiver.swarm.behaviour_mut().send_successful_response( peer_id, id, @@ -481,7 +497,7 @@ fn test_blocks_by_root_chunked_rpc() { let log_level = Level::Debug; let enable_logging = false; - let messages_to_send = 3; + let messages_to_send = 10; let log = common::build_log(log_level, enable_logging); let spec = E::default_spec(); @@ -497,6 +513,13 @@ fn test_blocks_by_root_chunked_rpc() { Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), ]), }); From 6e3b210aaa148cc604619f526902efa267704a79 Mon Sep 17 00:00:00 2001 From: pawan Date: Fri, 27 Aug 2021 16:27:47 +0530 Subject: [PATCH 5/9] Serialize altair fork epoch as u64::max --- beacon_node/network/src/service.rs | 3 ++- consensus/types/src/chain_spec.rs | 22 +++++++++++++++++++--- consensus/types/src/fork_context.rs | 17 ++++++----------- 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 965b4af2054..4b6617b89e3 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -619,7 +619,8 @@ fn spawn_service( "peer" => %source, "fork_topic" => %topic, ); - service.libp2p.report_peer(&source, PeerAction::LowToleranceError, ReportSource::Gossipsub); + // Penalize with MidTolerance to account for slight clock skews beyond `MAXIMUM_GOSSIP_CLOCK_DISPARITY` + service.libp2p.report_peer(&source, PeerAction::MidToleranceError, ReportSource::Gossipsub); service .libp2p .swarm diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index f82210c93d4..6307477e804 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -1,7 +1,7 @@ use crate::*; use int_to_bytes::int_to_bytes4; -use serde::Deserializer; -use serde_derive::{Deserialize, Serialize}; +use serde::{Deserializer, Serialize, Serializer}; +use serde_derive::Deserialize; use serde_utils::quoted_u64::MaybeQuoted; use std::fs::File; use std::path::Path; @@ -545,8 +545,9 @@ pub struct Config { #[serde(with = "serde_utils::bytes_4_hex")] altair_fork_version: [u8; 4], + #[serde(serialize_with = "serialize_fork_epoch")] #[serde(deserialize_with = "deserialize_fork_epoch")] - altair_fork_epoch: Option>, + pub altair_fork_epoch: Option>, #[serde(with = "serde_utils::quoted_u64")] seconds_per_slot: u64, @@ -584,6 +585,21 @@ impl Default for Config { } } +/// Util function to serialize a `None` fork epoch value +/// as `Epoch::max_value()`. +fn serialize_fork_epoch(val: &Option>, s: S) -> Result +where + S: Serializer, +{ + match val { + None => MaybeQuoted { + value: Epoch::max_value(), + } + .serialize(s), + Some(epoch) => epoch.serialize(s), + } +} + /// Util function to deserialize a u64::max() fork epoch as `None`. fn deserialize_fork_epoch<'de, D>(deserializer: D) -> Result>, D::Error> where diff --git a/consensus/types/src/fork_context.rs b/consensus/types/src/fork_context.rs index 1290d85b8a0..50068787e7a 100644 --- a/consensus/types/src/fork_context.rs +++ b/consensus/types/src/fork_context.rs @@ -27,17 +27,12 @@ impl ForkContext { )]; // Only add Altair to list of forks if it's enabled - // Note: `altair_fork_epoch = None | Some(Epoch::max_value())` implies altair hasn't been activated yet on the config. - if let Some(altair_epoch) = spec.altair_fork_epoch { - if altair_epoch != Epoch::max_value() { - fork_to_digest.push(( - ForkName::Altair, - ChainSpec::compute_fork_digest( - spec.altair_fork_version, - genesis_validators_root, - ), - )); - } + // Note: `altair_fork_epoch == None` implies altair hasn't been activated yet on the config. + if spec.altair_fork_epoch.is_some() { + fork_to_digest.push(( + ForkName::Altair, + ChainSpec::compute_fork_digest(spec.altair_fork_version, genesis_validators_root), + )); } let fork_to_digest: HashMap = fork_to_digest.into_iter().collect(); From 9a46ee75938427bd26359102a02a24325a82d5a0 Mon Sep 17 00:00:00 2001 From: pawan Date: Mon, 30 Aug 2021 16:59:23 +0530 Subject: [PATCH 6/9] Revert "Reject post fork gossip before fork" This reverts commit c8b33b9b280f8f3b0d74c29ae14440d90fd98213. --- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 6 +- beacon_node/eth2_libp2p/src/types/pubsub.rs | 69 +++++++++++--------- beacon_node/network/src/service.rs | 28 +------- 3 files changed, 40 insertions(+), 63 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 508c9f979dc..316917ac5ac 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -43,7 +43,6 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use types::ForkName; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, ChainSpec, EnrForkId, EthSpec, ForkContext, SignedBeaconBlock, Slot, SubnetId, SyncSubnetId, @@ -104,8 +103,6 @@ pub enum BehaviourEvent { topic: TopicHash, /// The message itself. message: PubsubMessage, - /// The fork corresponding to the topic the message was received on. - fork_name: ForkName, }, /// Inform the network to send a Status to this peer. StatusPeer(PeerId), @@ -813,14 +810,13 @@ impl NetworkBehaviourEventProcess for Behaviour< warn!(self.log, "Failed to report message validation"; "message_id" => %id, "peer_id" => %propagation_source, "error" => ?e); } } - Ok((msg, fork_name)) => { + Ok(msg) => { // Notify the network self.add_event(BehaviourEvent::PubsubMessage { id, source: propagation_source, topic: gs_msg.topic, message: msg, - fork_name, }); } } diff --git a/beacon_node/eth2_libp2p/src/types/pubsub.rs b/beacon_node/eth2_libp2p/src/types/pubsub.rs index 0c7326f23c6..75ef6e8ab26 100644 --- a/beacon_node/eth2_libp2p/src/types/pubsub.rs +++ b/beacon_node/eth2_libp2p/src/types/pubsub.rs @@ -117,8 +117,6 @@ impl PubsubMessage { } /// This decodes `data` into a `PubsubMessage` given a topic. - /// Returns the decoded data along with the fork name corresponding to the topic - /// the message was received on. /* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will * need to be modified. */ @@ -126,74 +124,83 @@ impl PubsubMessage { topic: &TopicHash, data: &[u8], fork_context: &ForkContext, - ) -> Result<(Self, ForkName), String> { + ) -> Result { match GossipTopic::decode(topic.as_str()) { Err(_) => Err(format!("Unknown gossipsub topic: {:?}", topic)), Ok(gossip_topic) => { - let fork_name = fork_context - .from_context_bytes(gossip_topic.fork_digest) - .ok_or(format!( - "Unknown gossipsub fork digest: {:?}", - gossip_topic.fork_digest - ))?; - // All topics are currently expected to be compressed and decompressed with snappy. // This is done in the `SnappyTransform` struct. // Therefore compression has already been handled for us by the time we are // decoding the objects here. // the ssz decoders - let msg = match gossip_topic.kind() { + match gossip_topic.kind() { GossipKind::BeaconAggregateAndProof => { let agg_and_proof = SignedAggregateAndProof::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - PubsubMessage::AggregateAndProofAttestation(Box::new(agg_and_proof)) + Ok(PubsubMessage::AggregateAndProofAttestation(Box::new( + agg_and_proof, + ))) } GossipKind::Attestation(subnet_id) => { let attestation = Attestation::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?; - PubsubMessage::Attestation(Box::new((*subnet_id, attestation))) + Ok(PubsubMessage::Attestation(Box::new(( + *subnet_id, + attestation, + )))) } GossipKind::BeaconBlock => { - let beacon_block = match fork_name { - ForkName::Base => SignedBeaconBlock::::Base( - SignedBeaconBlockBase::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ), - ForkName::Altair => SignedBeaconBlock::::Altair( - SignedBeaconBlockAltair::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ), - }; - PubsubMessage::BeaconBlock(Box::new(beacon_block)) + let beacon_block = + match fork_context.from_context_bytes(gossip_topic.fork_digest) { + Some(ForkName::Base) => SignedBeaconBlock::::Base( + SignedBeaconBlockBase::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + ), + Some(ForkName::Altair) => SignedBeaconBlock::::Altair( + SignedBeaconBlockAltair::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + ), + None => { + return Err(format!( + "Unknown gossipsub fork digest: {:?}", + gossip_topic.fork_digest + )) + } + }; + Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block))) } GossipKind::VoluntaryExit => { let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - PubsubMessage::VoluntaryExit(Box::new(voluntary_exit)) + Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit))) } GossipKind::ProposerSlashing => { let proposer_slashing = ProposerSlashing::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - PubsubMessage::ProposerSlashing(Box::new(proposer_slashing)) + Ok(PubsubMessage::ProposerSlashing(Box::new(proposer_slashing))) } GossipKind::AttesterSlashing => { let attester_slashing = AttesterSlashing::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - PubsubMessage::AttesterSlashing(Box::new(attester_slashing)) + Ok(PubsubMessage::AttesterSlashing(Box::new(attester_slashing))) } GossipKind::SignedContributionAndProof => { let sync_aggregate = SignedContributionAndProof::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - PubsubMessage::SignedContributionAndProof(Box::new(sync_aggregate)) + Ok(PubsubMessage::SignedContributionAndProof(Box::new( + sync_aggregate, + ))) } GossipKind::SyncCommitteeMessage(subnet_id) => { let sync_committee = SyncCommitteeMessage::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - PubsubMessage::SyncCommitteeMessage(Box::new((*subnet_id, sync_committee))) + Ok(PubsubMessage::SyncCommitteeMessage(Box::new(( + *subnet_id, + sync_committee, + )))) } - }; - Ok((msg, *fork_name)) + } } } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 4b6617b89e3..441da89676b 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -602,37 +602,11 @@ fn spawn_service( id, source, message, - fork_name, - topic, + ... } => { // Update prometheus metrics. metrics::expose_receive_metrics(&message); - if fork_name != service.fork_context.current_fork() { - if let Some((_, next_fork_duration)) = service.beacon_chain.duration_to_next_fork() { - // This implies that the peer is sending messages on post fork topics - // before the fork. We ignore the message and score down the peer. - if next_fork_duration > beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY { - debug!( - service.log, - "Peer sent gossip on incorrect fork version topic"; - "peer" => %source, - "fork_topic" => %topic, - ); - // Penalize with MidTolerance to account for slight clock skews beyond `MAXIMUM_GOSSIP_CLOCK_DISPARITY` - service.libp2p.report_peer(&source, PeerAction::MidToleranceError, ReportSource::Gossipsub); - service - .libp2p - .swarm - .behaviour_mut() - .report_message_validation_result( - &source, id, MessageAcceptance::Ignore, - ); - continue; - } - } - - } match message { // attestation information gets processed in the attestation service PubsubMessage::Attestation(ref subnet_and_attestation) => { From ca96c1301dc2aeb74dc1d6276a6030e8ad8878df Mon Sep 17 00:00:00 2001 From: pawan Date: Tue, 31 Aug 2021 02:09:36 +0530 Subject: [PATCH 7/9] Subscribe to new fork topics 2 slots before the fork --- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 9 ++++ beacon_node/network/src/service.rs | 51 +++++++++++++++++--- consensus/types/src/fork_context.rs | 2 +- 3 files changed, 55 insertions(+), 7 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 316917ac5ac..7c5b6b76a6f 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -326,6 +326,15 @@ impl Behaviour { self.unsubscribe(gossip_topic) } + /// Subscribe to all currently subscribed topics with the new fork digest. + pub fn subscribe_new_fork_topics(&mut self, new_fork_digest: [u8; 4]) { + let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone(); + for mut topic in subscriptions.into_iter() { + topic.fork_digest = new_fork_digest; + self.subscribe(topic); + } + } + /// Unsubscribe from all topics that doesn't have the given fork_digest pub fn unsubscribe_from_fork_topics_except(&mut self, except: [u8; 4]) { let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone(); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 441da89676b..6a0dbd3af26 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -26,13 +26,15 @@ use tokio::sync::mpsc; use tokio::time::Sleep; use types::{ EthSpec, ForkContext, ForkName, RelativeEpoch, SubnetId, SyncCommitteeSubscription, - SyncSubnetId, Unsigned, ValidatorSubscription, + SyncSubnetId, Unsigned, ValidatorSubscription, Slot, ChainSpec }; mod tests; /// The interval (in seconds) that various network metrics will update. const METRIC_UPDATE_INTERVAL: u64 = 1; +/// Number of slots before the fork when we should subscribe to the new fork topics. +const SUBSCRIBE_DELAY_SLOTS: u64 = 2; /// Delay after a fork where we unsubscribe from pre-fork topics. const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2; @@ -129,6 +131,8 @@ pub struct NetworkService { discovery_auto_update: bool, /// A delay that expires when a new fork takes place. next_fork_update: Pin>>, + /// A delay that expires when we need to subscribe to a new fork's topics. + next_fork_subscriptions: Pin>>, /// A delay that expires when we need to unsubscribe from old fork topics. next_unsubscribe: Pin>>, /// Subscribe to all the subnets once synced. @@ -177,6 +181,7 @@ impl NetworkService { // keep track of when our fork_id needs to be updated let next_fork_update = Box::pin(next_fork_delay(&beacon_chain).into()); + let next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&beacon_chain).into()); let next_unsubscribe = Box::pin(None.into()); let current_slot = beacon_chain @@ -254,6 +259,7 @@ impl NetworkService { upnp_mappings: (None, None), discovery_auto_update: config.discv5_config.enr_update, next_fork_update, + next_fork_subscriptions, next_unsubscribe, subscribe_all_subnets: config.subscribe_all_subnets, metrics_update, @@ -273,12 +279,23 @@ impl NetworkService { /// digests since we should be subscribed to post fork topics before the fork. pub fn required_gossip_fork_digests(&self) -> Vec<[u8; 4]> { let fork_context = &self.fork_context; + let spec = &self.beacon_chain.spec; match fork_context.current_fork() { ForkName::Base => { - if fork_context.fork_exists(ForkName::Altair) { - fork_context.all_fork_digests() - } else { - vec![fork_context.genesis_context_bytes()] + // If we are SUBSCRIBE_DELAY_SLOTS before the fork slot, subscribe only to Base, + // else subscribe to Base and Altair. + let current_slot = self.beacon_chain.slot().unwrap_or(spec.genesis_slot); + match spec.next_fork_epoch::(current_slot) { + Some((_, fork_epoch)) => { + if current_slot.saturating_add(Slot::new(SUBSCRIBE_DELAY_SLOTS)) + >= fork_epoch.start_slot(T::EthSpec::slots_per_epoch()) + { + fork_context.all_fork_digests() + } else { + vec![fork_context.genesis_context_bytes()] + } + } + None => vec![fork_context.genesis_context_bytes()], } } ForkName::Altair => vec![fork_context @@ -602,7 +619,7 @@ fn spawn_service( id, source, message, - ... + .. } => { // Update prometheus metrics. metrics::expose_receive_metrics(&message); @@ -689,6 +706,16 @@ fn spawn_service( info!(service.log, "Unsubscribed from old fork topics"); service.next_unsubscribe = Box::pin(None.into()); } + Some(_) = &mut service.next_fork_subscriptions => { + if let Some((fork_name, _)) = service.beacon_chain.duration_to_next_fork() { + let fork_version = service.beacon_chain.spec.fork_version_for_name(fork_name); + let fork_digest = ChainSpec::compute_fork_digest(fork_version, service.beacon_chain.genesis_validators_root); + info!(service.log, "Subscribing to new fork topics"); + service.libp2p.swarm.behaviour_mut().subscribe_new_fork_topics(fork_digest); + } + service.next_fork_subscriptions = Box::pin(None.into()); + } + } metrics::update_bandwidth_metrics(service.libp2p.bandwidth.clone()); } @@ -705,6 +732,18 @@ fn next_fork_delay( .map(|(_, until_fork)| tokio::time::sleep(until_fork)) } +/// Returns a `Sleep` that triggers `SUBSCRIBE_DELAY_SLOTS` before the next fork. +/// If there is no scheduled fork, `None` is returned. +fn next_fork_subscriptions_delay( + beacon_chain: &BeaconChain, +) -> Option { + beacon_chain.duration_to_next_fork().map(|(_, until_fork)| { + tokio::time::sleep(until_fork.saturating_sub(Duration::from_secs( + beacon_chain.spec.seconds_per_slot * SUBSCRIBE_DELAY_SLOTS, + ))) + }) +} + impl Drop for NetworkService { fn drop(&mut self) { // network thread is terminating diff --git a/consensus/types/src/fork_context.rs b/consensus/types/src/fork_context.rs index 50068787e7a..1d488f76963 100644 --- a/consensus/types/src/fork_context.rs +++ b/consensus/types/src/fork_context.rs @@ -1,6 +1,6 @@ use parking_lot::RwLock; -use crate::{ChainSpec, Epoch, EthSpec, ForkName, Hash256, Slot}; +use crate::{ChainSpec, EthSpec, ForkName, Hash256, Slot}; use std::collections::HashMap; /// Provides fork specific info like the current fork name and the fork digests corresponding to every valid fork. From 406ef4f6dc8279290a628ab2c14e650336d1aaac Mon Sep 17 00:00:00 2001 From: pawan Date: Thu, 2 Sep 2021 00:38:41 +0530 Subject: [PATCH 8/9] Fix delay --- beacon_node/network/src/service.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index e299656f831..93a19b50f6c 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -691,7 +691,7 @@ fn spawn_service( if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) { info!( service.log, - "Updating enr fork version"; + "Transitioned to new fork"; "old_fork" => ?fork_context.current_fork(), "new_fork" => ?new_fork_name, ); @@ -728,7 +728,10 @@ fn spawn_service( info!(service.log, "Subscribing to new fork topics"); service.libp2p.swarm.behaviour_mut().subscribe_new_fork_topics(fork_digest); } - service.next_fork_subscriptions = Box::pin(None.into()); + else { + error!(service.log, "Fork subscription scheduled but no fork scheduled"); + } + service.next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&service.beacon_chain).into()); } } @@ -748,15 +751,19 @@ fn next_fork_delay( } /// Returns a `Sleep` that triggers `SUBSCRIBE_DELAY_SLOTS` before the next fork. -/// If there is no scheduled fork, `None` is returned. +/// Returns `None` if there are no scheduled forks or we are already past `current_slot + SUBSCRIBE_DELAY_SLOTS > fork_slot`. fn next_fork_subscriptions_delay( beacon_chain: &BeaconChain, ) -> Option { - beacon_chain.duration_to_next_fork().map(|(_, until_fork)| { - tokio::time::sleep(until_fork.saturating_sub(Duration::from_secs( + if let Some((_, duration_to_fork)) = beacon_chain.duration_to_next_fork() { + let duration_to_subscription = duration_to_fork.saturating_sub(Duration::from_secs( beacon_chain.spec.seconds_per_slot * SUBSCRIBE_DELAY_SLOTS, - ))) - }) + )); + if !duration_to_subscription.is_zero() { + return Some(tokio::time::sleep(duration_to_subscription)); + } + } + None } impl Drop for NetworkService { From 08f57dce7f18134ac9e53e8a922e33cf9ee1c5a8 Mon Sep 17 00:00:00 2001 From: pawan Date: Thu, 2 Sep 2021 00:57:13 +0530 Subject: [PATCH 9/9] fmt --- beacon_node/network/src/service.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 93a19b50f6c..0f905edfc15 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -25,8 +25,8 @@ use task_executor::ShutdownReason; use tokio::sync::mpsc; use tokio::time::Sleep; use types::{ - EthSpec, ForkContext, ForkName, RelativeEpoch, SubnetId, SyncCommitteeSubscription, - SyncSubnetId, Unsigned, ValidatorSubscription, Slot, ChainSpec + ChainSpec, EthSpec, ForkContext, ForkName, RelativeEpoch, Slot, SubnetId, + SyncCommitteeSubscription, SyncSubnetId, Unsigned, ValidatorSubscription, }; mod tests; @@ -733,7 +733,6 @@ fn spawn_service( } service.next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&service.beacon_chain).into()); } - } metrics::update_bandwidth_metrics(service.libp2p.bandwidth.clone()); }