From 7096f91d909e37afc15b0e531e24604e806c03c1 Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Tue, 12 Sep 2023 11:48:08 -0700 Subject: [PATCH] Modular block request handler Patch the outstanding PRs from the old repos: https://github.com/paritytech/substrate/pull/14014 https://github.com/paritytech/polkadot/pull/7134 https://github.com/paritytech/cumulus/pull/2489 These were already reviewed and approved, but not yet submitted. --- cumulus/client/service/src/lib.rs | 1 + polkadot/node/service/src/lib.rs | 1 + .../bin/node-template/node/src/service.rs | 1 + substrate/bin/node/cli/src/service.rs | 1 + substrate/client/network/common/src/sync.rs | 33 +-- .../network/sync/src/block_relay_protocol.rs | 72 ++++++ .../network/sync/src/block_request_handler.rs | 166 +++++++++++++- substrate/client/network/sync/src/engine.rs | 5 +- substrate/client/network/sync/src/lib.rs | 214 +++++------------- substrate/client/network/sync/src/mock.rs | 35 ++- substrate/client/network/test/src/lib.rs | 24 +- substrate/client/network/test/src/service.rs | 24 +- substrate/client/service/src/builder.rs | 43 ++-- .../client/transaction-pool/api/src/lib.rs | 3 +- 14 files changed, 369 insertions(+), 254 deletions(-) create mode 100644 substrate/client/network/sync/src/block_relay_protocol.rs diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index 211a5cc3b79bc..9f1b6762450bf 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -484,6 +484,7 @@ where import_queue, block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)), warp_sync_params, + block_relay: None, }) } diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 7f4eadaba7f8a..58535944a22e3 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -894,6 +894,7 @@ pub fn new_full( import_queue, block_announce_validator_builder: None, warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), + block_relay: None, })?; if config.offchain_worker.enabled { diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index 7303f5cd6dd6d..403202829241e 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -183,6 +183,7 @@ pub fn new_full(config: Configuration) -> Result { import_queue, block_announce_validator_builder: None, warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), + block_relay: None, })?; if config.offchain_worker.enabled { diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index e49c60fe2fb7b..977c90e73e9ff 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -388,6 +388,7 @@ pub fn new_full_base( import_queue, block_announce_validator_builder: None, warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), + block_relay: None, })?; let role = config.role.clone(); diff --git a/substrate/client/network/common/src/sync.rs b/substrate/client/network/common/src/sync.rs index 461c4ae411d6a..5a6f90b290d2f 100644 --- a/substrate/client/network/common/src/sync.rs +++ b/substrate/client/network/common/src/sync.rs @@ -22,12 +22,12 @@ pub mod message; pub mod metrics; pub mod warp; -use crate::{role::Roles, sync::message::BlockAnnounce, types::ReputationChange}; +use crate::{role::Roles, types::ReputationChange}; use futures::Stream; use libp2p_identity::PeerId; -use message::{BlockData, BlockRequest, BlockResponse}; +use message::{BlockAnnounce, BlockRequest, BlockResponse}; use sc_consensus::{import_queue::RuntimeOrigin, IncomingBlock}; use sp_consensus::BlockOrigin; use sp_runtime::{ @@ -226,28 +226,6 @@ impl fmt::Debug for OpaqueStateResponse { } } -/// Wrapper for implementation-specific block request. -/// -/// NOTE: Implementation must be able to encode and decode it for network purposes. -pub struct OpaqueBlockRequest(pub Box); - -impl fmt::Debug for OpaqueBlockRequest { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("OpaqueBlockRequest").finish() - } -} - -/// Wrapper for implementation-specific block response. -/// -/// NOTE: Implementation must be able to encode and decode it for network purposes. -pub struct OpaqueBlockResponse(pub Box); - -impl fmt::Debug for OpaqueBlockResponse { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("OpaqueBlockResponse").finish() - } -} - /// Provides high-level status of syncing. #[async_trait::async_trait] pub trait SyncStatusProvider: Send + Sync { @@ -392,13 +370,6 @@ pub trait ChainSync: Send { /// Return some key metrics. fn metrics(&self) -> Metrics; - /// Access blocks from implementation-specific block response. - fn block_response_into_blocks( - &self, - request: &BlockRequest, - response: OpaqueBlockResponse, - ) -> Result>, String>; - /// Advance the state of `ChainSync` fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()>; diff --git a/substrate/client/network/sync/src/block_relay_protocol.rs b/substrate/client/network/sync/src/block_relay_protocol.rs new file mode 100644 index 0000000000000..593730b5809e1 --- /dev/null +++ b/substrate/client/network/sync/src/block_relay_protocol.rs @@ -0,0 +1,72 @@ +// Copyright Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Block relay protocol related definitions. + +use futures::channel::oneshot; +use libp2p::PeerId; +use sc_network::request_responses::{ProtocolConfig, RequestFailure}; +use sc_network_common::sync::message::{BlockData, BlockRequest}; +use sp_runtime::traits::Block as BlockT; +use std::sync::Arc; + +/// The serving side of the block relay protocol. It runs a single instance +/// of the server task that processes the incoming protocol messages. +#[async_trait::async_trait] +pub trait BlockServer: Send { + /// Starts the protocol processing. + async fn run(&mut self); +} + +/// The client side stub to download blocks from peers. This is a handle +/// that can be used to initiate concurrent downloads. +#[async_trait::async_trait] +pub trait BlockDownloader: Send + Sync { + /// Performs the protocol specific sequence to fetch the block from the peer. + /// Output: if the download succeeds, the response is a `Vec` which is + /// in a format specific to the protocol implementation. The block data + /// can be extracted from this response using [`BlockDownloader::block_response_into_blocks`]. + async fn download_block( + &self, + who: PeerId, + request: BlockRequest, + ) -> Result, RequestFailure>, oneshot::Canceled>; + + /// Parses the protocol specific response to retrieve the block data. + fn block_response_into_blocks( + &self, + request: &BlockRequest, + response: Vec, + ) -> Result>, BlockResponseError>; +} + +/// Errors returned by [`BlockDownloader::block_response_into_blocks`]. +#[derive(Debug)] +pub enum BlockResponseError { + /// Failed to decode the response bytes. + DecodeFailed(String), + + /// Failed to extract the blocks from the decoded bytes. + ExtractionFailed(String), +} + +/// Block relay specific params for network creation, specified in +/// ['sc_service::BuildNetworkParams']. +pub struct BlockRelayParams { + pub server: Box>, + pub downloader: Arc>, + pub request_response_config: ProtocolConfig, +} diff --git a/substrate/client/network/sync/src/block_request_handler.rs b/substrate/client/network/sync/src/block_request_handler.rs index d90a00b37673b..b8ea5eaf46b2c 100644 --- a/substrate/client/network/sync/src/block_request_handler.rs +++ b/substrate/client/network/sync/src/block_request_handler.rs @@ -18,29 +18,35 @@ //! `crate::request_responses::RequestResponsesBehaviour`. use crate::{ - schema::v1::{block_request::FromBlock, BlockResponse, Direction}, + block_relay_protocol::{BlockDownloader, BlockRelayParams, BlockResponseError, BlockServer}, + schema::v1::{ + block_request::FromBlock as FromBlockSchema, BlockRequest as BlockRequestSchema, + BlockResponse as BlockResponseSchema, BlockResponse, Direction, + }, + service::network::NetworkServiceHandle, MAX_BLOCKS_IN_RESPONSE, }; -use codec::{Decode, Encode}; +use codec::{Decode, DecodeAll, Encode}; use futures::{channel::oneshot, stream::StreamExt}; use libp2p::PeerId; use log::debug; use prost::Message; use schnellru::{ByLength, LruMap}; - use sc_client_api::BlockBackend; use sc_network::{ config::ProtocolId, - request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, + request_responses::{ + IfDisconnected, IncomingRequest, OutgoingResponse, ProtocolConfig, RequestFailure, + }, + types::ProtocolName, }; -use sc_network_common::sync::message::BlockAttributes; +use sc_network_common::sync::message::{BlockAttributes, BlockData, BlockRequest, FromBlock}; use sp_blockchain::HeaderBackend; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Header, One, Zero}, }; - use std::{ cmp::min, hash::{Hash, Hasher}, @@ -129,7 +135,8 @@ enum SeenRequestsValue { Fulfilled(usize), } -/// Handler for incoming block requests from a remote peer. +/// The full block server implementation of [`BlockServer`]. It handles +/// the incoming block requests from a remote peer. pub struct BlockRequestHandler { client: Arc, request_receiver: async_channel::Receiver, @@ -146,11 +153,12 @@ where { /// Create a new [`BlockRequestHandler`]. pub fn new( + network: NetworkServiceHandle, protocol_id: &ProtocolId, fork_id: Option<&str>, client: Arc, num_peer_hint: usize, - ) -> (Self, ProtocolConfig) { + ) -> BlockRelayParams { // Reserve enough request slots for one request per peer when we are at the maximum // number of peers. let capacity = std::cmp::max(num_peer_hint, 1); @@ -170,11 +178,15 @@ where let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2); let seen_requests = LruMap::new(capacity); - (Self { client, request_receiver, seen_requests }, protocol_config) + BlockRelayParams { + server: Box::new(Self { client, request_receiver, seen_requests }), + downloader: Arc::new(FullBlockDownloader::new(protocol_config.name.clone(), network)), + request_response_config: protocol_config, + } } /// Run [`BlockRequestHandler`]. - pub async fn run(mut self) { + async fn process_requests(&mut self) { while let Some(request) = self.request_receiver.next().await { let IncomingRequest { peer, payload, pending_response } = request; @@ -197,11 +209,11 @@ where let request = crate::schema::v1::BlockRequest::decode(&payload[..])?; let from_block_id = match request.from_block.ok_or(HandleRequestError::MissingFromField)? { - FromBlock::Hash(ref h) => { + FromBlockSchema::Hash(ref h) => { let h = Decode::decode(&mut h.as_ref())?; BlockId::::Hash(h) }, - FromBlock::Number(ref n) => { + FromBlockSchema::Number(ref n) => { let n = Decode::decode(&mut n.as_ref())?; BlockId::::Number(n) }, @@ -448,6 +460,17 @@ where } } +#[async_trait::async_trait] +impl BlockServer for BlockRequestHandler +where + B: BlockT, + Client: HeaderBackend + BlockBackend + Send + Sync + 'static, +{ + async fn run(&mut self) { + self.process_requests().await; + } +} + #[derive(Debug, thiserror::Error)] enum HandleRequestError { #[error("Failed to decode request: {0}.")] @@ -465,3 +488,122 @@ enum HandleRequestError { #[error("Failed to send response.")] SendResponse, } + +/// The full block downloader implementation of [`BlockDownloader]. +pub struct FullBlockDownloader { + protocol_name: ProtocolName, + network: NetworkServiceHandle, +} + +impl FullBlockDownloader { + fn new(protocol_name: ProtocolName, network: NetworkServiceHandle) -> Self { + Self { protocol_name, network } + } + + /// Extracts the blocks from the response schema. + fn blocks_from_schema( + &self, + request: &BlockRequest, + response: BlockResponseSchema, + ) -> Result>, String> { + response + .blocks + .into_iter() + .map(|block_data| { + Ok(BlockData:: { + hash: Decode::decode(&mut block_data.hash.as_ref())?, + header: if !block_data.header.is_empty() { + Some(Decode::decode(&mut block_data.header.as_ref())?) + } else { + None + }, + body: if request.fields.contains(BlockAttributes::BODY) { + Some( + block_data + .body + .iter() + .map(|body| Decode::decode(&mut body.as_ref())) + .collect::, _>>()?, + ) + } else { + None + }, + indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) { + Some(block_data.indexed_body) + } else { + None + }, + receipt: if !block_data.receipt.is_empty() { + Some(block_data.receipt) + } else { + None + }, + message_queue: if !block_data.message_queue.is_empty() { + Some(block_data.message_queue) + } else { + None + }, + justification: if !block_data.justification.is_empty() { + Some(block_data.justification) + } else if block_data.is_empty_justification { + Some(Vec::new()) + } else { + None + }, + justifications: if !block_data.justifications.is_empty() { + Some(DecodeAll::decode_all(&mut block_data.justifications.as_ref())?) + } else { + None + }, + }) + }) + .collect::>() + .map_err(|error: codec::Error| error.to_string()) + } +} + +#[async_trait::async_trait] +impl BlockDownloader for FullBlockDownloader { + async fn download_block( + &self, + who: PeerId, + request: BlockRequest, + ) -> Result, RequestFailure>, oneshot::Canceled> { + // Build the request protobuf. + let bytes = BlockRequestSchema { + fields: request.fields.to_be_u32(), + from_block: match request.from { + FromBlock::Hash(h) => Some(FromBlockSchema::Hash(h.encode())), + FromBlock::Number(n) => Some(FromBlockSchema::Number(n.encode())), + }, + direction: request.direction as i32, + max_blocks: request.max.unwrap_or(0), + support_multiple_justifications: true, + } + .encode_to_vec(); + + let (tx, rx) = oneshot::channel(); + self.network.start_request( + who, + self.protocol_name.clone(), + bytes, + tx, + IfDisconnected::ImmediateError, + ); + rx.await + } + + fn block_response_into_blocks( + &self, + request: &BlockRequest, + response: Vec, + ) -> Result>, BlockResponseError> { + // Decode the response protobuf + let response_schema = BlockResponseSchema::decode(response.as_slice()) + .map_err(|error| BlockResponseError::DecodeFailed(error.to_string()))?; + + // Extract the block data from the protobuf + self.blocks_from_schema::(request, response_schema) + .map_err(|error| BlockResponseError::ExtractionFailed(error.to_string())) + } +} diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 23847d16c972d..c93ba89c6220e 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -23,6 +23,7 @@ use crate::{ block_announce_validator::{ BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream, }, + block_relay_protocol::BlockDownloader, service::{self, chain_sync::ToServiceCommand}, warp::WarpSyncParams, ChainSync, ClientError, SyncingService, @@ -300,7 +301,7 @@ where warp_sync_params: Option>, network_service: service::network::NetworkServiceHandle, import_queue: Box>, - block_request_protocol_name: ProtocolName, + block_downloader: Arc>, state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option, rx: sc_utils::mpsc::TracingUnboundedReceiver>, @@ -392,7 +393,7 @@ where metrics_registry, network_service.clone(), import_queue, - block_request_protocol_name, + block_downloader, state_request_protocol_name, warp_sync_protocol_name, )?; diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index df0ed2c454109..1db3d090dfed7 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -29,13 +29,14 @@ //! order to update it. use crate::{ + block_relay_protocol::{BlockDownloader, BlockResponseError}, blocks::BlockCollection, schema::v1::{StateRequest, StateResponse}, state::StateSync, warp::{WarpProofImportResult, WarpSync, WarpSyncConfig}, }; -use codec::{Decode, DecodeAll, Encode}; +use codec::Encode; use extra_requests::ExtraRequests; use futures::{channel::oneshot, task::Poll, Future, FutureExt}; use libp2p::{request_response::OutboundFailure, PeerId}; @@ -63,8 +64,8 @@ use sc_network_common::{ }, warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress}, BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification, - OnStateData, OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, - OpaqueStateResponse, PeerInfo, PeerRequest, SyncMode, SyncState, SyncStatus, + OnStateData, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, PeerRequest, + SyncMode, SyncState, SyncStatus, }, }; use sp_arithmetic::traits::Saturating; @@ -93,6 +94,7 @@ mod extra_requests; mod futures_stream; mod schema; +pub mod block_relay_protocol; pub mod block_request_handler; pub mod blocks; pub mod engine; @@ -320,8 +322,8 @@ pub struct ChainSync { network_service: service::network::NetworkServiceHandle, /// Protocol name used for block announcements block_announce_protocol_name: ProtocolName, - /// Protocol name used to send out block requests - block_request_protocol_name: ProtocolName, + /// Block downloader stub + block_downloader: Arc>, /// Protocol name used to send out state requests state_request_protocol_name: ProtocolName, /// Protocol name used to send out warp sync requests @@ -1167,72 +1169,6 @@ where } } - fn block_response_into_blocks( - &self, - request: &BlockRequest, - response: OpaqueBlockResponse, - ) -> Result>, String> { - let response: Box = response.0.downcast().map_err(|_error| { - "Failed to downcast opaque block response during encoding, this is an \ - implementation bug." - .to_string() - })?; - - response - .blocks - .into_iter() - .map(|block_data| { - Ok(BlockData:: { - hash: Decode::decode(&mut block_data.hash.as_ref())?, - header: if !block_data.header.is_empty() { - Some(Decode::decode(&mut block_data.header.as_ref())?) - } else { - None - }, - body: if request.fields.contains(BlockAttributes::BODY) { - Some( - block_data - .body - .iter() - .map(|body| Decode::decode(&mut body.as_ref())) - .collect::, _>>()?, - ) - } else { - None - }, - indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) { - Some(block_data.indexed_body) - } else { - None - }, - receipt: if !block_data.receipt.is_empty() { - Some(block_data.receipt) - } else { - None - }, - message_queue: if !block_data.message_queue.is_empty() { - Some(block_data.message_queue) - } else { - None - }, - justification: if !block_data.justification.is_empty() { - Some(block_data.justification) - } else if block_data.is_empty_justification { - Some(Vec::new()) - } else { - None - }, - justifications: if !block_data.justifications.is_empty() { - Some(DecodeAll::decode_all(&mut block_data.justifications.as_ref())?) - } else { - None - }, - }) - }) - .collect::>() - .map_err(|error: codec::Error| error.to_string()) - } - fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> { self.process_outbound_requests(); @@ -1248,30 +1184,18 @@ where } fn send_block_request(&mut self, who: PeerId, request: BlockRequest) { - let (tx, rx) = oneshot::channel(); - let opaque_req = self.create_opaque_block_request(&request); - if self.peers.contains_key(&who) { - self.pending_responses - .insert(who, Box::pin(async move { (who, PeerRequest::Block(request), rx.await) })); - } - - match self.encode_block_request(&opaque_req) { - Ok(data) => { - self.network_service.start_request( - who, - self.block_request_protocol_name.clone(), - data, - tx, - IfDisconnected::ImmediateError, - ); - }, - Err(err) => { - log::warn!( - target: LOG_TARGET, - "Failed to encode block request {opaque_req:?}: {err:?}", - ); - }, + let downloader = self.block_downloader.clone(); + self.pending_responses.insert( + who, + Box::pin(async move { + ( + who, + PeerRequest::Block(request.clone()), + downloader.download_block(who, request).await, + ) + }), + ); } } } @@ -1301,7 +1225,7 @@ where metrics_registry: Option<&Registry>, network_service: service::network::NetworkServiceHandle, import_queue: Box>, - block_request_protocol_name: ProtocolName, + block_downloader: Arc>, state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option, ) -> Result<(Self, NonDefaultSetConfig), ClientError> { @@ -1337,7 +1261,7 @@ where import_existing: false, gap_sync: None, network_service, - block_request_protocol_name, + block_downloader, state_request_protocol_name, warp_sync_config, warp_sync_target_block_header: None, @@ -1710,13 +1634,6 @@ where } } - fn decode_block_response(response: &[u8]) -> Result { - let response = schema::v1::BlockResponse::decode(response) - .map_err(|error| format!("Failed to decode block response: {error}"))?; - - Ok(OpaqueBlockResponse(Box::new(response))) - } - fn decode_state_response(response: &[u8]) -> Result { let response = StateResponse::decode(response) .map_err(|error| format!("Failed to decode state response: {error}"))?; @@ -1780,22 +1697,8 @@ where &mut self, peer_id: PeerId, request: BlockRequest, - response: OpaqueBlockResponse, + blocks: Vec>, ) -> Option> { - let blocks = match self.block_response_into_blocks(&request, response) { - Ok(blocks) => blocks, - Err(err) => { - debug!( - target: LOG_TARGET, - "Failed to decode block response from {}: {}", - peer_id, - err, - ); - self.network_service.report_peer(peer_id, rep::BAD_MESSAGE); - return None - }, - }; - let block_response = BlockResponse:: { id: request.id, blocks }; let blocks_range = || match ( @@ -1915,22 +1818,34 @@ where match response { Ok(Ok(resp)) => match request { PeerRequest::Block(req) => { - let response = match Self::decode_block_response(&resp[..]) { - Ok(proto) => proto, - Err(e) => { + match self.block_downloader.block_response_into_blocks(&req, resp) { + Ok(blocks) => { + if let Some(import) = self.on_block_response(id, req, blocks) { + return Poll::Ready(import) + } + }, + Err(BlockResponseError::DecodeFailed(e)) => { debug!( - target: LOG_TARGET, - "Failed to decode block response from peer {id:?}: {e:?}.", + target: "sync", + "Failed to decode block response from peer {:?}: {:?}.", + id, + e ); self.network_service.report_peer(id, rep::BAD_MESSAGE); self.network_service .disconnect_peer(id, self.block_announce_protocol_name.clone()); continue }, - }; - - if let Some(import) = self.on_block_response(id, req, response) { - return Poll::Ready(import) + Err(BlockResponseError::ExtractionFailed(e)) => { + debug!( + target: "sync", + "Failed to extract blocks from peer response {:?}: {:?}.", + id, + e + ); + self.network_service.report_peer(id, rep::BAD_MESSAGE); + continue + }, } }, PeerRequest::State => { @@ -2010,31 +1925,6 @@ where Poll::Pending } - /// Create implementation-specific block request. - fn create_opaque_block_request(&self, request: &BlockRequest) -> OpaqueBlockRequest { - OpaqueBlockRequest(Box::new(schema::v1::BlockRequest { - fields: request.fields.to_be_u32(), - from_block: match request.from { - FromBlock::Hash(h) => Some(schema::v1::block_request::FromBlock::Hash(h.encode())), - FromBlock::Number(n) => - Some(schema::v1::block_request::FromBlock::Number(n.encode())), - }, - direction: request.direction as i32, - max_blocks: request.max.unwrap_or(0), - support_multiple_justifications: true, - })) - } - - fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result, String> { - let request: &schema::v1::BlockRequest = request.0.downcast_ref().ok_or_else(|| { - "Failed to downcast opaque block response during encoding, this is an \ - implementation bug." - .to_string() - })?; - - Ok(request.encode_to_vec()) - } - fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result, String> { let request: &StateRequest = request.0.downcast_ref().ok_or_else(|| { "Failed to downcast opaque state response during encoding, this is an \ @@ -2909,7 +2799,7 @@ fn validate_blocks( #[cfg(test)] mod test { use super::*; - use crate::service::network::NetworkServiceProvider; + use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider}; use futures::executor::block_on; use sc_block_builder::BlockBuilderProvider; use sc_network_common::{ @@ -2947,7 +2837,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3013,7 +2903,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3187,7 +3077,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3313,7 +3203,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3470,7 +3360,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3612,7 +3502,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3756,7 +3646,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3801,7 +3691,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3853,7 +3743,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) diff --git a/substrate/client/network/sync/src/mock.rs b/substrate/client/network/sync/src/mock.rs index d37095c17d2c5..fa2fe178ef64d 100644 --- a/substrate/client/network/sync/src/mock.rs +++ b/substrate/client/network/sync/src/mock.rs @@ -16,15 +16,17 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! Contains a mock implementation of `ChainSync` that can be used -//! for testing calls made to `ChainSync`. +//! Contains mock implementations of `ChainSync` and 'BlockDownloader'. -use futures::task::Poll; +use crate::block_relay_protocol::{BlockDownloader as BlockDownloaderT, BlockResponseError}; + +use futures::{channel::oneshot, task::Poll}; use libp2p::PeerId; +use sc_network::RequestFailure; use sc_network_common::sync::{ message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse}, - BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, - OpaqueBlockResponse, PeerInfo, SyncStatus, + BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, PeerInfo, + SyncStatus, }; use sp_runtime::traits::{Block as BlockT, NumberFor}; @@ -79,11 +81,6 @@ mockall::mock! { ); fn peer_disconnected(&mut self, who: &PeerId); fn metrics(&self) -> Metrics; - fn block_response_into_blocks( - &self, - request: &BlockRequest, - response: OpaqueBlockResponse, - ) -> Result>, String>; fn poll<'a>( &mut self, cx: &mut std::task::Context<'a>, @@ -95,3 +92,21 @@ mockall::mock! { ); } } + +mockall::mock! { + pub BlockDownloader {} + + #[async_trait::async_trait] + impl BlockDownloaderT for BlockDownloader { + async fn download_block( + &self, + who: PeerId, + request: BlockRequest, + ) -> Result, RequestFailure>, oneshot::Canceled>; + fn block_response_into_blocks( + &self, + request: &BlockRequest, + response: Vec, + ) -> Result>, BlockResponseError>; + } +} diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index d350b0e54ae1c..11505903e35d7 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -798,12 +798,18 @@ pub trait TestNetFactory: Default + Sized + Send { let fork_id = Some(String::from("test-fork-id")); - let block_request_protocol_config = { - let (handler, protocol_config) = - BlockRequestHandler::new(&protocol_id, None, client.clone(), 50); - self.spawn_task(handler.run().boxed()); - protocol_config - }; + let (chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); + let mut block_relay_params = BlockRequestHandler::new( + chain_sync_network_handle.clone(), + &protocol_id, + None, + client.clone(), + 50, + ); + self.spawn_task(Box::pin(async move { + block_relay_params.server.run().await; + })); let state_request_protocol_config = { let (handler, protocol_config) = @@ -848,8 +854,6 @@ pub trait TestNetFactory: Default + Sized + Send { let block_announce_validator = config .block_announce_validator .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)); - let (chain_sync_network_provider, chain_sync_network_handle) = - NetworkServiceProvider::new(); let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (engine, sync_service, block_announce_config) = @@ -864,7 +868,7 @@ pub trait TestNetFactory: Default + Sized + Send { Some(warp_sync_params), chain_sync_network_handle, import_queue.service(), - block_request_protocol_config.name.clone(), + block_relay_params.downloader, state_request_protocol_config.name.clone(), Some(warp_protocol_config.name.clone()), rx, @@ -877,7 +881,7 @@ pub trait TestNetFactory: Default + Sized + Send { full_net_config.add_request_response_protocol(config); } for config in [ - block_request_protocol_config, + block_relay_params.request_response_config, state_request_protocol_config, light_client_request_protocol_config, warp_protocol_config, diff --git a/substrate/client/network/test/src/service.rs b/substrate/client/network/test/src/service.rs index 68e780545bb17..62d7f9f9d1bb1 100644 --- a/substrate/client/network/test/src/service.rs +++ b/substrate/client/network/test/src/service.rs @@ -156,12 +156,18 @@ impl TestNetworkBuilder { let fork_id = Some(String::from("test-fork-id")); let mut full_net_config = FullNetworkConfiguration::new(&network_config); - let block_request_protocol_config = { - let (handler, protocol_config) = - BlockRequestHandler::new(&protocol_id, None, client.clone(), 50); - tokio::spawn(handler.run().boxed()); - protocol_config - }; + let (chain_sync_network_provider, chain_sync_network_handle) = + self.chain_sync_network.unwrap_or(NetworkServiceProvider::new()); + let mut block_relay_params = BlockRequestHandler::new( + chain_sync_network_handle.clone(), + &protocol_id, + None, + client.clone(), + 50, + ); + tokio::spawn(Box::pin(async move { + block_relay_params.server.run().await; + })); let state_request_protocol_config = { let (handler, protocol_config) = @@ -177,8 +183,6 @@ impl TestNetworkBuilder { protocol_config }; - let (chain_sync_network_provider, chain_sync_network_handle) = - self.chain_sync_network.unwrap_or(NetworkServiceProvider::new()); let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (engine, chain_sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config::Role::Full), @@ -191,7 +195,7 @@ impl TestNetworkBuilder { None, chain_sync_network_handle, import_queue.service(), - block_request_protocol_config.name.clone(), + block_relay_params.downloader, state_request_protocol_config.name.clone(), None, rx, @@ -214,7 +218,7 @@ impl TestNetworkBuilder { } for config in [ - block_request_protocol_config, + block_relay_params.request_response_config, state_request_protocol_config, light_client_request_protocol_config, ] { diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 917b3be8dc7ce..e3a6ba5f356ed 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -50,8 +50,9 @@ use sc_network_bitswap::BitswapRequestHandler; use sc_network_common::role::Roles; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ - block_request_handler::BlockRequestHandler, engine::SyncingEngine, - service::network::NetworkServiceProvider, state_request_handler::StateRequestHandler, + block_relay_protocol::BlockRelayParams, block_request_handler::BlockRequestHandler, + engine::SyncingEngine, service::network::NetworkServiceProvider, + state_request_handler::StateRequestHandler, warp::WarpSyncParams, warp_request_handler::RequestHandler as WarpSyncRequestHandler, SyncingService, }; @@ -696,6 +697,9 @@ pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> { Option) -> Box + Send> + Send>>, /// Optional warp sync params. pub warp_sync_params: Option>, + /// User specified block relay params. If not specified, the default + /// block request handler will be used. + pub block_relay: Option>, } /// Build the network service, the network status sinks and an RPC sender. @@ -734,6 +738,7 @@ where import_queue, block_announce_validator_builder, warp_sync_params, + block_relay, } = params; if warp_sync_params.is_none() && config.network.sync_mode.is_warp() { @@ -757,19 +762,26 @@ where Box::new(DefaultBlockAnnounceValidator) }; - let (block_request_protocol_config, block_request_protocol_name) = { - // Allow both outgoing and incoming requests. - let (handler, protocol_config) = BlockRequestHandler::new( - &protocol_id, - config.chain_spec.fork_id(), - client.clone(), - net_config.network_config.default_peers_set.in_peers as usize + - net_config.network_config.default_peers_set.out_peers as usize, - ); - let config_name = protocol_config.name.clone(); - spawn_handle.spawn("block-request-handler", Some("networking"), handler.run()); - (protocol_config, config_name) + let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); + let (mut block_server, block_downloader, block_request_protocol_config) = match block_relay { + Some(params) => (params.server, params.downloader, params.request_response_config), + None => { + // Custom protocol was not specified, use the default block handler. + // Allow both outgoing and incoming requests. + let params = BlockRequestHandler::new( + chain_sync_network_handle.clone(), + &protocol_id, + config.chain_spec.fork_id(), + client.clone(), + config.network.default_peers_set.in_peers as usize + + config.network.default_peers_set.out_peers as usize, + ); + (params.server, params.downloader, params.request_response_config) + }, }; + spawn_handle.spawn("block-request-handler", Some("networking"), async move { + block_server.run().await; + }); let (state_request_protocol_config, state_request_protocol_name) = { let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize + @@ -860,7 +872,6 @@ where spawn_handle.spawn("peer-store", Some("networking"), peer_store.run()); let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); - let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (engine, sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config.role), client.clone(), @@ -872,7 +883,7 @@ where warp_sync_params, chain_sync_network_handle, import_queue.service(), - block_request_protocol_name, + block_downloader, state_request_protocol_name, warp_request_protocol_name, rx, diff --git a/substrate/client/transaction-pool/api/src/lib.rs b/substrate/client/transaction-pool/api/src/lib.rs index a132cbc46e9b0..73cc513708d2d 100644 --- a/substrate/client/transaction-pool/api/src/lib.rs +++ b/substrate/client/transaction-pool/api/src/lib.rs @@ -22,6 +22,7 @@ pub mod error; use async_trait::async_trait; +use codec::Codec; use futures::{Future, Stream}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use sp_core::offchain::TransactionPoolExt; @@ -187,7 +188,7 @@ pub trait TransactionPool: Send + Sync { /// Block type. type Block: BlockT; /// Transaction hash type. - type Hash: Hash + Eq + Member + Serialize + DeserializeOwned; + type Hash: Hash + Eq + Member + Serialize + DeserializeOwned + Codec; /// In-pool transaction type. type InPoolTransaction: InPoolTransaction< Transaction = TransactionFor,