Skip to content

Commit

Permalink
Upgrade libp2p from 0.52.4 to 0.54.1 (paritytech#6248) + net/libp2p: …
Browse files Browse the repository at this point in the history
…Enforce outbound request-response timeout limits (paritytech#7222)
  • Loading branch information
nazar-pc committed Feb 17, 2025
1 parent ab5882b commit c9e028d
Show file tree
Hide file tree
Showing 24 changed files with 865 additions and 1,087 deletions.
700 changes: 298 additions & 402 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ kvdb-shared-tests = { version = "0.11.0" }
landlock = { version = "0.3.0" }
libc = { version = "0.2.155" }
libfuzzer-sys = { version = "0.4" }
libp2p = { version = "0.52.4" }
libp2p = { version = "0.54.1" }
libp2p-identity = { version = "0.2.9" }
libsecp256k1 = { version = "0.7.0", default-features = false }
linked-hash-map = { version = "0.5.4" }
Expand Down
16 changes: 16 additions & 0 deletions prdoc/pr_6248.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: Upgrade libp2p to 0.54.1

doc:
- audience: [Node Dev, Node Operator]
description: |
Upgrade libp2p from 0.52.4 to 0.54.1

crates:
- name: sc-network
bump: major
- name: sc-network-types
bump: minor
- name: sc-network-sync
bump: patch
- name: sc-telemetry
bump: minor
2 changes: 1 addition & 1 deletion substrate/client/authority-discovery/src/worker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use futures::{
sink::SinkExt,
task::LocalSpawn,
};
use libp2p::{identity::SigningError, kad::record::Key as KademliaKey};
use libp2p::{identity::SigningError, kad::RecordKey as KademliaKey};
use prometheus_endpoint::prometheus::default_registry;

use sc_client_api::HeaderBackend;
Expand Down
1 change: 1 addition & 0 deletions substrate/client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct Behaviour<B: BlockT> {
}

/// Event generated by `Behaviour`.
#[derive(Debug)]
pub enum BehaviourOut {
/// Started a random iterative Kademlia discovery query.
RandomKademliaStarted,
Expand Down
172 changes: 82 additions & 90 deletions substrate/client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ use futures::prelude::*;
use futures_timer::Delay;
use ip_network::IpNetwork;
use libp2p::{
core::{Endpoint, Multiaddr},
core::{transport::PortUse, Endpoint, Multiaddr},
kad::{
self,
record::store::{MemoryStore, RecordStore},
store::{MemoryStore, RecordStore},
Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record,
Event, GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record,
RecordKey,
},
mdns::{self, tokio::Behaviour as TokioMdns},
Expand All @@ -68,8 +68,8 @@ use libp2p::{
toggle::{Toggle, ToggleConnectionHandler},
DialFailure, ExternalAddrConfirmed, FromSwarm,
},
ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, PollParameters,
StreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
},
PeerId,
};
Expand Down Expand Up @@ -214,23 +214,14 @@ impl DiscoveryConfig {
enable_mdns,
kademlia_disjoint_query_paths,
kademlia_protocol,
kademlia_legacy_protocol,
kademlia_legacy_protocol: _,
kademlia_replication_factor,
} = self;

let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
let mut config = KademliaConfig::default();
let mut config = KademliaConfig::new(kademlia_protocol.clone());

config.set_replication_factor(kademlia_replication_factor);
// Populate kad with both the legacy and the new protocol names.
// Remove the legacy protocol:
// https://github.com/paritytech/polkadot-sdk/issues/504
let kademlia_protocols = if let Some(legacy_protocol) = kademlia_legacy_protocol {
vec![kademlia_protocol.clone(), legacy_protocol]
} else {
vec![kademlia_protocol.clone()]
};
config.set_protocol_names(kademlia_protocols.into_iter().map(Into::into).collect());

config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);

Expand Down Expand Up @@ -613,12 +604,14 @@ impl NetworkBehaviour for DiscoveryBehaviour {
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
self.kademlia.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
port_use,
)
}

