Skip to content

Commit

Permalink
use one Command for all protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Jan 4, 2024
1 parent 2001cee commit f137b50
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 73 deletions.
55 changes: 37 additions & 18 deletions network/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
use crate::CKBProtocolContext;
use crate::peer::BroadcastTarget;
use crate::{Behaviour, Peer, PeerIndex, SupportProtocols};
use ckb_logger::debug;
use p2p::bytes::Bytes;
use p2p::service::TargetSession;
use crate::Multiaddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::oneshot;

#[derive(Debug)]
pub enum Command {
Dial {
multiaddr: Multiaddr,
},
Disconnect {
peer_index: PeerIndex,
peer: PeerIndex,
message: String,
},
Ban {
peer_index: PeerIndex,
peer: PeerIndex,
duration: Duration,
reason: String,
},
Report {
peer_index: PeerIndex,
peer: PeerIndex,
behaviour: Behaviour,
},
GetPeer {
peer_index: PeerIndex,
peer: PeerIndex,
sender: oneshot::Sender<Option<Peer>>,
},
GetConnectedPeers {
Expand All @@ -36,22 +41,22 @@ pub enum Command {
},
FilterBroadCast {
protocol: SupportProtocols,
target: TargetSession,
target: BroadcastTarget,
message: Bytes,
quick: bool,
},
}

#[derive(Clone, Copy)]
#[derive(Clone, Copy, Default, Debug)]
pub struct CommandSenderContext {
protocol: SupportProtocols,
ckb2023: bool,
protocol: Option<SupportProtocols>,
ckb2023: Option<bool>,
}

#[derive(Clone)]
#[derive(Clone, Default, Debug)]
pub struct CommandSender {
context: CommandSenderContext,
channel: mpsc::Sender<Command>,
channel: Option<mpsc::Sender<Command>>,
}

impl CommandSender {
Expand All @@ -60,16 +65,30 @@ impl CommandSender {
(
Self {
context: CommandSenderContext {
protocol: nc.protocol_id().into(),
ckb2023: nc.ckb2023(),
protocol: Some(nc.protocol_id().into()),
ckb2023: Some(nc.ckb2023()),
},
channel: command_sender,
channel: Some(command_sender),
},
command_receiver,
)
}

pub fn with_mpsc_sender(mut self, mpsc_sender: mpsc::Sender<Command>) -> Self {
self.channel = Some(mpsc_sender);
self
}
pub fn with_ckb2023(mut self, ckb2023: bool) -> Self {
self.context.ckb2023 = Some(ckb2023);
self
}
pub fn with_protocol(mut self, protocol: SupportProtocols) -> Self {
self.context.protocol = Some(protocol);
self
}

pub fn send(&self, command: Command) -> Result<(), mpsc::error::SendError<Command>> {
self.channel.blocking_send(command)
self.channel.as_ref().unwrap().blocking_send(command)
}

pub fn try_send(&self, command: Command) {
Expand All @@ -81,17 +100,17 @@ impl CommandSender {
}

pub fn protocol(&self) -> SupportProtocols {
self.context.protocol
self.context.protocol.unwrap()
}

pub fn ckb2023(&self) -> bool {
self.context.ckb2023
self.context.ckb2023.unwrap()
}

pub fn get_peer(&self, peer: PeerIndex) -> Option<Peer> {
let (sender, receiver) = oneshot::channel();
match self.send(Command::GetPeer {
peer_index: peer,
peer,
sender,
}) {
Ok(_) => receiver.blocking_recv().ok().flatten(),
Expand Down
11 changes: 1 addition & 10 deletions network/src/libp2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,7 @@ pub use serde::{self, Deserialize, Serialize};

use ckb_spawn::Spawn;
use tokio::{sync::mpsc};




#[derive(Debug, Clone)]
pub enum Command {
Dial { multiaddr: Multiaddr },
Disconnect { peer: PeerId, message: String },
GetHeader,
}
pub use crate::Command;

pub enum Event {}

Expand Down
6 changes: 3 additions & 3 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ impl NetworkController {
info!("Dialing {}", &multiaddr);
handle.spawn_task(async move {
let _ = command_sender
.send(libp2p::Command::Dial { multiaddr })
.send(libp2p::Command::Dial { multiaddr: multiaddr.into() })
.await;
});
}
Expand All @@ -594,7 +594,7 @@ impl NetworkController {
info!("Disconnecting {}", &peer);
handle.spawn_task(async move {
let _ = command_sender
.send(libp2p::Command::Disconnect { peer, message })
.send(libp2p::Command::Disconnect { peer: peer.into(), message })
.await;
});
}
Expand Down Expand Up @@ -742,7 +742,7 @@ impl NetworkController {
self.must_get_libp2p_controller()
.command_sender
.try_send(libp2p::Command::Disconnect {
peer: peer_id,
peer: peer_id.into(),
message: "".to_string(),
})
.expect("command receiver not closed");
Expand Down
33 changes: 33 additions & 0 deletions network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
use ipnetwork::IpNetwork;
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr as Libp2pMultiaddr, PeerId as Libp2pPeerId};
use p2p::service::TargetSession;
use p2p::utils::{extract_peer_id, multiaddr_to_socketaddr};
use p2p::{secio::PeerId as TentaclePeerId, SessionId};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -417,3 +418,35 @@ impl ConnectionType {
}
}
}

pub enum BroadcastTarget {
Tentacle(TargetSession),
}

impl std::fmt::Debug for BroadcastTarget {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
&BroadcastTarget::Tentacle(t) => match t {
TargetSession::All => write!(f, "tentacle target all"),
TargetSession::Filter(_) => write!(f, "tentacle target filter"),
TargetSession::Multi(_) => write!(f, "tentacle target multi"),
TargetSession::Single(s) => write!(f, "tentacle target single ({})", s),
},
}
}
}

impl From<TargetSession> for BroadcastTarget {
fn from(t: TargetSession) -> Self {
Self::Tentacle(t)
}
}

impl TryFrom<BroadcastTarget> for TargetSession {
type Error = String;
fn try_from(t: BroadcastTarget) -> Result<Self, Self::Error> {
match t {
BroadcastTarget::Tentacle(t) => Ok(t),
}
}
}
24 changes: 14 additions & 10 deletions network/src/tentacle/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ pub trait CKBProtocolContext: Send {
}
async fn process_command(&self, command: Command) {
match command {
Command::Dial { multiaddr: _ } => {
todo!("Implement dial for tentacle");
}
Command::SendMessage {
protocol,
peer_index,
Expand All @@ -130,25 +133,25 @@ pub trait CKBProtocolContext: Send {
};
}
Command::Ban {
peer_index,
peer,
duration,
reason,
} => self.ban_peer(peer_index, duration, reason),
} => self.ban_peer(peer, duration, reason),
Command::Disconnect {
peer_index,
peer,
message,
} => {
let result = self.disconnect(peer_index, &message);
let result = self.disconnect(peer, &message);
if let Err(e) = result {
debug!("Failed to disconnect from peer {}: {:?}", peer_index, e)
debug!("Failed to disconnect from peer {}: {:?}", peer, e)
};
}
Command::GetPeer { peer_index, sender } => {
let result = sender.send(self.get_peer(peer_index));
Command::GetPeer { peer, sender } => {
let result = sender.send(self.get_peer(peer));
if let Err(e) = result {
debug!(
"Failed to send response of get_peer (peer: {}): {:?}",
peer_index, e
peer, e
);
};
}
Expand All @@ -159,16 +162,17 @@ pub trait CKBProtocolContext: Send {
};
}
Command::Report {
peer_index,
peer,
behaviour,
} => self.report_peer(peer_index, behaviour),
} => self.report_peer(peer, behaviour),
Command::FilterBroadCast {
// TODO: need to send message to the specific protocol.
protocol: _,
target,
message,
quick,
} => {
let target = target.try_into().expect("Must be a tentacle broadcast target");
let result = if quick {
self.quick_filter_broadcast(target, message)
} else {
Expand Down
2 changes: 1 addition & 1 deletion sync/src/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl BlockFilter {
status
);
command_sender.must_send(Command::Ban {
peer_index: peer,
peer,
duration: ban_time,
reason: status.to_string(),
});
Expand Down
6 changes: 3 additions & 3 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl Relayer {
status
);
command_sender.must_send(Command::Ban {
peer_index: peer,
peer,
duration: ban_time,
reason: status.to_string(),
});
Expand Down Expand Up @@ -330,7 +330,7 @@ impl Relayer {
.collect();
if let Err(err) = command_sender.send(Command::FilterBroadCast {
protocol: command_sender.protocol(),
target: TargetSession::Multi(Box::new(selected_peers.into_iter())),
target: TargetSession::Multi(Box::new(selected_peers.into_iter())).into(),
message: message.as_bytes(),
quick: true,
}) {
Expand Down Expand Up @@ -382,7 +382,7 @@ impl Relayer {
protocol: SupportProtocols::LightClient,
target: TargetSession::Filter(Box::new(move |id| {
light_client_peers.contains(&id.into())
})),
})).into(),
message: light_client_message.as_bytes(),
quick: false,
}) {
Expand Down
2 changes: 1 addition & 1 deletion sync/src/relayer/transactions_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<'a> TransactionsProcess<'a> {
.any(|(_, declared_cycles)| declared_cycles > &max_block_cycles)
{
self.command_sender.must_send(Command::Ban {
peer_index: self.peer,
peer: self.peer,
duration: DEFAULT_BAN_TIME,
reason: String::from("relay declared cycles greater than max_block_cycles"),
});
Expand Down
2 changes: 1 addition & 1 deletion sync/src/synchronizer/headers_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl<'a> HeadersProcess<'a> {
{
debug!("Disconnect peer({}) is unprotected outbound", self.peer);
if let Err(err) = self.command_sender.send(Command::Disconnect {
peer_index: self.peer,
peer: self.peer,
message: "useless outbound peer in IBD".to_string(),
}) {
return StatusCode::Network.with_context(format!("Disconnect error: {err:?}"));
Expand Down
2 changes: 1 addition & 1 deletion sync/src/synchronizer/in_ibd_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl<'a> InIBDProcess<'a> {
if state.peer_flags.is_whitelist {
self.synchronizer.shared().state().suspend_sync(state);
} else if let Err(err) = self.command_sender.send(Command::Disconnect {
peer_index: self.peer,
peer: self.peer,
message: "outbound in ibd".to_string(),
}) {
return StatusCode::Network.with_context(format!("Disconnect error: {err:?}"));
Expand Down
6 changes: 3 additions & 3 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl Synchronizer {
item_name, peer, ban_time, status
);
command_sender.must_send(Command::Ban {
peer_index: peer,
peer,
duration: ban_time,
reason: status.to_string(),
});
Expand Down Expand Up @@ -539,7 +539,7 @@ impl Synchronizer {
for peer in eviction {
info!("timeout eviction peer={}", peer);
if let Err(err) = command_sender.send(Command::Disconnect {
peer_index: peer,
peer,
message: "sync timeout eviction".to_string(),
}) {
debug!("synchronizer disconnect error: {:?}", err);
Expand Down Expand Up @@ -667,7 +667,7 @@ impl Synchronizer {
continue;
}
command_sender.must_send(Command::Disconnect {
peer_index: *peer,
peer: *peer,
message: "sync disconnect".to_string(),
});
}
Expand Down
3 changes: 3 additions & 0 deletions util/launcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use ckb_channel::Receiver;
use ckb_jsonrpc_types::ScriptHashType;
use ckb_light_client_protocol_server::LightClientProtocol;
use ckb_logger::info;
use ckb_network::CommandSender;
use ckb_network::{
libp2p::NetworkController as Libp2pNetworkController, observe_listen_port_occupancy,
CKBProtocol, Flags, NetworkController, NetworkState, SupportProtocols, TentacleNetworkService,
Expand Down Expand Up @@ -307,6 +308,8 @@ impl Launcher {
&support_protocols,
&required_protocols,
synchronizer.clone(),
CommandSender::default()
.with_mpsc_sender(command_sender.clone())
);
let libp2p_network_controller = Libp2pNetworkController::new::<NetworkService>(
shared.async_handle(),
Expand Down
Loading

0 comments on commit f137b50

Please sign in to comment.