Skip to content

Commit

Permalink
use a dedicated sync Behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Jan 3, 2024
1 parent 402e9c8 commit 2001cee
Show file tree
Hide file tree
Showing 4 changed files with 1,870 additions and 56 deletions.
1 change: 1 addition & 0 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ impl BlockFetchCMD {
}

/// Sync protocol handle
#[derive(Clone)]
pub struct Synchronizer {
pub(crate) chain: ChainController,
/// Sync shared state
Expand Down
14 changes: 7 additions & 7 deletions util/launcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use ckb_jsonrpc_types::ScriptHashType;
use ckb_light_client_protocol_server::LightClientProtocol;
use ckb_logger::info;
use ckb_network::{
libp2p::NetworkController as Libp2pNetworkController, observe_listen_port_occupancy, CKBProtocol, Flags, NetworkController, NetworkState,
SupportProtocols, TentacleNetworkService,
libp2p::NetworkController as Libp2pNetworkController, observe_listen_port_occupancy,
CKBProtocol, Flags, NetworkController, NetworkState, SupportProtocols, TentacleNetworkService,
};
use ckb_network_alert::alert_relayer::AlertRelayer;
use ckb_proposal_table::ProposalTable;
Expand Down Expand Up @@ -270,6 +270,9 @@ impl Launcher {
self.args.config.tmp_dir.as_ref(),
relay_tx_receiver,
));
// Sync is a core protocol, user cannot disable it via config
let synchronizer = Synchronizer::new(chain_controller.clone(), Arc::clone(&sync_shared));