Expand Down Expand Up @@ -690,7 +683,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
Ok(list.into_iter().collect())
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(e) => {
self.num_connections += 1;
Expand Down Expand Up @@ -777,6 +770,10 @@ impl NetworkBehaviour for DiscoveryBehaviour {

self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
},
event => {
debug!(target: "sub-libp2p", "New unknown `FromSwarm` libp2p event: {event:?}");
self.kademlia.on_swarm_event(event);
},
}
}

Expand All @@ -789,11 +786,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
}

fn poll(
&mut self,
cx: &mut Context,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// Immediately process the content of `discovered`.
if let Some(ev) = self.pending_events.pop_front() {
return Poll::Ready(ToSwarm::GenerateEvent(ev))
Expand Down Expand Up @@ -836,7 +829,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}

while let Poll::Ready(ev) = self.kademlia.poll(cx, params) {
while let Poll::Ready(ev) = self.kademlia.poll(cx) {
match ev {
ToSwarm::GenerateEvent(ev) => match ev {
KademliaEvent::RoutingUpdated { peer, .. } => {
Expand Down Expand Up @@ -1019,30 +1012,38 @@ impl NetworkBehaviour for DiscoveryBehaviour {
e.key(), e,
),
},
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::Bootstrap(res),
..
} => match res {
Ok(ok) => debug!(
target: "sub-libp2p",
"Libp2p => DHT bootstrap progressed: {ok:?}",
),
Err(e) => warn!(
target: "sub-libp2p",
"Libp2p => DHT bootstrap error: {e:?}",
),
},
// We never start any other type of query.
KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
},
Event::ModeChanged { new_mode } => {
debug!(target: "sub-libp2p", "Libp2p => Kademlia mode changed: {new_mode}")
},
},
ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
ToSwarm::NotifyHandler { peer_id, handler, event } =>
return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }),
ToSwarm::CloseConnection { peer_id, connection } =>
return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
ToSwarm::NewExternalAddrCandidate(observed) =>
return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
ToSwarm::ExternalAddrConfirmed(addr) =>
return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
ToSwarm::ExternalAddrExpired(addr) =>
return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
ToSwarm::RemoveListener { id } =>
return Poll::Ready(ToSwarm::RemoveListener { id }),
event => {
return Poll::Ready(event.map_out(|_| {
unreachable!("`GenerateEvent` is handled in a branch above; qed")
}));
},
}
}

