From d443f148ce0aa6a51798b86c6b3ae54113786f94 Mon Sep 17 00:00:00 2001
From: Aaro Altonen <48052676+altonen@users.noreply.github.com>
Date: Fri, 7 Apr 2023 11:20:17 +0300
Subject: [PATCH] Make blocks per request configurable (#13824)

* Make blocks per request configurable

* Correct type

* Update docs

* Update client/cli/src/params/network_params.rs
---
 client/cli/src/params/network_params.rs       |  8 ++
 client/network/src/config.rs                  |  5 ++
 .../network/sync/src/block_request_handler.rs |  6 +-
 client/network/sync/src/blocks.rs             |  2 +-
 client/network/sync/src/engine.rs             |  9 ++
 client/network/sync/src/lib.rs                | 84 ++++++++++++-------
 6 files changed, 81 insertions(+), 33 deletions(-)

diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs
index acec4b2ac872a..9d61e7204295d 100644
--- a/client/cli/src/params/network_params.rs
+++ b/client/cli/src/params/network_params.rs
@@ -141,6 +141,13 @@ pub struct NetworkParams {
 		verbatim_doc_comment
 	)]
 	pub sync: SyncMode,
+
+	/// Maximum number of blocks per request.
+	///
+	/// Try reducing this number from the default value if you have a slow network connection
+	/// and observe block requests timing out.
+	#[arg(long, value_name = "COUNT", default_value_t = 64)]
+	pub max_blocks_per_request: u32,
 }
 
 impl NetworkParams {
@@ -230,6 +237,7 @@ impl NetworkParams {
 				allow_private_ip,
 			},
 			max_parallel_downloads: self.max_parallel_downloads,
+			max_blocks_per_request: self.max_blocks_per_request,
 			enable_dht_random_walk: !self.reserved_only,
 			allow_non_globals_in_dht,
 			kademlia_disjoint_query_paths: self.kademlia_disjoint_query_paths,
diff --git a/client/network/src/config.rs b/client/network/src/config.rs
index 781ae9c786694..e00bfac79f650 100644
--- a/client/network/src/config.rs
+++ b/client/network/src/config.rs
@@ -566,6 +566,7 @@ pub struct NetworkConfiguration {
 
 	/// List of request-response protocols that the node supports.
 	pub request_response_protocols: Vec<RequestResponseConfig>,
+
 	/// Configuration for the default set of nodes used for block syncing and transactions.
 	pub default_peers_set: SetConfig,
 
@@ -590,6 +591,9 @@ pub struct NetworkConfiguration {
 	/// Maximum number of peers to ask the same blocks in parallel.
 	pub max_parallel_downloads: u32,
 
+	/// Maximum number of blocks per request.
+	pub max_blocks_per_request: u32,
+
 	/// Initial syncing mode.
 	pub sync_mode: SyncMode,
 
@@ -653,6 +657,7 @@ impl NetworkConfiguration {
 			node_name: node_name.into(),
 			transport: TransportConfig::Normal { enable_mdns: false, allow_private_ip: true },
 			max_parallel_downloads: 5,
+			max_blocks_per_request: 64,
 			sync_mode: SyncMode::Full,
 			enable_dht_random_walk: true,
 			allow_non_globals_in_dht: false,
diff --git a/client/network/sync/src/block_request_handler.rs b/client/network/sync/src/block_request_handler.rs
index 921efd7def622..ece565aad4b09 100644
--- a/client/network/sync/src/block_request_handler.rs
+++ b/client/network/sync/src/block_request_handler.rs
@@ -17,7 +17,10 @@
 //! Helper for handling (i.e. answering) block requests from a remote peer via the
 //! `crate::request_responses::RequestResponsesBehaviour`.
 
-use crate::schema::v1::{block_request::FromBlock, BlockResponse, Direction};
+use crate::{
+	schema::v1::{block_request::FromBlock, BlockResponse, Direction},
+	MAX_BLOCKS_IN_RESPONSE,
+};
 
 use codec::{Decode, Encode};
 use futures::{
@@ -50,7 +53,6 @@ use std::{
 };
 
 const LOG_TARGET: &str = "sync";
-const MAX_BLOCKS_IN_RESPONSE: usize = 128;
 const MAX_BODY_BYTES: usize = 8 * 1024 * 1024;
 const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
 
diff --git a/client/network/sync/src/blocks.rs b/client/network/sync/src/blocks.rs
index 4cc1ca080d177..3c76238be1b5f 100644
--- a/client/network/sync/src/blocks.rs
+++ b/client/network/sync/src/blocks.rs
@@ -109,7 +109,7 @@ impl<B: BlockT> BlockCollection<B> {
 	pub fn needed_blocks(
 		&mut self,
 		who: PeerId,
-		count: usize,
+		count: u32,
 		peer_best: NumberFor<B>,
 		common: NumberFor<B>,
 		max_parallel: u32,
diff --git a/client/network/sync/src/engine.rs b/client/network/sync/src/engine.rs
index e3d45a980a0b4..6fb618a571c25 100644
--- a/client/network/sync/src/engine.rs
+++ b/client/network/sync/src/engine.rs
@@ -264,6 +264,14 @@ where
 			SyncOperationMode::Warp => SyncMode::Warp,
 		};
 		let max_parallel_downloads = network_config.max_parallel_downloads;
+		let max_blocks_per_request = if network_config.max_blocks_per_request >
+			crate::MAX_BLOCKS_IN_RESPONSE as u32
+		{
+			log::info!(target: "sync", "clamping maximum blocks per request to {}", crate::MAX_BLOCKS_IN_RESPONSE);
+			crate::MAX_BLOCKS_IN_RESPONSE as u32
+		} else {
+			network_config.max_blocks_per_request
+		};
 		let cache_capacity = NonZeroUsize::new(
 			(network_config.default_peers_set.in_peers as usize +
 				network_config.default_peers_set.out_peers as usize)
@@ -318,6 +326,7 @@ where
 			roles,
 			block_announce_validator,
 			max_parallel_downloads,
+			max_blocks_per_request,
 			warp_sync_params,
 			metrics_registry,
 			network_service.clone(),
diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs
index 28959e7f9c886..e112a1715ced1 100644
--- a/client/network/sync/src/lib.rs
+++ b/client/network/sync/src/lib.rs
@@ -107,9 +107,6 @@ pub mod state_request_handler;
 pub mod warp;
 pub mod warp_request_handler;
 
-/// Maximum blocks to request in a single packet.
-const MAX_BLOCKS_TO_REQUEST: usize = 64;
-
 /// Maximum blocks to store in the import queue.
 const MAX_IMPORTING_BLOCKS: usize = 2048;
 
@@ -147,6 +144,9 @@ const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
 /// Maximum allowed size for a block announce.
 const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
 
+/// Maximum blocks per response.
+pub(crate) const MAX_BLOCKS_IN_RESPONSE: usize = 128;
+
 mod rep {
 	use sc_peerset::ReputationChange as Rep;
 	/// Reputation change when a peer sent us a message that led to a
@@ -311,6 +311,8 @@ pub struct ChainSync<B: BlockT, Client> {
 	block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
 	/// Maximum number of peers to ask the same blocks in parallel.
 	max_parallel_downloads: u32,
+	/// Maximum blocks per request.
+	max_blocks_per_request: u32,
 	/// Total number of downloaded blocks.
 	downloaded_blocks: usize,
 	/// All block announcement that are currently being validated.
@@ -1403,6 +1405,7 @@ where
 		roles: Roles,
 		block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
 		max_parallel_downloads: u32,
+		max_blocks_per_request: u32,
 		warp_sync_params: Option<WarpSyncParams<B>>,
 		metrics_registry: Option<&Registry>,
 		network_service: service::network::NetworkServiceHandle,
@@ -1437,6 +1440,7 @@ where
 			allowed_requests: Default::default(),
 			block_announce_validator,
 			max_parallel_downloads,
+			max_blocks_per_request,
 			downloaded_blocks: 0,
 			block_announce_validation: Default::default(),
 			block_announce_validation_per_peer_stats: Default::default(),
@@ -2365,6 +2369,7 @@ where
 		let queue = &self.queue_blocks;
 		let allowed_requests = self.allowed_requests.take();
 		let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
+		let max_blocks_per_request = self.max_blocks_per_request;
 		let gap_sync = &mut self.gap_sync;
 		self.peers
 			.iter_mut()
@@ -2404,6 +2409,7 @@ where
 					blocks,
 					attrs,
 					max_parallel,
+					max_blocks_per_request,
 					last_finalized,
 					best_queued,
 				) {
@@ -2430,6 +2436,7 @@ where
 							client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
 						}
 					},
+					max_blocks_per_request,
 				) {
 					trace!(target: "sync", "Downloading fork {:?} from {}", hash, id);
 					peer.state = PeerSyncState::DownloadingStale(hash);
@@ -2442,6 +2449,7 @@ where
 						attrs,
 						sync.target,
 						sync.best_queued_number,
+						max_blocks_per_request,
 					)
 				}) {
 					peer.state = PeerSyncState::DownloadingGap(range.start);
@@ -2910,6 +2918,7 @@ fn peer_block_request<B: BlockT>(
 	blocks: &mut BlockCollection<B>,
 	attrs: BlockAttributes,
 	max_parallel_downloads: u32,
+	max_blocks_per_request: u32,
 	finalized: NumberFor<B>,
 	best_num: NumberFor<B>,
 ) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
@@ -2925,7 +2934,7 @@ fn peer_block_request<B: BlockT>(
 	}
 	let range = blocks.needed_blocks(
 		*id,
-		MAX_BLOCKS_TO_REQUEST,
+		max_blocks_per_request,
 		peer.best_number,
 		peer.common_number,
 		max_parallel_downloads,
@@ -2960,10 +2969,11 @@ fn peer_gap_block_request<B: BlockT>(
 	attrs: BlockAttributes,
 	target: NumberFor<B>,
 	common_number: NumberFor<B>,
+	max_blocks_per_request: u32,
 ) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
 	let range = blocks.needed_blocks(
 		*id,
-		MAX_BLOCKS_TO_REQUEST,
+		max_blocks_per_request,
 		std::cmp::min(peer.best_number, target),
 		common_number,
 		1,
@@ -2992,6 +3002,7 @@ fn fork_sync_request<B: BlockT>(
 	finalized: NumberFor<B>,
 	attributes: BlockAttributes,
 	check_block: impl Fn(&B::Hash) -> BlockStatus,
+	max_blocks_per_request: u32,
 ) -> Option<(B::Hash, BlockRequest<B>)> {
 	targets.retain(|hash, r| {
 		if r.number <= finalized {
@@ -3011,7 +3022,7 @@ fn fork_sync_request<B: BlockT>(
 		// Download the fork only if it is behind or not too far ahead our tip of the chain
 		// Otherwise it should be downloaded in full sync mode.
 		if r.number <= best_num ||
-			(r.number - best_num).saturated_into::<u32>() < MAX_BLOCKS_TO_REQUEST as u32
+			(r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
 		{
 			let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
 			let count = if parent_status == BlockStatus::Unknown {
@@ -3199,6 +3210,7 @@ mod test {
 			Roles::from(&Role::Full),
 			block_announce_validator,
 			1,
+			64,
 			None,
 			None,
 			chain_sync_network_handle,
@@ -3265,6 +3277,7 @@ mod test {
 			Roles::from(&Role::Full),
 			Box::new(DefaultBlockAnnounceValidator),
 			1,
+			64,
 			None,
 			None,
 			chain_sync_network_handle,
@@ -3446,6 +3459,7 @@ mod test {
 			Roles::from(&Role::Full),
 			Box::new(DefaultBlockAnnounceValidator),
 			5,
+			64,
 			None,
 			None,
 			chain_sync_network_handle,
@@ -3572,6 +3586,7 @@ mod test {
 			Roles::from(&Role::Full),
 			Box::new(DefaultBlockAnnounceValidator),
 			5,
+			64,
 			None,
 			None,
 			chain_sync_network_handle,
@@ -3586,6 +3601,7 @@ mod test {
 		let peer_id2 = PeerId::random();
 
 		let best_block = blocks.last().unwrap().clone();
+		let max_blocks_to_request = sync.max_blocks_per_request;
 		// Connect the node we will sync from
 		sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number())
 			.unwrap();
@@ -3595,8 +3611,8 @@ mod test {
 		while best_block_num < MAX_DOWNLOAD_AHEAD {
 			let request = get_block_request(
 				&mut sync,
-				FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64),
-				MAX_BLOCKS_TO_REQUEST as u32,
+				FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
+				max_blocks_to_request as u32,
 				&peer_id1,
 			);
 
@@ -3610,14 +3626,14 @@ mod test {
 			let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap();
 			assert!(matches!(
 				res,
-				OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST
+				OnBlockData::Import(_, blocks) if blocks.len() == max_blocks_to_request as usize
 			),);
 
-			best_block_num += MAX_BLOCKS_TO_REQUEST as u32;
+			best_block_num += max_blocks_to_request as u32;
 
 			let _ = sync.on_blocks_processed(
-				MAX_BLOCKS_TO_REQUEST as usize,
-				MAX_BLOCKS_TO_REQUEST as usize,
+				max_blocks_to_request as usize,
+				max_blocks_to_request as usize,
 				resp_blocks
 					.iter()
 					.rev()
@@ -3675,8 +3691,8 @@ mod test {
 		// peer 2 as well.
 		get_block_request(
 			&mut sync,
-			FromBlock::Number(peer1_from + MAX_BLOCKS_TO_REQUEST as u64),
-			MAX_BLOCKS_TO_REQUEST as u32,
+			FromBlock::Number(peer1_from + max_blocks_to_request as u64),
+			max_blocks_to_request as u32,
 			&peer_id2,
 		);
 	}
@@ -3728,6 +3744,7 @@ mod test {
 			Roles::from(&Role::Full),
 			Box::new(DefaultBlockAnnounceValidator),
 			5,
+			64,
 			None,
 			None,
 			chain_sync_network_handle,
@@ -3773,11 +3790,12 @@ mod test {
 
 		// Now request and import the fork.
 		let mut best_block_num = *finalized_block.header().number() as u32;
+		let max_blocks_to_request = sync.max_blocks_per_request;
 		while best_block_num < *fork_blocks.last().unwrap().header().number() as u32 - 1 {
 			let request = get_block_request(
 				&mut sync,
-				FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64),
-				MAX_BLOCKS_TO_REQUEST as u32,
+				FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
+				max_blocks_to_request as u32,
 				&peer_id1,
 			);
 
@@ -3791,14 +3809,14 @@ mod test {
 			let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap();
 			assert!(matches!(
 				res,
-				OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST
+				OnBlockData::Import(_, blocks) if blocks.len() == sync.max_blocks_per_request as usize
 			),);
 
-			best_block_num += MAX_BLOCKS_TO_REQUEST as u32;
+			best_block_num += sync.max_blocks_per_request as u32;
 
 			let _ = sync.on_blocks_processed(
-				MAX_BLOCKS_TO_REQUEST as usize,
-				MAX_BLOCKS_TO_REQUEST as usize,
+				max_blocks_to_request as usize,
+				max_blocks_to_request as usize,
 				resp_blocks
 					.iter()
 					.rev()
@@ -3869,6 +3887,7 @@ mod test {
 			Roles::from(&Role::Full),
 			Box::new(DefaultBlockAnnounceValidator),
 			5,
+			64,
 			None,
 			None,
 			chain_sync_network_handle,
@@ -3914,10 +3933,12 @@ mod test {
 
 		// Now request and import the fork.
 		let mut best_block_num = *finalized_block.header().number() as u32;
+		let max_blocks_to_request = sync.max_blocks_per_request;
+
 		let mut request = get_block_request(
 			&mut sync,
-			FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64),
-			MAX_BLOCKS_TO_REQUEST as u32,
+			FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
+			max_blocks_to_request as u32,
 			&peer_id1,
 		);
 		let last_block_num = *fork_blocks.last().unwrap().header().number() as u32 - 1;
@@ -3932,18 +3953,18 @@ mod test {
 			let res = sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap();
 			assert!(matches!(
 				res,
-				OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST
+				OnBlockData::Import(_, blocks) if blocks.len() == max_blocks_to_request as usize
 			),);
 
-			best_block_num += MAX_BLOCKS_TO_REQUEST as u32;
+			best_block_num += max_blocks_to_request as u32;
 
 			if best_block_num < last_block_num {
 				// make sure we're not getting a duplicate request in the time before the blocks are
 				// processed
 				request = get_block_request(
 					&mut sync,
-					FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64),
-					MAX_BLOCKS_TO_REQUEST as u32,
+					FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
+					max_blocks_to_request as u32,
 					&peer_id1,
 				);
 			}
@@ -3965,16 +3986,17 @@ mod test {
 
 			// The import queue may send notifications in batches of varying size. So we simulate
 			// this here by splitting the batch into 2 notifications.
+			let max_blocks_to_request = sync.max_blocks_per_request;
 			let second_batch = notify_imported.split_off(notify_imported.len() / 2);
 			let _ = sync.on_blocks_processed(
-				MAX_BLOCKS_TO_REQUEST as usize,
-				MAX_BLOCKS_TO_REQUEST as usize,
+				max_blocks_to_request as usize,
+				max_blocks_to_request as usize,
 				notify_imported,
 			);
 
 			let _ = sync.on_blocks_processed(
-				MAX_BLOCKS_TO_REQUEST as usize,
-				MAX_BLOCKS_TO_REQUEST as usize,
+				max_blocks_to_request as usize,
+				max_blocks_to_request as usize,
 				second_batch,
 			);
 
@@ -4010,6 +4032,7 @@ mod test {
 			Roles::from(&Role::Full),
 			Box::new(DefaultBlockAnnounceValidator),
 			1,
+			64,
 			None,
 			None,
 			chain_sync_network_handle,
@@ -4055,6 +4078,7 @@ mod test {
 			Roles::from(&Role::Full),
 			Box::new(DefaultBlockAnnounceValidator),
 			1,
+			64,
 			None,
 			None,
 			chain_sync_network_handle,