let fork_enable = {
let epoch = shared.snapshot().tip_header().epoch().number();
shared
Expand Down Expand Up @@ -303,7 +306,7 @@ impl Launcher {
Arc::clone(&network_state),
&support_protocols,
&required_protocols,
command_sender.clone(),
synchronizer.clone(),
);
let libp2p_network_controller = Libp2pNetworkController::new::<NetworkService>(
shared.async_handle(),
Expand All @@ -316,12 +319,9 @@ impl Launcher {

// TODO: make alert_notifier, alert_verifier independent of tentacle.
let (tentacle_network_controller, alert_notifier, alert_verifier) = {
// Sync is a core protocol, user cannot disable it via config
let synchronizer =
Synchronizer::new(chain_controller.clone(), Arc::clone(&sync_shared));
let mut protocols = vec![CKBProtocol::new_with_support_protocol(
SupportProtocols::Sync,
Box::new(synchronizer),
Box::new(synchronizer.clone()),
Arc::clone(&network_state),
)];

Expand Down
84 changes: 35 additions & 49 deletions util/launcher/src/temp/libp2p/mod.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
pub mod reqresp;

pub mod sync;

use ckb_network::NetworkState;

use ckb_network::SupportProtocols;

use ckb_async_runtime::Handle;
use ckb_logger::{debug, error, info, trace};
use ckb_network::async_trait;
use ckb_stop_handler::new_tokio_exit_rx;
use ckb_sync::Synchronizer;

use ::libp2p::request_response::{
Config as ReqRespConfig, Event as ReqRespEvent, Message as ReqRespMessage,
ProtocolSupport as ReqRespProtocolSupport,
};
use ckb_network::libp2p::{
futures::StreamExt,
identify, identity, noise, ping,
request_response::{self, ProtocolSupport},
serde,
swarm::behaviour::toggle::Toggle,
swarm::NetworkBehaviour,
swarm::SwarmEvent,
tcp, yamux, Command, Deserialize, NetworkServiceTrait, PeerId, Serialize, StreamProtocol,
Swarm, SwarmBuilder,
futures::StreamExt, identify, identity, noise, ping, serde, swarm::behaviour::toggle::Toggle,
swarm::NetworkBehaviour, swarm::SwarmEvent, tcp, yamux, Command, Deserialize,
NetworkServiceTrait, PeerId, Serialize, StreamProtocol, Swarm, SwarmBuilder,
};
use core::time::Duration;

use ckb_network::tokio::{select, sync::mpsc, time};
use ckb_spawn::Spawn;
use ckb_network::tokio::{select, sync::mpsc};

use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand All @@ -46,9 +45,12 @@ pub struct MyBehaviour {
identify: identify::Behaviour,
ping: Toggle<ping::Behaviour>,
disconnect_message: Toggle<
request_response::cbor::Behaviour<DisconnectMessageRequest, DisconnectMessageResponse>,
libp2p::request_response::cbor::Behaviour<
DisconnectMessageRequest,
DisconnectMessageResponse,
>,
>,
sync: Toggle<request_response::cbor::Behaviour<SyncRequest, SyncResponse>>,
sync: Toggle<sync::CborBehaviour<SyncRequest, SyncResponse>>,
}

pub fn new_swarm(
Expand All @@ -58,7 +60,7 @@ pub fn new_swarm(
network_state: Arc<NetworkState>,
supported_protocols: &[SupportProtocols],
_required_protocol_ids: &[SupportProtocols],
command_sender: mpsc::Sender<Command>,
synchronizer: Synchronizer,
) -> Swarm<MyBehaviour> {
info!("supported protocols {:?}", supported_protocols);
let priv_key_bytes: [u8; 32] = network_state
Expand Down Expand Up @@ -87,49 +89,32 @@ pub fn new_swarm(
let disconnect_message_supported =
supported_protocols.contains(&SupportProtocols::DisconnectMessage);
let disconenct_message_behaviour = Toggle::from(if disconnect_message_supported {
Some(request_response::cbor::Behaviour::new(
Some(libp2p::request_response::cbor::Behaviour::new(
[(
StreamProtocol::try_from_owned(SupportProtocols::DisconnectMessage.name())
.expect("Protocol of DisconnectMessage name start with /"),
ProtocolSupport::Full,
ReqRespProtocolSupport::Full,
)],
request_response::Config::default(),
ReqRespConfig::default(),
))
} else {
None
});

let sync_supported = supported_protocols.contains(&SupportProtocols::Sync);
let sync_behaviour = Toggle::from(if sync_supported {
Some(request_response::cbor::Behaviour::new(
Some(sync::CborBehaviour::new(
[(
StreamProtocol::try_from_owned(SupportProtocols::Sync.name())
.expect("Protocol of Sync name start with /"),
ProtocolSupport::Full,
sync::ProtocolSupport::Full,
)],
request_response::Config::default(),
sync::Config::default(),
synchronizer,
))
} else {
None
});
if sync_supported {
let command_sender = command_sender.clone();
handle.spawn_task(async move {
let mut interval = time::interval(Duration::from_secs(1));
let rx = new_tokio_exit_rx();
loop {
select! {
_ = interval.tick() => {
command_sender.send(Command::GetHeader).await.expect("receiver not dropped");
},
_ = rx.cancelled() => {
info!("Exit signal received, exit now");
break
},
}
}
});
}

let swarm = SwarmBuilder::with_existing_identity(keypair)
.with_tokio()
Expand Down Expand Up @@ -245,10 +230,11 @@ impl NetworkServiceTrait for NetworkService {
_ => {}
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::DisconnectMessage(
request_response::Event::Message { message, peer },
)) => match message {
request_response::Message::Request {
SwarmEvent::Behaviour(MyBehaviourEvent::DisconnectMessage(ReqRespEvent::Message {
message,
peer,
})) => match message {
ReqRespMessage::Request {
request, channel, ..
} => {
info!(
Expand All @@ -266,7 +252,7 @@ impl NetworkServiceTrait for NetworkService {
.send_response(channel, DisconnectMessageResponse("Ok, bye".to_string()));
let _ = self.swarm.disconnect_peer_id(peer);
}
request_response::Message::Response {
ReqRespMessage::Response {
request_id,
response,
} => {
Expand All @@ -276,11 +262,11 @@ impl NetworkServiceTrait for NetworkService {
);
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Sync(request_response::Event::Message {
SwarmEvent::Behaviour(MyBehaviourEvent::Sync(sync::Event::Message {
message,
peer,
})) => match message {
request_response::Message::Request {
sync::Message::Request {
request, channel, ..
} => {
info!(
Expand All @@ -296,7 +282,7 @@ impl NetworkServiceTrait for NetworkService {

let _ = sync.send_response(channel, SyncResponse("Got you".to_string()));
}
request_response::Message::Response {
sync::Message::Response {
request_id,
response,
} => {
Expand All @@ -307,7 +293,7 @@ impl NetworkServiceTrait for NetworkService {
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::DisconnectMessage(
request_response::Event::OutboundFailure {
ReqRespEvent::OutboundFailure {
request_id, error, ..
},
)) => {
Expand All @@ -317,7 +303,7 @@ impl NetworkServiceTrait for NetworkService {
);
}
SwarmEvent::Behaviour(MyBehaviourEvent::DisconnectMessage(
request_response::Event::ResponseSent { .. },
ReqRespEvent::ResponseSent { .. },
)) => {}
other => {
debug!("Unhandled {:?}", other);
Expand Down
Loading

0 comments on commit 2001cee

Please sign in to comment.