diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 47e23337633b..51f742285af9 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -49,7 +49,7 @@ use crate::{ NotificationSenderReady as NotificationSenderReadyT, }, }, - transport, + transport::{self, build_transport, NetworkConfig}, types::ProtocolName, ReputationChange, }; @@ -69,7 +69,11 @@ use libp2p::{ AddressScore, ConnectionError, ConnectionId, ConnectionLimits, DialError, Executor, ListenError, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent, THandlerErr, }, - Multiaddr, PeerId, + Multiaddr, PeerId, TransportExt, +}; +use libp2p::{ + core::{muxing::StreamMuxerBox, StreamMuxer}, + Transport, }; use log::{debug, error, info, trace, warn}; use metrics::{Histogram, MetricSources, Metrics}; @@ -144,12 +148,28 @@ where B: BlockT + 'static, H: ExHashT, { - /// Creates the network service. + /// Creates the network service. It allows to provide a custom implementation + /// of the [`libp2p::Transport`] for its underlying network transport, + /// i.e. a transport that should at minimum provide authentication and multiplexing. + /// Default implementation can be aquired by calling [`crate::transport::build_transport`]. /// /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. - pub fn new(params: Params) -> Result { + pub fn new( + params: Params, + transport_builder: impl FnOnce(NetworkConfig) -> T, + ) -> Result + where + T: Transport + Send + Unpin + 'static, + T::Dial: Send, + T::ListenerUpgrade: Send, + T::Error: Send + Sync, + + SM: StreamMuxer + Send + 'static, + SM::Substream: Send, + SM::Error: Send + Sync, + { let FullNetworkConfiguration { notification_protocols, request_response_protocols, @@ -260,12 +280,15 @@ where .saturating_add(10) }; - transport::build_transport( - local_identity.clone(), - config_mem, - network_config.yamux_window_size, - yamux_maximum_buffer_size, - ) + transport_builder(NetworkConfig { + keypair: local_identity.clone(), + memory_only: config_mem, + muxer_window_size: network_config.yamux_window_size, + muxer_maximum_buffer_size: yamux_maximum_buffer_size, + }) + .map(|(peer_id, stream_muxer), _| (peer_id, StreamMuxerBox::new(stream_muxer))) + .boxed() + .with_bandwidth_logging() }; let (to_notifications, from_protocol_controllers) = diff --git a/substrate/client/network/src/transport.rs b/substrate/client/network/src/transport.rs index 4136b34fc0e8..32483a9a9fc9 100644 --- a/substrate/client/network/src/transport.rs +++ b/substrate/client/network/src/transport.rs @@ -19,41 +19,43 @@ //! Transport that serves as a common ground for all connections. use either::Either; +use futures::{AsyncRead, AsyncWrite}; use libp2p::{ - core::{ - muxing::StreamMuxerBox, - transport::{Boxed, OptionalTransport}, - upgrade, - }, - dns, identity, noise, tcp, websocket, PeerId, Transport, TransportExt, + core::{transport::OptionalTransport, upgrade, StreamMuxer}, + dns, identity, + identity::Keypair, + noise, tcp, websocket, PeerId, Transport, }; -use std::{sync::Arc, time::Duration}; +use std::time::Duration; pub use libp2p::bandwidth::BandwidthSinks; -/// Builds the transport that serves as a common ground for all connections. -/// -/// If `memory_only` is true, then only communication within the same process are allowed. Only -/// addresses with the format `/memory/...` are allowed. -/// -/// `yamux_window_size` is the maximum size of the Yamux receive windows. `None` to leave the -/// default (256kiB). -/// -/// `yamux_maximum_buffer_size` is the maximum allowed size of the Yamux buffer. This should be -/// set either to the maximum of all the maximum allowed sizes of messages frames of all -/// high-level protocols combined, or to some generously high value if you are sure that a maximum -/// size is enforced on all high-level protocols. -/// -/// Returns a `BandwidthSinks` object that allows querying the average bandwidth produced by all -/// the connections spawned with this transport. -pub fn build_transport( - keypair: identity::Keypair, +/// Describes network configuration used for building instances of [`libp2p::Transport`]. +pub struct NetworkConfig { + /// Our network identity. + pub keypair: Keypair, + /// Indicates whether created [`Transport`] should be only memory-based. + pub memory_only: bool, + /// Window size of the muxer. + pub muxer_window_size: Option, + /// Buffer size of the muxer. + pub muxer_maximum_buffer_size: usize, +} + +/// Creates default base layer of network transport, i.e. a transport that allows connectivity for +/// `WS + WSS` (with `DNS`) or `TCP + WS` (when `DNS` is not available). It can be used as basis for +/// building a custom implementation of authenticated and mutliplexed [`libp2p::Transport`] that is +/// required by the [`NetworkWorker`]. +pub fn build_basic_transport( memory_only: bool, - yamux_window_size: Option, - yamux_maximum_buffer_size: usize, -) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc) { +) -> impl Transport< + Output = impl AsyncRead + AsyncWrite, + Dial = impl Send, + ListenerUpgrade = impl Send, + Error = impl Send, +> + Send { // Build the base layer of the transport. - let transport = if !memory_only { + if !memory_only { // Main transport: DNS(TCP) let tcp_config = tcp::Config::new().nodelay(true); let tcp_trans = tcp::tokio::Transport::new(tcp_config.clone()); @@ -78,8 +80,27 @@ pub fn build_transport( }) } else { Either::Right(OptionalTransport::some(libp2p::core::transport::MemoryTransport::default())) - }; + } +} +/// Adds authentication and multiplexing to a given implementation of [`libp2p::Transport`]. +/// It uses the `noise` protocol for authentication and the `yamux` library for connection multiplexing. +pub fn add_authentication_and_muxing( + keypair: identity::Keypair, + yamux_window_size: Option, + yamux_maximum_buffer_size: usize, + transport: impl Transport< + Output = impl AsyncRead + AsyncWrite + Send + Unpin + 'static, + Dial = impl Send, + ListenerUpgrade = impl Send, + Error = impl Send + 'static, + > + Send, +) -> impl Transport< + Output = (PeerId, impl StreamMuxer + Send), + Dial = impl Send, + ListenerUpgrade = impl Send, + Error = impl Send, +> + Send { let authentication_config = noise::Config::new(&keypair).expect("Can create noise config. qed"); let multiplexing_config = { let mut yamux_config = libp2p::yamux::Config::default(); @@ -95,12 +116,43 @@ pub fn build_transport( yamux_config }; - let transport = transport + transport .upgrade(upgrade::Version::V1Lazy) .authenticate(authentication_config) .multiplex(multiplexing_config) .timeout(Duration::from_secs(20)) - .boxed(); +} - transport.with_bandwidth_logging() +/// Builds the transport that serves as a common ground for all connections. +/// +/// If `memory_only` is true, then only communication within the same process are allowed. Only +/// addresses with the format `/memory/...` are allowed. +/// +/// `yamux_window_size` is the maximum size of the Yamux receive windows. `None` to leave the +/// default (256kiB). +/// +/// `yamux_maximum_buffer_size` is the maximum allowed size of the Yamux buffer. This should be +/// set either to the maximum of all the maximum allowed sizes of messages frames of all +/// high-level protocols combined, or to some generously high value if you are sure that a maximum +/// size is enforced on all high-level protocols. +/// +/// Returns a multiplexed and authenticated implementation of [`libp2p::Transport``]. +pub fn build_transport( + keypair: identity::Keypair, + memory_only: bool, + yamux_window_size: Option, + yamux_maximum_buffer_size: usize, +) -> impl Transport< + Output = (PeerId, impl StreamMuxer + Send), + Dial = impl Send, + ListenerUpgrade = impl Send, + Error = impl Send, +> + Send { + let basic_transport = build_basic_transport(memory_only); + add_authentication_and_muxing( + keypair, + yamux_window_size, + yamux_maximum_buffer_size, + basic_transport, + ) } diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 1a3a679c519a..599ad61a1add 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -926,7 +926,14 @@ where }; let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty(); - let network_mut = sc_network::NetworkWorker::new(network_params)?; + let network_mut = sc_network::NetworkWorker::new(network_params, |config| { + sc_network::transport::build_transport( + config.keypair, + config.memory_only, + config.muxer_window_size, + config.muxer_maximum_buffer_size, + ) + })?; let network = network_mut.service().clone(); let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(