From cf1a2766f64b6b41dab842c65d9dbbfb8474f7f5 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 20 Aug 2024 17:50:30 +0300 Subject: [PATCH 1/7] Rename `SyncingStrategy` to `PolkadotSyncingStrategy` --- substrate/client/network/sync/src/engine.rs | 9 +++++---- substrate/client/network/sync/src/strategy.rs | 12 ++++++------ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 4a57d61df4572..7aa848820ee9f 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -33,7 +33,7 @@ use crate::{ }, strategy::{ warp::{EncodedProof, WarpProofRequest, WarpSyncConfig}, - StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy, + PolkadotSyncingStrategy, StrategyKey, SyncingAction, SyncingConfig, }, types::{ BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent, @@ -189,7 +189,7 @@ pub struct Peer { pub struct SyncingEngine { /// Syncing strategy. - strategy: SyncingStrategy, + strategy: PolkadotSyncingStrategy, /// Blockchain client. client: Arc, @@ -397,7 +397,8 @@ where ); // Initialize syncing strategy. - let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?; + let strategy = + PolkadotSyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?; let block_announce_protocol_name = block_announce_config.protocol_name().clone(); let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); @@ -710,7 +711,7 @@ where number, ) }, - // Nothing to do, this is handled internally by `SyncingStrategy`. + // Nothing to do, this is handled internally by `PolkadotSyncingStrategy`. SyncingAction::Finished => {}, } } diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index ad3a9461c93b8..2a9f6d860a011 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! [`SyncingStrategy`] is a proxy between [`crate::engine::SyncingEngine`] +//! [`PolkadotSyncingStrategy`] is a proxy between [`crate::engine::SyncingEngine`] //! and specific syncing algorithms. pub mod chain_sync; @@ -104,7 +104,7 @@ pub enum SyncingAction { number: NumberFor, justifications: Justifications, }, - /// Strategy finished. Nothing to do, this is handled by `SyncingStrategy`. + /// Strategy finished. Nothing to do, this is handled by `PolkadotSyncingStrategy`. Finished, } @@ -158,8 +158,8 @@ impl From> for SyncingAction { } } -/// Proxy to specific syncing strategies. -pub struct SyncingStrategy { +/// Proxy to specific syncing strategies used in Polkadot. +pub struct PolkadotSyncingStrategy { /// Initial syncing configuration. config: SyncingConfig, /// Client used by syncing strategies. @@ -171,11 +171,11 @@ pub struct SyncingStrategy { /// `ChainSync` strategy.` chain_sync: Option>, /// Connected peers and their best blocks used to seed a new strategy when switching to it in - /// [`SyncingStrategy::proceed_to_next`]. + /// [`PolkadotSyncingStrategy::proceed_to_next`]. peer_best_blocks: HashMap)>, } -impl SyncingStrategy +impl PolkadotSyncingStrategy where B: BlockT, Client: HeaderBackend From 203cbe23cfbcb6ddf2a978f28a75091dca39f66a Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 20 Aug 2024 22:59:26 +0300 Subject: [PATCH 2/7] Reorder `PolkadotSyncingStrategy` methods with zero other changes --- substrate/client/network/sync/src/strategy.rs | 76 +++++++++---------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index 2a9f6d860a011..3bc6dc8fccbc2 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -186,44 +186,6 @@ where + Sync + 'static, { - /// Initialize a new syncing strategy. - pub fn new( - config: SyncingConfig, - client: Arc, - warp_sync_config: Option>, - ) -> Result { - if let SyncMode::Warp = config.mode { - let warp_sync_config = warp_sync_config - .expect("Warp sync configuration must be supplied in warp sync mode."); - let warp_sync = WarpSync::new(client.clone(), warp_sync_config); - Ok(Self { - config, - client, - warp: Some(warp_sync), - state: None, - chain_sync: None, - peer_best_blocks: Default::default(), - }) - } else { - let chain_sync = ChainSync::new( - chain_sync_mode(config.mode), - client.clone(), - config.max_parallel_downloads, - config.max_blocks_per_request, - config.metrics_registry.as_ref(), - std::iter::empty(), - )?; - Ok(Self { - config, - client, - warp: None, - state: None, - chain_sync: Some(chain_sync), - peer_best_blocks: Default::default(), - }) - } - } - /// Notify that a new peer has connected. pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { self.peer_best_blocks.insert(peer_id, (best_hash, best_number)); @@ -477,6 +439,44 @@ where Ok(actions) } + /// Initialize a new syncing strategy. + pub fn new( + config: SyncingConfig, + client: Arc, + warp_sync_config: Option>, + ) -> Result { + if let SyncMode::Warp = config.mode { + let warp_sync_config = warp_sync_config + .expect("Warp sync configuration must be supplied in warp sync mode."); + let warp_sync = WarpSync::new(client.clone(), warp_sync_config); + Ok(Self { + config, + client, + warp: Some(warp_sync), + state: None, + chain_sync: None, + peer_best_blocks: Default::default(), + }) + } else { + let chain_sync = ChainSync::new( + chain_sync_mode(config.mode), + client.clone(), + config.max_parallel_downloads, + config.max_blocks_per_request, + config.metrics_registry.as_ref(), + std::iter::empty(), + )?; + Ok(Self { + config, + client, + warp: None, + state: None, + chain_sync: Some(chain_sync), + peer_best_blocks: Default::default(), + }) + } + } + /// Proceed with the next strategy if the active one finished. pub fn proceed_to_next(&mut self) -> Result<(), ClientError> { // The strategies are switched as `WarpSync` -> `StateStrategy` -> `ChainSync`. From a49eb13ac663b6c1d487ce596c0c265e70c6fc68 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 20 Aug 2024 21:45:21 +0300 Subject: [PATCH 3/7] Extract `SyncingStrategy` trait out of Polkadot-specific implementation --- substrate/client/network/sync/src/engine.rs | 2 +- substrate/client/network/sync/src/strategy.rs | 176 +++++++++++++----- 2 files changed, 128 insertions(+), 50 deletions(-) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 7aa848820ee9f..25b06a1fd38e4 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -33,7 +33,7 @@ use crate::{ }, strategy::{ warp::{EncodedProof, WarpProofRequest, WarpSyncConfig}, - PolkadotSyncingStrategy, StrategyKey, SyncingAction, SyncingConfig, + PolkadotSyncingStrategy, StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy, }, types::{ BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent, diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index 3bc6dc8fccbc2..bb0f97b58db2b 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -59,6 +59,100 @@ fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode { } } +/// Syncing strategy for syncing engine to use +pub trait SyncingStrategy: Send +where + B: BlockT, +{ + /// Notify that a new peer has connected. + fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor); + + /// Notify that a peer has disconnected. + fn remove_peer(&mut self, peer_id: &PeerId); + + /// Submit a validated block announcement. + /// + /// Returns new best hash & best number of the peer if they are updated. + #[must_use] + fn on_validated_block_announce( + &mut self, + is_best: bool, + peer_id: PeerId, + announce: &BlockAnnounce, + ) -> Option<(B::Hash, NumberFor)>; + + /// Configure an explicit fork sync request in case external code has detected that there is a + /// stale fork missing. + fn set_sync_fork_request(&mut self, peers: Vec, hash: &B::Hash, number: NumberFor); + + /// Request extra justification. + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor); + + /// Clear extra justification requests. + fn clear_justification_requests(&mut self); + + /// Report a justification import (successful or not). + fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool); + + /// Process block response. + fn on_block_response( + &mut self, + peer_id: PeerId, + key: StrategyKey, + request: BlockRequest, + blocks: Vec>, + ); + + /// Process state response. + fn on_state_response( + &mut self, + peer_id: PeerId, + key: StrategyKey, + response: OpaqueStateResponse, + ); + + /// Process warp proof response. + fn on_warp_proof_response( + &mut self, + peer_id: &PeerId, + key: StrategyKey, + response: EncodedProof, + ); + + /// A batch of blocks have been processed, with or without errors. + fn on_blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)>, + ); + + /// Notify a syncing strategy that a block has been finalized. + fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor); + + /// Inform sync about a new best imported block. + fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor); + + // Are we in major sync mode? + fn is_major_syncing(&self) -> bool; + + /// Get the number of peers known to the syncing strategy. + fn num_peers(&self) -> usize; + + /// Returns the current sync status. + fn status(&self) -> SyncStatus; + + /// Get the total number of downloaded blocks. + fn num_downloaded_blocks(&self) -> usize; + + /// Get an estimate of the number of parallel sync requests. + fn num_sync_requests(&self) -> usize; + + /// Get actions that should be performed by the owner on the strategy's behalf + #[must_use] + fn actions(&mut self) -> Result>, ClientError>; +} + /// Syncing configuration containing data for all strategies. #[derive(Clone, Debug)] pub struct SyncingConfig { @@ -171,11 +265,11 @@ pub struct PolkadotSyncingStrategy { /// `ChainSync` strategy.` chain_sync: Option>, /// Connected peers and their best blocks used to seed a new strategy when switching to it in - /// [`PolkadotSyncingStrategy::proceed_to_next`]. + /// `PolkadotSyncingStrategy::proceed_to_next`. peer_best_blocks: HashMap)>, } -impl PolkadotSyncingStrategy +impl SyncingStrategy for PolkadotSyncingStrategy where B: BlockT, Client: HeaderBackend @@ -186,8 +280,7 @@ where + Sync + 'static, { - /// Notify that a new peer has connected. - pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { + fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { self.peer_best_blocks.insert(peer_id, (best_hash, best_number)); self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number)); @@ -195,8 +288,7 @@ where self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number)); } - /// Notify that a peer has disconnected. - pub fn remove_peer(&mut self, peer_id: &PeerId) { + fn remove_peer(&mut self, peer_id: &PeerId) { self.warp.as_mut().map(|s| s.remove_peer(peer_id)); self.state.as_mut().map(|s| s.remove_peer(peer_id)); self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id)); @@ -204,10 +296,7 @@ where self.peer_best_blocks.remove(peer_id); } - /// Submit a validated block announcement. - /// - /// Returns new best hash & best number of the peer if they are updated. - pub fn on_validated_block_announce( + fn on_validated_block_announce( &mut self, is_best: bool, peer_id: PeerId, @@ -240,46 +329,35 @@ where new_best } - /// Configure an explicit fork sync request in case external code has detected that there is a - /// stale fork missing. - pub fn set_sync_fork_request( - &mut self, - peers: Vec, - hash: &B::Hash, - number: NumberFor, - ) { + fn set_sync_fork_request(&mut self, peers: Vec, hash: &B::Hash, number: NumberFor) { // Fork requests are only handled by `ChainSync`. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.set_sync_fork_request(peers.clone(), hash, number); } } - /// Request extra justification. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { // Justifications can only be requested via `ChainSync`. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.request_justification(hash, number); } } - /// Clear extra justification requests. - pub fn clear_justification_requests(&mut self) { + fn clear_justification_requests(&mut self) { // Justification requests can only be cleared by `ChainSync`. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.clear_justification_requests(); } } - /// Report a justification import (successful or not). - pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { // Only `ChainSync` is interested in justification import. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.on_justification_import(hash, number, success); } } - /// Process block response. - pub fn on_block_response( + fn on_block_response( &mut self, peer_id: PeerId, key: StrategyKey, @@ -302,8 +380,7 @@ where } } - /// Process state response. - pub fn on_state_response( + fn on_state_response( &mut self, peer_id: PeerId, key: StrategyKey, @@ -325,8 +402,7 @@ where } } - /// Process warp proof response. - pub fn on_warp_proof_response( + fn on_warp_proof_response( &mut self, peer_id: &PeerId, key: StrategyKey, @@ -344,8 +420,7 @@ where } } - /// A batch of blocks have been processed, with or without errors. - pub fn on_blocks_processed( + fn on_blocks_processed( &mut self, imported: usize, count: usize, @@ -359,24 +434,21 @@ where } } - /// Notify a syncing strategy that a block has been finalized. - pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { + fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { // Only `ChainSync` is interested in block finalization notifications. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.on_block_finalized(hash, number); } } - /// Inform sync about a new best imported block. - pub fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { + fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { // This is relevant to `ChainSync` only. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.update_chain_info(best_hash, best_number); } } - // Are we in major sync mode? - pub fn is_major_syncing(&self) -> bool { + fn is_major_syncing(&self) -> bool { self.warp.is_some() || self.state.is_some() || match self.chain_sync { @@ -385,13 +457,11 @@ where } } - /// Get the number of peers known to the syncing strategy. - pub fn num_peers(&self) -> usize { + fn num_peers(&self) -> usize { self.peer_best_blocks.len() } - /// Returns the current sync status. - pub fn status(&self) -> SyncStatus { + fn status(&self) -> SyncStatus { // This function presumes that strategies are executed serially and must be refactored // once we have parallel strategies. if let Some(ref warp) = self.warp { @@ -405,21 +475,17 @@ where } } - /// Get the total number of downloaded blocks. - pub fn num_downloaded_blocks(&self) -> usize { + fn num_downloaded_blocks(&self) -> usize { self.chain_sync .as_ref() .map_or(0, |chain_sync| chain_sync.num_downloaded_blocks()) } - /// Get an estimate of the number of parallel sync requests. - pub fn num_sync_requests(&self) -> usize { + fn num_sync_requests(&self) -> usize { self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests()) } - /// Get actions that should be performed by the owner on the strategy's behalf - #[must_use] - pub fn actions(&mut self) -> Result>, ClientError> { + fn actions(&mut self) -> Result>, ClientError> { // This function presumes that strategies are executed serially and must be refactored once // we have parallel strategies. let actions: Vec<_> = if let Some(ref mut warp) = self.warp { @@ -438,7 +504,19 @@ where Ok(actions) } +} +impl PolkadotSyncingStrategy +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ /// Initialize a new syncing strategy. pub fn new( config: SyncingConfig, From b7a0add875817b38a5f9a5e1732ad064e34e8cf6 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 20 Aug 2024 22:31:11 +0300 Subject: [PATCH 4/7] Move `ChainSync` definition next to implementation --- .../network/sync/src/strategy/chain_sync.rs | 88 +++++++++---------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 21e4740486257..7b4a1509b2308 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -233,50 +233,6 @@ pub enum ChainSyncMode { }, } -/// The main data structure which contains all the state for a chains -/// active syncing strategy. -pub struct ChainSync { - /// Chain client. - client: Arc, - /// The active peers that we are using to sync and their PeerSync status - peers: HashMap>, - disconnected_peers: DisconnectedPeers, - /// A `BlockCollection` of blocks that are being downloaded from peers - blocks: BlockCollection, - /// The best block number in our queue of blocks to import - best_queued_number: NumberFor, - /// The best block hash in our queue of blocks to import - best_queued_hash: B::Hash, - /// Current mode (full/light) - mode: ChainSyncMode, - /// Any extra justification requests. - extra_justifications: ExtraRequests, - /// A set of hashes of blocks that are being downloaded or have been - /// downloaded and are queued for import. - queue_blocks: HashSet, - /// Fork sync targets. - fork_targets: HashMap>, - /// A set of peers for which there might be potential block requests - allowed_requests: AllowedRequests, - /// Maximum number of peers to ask the same blocks in parallel. - max_parallel_downloads: u32, - /// Maximum blocks per request. - max_blocks_per_request: u32, - /// Total number of downloaded blocks. - downloaded_blocks: usize, - /// State sync in progress, if any. - state_sync: Option>, - /// Enable importing existing blocks. This is used used after the state download to - /// catch up to the latest state while re-importing blocks. - import_existing: bool, - /// Gap download process. - gap_sync: Option>, - /// Pending actions. - actions: Vec>, - /// Prometheus metrics. - metrics: Option, -} - /// All the data we have about a Peer that we are trying to sync with #[derive(Debug, Clone)] pub(crate) struct PeerSync { @@ -346,6 +302,50 @@ impl PeerSyncState { } } +/// The main data structure which contains all the state for a chains +/// active syncing strategy. +pub struct ChainSync { + /// Chain client. + client: Arc, + /// The active peers that we are using to sync and their PeerSync status + peers: HashMap>, + disconnected_peers: DisconnectedPeers, + /// A `BlockCollection` of blocks that are being downloaded from peers + blocks: BlockCollection, + /// The best block number in our queue of blocks to import + best_queued_number: NumberFor, + /// The best block hash in our queue of blocks to import + best_queued_hash: B::Hash, + /// Current mode (full/light) + mode: ChainSyncMode, + /// Any extra justification requests. + extra_justifications: ExtraRequests, + /// A set of hashes of blocks that are being downloaded or have been + /// downloaded and are queued for import. + queue_blocks: HashSet, + /// Fork sync targets. + fork_targets: HashMap>, + /// A set of peers for which there might be potential block requests + allowed_requests: AllowedRequests, + /// Maximum number of peers to ask the same blocks in parallel. + max_parallel_downloads: u32, + /// Maximum blocks per request. + max_blocks_per_request: u32, + /// Total number of downloaded blocks. + downloaded_blocks: usize, + /// State sync in progress, if any. + state_sync: Option>, + /// Enable importing existing blocks. This is used used after the state download to + /// catch up to the latest state while re-importing blocks. + import_existing: bool, + /// Gap download process. + gap_sync: Option>, + /// Pending actions. + actions: Vec>, + /// Prometheus metrics. + metrics: Option, +} + impl ChainSync where B: BlockT, From 7e7c5991cebfd3f05b7f007d00b35eb678737dce Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 20 Aug 2024 22:40:15 +0300 Subject: [PATCH 5/7] Reorder `ChainSync` methods with zero other changes --- .../network/sync/src/strategy/chain_sync.rs | 1194 ++++++++--------- 1 file changed, 597 insertions(+), 597 deletions(-) diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 7b4a1509b2308..8ef12e74910e7 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -357,52 +357,472 @@ where + Sync + 'static, { - /// Create a new instance. - pub fn new( - mode: ChainSyncMode, - client: Arc, - max_parallel_downloads: u32, - max_blocks_per_request: u32, - metrics_registry: Option<&Registry>, - initial_peers: impl Iterator)>, - ) -> Result { - let mut sync = Self { - client, - peers: HashMap::new(), - disconnected_peers: DisconnectedPeers::new(), - blocks: BlockCollection::new(), - best_queued_hash: Default::default(), - best_queued_number: Zero::zero(), - extra_justifications: ExtraRequests::new("justification", metrics_registry), - mode, - queue_blocks: Default::default(), - fork_targets: Default::default(), - allowed_requests: Default::default(), - max_parallel_downloads, - max_blocks_per_request, - downloaded_blocks: 0, - state_sync: None, - import_existing: false, - gap_sync: None, - actions: Vec::new(), - metrics: metrics_registry.and_then(|r| match Metrics::register(r) { - Ok(metrics) => Some(metrics), - Err(err) => { - log::error!( + /// Notify syncing state machine that a new sync peer has connected. + pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { + match self.add_peer_inner(peer_id, best_hash, best_number) { + Ok(Some(request)) => + self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }), + Ok(None) => {}, + Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), + } + } + + /// Notify that a sync peer has disconnected. + pub fn remove_peer(&mut self, peer_id: &PeerId) { + self.blocks.clear_peer_download(peer_id); + if let Some(gap_sync) = &mut self.gap_sync { + gap_sync.blocks.clear_peer_download(peer_id) + } + + if let Some(state) = self.peers.remove(peer_id) { + if !state.state.is_available() { + if let Some(bad_peer) = + self.disconnected_peers.on_disconnect_during_request(*peer_id) + { + self.actions.push(ChainSyncAction::DropPeer(bad_peer)); + } + } + } + + self.extra_justifications.peer_disconnected(peer_id); + self.allowed_requests.set_all(); + self.fork_targets.retain(|_, target| { + target.peers.remove(peer_id); + !target.peers.is_empty() + }); + if let Some(metrics) = &self.metrics { + metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX)); + } + + let blocks = self.ready_blocks(); + + if !blocks.is_empty() { + self.validate_and_queue_blocks(blocks, false); + } + } + + /// Submit a validated block announcement. + /// + /// Returns new best hash & best number of the peer if they are updated. + #[must_use] + pub fn on_validated_block_announce( + &mut self, + is_best: bool, + peer_id: PeerId, + announce: &BlockAnnounce, + ) -> Option<(B::Hash, NumberFor)> { + let number = *announce.header.number(); + let hash = announce.header.hash(); + let parent_status = + self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown); + let known_parent = parent_status != BlockStatus::Unknown; + let ancient_parent = parent_status == BlockStatus::InChainPruned; + + let known = self.is_known(&hash); + let peer = if let Some(peer) = self.peers.get_mut(&peer_id) { + peer + } else { + error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}"); + return Some((hash, number)) + }; + + if let PeerSyncState::AncestorSearch { .. } = peer.state { + trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id); + return None + } + + let peer_info = is_best.then(|| { + // update their best block + peer.best_number = number; + peer.best_hash = hash; + + (hash, number) + }); + + // If the announced block is the best they have and is not ahead of us, our common number + // is either one further ahead or it's the one they just announced, if we know about it. + if is_best { + if known && self.best_queued_number >= number { + self.update_peer_common_number(&peer_id, number); + } else if announce.header.parent_hash() == &self.best_queued_hash || + known_parent && self.best_queued_number >= number + { + self.update_peer_common_number(&peer_id, number.saturating_sub(One::one())); + } + } + self.allowed_requests.add(&peer_id); + + // known block case + if known || self.is_already_downloading(&hash) { + trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash); + if let Some(target) = self.fork_targets.get_mut(&hash) { + target.peers.insert(peer_id); + } + return peer_info + } + + if ancient_parent { + trace!( + target: LOG_TARGET, + "Ignored ancient block announced from {}: {} {:?}", + peer_id, + hash, + announce.header, + ); + return peer_info + } + + if self.status().state == SyncState::Idle { + trace!( + target: LOG_TARGET, + "Added sync target for block announced from {}: {} {:?}", + peer_id, + hash, + announce.summary(), + ); + self.fork_targets + .entry(hash) + .or_insert_with(|| { + if let Some(metrics) = &self.metrics { + metrics.fork_targets.inc(); + } + + ForkTarget { + number, + parent_hash: Some(*announce.header.parent_hash()), + peers: Default::default(), + } + }) + .peers + .insert(peer_id); + } + + peer_info + } + + /// Configure an explicit fork sync request in case external code has detected that there is a + /// stale fork missing. + /// + /// Note that this function should not be used for recent blocks. + /// Sync should be able to download all the recent forks normally. + /// + /// Passing empty `peers` set effectively removes the sync request. + // The implementation is similar to `on_validated_block_announce` with unknown parent hash. + pub fn set_sync_fork_request( + &mut self, + mut peers: Vec, + hash: &B::Hash, + number: NumberFor, + ) { + if peers.is_empty() { + peers = self + .peers + .iter() + // Only request blocks from peers who are ahead or on a par. + .filter(|(_, peer)| peer.best_number >= number) + .map(|(id, _)| *id) + .collect(); + + debug!( + target: LOG_TARGET, + "Explicit sync request for block {hash:?} with no peers specified. \ + Syncing from these peers {peers:?} instead.", + ); + } else { + debug!( + target: LOG_TARGET, + "Explicit sync request for block {hash:?} with {peers:?}", + ); + } + + if self.is_known(hash) { + debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}"); + return + } + + trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}"); + for peer_id in &peers { + if let Some(peer) = self.peers.get_mut(peer_id) { + if let PeerSyncState::AncestorSearch { .. } = peer.state { + continue + } + + if number > peer.best_number { + peer.best_number = number; + peer.best_hash = *hash; + } + self.allowed_requests.add(peer_id); + } + } + + self.fork_targets + .entry(*hash) + .or_insert_with(|| { + if let Some(metrics) = &self.metrics { + metrics.fork_targets.inc(); + } + + ForkTarget { number, peers: Default::default(), parent_hash: None } + }) + .peers + .extend(peers); + } + + /// Request extra justification. + pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + let client = &self.client; + self.extra_justifications + .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block)) + } + + /// Clear extra justification requests. + pub fn clear_justification_requests(&mut self) { + self.extra_justifications.reset(); + } + + /// Report a justification import (successful or not). + pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; + self.extra_justifications + .try_finalize_root((hash, number), finalization_result, true); + self.allowed_requests.set_all(); + } + + /// Submit blocks received in a response. + pub fn on_block_response( + &mut self, + peer_id: PeerId, + request: BlockRequest, + blocks: Vec>, + ) { + let block_response = BlockResponse:: { id: request.id, blocks }; + + let blocks_range = || match ( + block_response + .blocks + .first() + .and_then(|b| b.header.as_ref().map(|h| h.number())), + block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), + ) { + (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), + (Some(first), Some(_)) => format!(" ({})", first), + _ => Default::default(), + }; + trace!( + target: LOG_TARGET, + "BlockResponse {} from {} with {} blocks {}", + block_response.id, + peer_id, + block_response.blocks.len(), + blocks_range(), + ); + + let res = if request.fields == BlockAttributes::JUSTIFICATION { + self.on_block_justification(peer_id, block_response) + } else { + self.on_block_data(&peer_id, Some(request), block_response) + }; + + if let Err(bad_peer) = res { + self.actions.push(ChainSyncAction::DropPeer(bad_peer)); + } + } + + /// Submit a state received in a response. + pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) { + if let Err(bad_peer) = self.on_state_data(&peer_id, response) { + self.actions.push(ChainSyncAction::DropPeer(bad_peer)); + } + } + + /// A batch of blocks have been processed, with or without errors. + /// + /// Call this when a batch of blocks have been processed by the import + /// queue, with or without errors. + pub fn on_blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)>, + ) { + trace!(target: LOG_TARGET, "Imported {imported} of {count}"); + + let mut has_error = false; + for (_, hash) in &results { + if self.queue_blocks.remove(hash) { + if let Some(metrics) = &self.metrics { + metrics.queued_blocks.dec(); + } + } + self.blocks.clear_queued(hash); + if let Some(gap_sync) = &mut self.gap_sync { + gap_sync.blocks.clear_queued(hash); + } + } + for (result, hash) in results { + if has_error { + break + } + + has_error |= result.is_err(); + + match result { + Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => + if let Some(peer) = peer_id { + self.update_peer_common_number(&peer, number); + }, + Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => { + if aux.clear_justification_requests { + trace!( + target: LOG_TARGET, + "Block imported clears all pending justification requests {number}: {hash:?}", + ); + self.clear_justification_requests(); + } + + if aux.needs_justification { + trace!( + target: LOG_TARGET, + "Block imported but requires justification {number}: {hash:?}", + ); + self.request_justification(&hash, number); + } + + if aux.bad_justification { + if let Some(ref peer) = peer_id { + warn!("💔 Sent block with bad justification to import"); + self.actions.push(ChainSyncAction::DropPeer(BadPeer( + *peer, + rep::BAD_JUSTIFICATION, + ))); + } + } + + if let Some(peer) = peer_id { + self.update_peer_common_number(&peer, number); + } + let state_sync_complete = + self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash); + if state_sync_complete { + info!( + target: LOG_TARGET, + "State sync is complete ({} MiB), restarting block sync.", + self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)), + ); + self.state_sync = None; + self.mode = ChainSyncMode::Full; + self.restart(); + } + let gap_sync_complete = + self.gap_sync.as_ref().map_or(false, |s| s.target == number); + if gap_sync_complete { + info!( + target: LOG_TARGET, + "Block history download is complete." + ); + self.gap_sync = None; + } + }, + Err(BlockImportError::IncompleteHeader(peer_id)) => + if let Some(peer) = peer_id { + warn!( + target: LOG_TARGET, + "💔 Peer sent block with incomplete header to import", + ); + self.actions + .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER))); + self.restart(); + }, + Err(BlockImportError::VerificationFailed(peer_id, e)) => { + let extra_message = peer_id + .map_or_else(|| "".into(), |peer| format!(" received from ({peer})")); + + warn!( target: LOG_TARGET, - "Failed to register `ChainSync` metrics {err:?}", + "💔 Verification failed for block {hash:?}{extra_message}: {e:?}", ); - None + + if let Some(peer) = peer_id { + self.actions + .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL))); + } + + self.restart(); + }, + Err(BlockImportError::BadBlock(peer_id)) => + if let Some(peer) = peer_id { + warn!( + target: LOG_TARGET, + "💔 Block {hash:?} received from peer {peer} has been blacklisted", + ); + self.actions.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); + }, + Err(BlockImportError::MissingState) => { + // This may happen if the chain we were requesting upon has been discarded + // in the meantime because other chain has been finalized. + // Don't mark it as bad as it still may be synced if explicitly requested. + trace!(target: LOG_TARGET, "Obsolete block {hash:?}"); }, - }), - }; + e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => { + warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err()); + self.state_sync = None; + self.restart(); + }, + Err(BlockImportError::Cancelled) => {}, + }; + } - sync.reset_sync_start_point()?; - initial_peers.for_each(|(peer_id, best_hash, best_number)| { - sync.add_peer(peer_id, best_hash, best_number); + self.allowed_requests.set_all(); + } + + /// Notify sync that a block has been finalized. + pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { + let client = &self.client; + let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| { + is_descendent_of(&**client, base, block) }); - Ok(sync) + if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode { + if self.state_sync.is_none() && !self.peers.is_empty() && self.queue_blocks.is_empty() { + // Finalized a recent block. + let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect(); + heads.sort(); + let median = heads[heads.len() / 2]; + if number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { + if let Ok(Some(header)) = self.client.header(*hash) { + log::debug!( + target: LOG_TARGET, + "Starting state sync for #{number} ({hash})", + ); + self.state_sync = Some(StateSync::new( + self.client.clone(), + header, + None, + None, + *skip_proofs, + )); + self.allowed_requests.set_all(); + } + } + } + } + + if let Err(err) = r { + warn!( + target: LOG_TARGET, + "💔 Error cleaning up pending extra justification data requests: {err}", + ); + } + } + + /// Inform sync about a new best imported block. + pub fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { + self.on_block_queued(best_hash, best_number); + } + + /// Get the number of peers known to the syncing state machine. + pub fn num_peers(&self) -> usize { + self.peers.len() } /// Returns the current sync status. @@ -444,6 +864,11 @@ where } } + /// Get the total number of downloaded blocks. + pub fn num_downloaded_blocks(&self) -> usize { + self.downloaded_blocks + } + /// Get an estimate of the number of parallel sync requests. pub fn num_sync_requests(&self) -> usize { self.fork_targets @@ -452,24 +877,76 @@ where .count() } - /// Get the total number of downloaded blocks. - pub fn num_downloaded_blocks(&self) -> usize { - self.downloaded_blocks - } + /// Get pending actions to perform. + #[must_use] + pub fn actions(&mut self) -> impl Iterator> { + let block_requests = self + .block_requests() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); + self.actions.extend(block_requests); - /// Get the number of peers known to the syncing state machine. - pub fn num_peers(&self) -> usize { - self.peers.len() + let justification_requests = self + .justification_requests() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); + self.actions.extend(justification_requests); + + let state_request = self + .state_request() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendStateRequest { peer_id, request }); + self.actions.extend(state_request); + + std::mem::take(&mut self.actions).into_iter() } - /// Notify syncing state machine that a new sync peer has connected. - pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { - match self.add_peer_inner(peer_id, best_hash, best_number) { - Ok(Some(request)) => - self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }), - Ok(None) => {}, - Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), - } + /// Create a new instance. + pub fn new( + mode: ChainSyncMode, + client: Arc, + max_parallel_downloads: u32, + max_blocks_per_request: u32, + metrics_registry: Option<&Registry>, + initial_peers: impl Iterator)>, + ) -> Result { + let mut sync = Self { + client, + peers: HashMap::new(), + disconnected_peers: DisconnectedPeers::new(), + blocks: BlockCollection::new(), + best_queued_hash: Default::default(), + best_queued_number: Zero::zero(), + extra_justifications: ExtraRequests::new("justification", metrics_registry), + mode, + queue_blocks: Default::default(), + fork_targets: Default::default(), + allowed_requests: Default::default(), + max_parallel_downloads, + max_blocks_per_request, + downloaded_blocks: 0, + state_sync: None, + import_existing: false, + gap_sync: None, + actions: Vec::new(), + metrics: metrics_registry.and_then(|r| match Metrics::register(r) { + Ok(metrics) => Some(metrics), + Err(err) => { + log::error!( + target: LOG_TARGET, + "Failed to register `ChainSync` metrics {err:?}", + ); + None + }, + }), + }; + + sync.reset_sync_start_point()?; + initial_peers.for_each(|(peer_id, best_hash, best_number)| { + sync.add_peer(peer_id, best_hash, best_number); + }); + + Ok(sync) } #[must_use] @@ -551,128 +1028,43 @@ where }, Some(ancestry_request::(common_best)), ) - }; - - self.allowed_requests.add(&peer_id); - self.peers.insert( - peer_id, - PeerSync { - peer_id, - common_number: Zero::zero(), - best_hash, - best_number, - state, - }, - ); - - Ok(req) - }, - Ok(BlockStatus::Queued) | - Ok(BlockStatus::InChainWithState) | - Ok(BlockStatus::InChainPruned) => { - debug!( - target: LOG_TARGET, - "New peer {peer_id} with known best hash {best_hash} ({best_number}).", - ); - self.peers.insert( - peer_id, - PeerSync { - peer_id, - common_number: std::cmp::min(self.best_queued_number, best_number), - best_hash, - best_number, - state: PeerSyncState::Available, - }, - ); - self.allowed_requests.add(&peer_id); - Ok(None) - }, - } - } - - /// Inform sync about a new best imported block. - pub fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { - self.on_block_queued(best_hash, best_number); - } - - /// Request extra justification. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - let client = &self.client; - self.extra_justifications - .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block)) - } - - /// Clear extra justification requests. - pub fn clear_justification_requests(&mut self) { - self.extra_justifications.reset(); - } - - /// Configure an explicit fork sync request in case external code has detected that there is a - /// stale fork missing. - /// - /// Note that this function should not be used for recent blocks. - /// Sync should be able to download all the recent forks normally. - /// - /// Passing empty `peers` set effectively removes the sync request. - // The implementation is similar to `on_validated_block_announce` with unknown parent hash. - pub fn set_sync_fork_request( - &mut self, - mut peers: Vec, - hash: &B::Hash, - number: NumberFor, - ) { - if peers.is_empty() { - peers = self - .peers - .iter() - // Only request blocks from peers who are ahead or on a par. - .filter(|(_, peer)| peer.best_number >= number) - .map(|(id, _)| *id) - .collect(); - - debug!( - target: LOG_TARGET, - "Explicit sync request for block {hash:?} with no peers specified. \ - Syncing from these peers {peers:?} instead.", - ); - } else { - debug!( - target: LOG_TARGET, - "Explicit sync request for block {hash:?} with {peers:?}", - ); - } - - if self.is_known(hash) { - debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}"); - return - } + }; - trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}"); - for peer_id in &peers { - if let Some(peer) = self.peers.get_mut(peer_id) { - if let PeerSyncState::AncestorSearch { .. } = peer.state { - continue - } + self.allowed_requests.add(&peer_id); + self.peers.insert( + peer_id, + PeerSync { + peer_id, + common_number: Zero::zero(), + best_hash, + best_number, + state, + }, + ); - if number > peer.best_number { - peer.best_number = number; - peer.best_hash = *hash; - } - self.allowed_requests.add(peer_id); - } + Ok(req) + }, + Ok(BlockStatus::Queued) | + Ok(BlockStatus::InChainWithState) | + Ok(BlockStatus::InChainPruned) => { + debug!( + target: LOG_TARGET, + "New peer {peer_id} with known best hash {best_hash} ({best_number}).", + ); + self.peers.insert( + peer_id, + PeerSync { + peer_id, + common_number: std::cmp::min(self.best_queued_number, best_number), + best_hash, + best_number, + state: PeerSyncState::Available, + }, + ); + self.allowed_requests.add(&peer_id); + Ok(None) + }, } - - self.fork_targets - .entry(*hash) - .or_insert_with(|| { - if let Some(metrics) = &self.metrics { - metrics.fork_targets.inc(); - } - - ForkTarget { number, peers: Default::default(), parent_hash: None } - }) - .peers - .extend(peers); } /// Submit a block response for processing. @@ -959,223 +1351,42 @@ where if hash != block.hash { warn!( target: LOG_TARGET, - "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}", - peer_id, - hash, - block.hash, - ); - return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)) - } - - block - .justifications - .or_else(|| legacy_justification_mapping(block.justification)) - } else { - // we might have asked the peer for a justification on a block that we assumed it - // had but didn't (regardless of whether it had a justification for it or not). - trace!( - target: LOG_TARGET, - "Peer {peer_id:?} provided empty response for justification request {hash:?}", - ); - - None - }; - - if let Some((peer_id, hash, number, justifications)) = - self.extra_justifications.on_response(peer_id, justification) - { - self.actions.push(ChainSyncAction::ImportJustifications { - peer_id, - hash, - number, - justifications, - }); - return Ok(()) - } - } - - Ok(()) - } - - /// Report a justification import (successful or not). - pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { - let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; - self.extra_justifications - .try_finalize_root((hash, number), finalization_result, true); - self.allowed_requests.set_all(); - } - - /// Notify sync that a block has been finalized. - pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { - let client = &self.client; - let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| { - is_descendent_of(&**client, base, block) - }); - - if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode { - if self.state_sync.is_none() && !self.peers.is_empty() && self.queue_blocks.is_empty() { - // Finalized a recent block. - let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect(); - heads.sort(); - let median = heads[heads.len() / 2]; - if number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { - if let Ok(Some(header)) = self.client.header(*hash) { - log::debug!( - target: LOG_TARGET, - "Starting state sync for #{number} ({hash})", - ); - self.state_sync = Some(StateSync::new( - self.client.clone(), - header, - None, - None, - *skip_proofs, - )); - self.allowed_requests.set_all(); - } - } - } - } - - if let Err(err) = r { - warn!( - target: LOG_TARGET, - "💔 Error cleaning up pending extra justification data requests: {err}", - ); - } - } - - /// Submit a validated block announcement. - /// - /// Returns new best hash & best number of the peer if they are updated. - #[must_use] - pub fn on_validated_block_announce( - &mut self, - is_best: bool, - peer_id: PeerId, - announce: &BlockAnnounce, - ) -> Option<(B::Hash, NumberFor)> { - let number = *announce.header.number(); - let hash = announce.header.hash(); - let parent_status = - self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown); - let known_parent = parent_status != BlockStatus::Unknown; - let ancient_parent = parent_status == BlockStatus::InChainPruned; - - let known = self.is_known(&hash); - let peer = if let Some(peer) = self.peers.get_mut(&peer_id) { - peer - } else { - error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}"); - return Some((hash, number)) - }; - - if let PeerSyncState::AncestorSearch { .. } = peer.state { - trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id); - return None - } - - let peer_info = is_best.then(|| { - // update their best block - peer.best_number = number; - peer.best_hash = hash; - - (hash, number) - }); - - // If the announced block is the best they have and is not ahead of us, our common number - // is either one further ahead or it's the one they just announced, if we know about it. - if is_best { - if known && self.best_queued_number >= number { - self.update_peer_common_number(&peer_id, number); - } else if announce.header.parent_hash() == &self.best_queued_hash || - known_parent && self.best_queued_number >= number - { - self.update_peer_common_number(&peer_id, number.saturating_sub(One::one())); - } - } - self.allowed_requests.add(&peer_id); - - // known block case - if known || self.is_already_downloading(&hash) { - trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash); - if let Some(target) = self.fork_targets.get_mut(&hash) { - target.peers.insert(peer_id); - } - return peer_info - } - - if ancient_parent { - trace!( - target: LOG_TARGET, - "Ignored ancient block announced from {}: {} {:?}", - peer_id, - hash, - announce.header, - ); - return peer_info - } - - if self.status().state == SyncState::Idle { - trace!( - target: LOG_TARGET, - "Added sync target for block announced from {}: {} {:?}", - peer_id, - hash, - announce.summary(), - ); - self.fork_targets - .entry(hash) - .or_insert_with(|| { - if let Some(metrics) = &self.metrics { - metrics.fork_targets.inc(); - } - - ForkTarget { - number, - parent_hash: Some(*announce.header.parent_hash()), - peers: Default::default(), - } - }) - .peers - .insert(peer_id); - } - - peer_info - } - - /// Notify that a sync peer has disconnected. - pub fn remove_peer(&mut self, peer_id: &PeerId) { - self.blocks.clear_peer_download(peer_id); - if let Some(gap_sync) = &mut self.gap_sync { - gap_sync.blocks.clear_peer_download(peer_id) - } - - if let Some(state) = self.peers.remove(peer_id) { - if !state.state.is_available() { - if let Some(bad_peer) = - self.disconnected_peers.on_disconnect_during_request(*peer_id) - { - self.actions.push(ChainSyncAction::DropPeer(bad_peer)); + "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}", + peer_id, + hash, + block.hash, + ); + return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)) } - } - } - self.extra_justifications.peer_disconnected(peer_id); - self.allowed_requests.set_all(); - self.fork_targets.retain(|_, target| { - target.peers.remove(peer_id); - !target.peers.is_empty() - }); - if let Some(metrics) = &self.metrics { - metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX)); - } + block + .justifications + .or_else(|| legacy_justification_mapping(block.justification)) + } else { + // we might have asked the peer for a justification on a block that we assumed it + // had but didn't (regardless of whether it had a justification for it or not). + trace!( + target: LOG_TARGET, + "Peer {peer_id:?} provided empty response for justification request {hash:?}", + ); - let blocks = self.ready_blocks(); + None + }; - if !blocks.is_empty() { - self.validate_and_queue_blocks(blocks, false); + if let Some((peer_id, hash, number, justifications)) = + self.extra_justifications.on_response(peer_id, justification) + { + self.actions.push(ChainSyncAction::ImportJustifications { + peer_id, + hash, + number, + justifications, + }); + return Ok(()) + } } + + Ok(()) } /// Returns the median seen block number. @@ -1444,53 +1655,6 @@ where .collect() } - /// Submit blocks received in a response. - pub fn on_block_response( - &mut self, - peer_id: PeerId, - request: BlockRequest, - blocks: Vec>, - ) { - let block_response = BlockResponse:: { id: request.id, blocks }; - - let blocks_range = || match ( - block_response - .blocks - .first() - .and_then(|b| b.header.as_ref().map(|h| h.number())), - block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), - ) { - (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), - (Some(first), Some(_)) => format!(" ({})", first), - _ => Default::default(), - }; - trace!( - target: LOG_TARGET, - "BlockResponse {} from {} with {} blocks {}", - block_response.id, - peer_id, - block_response.blocks.len(), - blocks_range(), - ); - - let res = if request.fields == BlockAttributes::JUSTIFICATION { - self.on_block_justification(peer_id, block_response) - } else { - self.on_block_data(&peer_id, Some(request), block_response) - }; - - if let Err(bad_peer) = res { - self.actions.push(ChainSyncAction::DropPeer(bad_peer)); - } - } - - /// Submit a state received in a response. - pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) { - if let Err(bad_peer) = self.on_state_data(&peer_id, response) { - self.actions.push(ChainSyncAction::DropPeer(bad_peer)); - } - } - /// Get justification requests scheduled by sync to be sent out. fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { let peers = &mut self.peers; @@ -1739,170 +1903,6 @@ where } } - /// A batch of blocks have been processed, with or without errors. - /// - /// Call this when a batch of blocks have been processed by the import - /// queue, with or without errors. - pub fn on_blocks_processed( - &mut self, - imported: usize, - count: usize, - results: Vec<(Result>, BlockImportError>, B::Hash)>, - ) { - trace!(target: LOG_TARGET, "Imported {imported} of {count}"); - - let mut has_error = false; - for (_, hash) in &results { - if self.queue_blocks.remove(hash) { - if let Some(metrics) = &self.metrics { - metrics.queued_blocks.dec(); - } - } - self.blocks.clear_queued(hash); - if let Some(gap_sync) = &mut self.gap_sync { - gap_sync.blocks.clear_queued(hash); - } - } - for (result, hash) in results { - if has_error { - break - } - - has_error |= result.is_err(); - - match result { - Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => - if let Some(peer) = peer_id { - self.update_peer_common_number(&peer, number); - }, - Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => { - if aux.clear_justification_requests { - trace!( - target: LOG_TARGET, - "Block imported clears all pending justification requests {number}: {hash:?}", - ); - self.clear_justification_requests(); - } - - if aux.needs_justification { - trace!( - target: LOG_TARGET, - "Block imported but requires justification {number}: {hash:?}", - ); - self.request_justification(&hash, number); - } - - if aux.bad_justification { - if let Some(ref peer) = peer_id { - warn!("💔 Sent block with bad justification to import"); - self.actions.push(ChainSyncAction::DropPeer(BadPeer( - *peer, - rep::BAD_JUSTIFICATION, - ))); - } - } - - if let Some(peer) = peer_id { - self.update_peer_common_number(&peer, number); - } - let state_sync_complete = - self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash); - if state_sync_complete { - info!( - target: LOG_TARGET, - "State sync is complete ({} MiB), restarting block sync.", - self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)), - ); - self.state_sync = None; - self.mode = ChainSyncMode::Full; - self.restart(); - } - let gap_sync_complete = - self.gap_sync.as_ref().map_or(false, |s| s.target == number); - if gap_sync_complete { - info!( - target: LOG_TARGET, - "Block history download is complete." - ); - self.gap_sync = None; - } - }, - Err(BlockImportError::IncompleteHeader(peer_id)) => - if let Some(peer) = peer_id { - warn!( - target: LOG_TARGET, - "💔 Peer sent block with incomplete header to import", - ); - self.actions - .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER))); - self.restart(); - }, - Err(BlockImportError::VerificationFailed(peer_id, e)) => { - let extra_message = peer_id - .map_or_else(|| "".into(), |peer| format!(" received from ({peer})")); - - warn!( - target: LOG_TARGET, - "💔 Verification failed for block {hash:?}{extra_message}: {e:?}", - ); - - if let Some(peer) = peer_id { - self.actions - .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL))); - } - - self.restart(); - }, - Err(BlockImportError::BadBlock(peer_id)) => - if let Some(peer) = peer_id { - warn!( - target: LOG_TARGET, - "💔 Block {hash:?} received from peer {peer} has been blacklisted", - ); - self.actions.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); - }, - Err(BlockImportError::MissingState) => { - // This may happen if the chain we were requesting upon has been discarded - // in the meantime because other chain has been finalized. - // Don't mark it as bad as it still may be synced if explicitly requested. - trace!(target: LOG_TARGET, "Obsolete block {hash:?}"); - }, - e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => { - warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err()); - self.state_sync = None; - self.restart(); - }, - Err(BlockImportError::Cancelled) => {}, - }; - } - - self.allowed_requests.set_all(); - } - - /// Get pending actions to perform. - #[must_use] - pub fn actions(&mut self) -> impl Iterator> { - let block_requests = self - .block_requests() - .into_iter() - .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); - self.actions.extend(block_requests); - - let justification_requests = self - .justification_requests() - .into_iter() - .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); - self.actions.extend(justification_requests); - - let state_request = self - .state_request() - .into_iter() - .map(|(peer_id, request)| ChainSyncAction::SendStateRequest { peer_id, request }); - self.actions.extend(state_request); - - std::mem::take(&mut self.actions).into_iter() - } - /// A version of `actions()` that doesn't schedule extra requests. For testing only. #[cfg(test)] #[must_use] From 9b1d08268c7aa985901d648e835eaed357318214 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 20 Aug 2024 23:36:49 +0300 Subject: [PATCH 6/7] Implement `SyncingStrategy` for `ChainSync` --- substrate/client/network/sync/src/strategy.rs | 40 ++-- .../network/sync/src/strategy/chain_sync.rs | 205 +++++++++--------- .../sync/src/strategy/chain_sync/test.rs | 30 +-- 3 files changed, 134 insertions(+), 141 deletions(-) diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index bb0f97b58db2b..f8d6976bbaa0d 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -29,7 +29,7 @@ use crate::{ types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncStatus}, LOG_TARGET, }; -use chain_sync::{ChainSync, ChainSyncAction, ChainSyncMode}; +use chain_sync::{ChainSync, ChainSyncMode}; use log::{debug, error, info}; use prometheus_endpoint::Registry; use sc_client_api::{BlockBackend, ProofProvider}; @@ -64,10 +64,10 @@ pub trait SyncingStrategy: Send where B: BlockT, { - /// Notify that a new peer has connected. + /// Notify syncing state machine that a new sync peer has connected. fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor); - /// Notify that a peer has disconnected. + /// Notify that a sync peer has disconnected. fn remove_peer(&mut self, peer_id: &PeerId); /// Submit a validated block announcement. @@ -83,6 +83,11 @@ where /// Configure an explicit fork sync request in case external code has detected that there is a /// stale fork missing. + /// + /// Note that this function should not be used for recent blocks. + /// Sync should be able to download all the recent forks normally. + /// + /// Passing empty `peers` set effectively removes the sync request. fn set_sync_fork_request(&mut self, peers: Vec, hash: &B::Hash, number: NumberFor); /// Request extra justification. @@ -119,7 +124,10 @@ where response: EncodedProof, ); - /// A batch of blocks have been processed, with or without errors. + /// A batch of blocks that have been processed, with or without errors. + /// + /// Call this when a batch of blocks that have been processed by the import queue, with or + /// without errors. fn on_blocks_processed( &mut self, imported: usize, @@ -234,24 +242,6 @@ impl From> for SyncingAction { } } -impl From> for SyncingAction { - fn from(action: ChainSyncAction) -> Self { - match action { - ChainSyncAction::SendBlockRequest { peer_id, request } => - SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request }, - ChainSyncAction::SendStateRequest { peer_id, request } => - SyncingAction::SendStateRequest { peer_id, key: StrategyKey::ChainSync, request }, - ChainSyncAction::CancelRequest { peer_id } => - SyncingAction::CancelRequest { peer_id, key: StrategyKey::ChainSync }, - ChainSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), - ChainSyncAction::ImportBlocks { origin, blocks } => - SyncingAction::ImportBlocks { origin, blocks }, - ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications } => - SyncingAction::ImportJustifications { peer_id, hash, number, justifications }, - } - } -} - /// Proxy to specific syncing strategies used in Polkadot. pub struct PolkadotSyncingStrategy { /// Initial syncing configuration. @@ -369,7 +359,7 @@ where } else if let (StrategyKey::ChainSync, Some(ref mut chain_sync)) = (key, &mut self.chain_sync) { - chain_sync.on_block_response(peer_id, request, blocks); + chain_sync.on_block_response(peer_id, key, request, blocks); } else { error!( target: LOG_TARGET, @@ -391,7 +381,7 @@ where } else if let (StrategyKey::ChainSync, Some(ref mut chain_sync)) = (key, &mut self.chain_sync) { - chain_sync.on_state_response(peer_id, response); + chain_sync.on_state_response(peer_id, key, response); } else { error!( target: LOG_TARGET, @@ -493,7 +483,7 @@ where } else if let Some(ref mut state) = self.state { state.actions().map(Into::into).collect() } else if let Some(ref mut chain_sync) = self.chain_sync { - chain_sync.actions().map(Into::into).collect() + chain_sync.actions()? } else { unreachable!("At least one syncing strategy is always active; qed") }; diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 8ef12e74910e7..c1894b9aebd1f 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -35,7 +35,8 @@ use crate::{ strategy::{ disconnected_peers::DisconnectedPeers, state_sync::{ImportResult, StateSync, StateSyncProvider}, - warp::{WarpSyncPhase, WarpSyncProgress}, + warp::{EncodedProof, WarpSyncPhase, WarpSyncProgress}, + StrategyKey, SyncingAction, SyncingStrategy, }, types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncState, SyncStatus}, LOG_TARGET, @@ -197,28 +198,6 @@ struct GapSync { target: NumberFor, } -/// Action that the parent of [`ChainSync`] should perform after reporting a network or block event. -#[derive(Debug)] -pub enum ChainSyncAction { - /// Send block request to peer. Always implies dropping a stale block request to the same peer. - SendBlockRequest { peer_id: PeerId, request: BlockRequest }, - /// Send state request to peer. - SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest }, - /// Drop stale request. - CancelRequest { peer_id: PeerId }, - /// Peer misbehaved. Disconnect, report it and cancel the block request to it. - DropPeer(BadPeer), - /// Import blocks. - ImportBlocks { origin: BlockOrigin, blocks: Vec> }, - /// Import justifications. - ImportJustifications { - peer_id: PeerId, - hash: B::Hash, - number: NumberFor, - justifications: Justifications, - }, -} - /// Sync operation mode. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum ChainSyncMode { @@ -341,12 +320,12 @@ pub struct ChainSync { /// Gap download process. gap_sync: Option>, /// Pending actions. - actions: Vec>, + actions: Vec>, /// Prometheus metrics. metrics: Option, } -impl ChainSync +impl SyncingStrategy for ChainSync where B: BlockT, Client: HeaderBackend @@ -357,18 +336,19 @@ where + Sync + 'static, { - /// Notify syncing state machine that a new sync peer has connected. - pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { + fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { match self.add_peer_inner(peer_id, best_hash, best_number) { - Ok(Some(request)) => - self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }), + Ok(Some(request)) => self.actions.push(SyncingAction::SendBlockRequest { + peer_id, + key: StrategyKey::ChainSync, + request, + }), Ok(None) => {}, - Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), + Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)), } } - /// Notify that a sync peer has disconnected. - pub fn remove_peer(&mut self, peer_id: &PeerId) { + fn remove_peer(&mut self, peer_id: &PeerId) { self.blocks.clear_peer_download(peer_id); if let Some(gap_sync) = &mut self.gap_sync { gap_sync.blocks.clear_peer_download(peer_id) @@ -379,7 +359,7 @@ where if let Some(bad_peer) = self.disconnected_peers.on_disconnect_during_request(*peer_id) { - self.actions.push(ChainSyncAction::DropPeer(bad_peer)); + self.actions.push(SyncingAction::DropPeer(bad_peer)); } } } @@ -401,11 +381,7 @@ where } } - /// Submit a validated block announcement. - /// - /// Returns new best hash & best number of the peer if they are updated. - #[must_use] - pub fn on_validated_block_announce( + fn on_validated_block_announce( &mut self, is_best: bool, peer_id: PeerId, @@ -500,15 +476,8 @@ where peer_info } - /// Configure an explicit fork sync request in case external code has detected that there is a - /// stale fork missing. - /// - /// Note that this function should not be used for recent blocks. - /// Sync should be able to download all the recent forks normally. - /// - /// Passing empty `peers` set effectively removes the sync request. // The implementation is similar to `on_validated_block_announce` with unknown parent hash. - pub fn set_sync_fork_request( + fn set_sync_fork_request( &mut self, mut peers: Vec, hash: &B::Hash, @@ -568,33 +537,37 @@ where .extend(peers); } - /// Request extra justification. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { let client = &self.client; self.extra_justifications .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block)) } - /// Clear extra justification requests. - pub fn clear_justification_requests(&mut self) { + fn clear_justification_requests(&mut self) { self.extra_justifications.reset(); } - /// Report a justification import (successful or not). - pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; self.extra_justifications .try_finalize_root((hash, number), finalization_result, true); self.allowed_requests.set_all(); } - /// Submit blocks received in a response. - pub fn on_block_response( + fn on_block_response( &mut self, peer_id: PeerId, + key: StrategyKey, request: BlockRequest, blocks: Vec>, ) { + if key != StrategyKey::ChainSync { + error!( + target: LOG_TARGET, + "`on_block_response()` called with unexpected key {key:?} for chain sync", + ); + debug_assert!(false); + } let block_response = BlockResponse:: { id: request.id, blocks }; let blocks_range = || match ( @@ -624,22 +597,42 @@ where }; if let Err(bad_peer) = res { - self.actions.push(ChainSyncAction::DropPeer(bad_peer)); + self.actions.push(SyncingAction::DropPeer(bad_peer)); } } - /// Submit a state received in a response. - pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) { + fn on_state_response( + &mut self, + peer_id: PeerId, + key: StrategyKey, + response: OpaqueStateResponse, + ) { + if key != StrategyKey::ChainSync { + error!( + target: LOG_TARGET, + "`on_state_response()` called with unexpected key {key:?} for chain sync", + ); + debug_assert!(false); + } if let Err(bad_peer) = self.on_state_data(&peer_id, response) { - self.actions.push(ChainSyncAction::DropPeer(bad_peer)); + self.actions.push(SyncingAction::DropPeer(bad_peer)); } } - /// A batch of blocks have been processed, with or without errors. - /// - /// Call this when a batch of blocks have been processed by the import - /// queue, with or without errors. - pub fn on_blocks_processed( + fn on_warp_proof_response( + &mut self, + _peer_id: &PeerId, + _key: StrategyKey, + _response: EncodedProof, + ) { + error!( + target: LOG_TARGET, + "`on_warp_proof_response()` called for chain sync strategy", + ); + debug_assert!(false); + } + + fn on_blocks_processed( &mut self, imported: usize, count: usize, @@ -691,7 +684,7 @@ where if aux.bad_justification { if let Some(ref peer) = peer_id { warn!("💔 Sent block with bad justification to import"); - self.actions.push(ChainSyncAction::DropPeer(BadPeer( + self.actions.push(SyncingAction::DropPeer(BadPeer( *peer, rep::BAD_JUSTIFICATION, ))); @@ -730,7 +723,7 @@ where "💔 Peer sent block with incomplete header to import", ); self.actions - .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER))); + .push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER))); self.restart(); }, Err(BlockImportError::VerificationFailed(peer_id, e)) => { @@ -744,7 +737,7 @@ where if let Some(peer) = peer_id { self.actions - .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL))); + .push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL))); } self.restart(); @@ -755,7 +748,7 @@ where target: LOG_TARGET, "💔 Block {hash:?} received from peer {peer} has been blacklisted", ); - self.actions.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); + self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); }, Err(BlockImportError::MissingState) => { // This may happen if the chain we were requesting upon has been discarded @@ -775,8 +768,7 @@ where self.allowed_requests.set_all(); } - /// Notify sync that a block has been finalized. - pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { + fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { let client = &self.client; let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| { is_descendent_of(&**client, base, block) @@ -815,18 +807,19 @@ where } } - /// Inform sync about a new best imported block. - pub fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { + fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { self.on_block_queued(best_hash, best_number); } - /// Get the number of peers known to the syncing state machine. - pub fn num_peers(&self) -> usize { + fn is_major_syncing(&self) -> bool { + self.status().state.is_major_syncing() + } + + fn num_peers(&self) -> usize { self.peers.len() } - /// Returns the current sync status. - pub fn status(&self) -> SyncStatus { + fn status(&self) -> SyncStatus { let median_seen = self.median_seen(); let best_seen_block = median_seen.and_then(|median| (median > self.best_queued_number).then_some(median)); @@ -864,43 +857,49 @@ where } } - /// Get the total number of downloaded blocks. - pub fn num_downloaded_blocks(&self) -> usize { + fn num_downloaded_blocks(&self) -> usize { self.downloaded_blocks } - /// Get an estimate of the number of parallel sync requests. - pub fn num_sync_requests(&self) -> usize { + fn num_sync_requests(&self) -> usize { self.fork_targets .values() .filter(|f| f.number <= self.best_queued_number) .count() } - /// Get pending actions to perform. - #[must_use] - pub fn actions(&mut self) -> impl Iterator> { - let block_requests = self - .block_requests() - .into_iter() - .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); + fn actions(&mut self) -> Result>, ClientError> { + let block_requests = self.block_requests().into_iter().map(|(peer_id, request)| { + SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request } + }); self.actions.extend(block_requests); - let justification_requests = self - .justification_requests() - .into_iter() - .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); + let justification_requests = + self.justification_requests().into_iter().map(|(peer_id, request)| { + SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request } + }); self.actions.extend(justification_requests); - let state_request = self - .state_request() - .into_iter() - .map(|(peer_id, request)| ChainSyncAction::SendStateRequest { peer_id, request }); + let state_request = self.state_request().into_iter().map(|(peer_id, request)| { + SyncingAction::SendStateRequest { peer_id, key: StrategyKey::ChainSync, request } + }); self.actions.extend(state_request); - std::mem::take(&mut self.actions).into_iter() + Ok(std::mem::take(&mut self.actions)) } +} +impl ChainSync +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ /// Create a new instance. pub fn new( mode: ChainSyncMode, @@ -1240,8 +1239,9 @@ where state: next_state, }; let request = ancestry_request::(next_num); - self.actions.push(ChainSyncAction::SendBlockRequest { + self.actions.push(SyncingAction::SendBlockRequest { peer_id: *peer_id, + key: StrategyKey::ChainSync, request, }); return Ok(()) @@ -1376,7 +1376,7 @@ where if let Some((peer_id, hash, number, justifications)) = self.extra_justifications.on_response(peer_id, justification) { - self.actions.push(ChainSyncAction::ImportJustifications { + self.actions.push(SyncingAction::ImportJustifications { peer_id, hash, number, @@ -1460,7 +1460,7 @@ where .set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX)); } - self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: new_blocks }) + self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks }) } fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor) { @@ -1539,7 +1539,10 @@ where PeerSyncState::DownloadingGap(_) | PeerSyncState::DownloadingState => { // Cancel a request first, as `add_peer` may generate a new request. - self.actions.push(ChainSyncAction::CancelRequest { peer_id }); + self.actions.push(SyncingAction::CancelRequest { + peer_id, + key: StrategyKey::ChainSync, + }); self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number); }, PeerSyncState::DownloadingJustification(_) => { @@ -1892,7 +1895,7 @@ where state: Some(state), }; debug!(target: LOG_TARGET, "State download is complete. Import is queued"); - self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: vec![block] }); + self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] }); Ok(()) }, ImportResult::Continue => Ok(()), @@ -1906,7 +1909,7 @@ where /// A version of `actions()` that doesn't schedule extra requests. For testing only. #[cfg(test)] #[must_use] - fn take_actions(&mut self) -> impl Iterator> { + fn take_actions(&mut self) -> impl Iterator> { std::mem::take(&mut self.actions).into_iter() } } diff --git a/substrate/client/network/sync/src/strategy/chain_sync/test.rs b/substrate/client/network/sync/src/strategy/chain_sync/test.rs index 39d0c8f8d4d63..59436f387db6a 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync/test.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync/test.rs @@ -128,10 +128,10 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { // we wil send block requests to these peers // for these blocks we don't know about - let actions = sync.actions().collect::>(); + let actions = sync.actions().unwrap(); assert_eq!(actions.len(), 2); assert!(actions.iter().all(|action| match action { - ChainSyncAction::SendBlockRequest { peer_id, .. } => + SyncingAction::SendBlockRequest { peer_id, .. } => peer_id == &peer_id1 || peer_id == &peer_id2, _ => false, })); @@ -162,15 +162,15 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { sync.restart(); // which should make us cancel and send out again block requests to the first two peers - let actions = sync.actions().collect::>(); + let actions = sync.actions().unwrap(); assert_eq!(actions.len(), 4); let mut cancelled_first = HashSet::new(); assert!(actions.iter().all(|action| match action { - ChainSyncAction::CancelRequest { peer_id, .. } => { + SyncingAction::CancelRequest { peer_id, .. } => { cancelled_first.insert(peer_id); peer_id == &peer_id1 || peer_id == &peer_id2 }, - ChainSyncAction::SendBlockRequest { peer_id, .. } => { + SyncingAction::SendBlockRequest { peer_id, .. } => { assert!(cancelled_first.remove(peer_id)); peer_id == &peer_id1 || peer_id == &peer_id2 }, @@ -329,7 +329,7 @@ fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize, + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize, )); best_block_num += max_blocks_to_request as u32; @@ -476,7 +476,7 @@ fn can_sync_huge_fork() { } else { assert_eq!(actions.len(), 1); match &actions[0] { - ChainSyncAction::SendBlockRequest { peer_id: _, request } => request.clone(), + SyncingAction::SendBlockRequest { peer_id: _, request, key: _ } => request.clone(), action @ _ => panic!("Unexpected action: {action:?}"), } }; @@ -508,7 +508,7 @@ fn can_sync_huge_fork() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == sync.max_blocks_per_request as usize + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == sync.max_blocks_per_request as usize )); best_block_num += sync.max_blocks_per_request as u32; @@ -610,7 +610,7 @@ fn syncs_fork_without_duplicate_requests() { } else { assert_eq!(actions.len(), 1); match &actions[0] { - ChainSyncAction::SendBlockRequest { peer_id: _, request } => request.clone(), + SyncingAction::SendBlockRequest { peer_id: _, request, key: _ } => request.clone(), action @ _ => panic!("Unexpected action: {action:?}"), } }; @@ -646,7 +646,7 @@ fn syncs_fork_without_duplicate_requests() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize )); best_block_num += max_blocks_to_request as u32; @@ -839,10 +839,10 @@ fn sync_restart_removes_block_but_not_justification_requests() { let actions = sync.take_actions().collect::>(); for action in actions.iter() { match action { - ChainSyncAction::CancelRequest { peer_id } => { + SyncingAction::CancelRequest { peer_id, key: _ } => { pending_responses.remove(&peer_id); }, - ChainSyncAction::SendBlockRequest { peer_id, .. } => { + SyncingAction::SendBlockRequest { peer_id, .. } => { // we drop obsolete response, but don't register a new request, it's checked in // the `assert!` below pending_responses.remove(&peer_id); @@ -852,7 +852,7 @@ fn sync_restart_removes_block_but_not_justification_requests() { } assert!(actions.iter().any(|action| { match action { - ChainSyncAction::SendBlockRequest { peer_id, .. } => peer_id == &peers[0], + SyncingAction::SendBlockRequest { peer_id, .. } => peer_id == &peers[0], _ => false, } })); @@ -943,7 +943,7 @@ fn request_across_forks() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 7_usize + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 7_usize )); assert_eq!(sync.best_queued_number, 107); assert_eq!(sync.best_queued_hash, block.hash()); @@ -988,7 +988,7 @@ fn request_across_forks() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 1_usize + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 1_usize )); assert!(sync.is_known(&block.header.parent_hash())); } From b9cb3d904f71a82494207a0e414a903aa9a3d678 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 26 Aug 2024 20:11:30 +0300 Subject: [PATCH 7/7] Add prdoc --- prdoc/pr_5469.prdoc | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 prdoc/pr_5469.prdoc diff --git a/prdoc/pr_5469.prdoc b/prdoc/pr_5469.prdoc new file mode 100644 index 0000000000000..1e6aa3c0c0726 --- /dev/null +++ b/prdoc/pr_5469.prdoc @@ -0,0 +1,11 @@ +title: Syncing strategy refactoring + +doc: + - audience: Node Dev + description: | + Mostly internal changes to syncing strategies that is a step towards making them configurable/extensible in the + future. It is unlikely that external developers will need to change their code. + +crates: + - name: sc-network-sync + bump: major