diff --git a/network/src/command.rs b/network/src/command.rs index c9d2ef3481..1c84651b7c 100644 --- a/network/src/command.rs +++ b/network/src/command.rs @@ -1,29 +1,34 @@ use crate::CKBProtocolContext; +use crate::peer::BroadcastTarget; use crate::{Behaviour, Peer, PeerIndex, SupportProtocols}; use ckb_logger::debug; use p2p::bytes::Bytes; -use p2p::service::TargetSession; +use crate::Multiaddr; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use tokio::sync::oneshot; +#[derive(Debug)] pub enum Command { + Dial { + multiaddr: Multiaddr, + }, Disconnect { - peer_index: PeerIndex, + peer: PeerIndex, message: String, }, Ban { - peer_index: PeerIndex, + peer: PeerIndex, duration: Duration, reason: String, }, Report { - peer_index: PeerIndex, + peer: PeerIndex, behaviour: Behaviour, }, GetPeer { - peer_index: PeerIndex, + peer: PeerIndex, sender: oneshot::Sender>, }, GetConnectedPeers { @@ -36,22 +41,22 @@ pub enum Command { }, FilterBroadCast { protocol: SupportProtocols, - target: TargetSession, + target: BroadcastTarget, message: Bytes, quick: bool, }, } -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Default, Debug)] pub struct CommandSenderContext { - protocol: SupportProtocols, - ckb2023: bool, + protocol: Option, + ckb2023: Option, } -#[derive(Clone)] +#[derive(Clone, Default, Debug)] pub struct CommandSender { context: CommandSenderContext, - channel: mpsc::Sender, + channel: Option>, } impl CommandSender { @@ -60,16 +65,30 @@ impl CommandSender { ( Self { context: CommandSenderContext { - protocol: nc.protocol_id().into(), - ckb2023: nc.ckb2023(), + protocol: Some(nc.protocol_id().into()), + ckb2023: Some(nc.ckb2023()), }, - channel: command_sender, + channel: Some(command_sender), }, command_receiver, ) } + + pub fn with_mpsc_sender(mut self, mpsc_sender: mpsc::Sender) -> Self { + self.channel = Some(mpsc_sender); + self + } + pub fn with_ckb2023(mut self, ckb2023: bool) -> Self { + self.context.ckb2023 = Some(ckb2023); + self + } + pub fn with_protocol(mut self, protocol: SupportProtocols) -> Self { + self.context.protocol = Some(protocol); + self + } + pub fn send(&self, command: Command) -> Result<(), mpsc::error::SendError> { - self.channel.blocking_send(command) + self.channel.as_ref().unwrap().blocking_send(command) } pub fn try_send(&self, command: Command) { @@ -81,17 +100,17 @@ impl CommandSender { } pub fn protocol(&self) -> SupportProtocols { - self.context.protocol + self.context.protocol.unwrap() } pub fn ckb2023(&self) -> bool { - self.context.ckb2023 + self.context.ckb2023.unwrap() } pub fn get_peer(&self, peer: PeerIndex) -> Option { let (sender, receiver) = oneshot::channel(); match self.send(Command::GetPeer { - peer_index: peer, + peer, sender, }) { Ok(_) => receiver.blocking_recv().ok().flatten(), diff --git a/network/src/libp2p/mod.rs b/network/src/libp2p/mod.rs index 588db03a37..c55a77808c 100644 --- a/network/src/libp2p/mod.rs +++ b/network/src/libp2p/mod.rs @@ -17,16 +17,7 @@ pub use serde::{self, Deserialize, Serialize}; use ckb_spawn::Spawn; use tokio::{sync::mpsc}; - - - - -#[derive(Debug, Clone)] -pub enum Command { - Dial { multiaddr: Multiaddr }, - Disconnect { peer: PeerId, message: String }, - GetHeader, -} +pub use crate::Command; pub enum Event {} diff --git a/network/src/network.rs b/network/src/network.rs index 75f0adb27b..f7e0e70ee9 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -581,7 +581,7 @@ impl NetworkController { info!("Dialing {}", &multiaddr); handle.spawn_task(async move { let _ = command_sender - .send(libp2p::Command::Dial { multiaddr }) + .send(libp2p::Command::Dial { multiaddr: multiaddr.into() }) .await; }); } @@ -594,7 +594,7 @@ impl NetworkController { info!("Disconnecting {}", &peer); handle.spawn_task(async move { let _ = command_sender - .send(libp2p::Command::Disconnect { peer, message }) + .send(libp2p::Command::Disconnect { peer: peer.into(), message }) .await; }); } @@ -742,7 +742,7 @@ impl NetworkController { self.must_get_libp2p_controller() .command_sender .try_send(libp2p::Command::Disconnect { - peer: peer_id, + peer: peer_id.into(), message: "".to_string(), }) .expect("command receiver not closed"); diff --git a/network/src/peer.rs b/network/src/peer.rs index 90af5dfb58..8fb757dd74 100644 --- a/network/src/peer.rs +++ b/network/src/peer.rs @@ -6,6 +6,7 @@ use crate::{ use ipnetwork::IpNetwork; use libp2p::multiaddr::Protocol; use libp2p::{Multiaddr as Libp2pMultiaddr, PeerId as Libp2pPeerId}; +use p2p::service::TargetSession; use p2p::utils::{extract_peer_id, multiaddr_to_socketaddr}; use p2p::{secio::PeerId as TentaclePeerId, SessionId}; use serde::{Deserialize, Serialize}; @@ -417,3 +418,35 @@ impl ConnectionType { } } } + +pub enum BroadcastTarget { + Tentacle(TargetSession), +} + +impl std::fmt::Debug for BroadcastTarget { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self { + &BroadcastTarget::Tentacle(t) => match t { + TargetSession::All => write!(f, "tentacle target all"), + TargetSession::Filter(_) => write!(f, "tentacle target filter"), + TargetSession::Multi(_) => write!(f, "tentacle target multi"), + TargetSession::Single(s) => write!(f, "tentacle target single ({})", s), + }, + } + } +} + +impl From for BroadcastTarget { + fn from(t: TargetSession) -> Self { + Self::Tentacle(t) + } +} + +impl TryFrom for TargetSession { + type Error = String; + fn try_from(t: BroadcastTarget) -> Result { + match t { + BroadcastTarget::Tentacle(t) => Ok(t), + } + } +} diff --git a/network/src/tentacle/protocols/mod.rs b/network/src/tentacle/protocols/mod.rs index 16101fb2df..e65dec3d17 100644 --- a/network/src/tentacle/protocols/mod.rs +++ b/network/src/tentacle/protocols/mod.rs @@ -119,6 +119,9 @@ pub trait CKBProtocolContext: Send { } async fn process_command(&self, command: Command) { match command { + Command::Dial { multiaddr: _ } => { + todo!("Implement dial for tentacle"); + } Command::SendMessage { protocol, peer_index, @@ -130,25 +133,25 @@ pub trait CKBProtocolContext: Send { }; } Command::Ban { - peer_index, + peer, duration, reason, - } => self.ban_peer(peer_index, duration, reason), + } => self.ban_peer(peer, duration, reason), Command::Disconnect { - peer_index, + peer, message, } => { - let result = self.disconnect(peer_index, &message); + let result = self.disconnect(peer, &message); if let Err(e) = result { - debug!("Failed to disconnect from peer {}: {:?}", peer_index, e) + debug!("Failed to disconnect from peer {}: {:?}", peer, e) }; } - Command::GetPeer { peer_index, sender } => { - let result = sender.send(self.get_peer(peer_index)); + Command::GetPeer { peer, sender } => { + let result = sender.send(self.get_peer(peer)); if let Err(e) = result { debug!( "Failed to send response of get_peer (peer: {}): {:?}", - peer_index, e + peer, e ); }; } @@ -159,9 +162,9 @@ pub trait CKBProtocolContext: Send { }; } Command::Report { - peer_index, + peer, behaviour, - } => self.report_peer(peer_index, behaviour), + } => self.report_peer(peer, behaviour), Command::FilterBroadCast { // TODO: need to send message to the specific protocol. protocol: _, @@ -169,6 +172,7 @@ pub trait CKBProtocolContext: Send { message, quick, } => { + let target = target.try_into().expect("Must be a tentacle broadcast target"); let result = if quick { self.quick_filter_broadcast(target, message) } else { diff --git a/sync/src/filter/mod.rs b/sync/src/filter/mod.rs index 53c430e793..be41776475 100644 --- a/sync/src/filter/mod.rs +++ b/sync/src/filter/mod.rs @@ -91,7 +91,7 @@ impl BlockFilter { status ); command_sender.must_send(Command::Ban { - peer_index: peer, + peer, duration: ban_time, reason: status.to_string(), }); diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index bac6bc3ddb..b34147ad8f 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -218,7 +218,7 @@ impl Relayer { status ); command_sender.must_send(Command::Ban { - peer_index: peer, + peer, duration: ban_time, reason: status.to_string(), }); @@ -330,7 +330,7 @@ impl Relayer { .collect(); if let Err(err) = command_sender.send(Command::FilterBroadCast { protocol: command_sender.protocol(), - target: TargetSession::Multi(Box::new(selected_peers.into_iter())), + target: TargetSession::Multi(Box::new(selected_peers.into_iter())).into(), message: message.as_bytes(), quick: true, }) { @@ -382,7 +382,7 @@ impl Relayer { protocol: SupportProtocols::LightClient, target: TargetSession::Filter(Box::new(move |id| { light_client_peers.contains(&id.into()) - })), + })).into(), message: light_client_message.as_bytes(), quick: false, }) { diff --git a/sync/src/relayer/transactions_process.rs b/sync/src/relayer/transactions_process.rs index c7b616c8b1..8a089784b8 100644 --- a/sync/src/relayer/transactions_process.rs +++ b/sync/src/relayer/transactions_process.rs @@ -71,7 +71,7 @@ impl<'a> TransactionsProcess<'a> { .any(|(_, declared_cycles)| declared_cycles > &max_block_cycles) { self.command_sender.must_send(Command::Ban { - peer_index: self.peer, + peer: self.peer, duration: DEFAULT_BAN_TIME, reason: String::from("relay declared cycles greater than max_block_cycles"), }); diff --git a/sync/src/synchronizer/headers_process.rs b/sync/src/synchronizer/headers_process.rs index e945098e8b..a9ee37b970 100644 --- a/sync/src/synchronizer/headers_process.rs +++ b/sync/src/synchronizer/headers_process.rs @@ -203,7 +203,7 @@ impl<'a> HeadersProcess<'a> { { debug!("Disconnect peer({}) is unprotected outbound", self.peer); if let Err(err) = self.command_sender.send(Command::Disconnect { - peer_index: self.peer, + peer: self.peer, message: "useless outbound peer in IBD".to_string(), }) { return StatusCode::Network.with_context(format!("Disconnect error: {err:?}")); diff --git a/sync/src/synchronizer/in_ibd_process.rs b/sync/src/synchronizer/in_ibd_process.rs index 3cf1b42dc0..cdf7006e90 100644 --- a/sync/src/synchronizer/in_ibd_process.rs +++ b/sync/src/synchronizer/in_ibd_process.rs @@ -35,7 +35,7 @@ impl<'a> InIBDProcess<'a> { if state.peer_flags.is_whitelist { self.synchronizer.shared().state().suspend_sync(state); } else if let Err(err) = self.command_sender.send(Command::Disconnect { - peer_index: self.peer, + peer: self.peer, message: "outbound in ibd".to_string(), }) { return StatusCode::Network.with_context(format!("Disconnect error: {err:?}")); diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index a5bf3bd4db..f61a3ac28b 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -316,7 +316,7 @@ impl Synchronizer { item_name, peer, ban_time, status ); command_sender.must_send(Command::Ban { - peer_index: peer, + peer, duration: ban_time, reason: status.to_string(), }); @@ -539,7 +539,7 @@ impl Synchronizer { for peer in eviction { info!("timeout eviction peer={}", peer); if let Err(err) = command_sender.send(Command::Disconnect { - peer_index: peer, + peer, message: "sync timeout eviction".to_string(), }) { debug!("synchronizer disconnect error: {:?}", err); @@ -667,7 +667,7 @@ impl Synchronizer { continue; } command_sender.must_send(Command::Disconnect { - peer_index: *peer, + peer: *peer, message: "sync disconnect".to_string(), }); } diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 6b53669ebf..46e46c79b6 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -16,6 +16,7 @@ use ckb_channel::Receiver; use ckb_jsonrpc_types::ScriptHashType; use ckb_light_client_protocol_server::LightClientProtocol; use ckb_logger::info; +use ckb_network::CommandSender; use ckb_network::{ libp2p::NetworkController as Libp2pNetworkController, observe_listen_port_occupancy, CKBProtocol, Flags, NetworkController, NetworkState, SupportProtocols, TentacleNetworkService, @@ -307,6 +308,8 @@ impl Launcher { &support_protocols, &required_protocols, synchronizer.clone(), + CommandSender::default() + .with_mpsc_sender(command_sender.clone()) ); let libp2p_network_controller = Libp2pNetworkController::new::( shared.async_handle(), diff --git a/util/launcher/src/temp/libp2p/mod.rs b/util/launcher/src/temp/libp2p/mod.rs index ad98e30a54..28b71efb27 100644 --- a/util/launcher/src/temp/libp2p/mod.rs +++ b/util/launcher/src/temp/libp2p/mod.rs @@ -2,8 +2,11 @@ pub mod reqresp; pub mod sync; +use ckb_network::CommandSender; +use ckb_network::Multiaddr; use ckb_network::NetworkState; +use ckb_network::PeerIndex; use ckb_network::SupportProtocols; use ckb_async_runtime::Handle; @@ -61,6 +64,7 @@ pub fn new_swarm( supported_protocols: &[SupportProtocols], _required_protocol_ids: &[SupportProtocols], synchronizer: Synchronizer, + command_sender: CommandSender, ) -> Swarm { info!("supported protocols {:?}", supported_protocols); let priv_key_bytes: [u8; 32] = network_state @@ -111,6 +115,7 @@ pub fn new_swarm( )], sync::Config::default(), synchronizer, + command_sender, )) } else { None @@ -314,13 +319,34 @@ impl NetworkServiceTrait for NetworkService { async fn handle_command(&mut self, command: Command) { match command { Command::Dial { multiaddr } => { + let multiaddr = match multiaddr { + Multiaddr::Libp2p(multiaddr) => multiaddr, + Multiaddr::Tentacle(multiaddr) => { + error!( + "Trying to dial tentacle peer {} while libp2p address is expected", + multiaddr + ); + return; + } + }; + if let Err(error) = self.swarm.dial(multiaddr.clone()) { - error!("Dialing libp2p peer {} failed: {}", multiaddr, error); + error!("Dialing libp2p peer {} failed: {}", &multiaddr, error); } else { - info!("Dialing libp2p peer {} succeeded", multiaddr); + info!("Dialing libp2p peer {} succeeded", &multiaddr); } } Command::Disconnect { peer, message } => { + let peer: PeerId = match peer { + PeerIndex::Libp2p(peer_id) => peer_id, + PeerIndex::Tentacle(peer_id) => { + error!( + "Trying to disconnect tentacle peer {} while libp2p peer is expected", + peer_id + ); + return; + } + }; let disconnect_message = &mut self.swarm.behaviour_mut().disconnect_message; if !disconnect_message.is_enabled() { return; @@ -334,22 +360,8 @@ impl NetworkServiceTrait for NetworkService { peer, request_id ); } - Command::GetHeader => { - info!("GetHeader command received, trying to send sync request"); - let sync = &mut self.swarm.behaviour_mut().sync; - if !sync.is_enabled() { - return; - } - let sync = sync.as_mut().unwrap(); - - // TODO: get peers from peer_store - let peers: Vec = vec![]; - info!("Fetched peers from network state {:?}", peers); - for peer in peers { - let request_id = - &sync.send_request(&peer, SyncRequest("hello world".to_string())); - info!("Sync message send to {}, request_id {:?}", peer, request_id); - } + _ => { + todo!("handle command {:?}", command); } } } diff --git a/util/launcher/src/temp/libp2p/sync.rs b/util/launcher/src/temp/libp2p/sync.rs index 951b8325ae..52ab0f4643 100644 --- a/util/launcher/src/temp/libp2p/sync.rs +++ b/util/launcher/src/temp/libp2p/sync.rs @@ -67,7 +67,8 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] use ckb_async_runtime::tokio::time; -use ckb_logger::{debug, warn, info}; +use ckb_logger::{debug, info, warn}; +use ckb_network::CommandSender; use ckb_sync::Synchronizer; pub use libp2p::request_response::Codec; @@ -349,6 +350,8 @@ where { /// The synchronizer that keep sync state and drive the sync protocol. synchronizer: Synchronizer, + /// The command sender to send command to the backend network stack. + command_sender: CommandSender, /// The supported inbound protocols. inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, /// The supported outbound protocols. @@ -382,11 +385,22 @@ where TCodec: Codec + Default + Clone + Send + 'static, { /// Creates a new `Behaviour` for the given protocols and configuration, using [`Default`] to construct the codec. - pub fn new(protocols: I, cfg: Config, synchronizer: Synchronizer) -> Self + pub fn new( + protocols: I, + cfg: Config, + synchronizer: Synchronizer, + command_sender: CommandSender, + ) -> Self where I: IntoIterator, { - Self::with_codec(TCodec::default(), protocols, cfg, synchronizer) + Self::with_codec( + TCodec::default(), + protocols, + cfg, + synchronizer, + command_sender, + ) } } @@ -401,6 +415,7 @@ where protocols: I, cfg: Config, synchronizer: Synchronizer, + command_sender: CommandSender, ) -> Self where I: IntoIterator, @@ -418,6 +433,7 @@ where let get_headers_timer = time::interval(cfg.get_headers_interval); Behaviour { synchronizer, + command_sender, inbound_protocols, outbound_protocols, next_outbound_request_id: OutboundRequestId(1), @@ -985,7 +1001,10 @@ where } } - fn poll(&mut self, cx: &mut Context<'_>) -> Poll>> { + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(ev); } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {