From e088fd05efa6e21b58e4a9ddfe45908bc61feb29 Mon Sep 17 00:00:00 2001 From: jjy Date: Thu, 22 Nov 2018 10:54:49 +0800 Subject: [PATCH] fix: remove external lock reference of network::peer_registry --- network/src/ckb_protocol_handler.rs | 37 ++++------ network/src/ckb_service.rs | 20 +++--- network/src/discovery_service.rs | 24 ++----- network/src/identify_service.rs | 45 ++++++------- network/src/network.rs | 100 +++++++++++++++++++++++++--- network/src/network_service.rs | 11 +-- network/src/outgoing_service.rs | 5 +- network/src/peers_registry.rs | 7 +- network/src/ping_service.rs | 94 +++++++++++++------------- 9 files changed, 195 insertions(+), 148 deletions(-) diff --git a/network/src/ckb_protocol_handler.rs b/network/src/ckb_protocol_handler.rs index e97b2a96e5..b194780b17 100644 --- a/network/src/ckb_protocol_handler.rs +++ b/network/src/ckb_protocol_handler.rs @@ -73,9 +73,8 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { protocol_id: ProtocolId, data: Vec, ) -> Result<(), Error> { - let peers_registry = self.network.peers_registry().read(); - if let Some(peer_id) = peers_registry.get_peer_id(peer_index) { - self.network.send(peer_id, protocol_id, data.into()) + if let Some(peer_id) = self.network.get_peer_id(peer_index) { + self.network.send(&peer_id, protocol_id, data.into()) } else { Err(ErrorKind::PeerNotFound.into()) } @@ -88,23 +87,15 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { } // ban peer fn ban_peer(&self, peer_index: PeerIndex, timeout: Duration) { - let mut peers_registry = self.network.peers_registry().write(); - if let Some(peer_id) = peers_registry - .get_peer_id(peer_index) - .map(|peer_id| peer_id.to_owned()) - { - peers_registry.ban_peer(peer_id, timeout) + if let Some(peer_id) = self.network.get_peer_id(peer_index) { + self.network.ban_peer(peer_id, timeout) } } // disconnect from peer fn disconnect(&self, peer_index: PeerIndex) { debug!(target: "network", "disconnect peer {}", peer_index); - let mut peers_registry = self.network.peers_registry().write(); - if let Some(peer_id) = peers_registry - .get_peer_id(peer_index) - .map(|peer_id| peer_id.to_owned()) - { - peers_registry.drop_peer(&peer_id) + if let Some(peer_id) = self.network.get_peer_id(peer_index) { + self.network.drop_peer(&peer_id) } } fn register_timer(&self, token: TimerToken, duration: Duration) -> Result<(), Error> { @@ -124,10 +115,10 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { Ok(()) } fn session_info(&self, peer_index: PeerIndex) -> Option { - let peers_registry = self.network.peers_registry().read(); - if let Some(session) = peers_registry + if let Some(session) = self + .network .get_peer_id(peer_index) - .map(|peer_id| self.network.session_info(peer_id, self.protocol_id)) + .map(|peer_id| self.network.session_info(&peer_id, self.protocol_id)) { session } else { @@ -135,10 +126,10 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { } } fn protocol_version(&self, peer_index: PeerIndex, protocol_id: ProtocolId) -> Option { - let peers_registry = self.network.peers_registry().read(); - if let Some(protocol_version) = peers_registry + if let Some(protocol_version) = self + .network .get_peer_id(peer_index) - .map(|peer_id| self.network.peer_protocol_version(peer_id, protocol_id)) + .map(|peer_id| self.network.peer_protocol_version(&peer_id, protocol_id)) { protocol_version } else { @@ -151,9 +142,7 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { } fn connected_peers(&self) -> Vec { - let peers_registry = self.network.peers_registry().read(); - let iter = peers_registry.connected_peers_indexes(); - iter.collect::>() + self.network.peers_indexes() } } diff --git a/network/src/ckb_service.rs b/network/src/ckb_service.rs index fde965ded9..03620229ea 100644 --- a/network/src/ckb_service.rs +++ b/network/src/ckb_service.rs @@ -55,16 +55,13 @@ impl CKBService { return Box::new(future::ok(())) as Box<_>; } - let peer_index = { - let peers_registry = network.peers_registry().read(); - match peers_registry.get(&peer_id) { - Some(peer) => peer.peer_index.unwrap(), - None => { - return Box::new(future::err(IoError::new( - IoErrorKind::Other, - format!("can't find peer {:?}", peer_id), - ))) - } + let peer_index = match network.get_peer_index(&peer_id) { + Some(peer_index) => peer_index, + None => { + return Box::new(future::err(IoError::new( + IoErrorKind::Other, + format!("can't find peer {:?}", peer_id), + ))) } }; @@ -118,8 +115,7 @@ impl CKBService { )), peer_index, ); - let mut peers_registry = network.peers_registry().write(); - peers_registry.drop_peer(&peer_id); + network.drop_peer(&peer_id); val } }) diff --git a/network/src/discovery_service.rs b/network/src/discovery_service.rs index 43f6f4a4ff..e51d3167c3 100644 --- a/network/src/discovery_service.rs +++ b/network/src/discovery_service.rs @@ -30,10 +30,8 @@ use transport::TransportOutput; pub(crate) struct DiscoveryService { timeout: Duration, - discovery_interval: Duration, pub(crate) kad_system: Arc, default_response_neighbour_count: usize, - pub(crate) kad_upgrade: kad::KadConnecConfig, kad_manage: Arc>, } @@ -83,16 +81,13 @@ impl DiscoveryService { pub fn new( timeout: Duration, default_response_neighbour_count: usize, - discovery_interval: Duration, kad_manage: Arc>, kad_system: Arc, ) -> Self { DiscoveryService { timeout, kad_system, - kad_upgrade: kad::KadConnecConfig::new(), default_response_neighbour_count, - discovery_interval, kad_manage, } } @@ -101,7 +96,7 @@ impl DiscoveryService { &self, network: Arc, peer_id: PeerId, - client_addr: Multiaddr, + _client_addr: Multiaddr, kad_connection_controller: kad::KadConnecController, _endpoint: Endpoint, kademlia_stream: Box + Send>, @@ -299,7 +294,6 @@ where Vec>, Error = IoError> + Send>>, kad_system: Arc, kad_manage: Arc>, - kad_upgrade: kad::KadConnecConfig, } impl DiscoveryQueryService @@ -331,7 +325,6 @@ where discovery_interval: Duration, kad_system: Arc, kad_manage: Arc>, - kad_upgrade: kad::KadConnecConfig, ) -> Self { let (kad_controller_request_sender, kad_controller_request_receiver) = mpsc::unbounded(); DiscoveryQueryService { @@ -345,7 +338,6 @@ where kad_query_events: Vec::with_capacity(10), kad_system, kad_manage, - kad_upgrade, kad_controller_request_sender, kad_controller_request_receiver, } @@ -362,17 +354,16 @@ where let query = self.kad_system.find_node(random_peer_id, { let kad_manage = Arc::clone(&self.kad_manage); let kad_controller_request_sender = self.kad_controller_request_sender.clone(); - let kad_upgrade = self.kad_upgrade.clone(); move |peer_id| { let (tx, rx) = oneshot::channel(); let mut kad_manage = kad_manage.lock(); kad_manage .kad_pending_dials .entry(peer_id.clone()) - .or_insert(Vec::new()) + .or_insert_with(Vec::new) .push(tx); - debug!(target: "discovery", "find node from {:?} pending: {}", peer_id, kad_manage.kad_pending_dials.get(&peer_id).unwrap().len()); - kad_controller_request_sender.unbounded_send(peer_id.clone()); + debug!(target: "discovery", "find node from {:?} pending: {}", peer_id, kad_manage.kad_pending_dials[&peer_id].len()); + kad_controller_request_sender.unbounded_send(peer_id.clone()).expect("send kad controller request"); rx.map_err(|err| { IoError::new( IoErrorKind::Other, @@ -521,16 +512,14 @@ pub(crate) struct KadManage { connected_kad_peers: FnvHashMap>, kad_pending_dials: FnvHashMap>>, kad_upgrade: kad::KadConnecConfig, - network: Arc, pub(crate) to_notify: Option, } impl KadManage { - pub fn new(network: Arc, kad_upgrade: kad::KadConnecConfig) -> Self { + pub fn new(_network: Arc, kad_upgrade: kad::KadConnecConfig) -> Self { KadManage { connected_kad_peers: FnvHashMap::with_capacity_and_hasher(10, Default::default()), kad_pending_dials: FnvHashMap::with_capacity_and_hasher(10, Default::default()), - network, kad_upgrade, to_notify: None, } @@ -608,10 +597,11 @@ impl KadManage { } }); - let dial_future = kad_connection.dial(swarm_controller, addr, transport); + let _ = kad_connection.dial(swarm_controller, addr, transport); Ok(kad_connection) } + #[allow(dead_code)] fn drop_connection(&mut self, peer_id: &PeerId) { debug!(target: "discovery","disconnect kad connection from {:?}", peer_id); self.connected_kad_peers.remove(peer_id); diff --git a/network/src/identify_service.rs b/network/src/identify_service.rs index ee2b5df76a..58ddbfd128 100644 --- a/network/src/identify_service.rs +++ b/network/src/identify_service.rs @@ -40,21 +40,21 @@ impl IdentifyService { trace!("process identify for peer_id {:?} with {:?}", peer_id, info); // set identify info to peer { - let mut peers_registry = network.peers_registry().write(); - match peers_registry.get_mut(&peer_id) { - Some(peer) => { - peer.identify_info = Some(PeerIdentifyInfo { - client_version: info.agent_version.clone(), - protocol_version: info.protocol_version.clone(), - supported_protocols: info.protocols.clone(), - count_of_known_listen_addrs: info.listen_addrs.len(), - }) - } - None => error!( + let identify_info = PeerIdentifyInfo { + client_version: info.agent_version.clone(), + protocol_version: info.protocol_version.clone(), + supported_protocols: info.protocols.clone(), + count_of_known_listen_addrs: info.listen_addrs.len(), + }; + if network + .set_peer_identify_info(&peer_id, identify_info) + .is_err() + { + error!( target: "network", "can't find peer_id {:?} during process identify info", peer_id - ), + ) } } @@ -194,21 +194,20 @@ where let _identify_timeout = self.identify_timeout; let network = Arc::clone(&network); move |_| { - let peers_registry = network.peers_registry().read(); - for (peer_id, peer) in peers_registry.peers_iter() { - if let Some(ref identify_info) = peer.identify_info { + for peer_id in network.peers() { + if let Some(ref identify_info) = network.get_peer_identify_info(&peer_id) { if identify_info.count_of_known_listen_addrs > 0 { continue; } } - trace!( - target: "network", - "request identify to peer {:?} {:?}", - peer_id, - peer.remote_addresses - ); // TODO should we try all addresses? - if let Some(addr) = peer.remote_addresses.get(0) { + if let Some(addr) = network.get_peer_remote_addresses(&peer_id).get(0) { + trace!( + target: "network", + "request identify to peer {:?} {:?}", + peer_id, + addr + ); // dial identify let _ = swarm_controller.dial(addr.clone(), transport.clone()); } else { @@ -216,7 +215,7 @@ where target: "network", "error when prepare identify : can't find addresses for peer {:?}", peer_id - ); + ); } } Ok(()) diff --git a/network/src/network.rs b/network/src/network.rs index 438d9bb307..8781ecb53b 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -1,7 +1,7 @@ #![cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] use super::NetworkConfig; -use super::{Error, ErrorKind, ProtocolId}; +use super::{Error, ErrorKind, PeerIndex, ProtocolId}; use bytes::Bytes; use ckb_protocol::{CKBProtocol, CKBProtocols}; use ckb_protocol_handler::CKBProtocolHandler; @@ -17,12 +17,11 @@ use identify_service::IdentifyService; use libp2p::core::{upgrade, MuxedTransport, PeerId}; use libp2p::core::{Endpoint, Multiaddr, UniqueConnec}; use libp2p::core::{PublicKey, SwarmController}; -use libp2p::{self, identify, kad, secio, Transport, TransportTimeout}; +use libp2p::{self, identify, kad, ping, secio, Transport, TransportTimeout}; use memory_peer_store::MemoryPeerStore; use outgoing_service::OutgoingService; use peer_store::{Behaviour, PeerStore}; -use peers_registry::PeerIdentifyInfo; -use peers_registry::PeersRegistry; +use peers_registry::{ConnectionStatus, PeerConnection, PeerIdentifyInfo, PeersRegistry}; use ping_service::PingService; use protocol::Protocol; use protocol_service::ProtocolService; @@ -77,10 +76,91 @@ pub struct Network { } impl Network { - // keep peers_registry function crate available, to avoiding lock race condition from outside. + pub fn drop_peer(&self, peer_id: &PeerId) { + self.peers_registry.write().drop_peer(&peer_id); + } + + pub(crate) fn add_peer(&self, peer_id: PeerId, peer: PeerConnection) { + let mut peers_registry = self.peers_registry.write(); + peers_registry.add_peer(peer_id, peer); + } + + pub(crate) fn get_peer_index(&self, peer_id: &PeerId) -> Option { + let peers_registry = self.peers_registry.read(); + peers_registry + .get(&peer_id) + .and_then(|peer| peer.peer_index) + } + + pub(crate) fn get_peer_id(&self, peer_index: PeerIndex) -> Option { + let peers_registry = self.peers_registry.read(); + peers_registry + .get_peer_id(peer_index) + .map(|peer_id| peer_id.to_owned()) + } + + pub(crate) fn connection_status(&self) -> ConnectionStatus { + let peers_registry = self.peers_registry.read(); + peers_registry.connection_status() + } + + pub(crate) fn get_peer_identify_info(&self, peer_id: &PeerId) -> Option { + let peers_registry = self.peers_registry.read(); + peers_registry + .get(peer_id) + .and_then(|peer| peer.identify_info.clone()) + } + + pub(crate) fn set_peer_identify_info( + &self, + peer_id: &PeerId, + identify_info: PeerIdentifyInfo, + ) -> Result<(), ()> { + let mut peers_registry = self.peers_registry.write(); + match peers_registry.get_mut(peer_id) { + Some(peer) => { + peer.identify_info = Some(identify_info); + Ok(()) + } + None => Err(()), + } + } + + pub(crate) fn get_peer_pinger(&self, peer_id: &PeerId) -> Option> { + let peers_registry = self.peers_registry.read(); + peers_registry + .get(peer_id) + .map(|peer| peer.pinger_loader.clone()) + } + + pub(crate) fn get_peer_remote_addresses(&self, peer_id: &PeerId) -> Vec { + let peers_registry = self.peers_registry.read(); + if let Some(peer) = peers_registry.get(peer_id) { + peer.remote_addresses.clone() + } else { + Vec::new() + } + } + + pub(crate) fn peers(&self) -> impl Iterator { + let peers_registry = self.peers_registry.read(); + let peers = peers_registry + .peers_iter() + .map(|(peer_id, _peer)| peer_id.to_owned()) + .collect::>(); + peers.into_iter() + } + + pub(crate) fn peers_indexes(&self) -> Vec { + let peers_registry = self.peers_registry.read(); + let iter = peers_registry.connected_peers_indexes(); + iter.collect::>() + } + #[inline] - pub(crate) fn peers_registry<'a>(&'a self) -> &'a RwLock { - &self.peers_registry + pub(crate) fn ban_peer(&self, peer_id: PeerId, timeout: Duration) { + let mut peers_registry = self.peers_registry.write(); + peers_registry.ban_peer(peer_id, timeout); } #[inline] @@ -146,7 +226,7 @@ impl Network { endpoint: Endpoint, addresses: Option>, ) -> Result, u8)>, Error> { - let mut peers_registry = self.peers_registry().write(); + let mut peers_registry = self.peers_registry.write(); // get peer protocol_connection match peers_registry.new_peer(peer_id.clone(), endpoint) { Ok(_) => { @@ -456,7 +536,6 @@ impl Network { let discovery_service = Arc::new(DiscoveryService::new( config.discovery_timeout, config.discovery_response_count, - config.discovery_interval, Arc::clone(&kad_manage), Arc::clone(&kad_system), )); @@ -600,7 +679,6 @@ impl Network { config.discovery_interval, Arc::clone(&kad_system), Arc::clone(&kad_manage), - kad_upgrade.clone(), ); // prepare services futures @@ -638,7 +716,7 @@ impl Network { .and_then({ let network = Arc::clone(&network); move |_| { - let mut peers_registry = network.peers_registry().write(); + let mut peers_registry = network.peers_registry.write(); debug!(target: "network", "drop all connections..."); peers_registry.drop_all(); Ok(()) diff --git a/network/src/network_service.rs b/network/src/network_service.rs index 8be857bfd5..1605651425 100644 --- a/network/src/network_service.rs +++ b/network/src/network_service.rs @@ -9,7 +9,7 @@ use futures::sync::oneshot; use libp2p::core::PeerId; use network::Network; use peer_store::PeerStore; -use peers_registry::{PeerConnection, PeersRegistry}; +use peers_registry::PeerConnection; use std::boxed::Box; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::sync::Arc; @@ -34,20 +34,15 @@ impl NetworkService { self.network.external_url() } - #[inline] - pub(crate) fn peers_registry<'a>(&'a self) -> &'a RwLock { - &self.network.peers_registry() - } - #[allow(dead_code)] #[inline] pub(crate) fn peer_store<'a>(&'a self) -> &'a RwLock> { &self.network.peer_store() } + #[inline] pub fn add_peer(&self, peer_id: PeerId, peer: PeerConnection) { - let mut peers_registry = self.peers_registry().write(); - peers_registry.add_peer(peer_id, peer); + self.network.add_peer(peer_id, peer); } pub fn with_protocol_context(&self, protocol_id: ProtocolId, f: F) -> Option diff --git a/network/src/outgoing_service.rs b/network/src/outgoing_service.rs index 06f7946449..d988e08d0d 100644 --- a/network/src/outgoing_service.rs +++ b/network/src/outgoing_service.rs @@ -79,10 +79,7 @@ impl ProtocolService for OutgoingService { let transport = transport.clone(); let timeout = self.timeout; move |_| { - let connection_status = { - let peers_registry = network.peers_registry().read(); - peers_registry.connection_status() - }; + let connection_status = network.connection_status(); let new_outgoing = (connection_status.max_outgoing - connection_status.unreserved_outgoing) as usize; diff --git a/network/src/peers_registry.rs b/network/src/peers_registry.rs index e4e51c6d7e..04b6601804 100644 --- a/network/src/peers_registry.rs +++ b/network/src/peers_registry.rs @@ -150,7 +150,7 @@ pub struct ConnectionStatus { pub max_outgoing: u32, } -pub struct PeersRegistry { +pub(crate) struct PeersRegistry { // store all known peers peer_store: Arc>>, peer_connections: PeerConnections, @@ -188,8 +188,9 @@ impl PeersRegistry { } // registry a new peer + #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] pub fn new_peer(&mut self, peer_id: PeerId, endpoint: Endpoint) -> Result<(), Error> { - if let Some(_) = self.peer_connections.get(&peer_id) { + if self.peer_connections.get(&peer_id).is_some() { return Ok(()); } let is_reserved = self.peer_store.read().is_reserved(&peer_id); @@ -228,7 +229,7 @@ impl PeersRegistry { } let peer = PeerConnection::new(endpoint); let peer_index = self.add_peer(peer_id.clone(), peer); - debug!(target: "network", "allocate peer_index {} to peer {:?}", peer_index,peer_id); + debug!(target: "network", "allocate peer_index {} to peer {:?}", peer_index, peer_id); Ok(()) } diff --git a/network/src/ping_service.rs b/network/src/ping_service.rs index 1a1d2b140c..8e1262ca53 100644 --- a/network/src/ping_service.rs +++ b/network/src/ping_service.rs @@ -60,10 +60,10 @@ impl ProtocolService for PingService { Box::new(processing) as Box + Send> } Protocol::Ping(pinger, processing, peer_id) => { - match network.peers_registry().read().get(&peer_id) { - Some(peer) => { + match network.get_peer_pinger(&peer_id) { + Some(pinger_loader) => { // ping and store pinger - Box::new(peer.pinger_loader.tie_or_passthrough(pinger, processing)) + Box::new(pinger_loader.tie_or_passthrough(pinger, processing)) as Box + Send> } None => Box::new(future::err(IoError::new( @@ -118,63 +118,65 @@ impl ProtocolService for PingService { let ping_timeout = self.ping_timeout; move |_| { let mut ping_futures = FuturesUnordered::new(); - let peers_registry = network.peers_registry().read(); // build ping future for each peer - for (peer_id, peer) in peers_registry.peers_iter() { + for peer_id in network.peers() { let peer_id = peer_id.clone(); // only ping first address? - if let Some(addr) = peer.remote_addresses.get(0) { - let ping_future = peer - .pinger_loader - .dial(&swarm_controller, &addr, transport.clone()) - .and_then({ - let peer_id = peer_id.clone(); - move |mut pinger| { - pinger.ping().map(|_| peer_id).map_err(|err| { - IoError::new( - IoErrorKind::Other, - format!("pinger error {}", err), - ) - }) - } - }); - let ping_start_time = Instant::now(); - let ping_future = Timeout::new(ping_future, ping_timeout).then({ - let network = Arc::clone(&network); - move |result| -> Result<(), IoError> { - let mut peer_store = network.peer_store().write(); - match result { - Ok(peer_id) => { - let received_during = ping_start_time.elapsed(); - peer_store.report(&peer_id, Behaviour::Ping); - trace!( + if let Some(addr) = network.get_peer_remote_addresses(&peer_id).get(0) { + if let Some(pinger_loader) = network.get_peer_pinger(&peer_id) { + let ping_future = pinger_loader + .dial(&swarm_controller, &addr, transport.clone()) + .and_then({ + let peer_id = peer_id.clone(); + move |mut pinger| { + pinger.ping().map(|_| peer_id).map_err(|err| { + IoError::new( + IoErrorKind::Other, + format!("pinger error {}", err), + ) + }) + } + }); + let ping_start_time = Instant::now(); + let ping_future = + Future::then(Timeout::new(ping_future, ping_timeout), { + let network = Arc::clone(&network); + move |result| -> Result<(), IoError> { + let mut peer_store = network.peer_store().write(); + match result { + Ok(peer_id) => { + let received_during = + ping_start_time.elapsed(); + peer_store + .report(&peer_id, Behaviour::Ping); + trace!( target: "network", "received pong from {:?} in {:?}", peer_id, received_during ); - Ok(()) - } - Err(err) => { - peer_store - .report(&peer_id, Behaviour::FailedToPing); - network - .peers_registry() - .write() - .drop_peer(&peer_id); - trace!( + Ok(()) + } + Err(err) => { + peer_store.report( + &peer_id, + Behaviour::FailedToPing, + ); + network.drop_peer(&peer_id); + trace!( target: "network", "error when send ping to {:?}, error: {:?}", peer_id, err ); - Ok(()) + Ok(()) + } + } } - } - } - }); - ping_futures.push(Box::new(ping_future) - as Box + Send>); + }); + ping_futures.push(Box::new(ping_future) + as Box + Send>); + } } } Box::new(