// Poll mDNS.
while let Poll::Ready(ev) = self.mdns.poll(cx, params) {
while let Poll::Ready(ev) = self.mdns.poll(cx) {
match ev {
ToSwarm::GenerateEvent(event) => match event {
mdns::Event::Discovered(list) => {
Expand All @@ -1064,17 +1065,17 @@ impl NetworkBehaviour for DiscoveryBehaviour {
},
// `event` is an enum with no variant
ToSwarm::NotifyHandler { event, .. } => match event {},
ToSwarm::CloseConnection { peer_id, connection } =>
return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
ToSwarm::NewExternalAddrCandidate(observed) =>
return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
ToSwarm::ExternalAddrConfirmed(addr) =>
return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
ToSwarm::ExternalAddrExpired(addr) =>
return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
ToSwarm::RemoveListener { id } =>
return Poll::Ready(ToSwarm::RemoveListener { id }),
event => {
return Poll::Ready(
event
.map_in(|_| {
unreachable!("`NotifyHandler` is handled in a branch above; qed")
})
.map_out(|_| {
unreachable!("`GenerateEvent` is handled in a branch above; qed")
}),
);
},
}
}

Expand Down Expand Up @@ -1117,21 +1118,14 @@ mod tests {
},
identity::Keypair,
noise,
swarm::{Executor, Swarm, SwarmEvent},
swarm::{Swarm, SwarmEvent},
yamux, Multiaddr,
};
use sp_core::hash::H256;
use std::{collections::HashSet, pin::Pin, task::Poll};
use std::{collections::HashSet, task::Poll, time::Duration};

struct TokioExecutor(tokio::runtime::Runtime);
impl Executor for TokioExecutor {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = self.0.spawn(f);
}
}

#[test]
fn discovery_working() {
#[tokio::test]
async fn discovery_working() {
let mut first_swarm_peer_id_and_addr = None;

let genesis_hash = H256::from_low_u64_be(1);
Expand All @@ -1142,42 +1136,40 @@ mod tests {
// the first swarm via `with_permanent_addresses`.
let mut swarms = (0..25)
.map(|i| {
let keypair = Keypair::generate_ed25519();

let transport = MemoryTransport::new()
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&keypair).unwrap())
.multiplex(yamux::Config::default())
.boxed();

let behaviour = {
let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
config
.with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
.allow_private_ip(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.with_kademlia(genesis_hash, fork_id, &protocol_id);

config.finish()
};

let runtime = tokio::runtime::Runtime::new().unwrap();
#[allow(deprecated)]
let mut swarm = libp2p::swarm::SwarmBuilder::with_executor(
transport,
behaviour,
keypair.public().to_peer_id(),
TokioExecutor(runtime),
)
.build();
let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_other_transport(|keypair| {
MemoryTransport::new()
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&keypair).unwrap())
.multiplex(yamux::Config::default())
.boxed()
})
.unwrap()
.with_behaviour(|keypair| {
let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
config
.with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
.allow_private_ip(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.with_kademlia(genesis_hash, fork_id, &protocol_id);

config.finish()
})
.unwrap()
.with_swarm_config(|config| {
// This is taken care of by notification protocols in non-test environment
config.with_idle_connection_timeout(Duration::from_secs(10))
})
.build();

let listen_addr: Multiaddr =
format!("/memory/{}", rand::random::<u64>()).parse().unwrap();

if i == 0 {
first_swarm_peer_id_and_addr =
Some((keypair.public().to_peer_id(), listen_addr.clone()))
Some((*swarm.local_peer_id(), listen_addr.clone()))
}

swarm.listen_on(listen_addr.clone()).unwrap();
Expand Down Expand Up @@ -1264,7 +1256,7 @@ mod tests {
}
});

futures::executor::block_on(fut);
fut.await
}

#[test]
Expand Down
15 changes: 10 additions & 5 deletions substrate/client/network/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::types::ProtocolName;

use bytes::Bytes;
use libp2p::{
kad::{record::Key, PeerRecord},
kad::{PeerRecord, RecordKey},
PeerId,
};

Expand All @@ -37,16 +37,21 @@ pub enum DhtEvent {
ValueFound(PeerRecord),

/// The requested record has not been found in the DHT.
ValueNotFound(Key),
ValueNotFound(RecordKey),

/// The record has been successfully inserted into the DHT.
ValuePut(Key),
ValuePut(RecordKey),

/// An error has occurred while putting a record into the DHT.
ValuePutFailed(Key),
ValuePutFailed(RecordKey),

/// The DHT received a put record request.
PutRecordRequest(Key, Vec<u8>, Option<sc_network_types::PeerId>, Option<std::time::Instant>),
PutRecordRequest(
RecordKey,
Vec<u8>,
Option<sc_network_types::PeerId>,
Option<std::time::Instant>,
),
}

/// Type for events generated by networking layer.
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use array_bytes::bytes2hex;
use futures::{FutureExt, Stream};
use futures_timer::Delay;
use ip_network::IpNetwork;
use libp2p::kad::record::Key as KademliaKey;
use libp2p::kad::RecordKey as KademliaKey;
use litep2p::{
protocol::{
libp2p::{
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/src/litep2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
use crate::litep2p::Record;
use codec::DecodeAll;
use futures::{channel::oneshot, stream::BoxStream};
use libp2p::{identity::SigningError, kad::record::Key as KademliaKey};
use libp2p::{identity::SigningError, kad::RecordKey as KademliaKey};
use litep2p::{
addresses::PublicAddresses, crypto::ed25519::Keypair,
types::multiaddr::Multiaddr as LiteP2pMultiaddr,
Expand Down
Loading

0 comments on commit c9e028d

Please sign in to comment.