From 845c9e22dce294462af120aef94fca8a72355706 Mon Sep 17 00:00:00 2001 From: sh3ll3x3c Date: Mon, 28 Oct 2024 15:40:57 +0100 Subject: [PATCH] integrate new automatic bootstrap mechanism on bootnode --- bootstrap/CHANGELOG.md | 1 + bootstrap/src/main.rs | 5 +- bootstrap/src/p2p.rs | 6 +- bootstrap/src/p2p/client.rs | 45 +---------- bootstrap/src/p2p/event_loop.rs | 137 +++++--------------------------- bootstrap/src/types.rs | 4 +- 6 files changed, 31 insertions(+), 167 deletions(-) diff --git a/bootstrap/CHANGELOG.md b/bootstrap/CHANGELOG.md index 4f89791b9..93c285900 100644 --- a/bootstrap/CHANGELOG.md +++ b/bootstrap/CHANGELOG.md @@ -2,6 +2,7 @@ ## [0.4.0] +- Integrate upstream `rust-libp2p` `0.54` changes to the bootstrap process - Add `/p2p/local/info` endpoint - Add webrtc support to bootstrap diff --git a/bootstrap/src/main.rs b/bootstrap/src/main.rs index c559f6b73..eea1b03c6 100644 --- a/bootstrap/src/main.rs +++ b/bootstrap/src/main.rs @@ -148,9 +148,10 @@ async fn run() -> Result<()> { network_client .add_bootstrap_nodes(cfg.bootstraps.iter().map(Into::into).collect()) .await?; + } else { + info!("No bootstrap list provided, starting client as the first bootstrap on the network.") } - network_client.bootstrap().await?; - info!("Bootstrap done."); + loop_handle.await?; Ok(()) diff --git a/bootstrap/src/p2p.rs b/bootstrap/src/p2p.rs index 8bc93e6a3..9a4bfd34a 100644 --- a/bootstrap/src/p2p.rs +++ b/bootstrap/src/p2p.rs @@ -64,7 +64,9 @@ pub async fn init( let kad_store = MemoryStore::new(id_keys.public().to_peer_id()); // create Kademlia Config let mut kad_cfg = kad::Config::new(cfg.kademlia.protocol_name); - kad_cfg.set_query_timeout(cfg.kademlia.query_timeout); + kad_cfg + .set_query_timeout(cfg.kademlia.query_timeout) + .set_periodic_bootstrap_interval(Some(cfg.kademlia.bootstrap_interval)); // build the Swarm, connecting the lower transport logic with the // higher layer network behaviour logic let tokio_swarm = SwarmBuilder::with_existing_identity(id_keys.clone()).with_tokio(); @@ -112,7 +114,7 @@ pub async fn init( Ok(( Client::new(command_sender), - EventLoop::new(swarm, command_receiver, cfg.bootstrap_interval), + EventLoop::new(swarm, command_receiver), )) } diff --git a/bootstrap/src/p2p/client.rs b/bootstrap/src/p2p/client.rs index 7828989ea..a0c2b6b7a 100644 --- a/bootstrap/src/p2p/client.rs +++ b/bootstrap/src/p2p/client.rs @@ -63,6 +63,7 @@ impl Client { .context("Sender not to be dropped.")? } + // Checks if bootstraps are available and adds them to the routing table automatically triggering bootstrap process pub async fn add_bootstrap_nodes(&self, nodes: Vec<(PeerId, Multiaddr)>) -> Result<()> { for (peer, addr) in nodes { self.dial_peer(peer, addr.clone()) @@ -74,43 +75,6 @@ impl Client { Ok(()) } - pub async fn bootstrap(&self) -> Result<()> { - // bootstrapping is impossible on an empty DHT table - // at least one node is required to be known, so check - let counted_peers = self.count_dht_entries().await?; - // for a bootstrap to succeed, we need at least 1 peer in our DHT - if counted_peers < 1 { - // we'll have to wait, until some one successfully connects us - let (peer_id, multiaddr) = self.wait_connection(None).await?; - // add that peer to have someone to bootstrap with - self.add_address(peer_id, multiaddr).await?; - } - - // proceed to bootstrap only if connected with someone - let (boot_res_sender, boot_res_receiver) = oneshot::channel(); - self.command_sender - .send(Command::Bootstrap { - response_sender: boot_res_sender, - }) - .await - .context("Command receiver should not be dropped while bootstrapping.")?; - boot_res_receiver - .await - .context("Sender not to be dropped while bootstrapping.")? - } - - async fn wait_connection(&self, peer_id: Option) -> Result<(PeerId, Multiaddr)> { - let (connection_res_sender, connection_res_receiver) = oneshot::channel(); - self.command_sender - .send(Command::WaitConnection { - peer_id, - response_sender: connection_res_sender, - }) - .await - .context("Command receiver should not be dropped while waiting on connection.")?; - Ok(connection_res_receiver.await?) - } - pub async fn count_dht_entries(&self) -> Result { let (response_sender, response_receiver) = oneshot::channel(); self.command_sender @@ -155,13 +119,6 @@ pub enum Command { multiaddr: Multiaddr, response_sender: oneshot::Sender>, }, - Bootstrap { - response_sender: oneshot::Sender>, - }, - WaitConnection { - peer_id: Option, - response_sender: oneshot::Sender<(PeerId, Multiaddr)>, - }, CountDHTPeers { response_sender: oneshot::Sender, }, diff --git a/bootstrap/src/p2p/event_loop.rs b/bootstrap/src/p2p/event_loop.rs index e0bf8eca1..90f44c7cf 100644 --- a/bootstrap/src/p2p/event_loop.rs +++ b/bootstrap/src/p2p/event_loop.rs @@ -3,20 +3,16 @@ use libp2p::{ autonat::{self, InboundProbeEvent, OutboundProbeEvent}, futures::StreamExt, identify::{Event as IdentifyEvent, Info}, - kad::{self, BootstrapOk, QueryId, QueryResult}, + kad::{self, BootstrapOk, QueryResult}, multiaddr::Protocol, swarm::SwarmEvent, - Multiaddr, PeerId, Swarm, + PeerId, Swarm, }; use std::{ collections::{hash_map, HashMap}, str::FromStr, - time::Duration, -}; -use tokio::{ - sync::{mpsc, oneshot}, - time::{interval_at, Instant, Interval}, }; +use tokio::sync::{mpsc, oneshot}; use tracing::{debug, info, trace}; use crate::types::AgentVersion; @@ -26,49 +22,24 @@ use super::{ Behaviour, BehaviourEvent, }; -enum QueryChannel { - Bootstrap(oneshot::Sender>), -} - enum SwarmChannel { Dial(oneshot::Sender>), - ConnectionEstablished(oneshot::Sender<(PeerId, Multiaddr)>), -} - -// BootstrapState keeps track of all things bootstrap related -struct BootstrapState { - // referring to this initial bootstrap process, - // one that runs when this node starts up - is_startup_done: bool, - // timer that is responsible for firing periodic bootstraps - timer: Interval, } pub struct EventLoop { swarm: Swarm, command_receiver: mpsc::Receiver, - pending_kad_queries: HashMap, pending_kad_routing: HashMap>>, pending_swarm_events: HashMap, - bootstrap: BootstrapState, } impl EventLoop { - pub fn new( - swarm: Swarm, - command_receiver: mpsc::Receiver, - bootstrap_interval: Duration, - ) -> Self { + pub fn new(swarm: Swarm, command_receiver: mpsc::Receiver) -> Self { Self { swarm, command_receiver, - pending_kad_queries: Default::default(), pending_kad_routing: Default::default(), pending_swarm_events: Default::default(), - bootstrap: BootstrapState { - is_startup_done: false, - timer: interval_at(Instant::now() + bootstrap_interval, bootstrap_interval), - }, } } @@ -82,7 +53,6 @@ impl EventLoop { // shutting down whole network event loop None => return, }, - _ = self.bootstrap.timer.tick() => self.handle_periodic_bootstraps(), } } } @@ -103,35 +73,21 @@ impl EventLoop { } }, kad::Event::OutboundQueryProgressed { - id, result: QueryResult::Bootstrap(bootstrap_result), .. - } => { - match bootstrap_result { - Ok(BootstrapOk { - peer, - num_remaining, - }) => { - trace!("BootstrapOK event. PeerID: {peer:?}. Num remaining: {num_remaining:?}."); - if num_remaining == 0 { - if let Some(QueryChannel::Bootstrap(ch)) = - self.pending_kad_queries.remove(&id) - { - _ = ch.send(Ok(())); - // we can say that the initial bootstrap at initialization is done - self.bootstrap.is_startup_done = true; - } - } - }, - Err(err) => { - trace!("Bootstrap error event. Error: {err:?}."); - if let Some(QueryChannel::Bootstrap(ch)) = - self.pending_kad_queries.remove(&id) - { - _ = ch.send(Err(err.into())); - } - }, - } + } => match bootstrap_result { + Ok(BootstrapOk { + peer, + num_remaining, + }) => { + debug!("BootstrapOK event. PeerID: {peer:?}. Num remaining: {num_remaining:?}."); + if num_remaining == 0 { + debug!("Bootstrap complete!"); + } + }, + Err(err) => { + debug!("Bootstrap error event. Error: {err:?}."); + }, }, _ => {}, }, @@ -238,23 +194,10 @@ impl EventLoop { SwarmEvent::ConnectionEstablished { endpoint, peer_id, .. } => { - // while waiting for a first successful connection, - // we're interested in a case where we are dialing back if endpoint.is_dialer() { - if let Some(event) = self.pending_swarm_events.remove(&peer_id) { - match event { - // check if there is a command waiting for a response for established 1st connection - SwarmChannel::ConnectionEstablished(ch) => { - // signal back that we have successfully established a connection, - // give us back PeerId and Multiaddress - let addr = endpoint.get_remote_address().to_owned(); - _ = ch.send((peer_id, addr)); - }, - SwarmChannel::Dial(ch) => { - // signal back that dial was a success - _ = ch.send(Ok(())); - }, - } + if let Some(SwarmChannel::Dial(ch)) = self.pending_swarm_events.remove(&peer_id) + { + _ = ch.send(Ok(())); } } }, @@ -309,38 +252,6 @@ impl EventLoop { .add_address(&peer_id, multiaddr); self.pending_kad_routing.insert(peer_id, response_sender); }, - Command::Bootstrap { response_sender } => { - match self.swarm.behaviour_mut().kademlia.bootstrap() { - Ok(query_id) => { - self.pending_kad_queries - .insert(query_id, QueryChannel::Bootstrap(response_sender)); - }, - // no available peers for bootstrap - // send error immediately through response channel - Err(err) => { - _ = response_sender.send(Err(err.into())); - }, - } - }, - Command::WaitConnection { - peer_id, - response_sender, - } => match peer_id { - // this means that we're waiting on a connection from - // a peer with provided PeerId - Some(id) => { - self.pending_swarm_events - .insert(id, SwarmChannel::ConnectionEstablished(response_sender)); - }, - // sending no particular PeerId means that we're - // waiting someone to establish connection with us - None => { - self.pending_swarm_events.insert( - self.swarm.local_peer_id().to_owned(), - SwarmChannel::ConnectionEstablished(response_sender), - ); - }, - }, Command::CountDHTPeers { response_sender } => { let mut total_peers: usize = 0; for bucket in self.swarm.behaviour_mut().kademlia.kbuckets() { @@ -370,12 +281,4 @@ impl EventLoop { }, } } - - fn handle_periodic_bootstraps(&mut self) { - // periodic bootstraps should only start after the initial one is done - if self.bootstrap.is_startup_done { - debug!("Starting periodic Bootstrap."); - _ = self.swarm.behaviour_mut().kademlia.bootstrap(); - } - } } diff --git a/bootstrap/src/types.rs b/bootstrap/src/types.rs index 6b87c0bff..6cc53d723 100644 --- a/bootstrap/src/types.rs +++ b/bootstrap/src/types.rs @@ -114,7 +114,6 @@ pub struct LibP2PConfig { pub identify: IdentifyConfig, pub kademlia: KademliaConfig, pub secret_key: Option, - pub bootstrap_interval: Duration, } impl From<&RuntimeConfig> for LibP2PConfig { @@ -124,7 +123,6 @@ impl From<&RuntimeConfig> for LibP2PConfig { identify: IdentifyConfig::new(), kademlia: rtcfg.into(), secret_key: rtcfg.secret_key.clone(), - bootstrap_interval: Duration::from_secs(rtcfg.bootstrap_period), } } } @@ -133,6 +131,7 @@ impl From<&RuntimeConfig> for LibP2PConfig { pub struct KademliaConfig { pub query_timeout: Duration, pub protocol_name: StreamProtocol, + pub bootstrap_interval: Duration, } impl From<&RuntimeConfig> for KademliaConfig { @@ -149,6 +148,7 @@ impl From<&RuntimeConfig> for KademliaConfig { KademliaConfig { query_timeout: Duration::from_secs(val.kad_query_timeout.into()), protocol_name, + bootstrap_interval: Duration::from_secs(val.bootstrap_period), } } }