diff --git a/applications/daily_tests/helpers.js b/applications/daily_tests/helpers.js index 9cdd81ba3f..5ebe183f5f 100644 --- a/applications/daily_tests/helpers.js +++ b/applications/daily_tests/helpers.js @@ -39,7 +39,7 @@ const yargs = () => require("yargs")(hideBin(process.argv)); function sendWebhookNotification(channel, message, webhookUrlOverride = null) { const hook = webhookUrlOverride || getWebhookUrlFromEnv(); if (!hook) { - throw new Error("WEBHOOK_URL not specified"); + return; } const data = JSON.stringify({ channel, text: message }); const args = ` -i -X POST -H 'Content-Type: application/json' -d '${data}' ${hook}`; diff --git a/base_layer/core/src/base_node/rpc/service.rs b/base_layer/core/src/base_node/rpc/service.rs index d82be66a8f..c50600ea9c 100644 --- a/base_layer/core/src/base_node/rpc/service.rs +++ b/base_layer/core/src/base_node/rpc/service.rs @@ -312,7 +312,7 @@ impl BaseNodeWalletService for BaseNodeWalletRpc async fn get_tip_info(&self, _request: Request<()>) -> Result, RpcStatus> { let state_machine = self.state_machine(); let status_watch = state_machine.get_status_info_watch(); - let is_synced = match (*status_watch.borrow()).state_info { + let is_synced = match status_watch.borrow().state_info { StateInfo::Listening(li) => li.is_synced(), _ => false, }; diff --git a/base_layer/core/src/base_node/state_machine_service/states/listening.rs b/base_layer/core/src/base_node/state_machine_service/states/listening.rs index e484dd5f3c..0ea8157568 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/listening.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/listening.rs @@ -152,8 +152,9 @@ impl Listening { ); if !self.is_synced { + debug!(target: LOG_TARGET, "Initial sync achieved"); self.is_synced = true; - shared.set_state_info(StateInfo::Listening(ListeningInfo::new(self.is_synced))); + shared.set_state_info(StateInfo::Listening(ListeningInfo::new(true))); } continue; } @@ -222,7 +223,7 @@ impl Listening { impl From for Listening { fn from(_: Waiting) -> Self { - Default::default() + Self { is_synced: false } } } diff --git a/base_layer/wallet/src/base_node_service/mock_base_node_service.rs b/base_layer/wallet/src/base_node_service/mock_base_node_service.rs index 99b114c3c9..1bc57ed9d2 100644 --- a/base_layer/wallet/src/base_node_service/mock_base_node_service.rs +++ b/base_layer/wallet/src/base_node_service/mock_base_node_service.rs @@ -94,7 +94,6 @@ impl MockBaseNodeService { updated: None, latency: None, online, - base_node_peer: self.state.base_node_peer.clone(), } } @@ -106,12 +105,11 @@ impl MockBaseNodeService { updated: None, latency: None, online: OnlineStatus::Online, - base_node_peer: None, } } fn set_base_node_peer(&mut self, peer: Peer) { - self.state.base_node_peer = Some(peer); + self.base_node_peer = Some(peer); } /// This handler is called when requests arrive from the various streams @@ -125,7 +123,7 @@ impl MockBaseNodeService { Ok(BaseNodeServiceResponse::BaseNodePeerSet) }, BaseNodeServiceRequest::GetBaseNodePeer => { - let peer = self.state.base_node_peer.clone(); + let peer = self.base_node_peer.clone(); Ok(BaseNodeServiceResponse::BaseNodePeer(peer.map(Box::new))) }, BaseNodeServiceRequest::GetChainMetadata => Ok(BaseNodeServiceResponse::ChainMetadata( diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index 5153557ed4..a1005bb6c7 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -101,10 +101,8 @@ impl BaseNodeMonitor { loop { use OnlineStatus::*; match watcher.recv().await.unwrap_or(Offline) { - Online => match self.wallet_connectivity.get_current_base_node() { - Some(peer) => { - return peer; - }, + Online => match self.wallet_connectivity.get_current_base_node_id() { + Some(node_id) => return node_id, _ => continue, }, Connecting => { @@ -126,15 +124,8 @@ impl BaseNodeMonitor { .await .ok_or(BaseNodeMonitorError::NodeShuttingDown)?; let latency = client.get_last_request_latency().await?; - debug!( - target: LOG_TARGET, - "Base node {} latency: {} ms", - peer_node_id, - latency.unwrap_or_default().as_millis() - ); let tip_info = client.get_tip_info().await?; - let is_synced = tip_info.is_synced; let chain_metadata = tip_info .metadata @@ -143,15 +134,24 @@ impl BaseNodeMonitor { ChainMetadata::try_from(metadata).map_err(BaseNodeMonitorError::InvalidBaseNodeResponse) })?; + let is_synced = tip_info.is_synced; + debug!( + target: LOG_TARGET, + "Base node {} Tip: {} ({}) Latency: {} ms", + peer_node_id, + chain_metadata.height_of_longest_chain(), + if is_synced { "Synced" } else { "Syncing..." }, + latency.unwrap_or_default().as_millis() + ); + self.db.set_chain_metadata(chain_metadata.clone()).await?; - self.map_state(move |state| BaseNodeState { + self.map_state(move |_| BaseNodeState { chain_metadata: Some(chain_metadata), is_synced: Some(is_synced), updated: Some(Utc::now().naive_utc()), latency, online: OnlineStatus::Online, - base_node_peer: state.base_node_peer.clone(), }) .await; @@ -164,25 +164,23 @@ impl BaseNodeMonitor { } async fn set_connecting(&self) { - self.map_state(|state| BaseNodeState { + self.map_state(|_| BaseNodeState { chain_metadata: None, is_synced: None, updated: Some(Utc::now().naive_utc()), latency: None, online: OnlineStatus::Connecting, - base_node_peer: state.base_node_peer.clone(), }) .await; } async fn set_offline(&self) { - self.map_state(|state| BaseNodeState { + self.map_state(|_| BaseNodeState { chain_metadata: None, is_synced: None, updated: Some(Utc::now().naive_utc()), latency: None, online: OnlineStatus::Offline, - base_node_peer: state.base_node_peer.clone(), }) .await; } diff --git a/base_layer/wallet/src/base_node_service/service.rs b/base_layer/wallet/src/base_node_service/service.rs index 9aba348223..3da987c8b1 100644 --- a/base_layer/wallet/src/base_node_service/service.rs +++ b/base_layer/wallet/src/base_node_service/service.rs @@ -50,7 +50,7 @@ pub struct BaseNodeState { pub updated: Option, pub latency: Option, pub online: OnlineStatus, - pub base_node_peer: Option, + // pub base_node_peer: Option, } impl Default for BaseNodeState { @@ -61,7 +61,6 @@ impl Default for BaseNodeState { updated: None, latency: None, online: OnlineStatus::Connecting, - base_node_peer: None, } } } @@ -158,18 +157,7 @@ where T: WalletBackend + 'static } async fn set_base_node_peer(&mut self, peer: Peer) -> Result<(), BaseNodeServiceError> { - let new_state = BaseNodeState { - base_node_peer: Some(peer.clone()), - ..Default::default() - }; - - { - let mut lock = self.state.write().await; - *lock = new_state.clone(); - } - self.wallet_connectivity.set_base_node(peer.node_id.clone()).await?; - - self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state)); + self.wallet_connectivity.set_base_node(peer.clone()).await?; self.publish_event(BaseNodeEvent::BaseNodePeerSet(Box::new(peer))); Ok(()) } @@ -189,7 +177,7 @@ where T: WalletBackend + 'static Ok(BaseNodeServiceResponse::BaseNodePeerSet) }, BaseNodeServiceRequest::GetBaseNodePeer => { - let peer = self.get_state().await.base_node_peer.map(Box::new); + let peer = self.wallet_connectivity.get_current_base_node_peer().map(Box::new); Ok(BaseNodeServiceResponse::BaseNodePeer(peer)) }, BaseNodeServiceRequest::GetChainMetadata => match self.get_state().await.chain_metadata.clone() { diff --git a/base_layer/wallet/src/connectivity_service/handle.rs b/base_layer/wallet/src/connectivity_service/handle.rs index d596c543db..a43fea5333 100644 --- a/base_layer/wallet/src/connectivity_service/handle.rs +++ b/base_layer/wallet/src/connectivity_service/handle.rs @@ -26,27 +26,30 @@ use futures::{ channel::{mpsc, oneshot}, SinkExt, }; -use tari_comms::{peer_manager::NodeId, protocol::rpc::RpcClientLease}; +use tari_comms::{ + peer_manager::{NodeId, Peer}, + protocol::rpc::RpcClientLease, +}; use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient}; use tokio::sync::watch; pub enum WalletConnectivityRequest { ObtainBaseNodeWalletRpcClient(oneshot::Sender>), ObtainBaseNodeSyncRpcClient(oneshot::Sender>), - SetBaseNode(NodeId), + SetBaseNode(Box), } #[derive(Clone)] pub struct WalletConnectivityHandle { sender: mpsc::Sender, - base_node_watch_rx: watch::Receiver>, + base_node_watch_rx: watch::Receiver>, online_status_rx: watch::Receiver, } impl WalletConnectivityHandle { pub(super) fn new( sender: mpsc::Sender, - base_node_watch_rx: watch::Receiver>, + base_node_watch_rx: watch::Receiver>, online_status_rx: watch::Receiver, ) -> Self { Self { @@ -56,9 +59,9 @@ impl WalletConnectivityHandle { } } - pub async fn set_base_node(&mut self, base_node_peer: NodeId) -> Result<(), WalletConnectivityError> { + pub async fn set_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> { self.sender - .send(WalletConnectivityRequest::SetBaseNode(base_node_peer)) + .send(WalletConnectivityRequest::SetBaseNode(Box::new(base_node_peer))) .await?; Ok(()) } @@ -110,7 +113,11 @@ impl WalletConnectivityHandle { self.online_status_rx.clone() } - pub fn get_current_base_node(&self) -> Option { + pub fn get_current_base_node_peer(&self) -> Option { self.base_node_watch_rx.borrow().clone() } + + pub fn get_current_base_node_id(&self) -> Option { + self.base_node_watch_rx.borrow().as_ref().map(|p| p.node_id.clone()) + } } diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index f71c1ae598..373dd069a1 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -33,7 +33,7 @@ use futures::{ use log::*; use tari_comms::{ connectivity::ConnectivityRequester, - peer_manager::NodeId, + peer_manager::{NodeId, Peer}, protocol::rpc::{RpcClientLease, RpcClientPool}, }; use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient}; @@ -53,7 +53,7 @@ pub struct WalletConnectivityService { config: BaseNodeServiceConfig, request_stream: Fuse>, connectivity: ConnectivityRequester, - base_node_watch: Watch>, + base_node_watch: Watch>, pools: Option, online_status_watch: Watch, pending_requests: Vec, @@ -68,7 +68,7 @@ impl WalletConnectivityService { pub(super) fn new( config: BaseNodeServiceConfig, request_stream: mpsc::Receiver, - base_node_watch: Watch>, + base_node_watch: Watch>, online_status_watch: Watch, connectivity: ConnectivityRequester, ) -> Self { @@ -91,10 +91,10 @@ impl WalletConnectivityService { req = self.request_stream.select_next_some() => { self.handle_request(req).await; }, - peer = base_node_watch_rx.select_next_some() => { - if let Some(peer) = peer { + maybe_peer = base_node_watch_rx.select_next_some() => { + if maybe_peer.is_some() { // This will block the rest until the connection is established. This is what we want. - self.setup_base_node_connection(peer).await; + self.setup_base_node_connection().await; } } } @@ -112,7 +112,7 @@ impl WalletConnectivityService { }, SetBaseNode(peer) => { - self.set_base_node_peer(peer); + self.set_base_node_peer(*peer); }, } } @@ -197,25 +197,29 @@ impl WalletConnectivityService { self.set_base_node_peer(peer); } - fn set_base_node_peer(&mut self, peer: NodeId) { + fn set_base_node_peer(&mut self, peer: Peer) { self.pools = None; self.base_node_watch.broadcast(Some(peer)); } - async fn setup_base_node_connection(&mut self, peer: NodeId) { + async fn setup_base_node_connection(&mut self) { self.pools = None; loop { + let node_id = match self.base_node_watch.borrow().as_ref() { + Some(p) => p.node_id.clone(), + None => return, + }; debug!( target: LOG_TARGET, - "Attempting to connect to base node peer {}...", peer + "Attempting to connect to base node peer {}...", node_id ); self.set_online_status(OnlineStatus::Connecting); - match self.try_setup_rpc_pool(peer.clone()).await { + match self.try_setup_rpc_pool(node_id.clone()).await { Ok(_) => { self.set_online_status(OnlineStatus::Online); debug!( target: LOG_TARGET, - "Wallet is ONLINE and connected to base node {}", peer + "Wallet is ONLINE and connected to base node {}", node_id ); break; }, diff --git a/base_layer/wallet/src/connectivity_service/test.rs b/base_layer/wallet/src/connectivity_service/test.rs index 35ca5ee95d..a2092453e9 100644 --- a/base_layer/wallet/src/connectivity_service/test.rs +++ b/base_layer/wallet/src/connectivity_service/test.rs @@ -53,6 +53,7 @@ async fn setup() -> ( let handle = WalletConnectivityHandle::new(tx, base_node_watch.get_receiver(), online_status_watch.get_receiver()); let (connectivity, mock) = create_connectivity_mock(); let mock_state = mock.spawn(); + // let peer_manager = create_peer_manager(tempdir().unwrap()); let service = WalletConnectivityService::new( Default::default(), rx, @@ -78,7 +79,7 @@ async fn it_dials_peer_when_base_node_is_set() { // Set the mock to defer returning a result for the peer connection mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.node_id().clone()).await.unwrap(); + handle.set_base_node(base_node_peer.to_peer()).await.unwrap(); // Wait for connection request mock_state.await_call_count(1).await; @@ -101,7 +102,7 @@ async fn it_resolves_many_pending_rpc_session_requests() { mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.node_id().clone()).await.unwrap(); + handle.set_base_node(base_node_peer.to_peer()).await.unwrap(); let pending_requests = iter::repeat_with(|| { let mut handle = handle.clone(); @@ -133,7 +134,7 @@ async fn it_changes_to_a_new_base_node() { mock_state.add_active_connection(conn2).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer1.node_id().clone()).await.unwrap(); + handle.set_base_node(base_node_peer1.to_peer()).await.unwrap(); mock_state.await_call_count(2).await; mock_state.expect_dial_peer(base_node_peer1.node_id()).await; @@ -144,7 +145,7 @@ async fn it_changes_to_a_new_base_node() { assert!(rpc_client.is_connected()); // Initiate a connection to the base node - handle.set_base_node(base_node_peer2.node_id().clone()).await.unwrap(); + handle.set_base_node(base_node_peer2.to_peer()).await.unwrap(); mock_state.await_call_count(2).await; mock_state.expect_dial_peer(base_node_peer2.node_id()).await; @@ -164,7 +165,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() { mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.node_id().clone()).await.unwrap(); + handle.set_base_node(base_node_peer.to_peer()).await.unwrap(); // Now a connection will given to the service mock_state.add_active_connection(conn.clone()).await; @@ -204,7 +205,7 @@ async fn it_gracefully_handles_multiple_connection_failures() { let conn = mock_server.create_mockimpl_connection(base_node_peer.to_peer()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.node_id().clone()).await.unwrap(); + handle.set_base_node(base_node_peer.to_peer()).await.unwrap(); // Now a connection will given to the service mock_state.add_active_connection(conn.clone()).await; diff --git a/comms/rpc_macros/src/generator.rs b/comms/rpc_macros/src/generator.rs index b02b702edb..bb8c05a53b 100644 --- a/comms/rpc_macros/src/generator.rs +++ b/comms/rpc_macros/src/generator.rs @@ -196,7 +196,8 @@ impl RpcCodeGenerator { let client_struct_body = quote! { pub async fn connect(framed: #dep_mod::CanonicalFraming) -> Result where TSubstream: #dep_mod::AsyncRead + #dep_mod::AsyncWrite + Unpin + Send + 'static { - let inner = #dep_mod::RpcClient::connect(Default::default(), framed).await?; + use #dep_mod::NamedProtocolService; + let inner = #dep_mod::RpcClient::connect(Default::default(), framed, Self::PROTOCOL_NAME.into()).await?; Ok(Self { inner }) } diff --git a/comms/src/connection_manager/peer_connection.rs b/comms/src/connection_manager/peer_connection.rs index 3e15b50bb0..260befaeee 100644 --- a/comms/src/connection_manager/peer_connection.rs +++ b/comms/src/connection_manager/peer_connection.rs @@ -219,15 +219,15 @@ impl PeerConnection { #[cfg(feature = "rpc")] pub async fn connect_rpc_using_builder(&mut self, builder: RpcClientBuilder) -> Result where T: From + NamedProtocolService { - let protocol = T::PROTOCOL_NAME; + let protocol = ProtocolId::from_static(T::PROTOCOL_NAME); debug!( target: LOG_TARGET, "Attempting to establish RPC protocol `{}` to peer `{}`", - String::from_utf8_lossy(protocol), + String::from_utf8_lossy(&protocol), self.peer_node_id ); - let framed = self.open_framed_substream(&protocol.into(), RPC_MAX_FRAME_SIZE).await?; - builder.connect(framed).await + let framed = self.open_framed_substream(&protocol, RPC_MAX_FRAME_SIZE).await?; + builder.with_protocol_id(protocol).connect(framed).await } /// Creates a new RpcClientPool that can be shared between tasks. The client pool will lazily establish up to diff --git a/comms/src/protocol/rpc/client.rs b/comms/src/protocol/rpc/client.rs index 5ed6a040d8..737255bbd5 100644 --- a/comms/src/protocol/rpc/client.rs +++ b/comms/src/protocol/rpc/client.rs @@ -25,14 +25,17 @@ use crate::{ framing::CanonicalFraming, message::MessageExt, proto, - protocol::rpc::{ - body::ClientStreaming, - message::BaseRequest, - Handshake, - NamedProtocolService, - Response, - RpcError, - RpcStatus, + protocol::{ + rpc::{ + body::ClientStreaming, + message::BaseRequest, + Handshake, + NamedProtocolService, + Response, + RpcError, + RpcStatus, + }, + ProtocolId, }, runtime::task, }; @@ -50,6 +53,7 @@ use futures::{ use log::*; use prost::Message; use std::{ + borrow::Cow, convert::TryFrom, fmt, future::Future, @@ -71,6 +75,7 @@ impl RpcClient { pub async fn connect( config: RpcClientConfig, framed: CanonicalFraming, + protocol_name: ProtocolId, ) -> Result where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, @@ -78,7 +83,7 @@ impl RpcClient { let (request_tx, request_rx) = mpsc::channel(1); let connector = ClientConnector::new(request_tx); let (ready_tx, ready_rx) = oneshot::channel(); - task::spawn(RpcClientWorker::new(config, request_rx, framed, ready_tx).run()); + task::spawn(RpcClientWorker::new(config, request_rx, framed, ready_tx, protocol_name).run()); ready_rx .await .expect("ready_rx oneshot is never dropped without a reply")?; @@ -150,6 +155,7 @@ impl fmt::Debug for RpcClient { #[derive(Debug, Clone)] pub struct RpcClientBuilder { config: RpcClientConfig, + protocol_id: Option, _client: PhantomData, } @@ -157,6 +163,7 @@ impl Default for RpcClientBuilder { fn default() -> Self { Self { config: Default::default(), + protocol_id: None, _client: PhantomData, } } @@ -198,10 +205,21 @@ where TClient: From + NamedProtocolService self } + pub(crate) fn with_protocol_id(mut self, protocol_id: ProtocolId) -> Self { + self.protocol_id = Some(protocol_id); + self + } + /// Negotiates and establishes a session to the peer's RPC service pub async fn connect(self, framed: CanonicalFraming) -> Result where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static { - RpcClient::connect(self.config, framed).await.map(Into::into) + RpcClient::connect( + self.config, + framed, + self.protocol_id.as_ref().cloned().unwrap_or_default(), + ) + .await + .map(Into::into) } } @@ -302,6 +320,7 @@ pub struct RpcClientWorker { next_request_id: u16, ready_tx: Option>>, latency: Option, + protocol_id: ProtocolId, } impl RpcClientWorker @@ -312,6 +331,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send request_rx: mpsc::Receiver, framed: CanonicalFraming, ready_tx: oneshot::Sender>, + protocol_id: ProtocolId, ) -> Self { Self { config, @@ -320,11 +340,20 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send next_request_id: 0, ready_tx: Some(ready_tx), latency: None, + protocol_id, } } + fn protocol_name(&self) -> Cow<'_, str> { + String::from_utf8_lossy(&self.protocol_id) + } + async fn run(mut self) { - debug!(target: LOG_TARGET, "Performing client handshake"); + debug!( + target: LOG_TARGET, + "Performing client handshake for '{}'", + self.protocol_name() + ); let start = Instant::now(); let mut handshake = Handshake::new(&mut self.framed).with_timeout(self.config.handshake_timeout()); match handshake.perform_client_handshake().await { @@ -332,7 +361,9 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send let latency = start.elapsed(); debug!( target: LOG_TARGET, - "RPC Session negotiation completed. Latency: {:.0?}", latency + "RPC Session ({}) negotiation completed. Latency: {:.0?}", + self.protocol_name(), + latency ); self.latency = Some(latency); if let Some(r) = self.ready_tx.take() { @@ -366,7 +397,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send debug!(target: LOG_TARGET, "IO Error when closing substream: {}", err); } - debug!(target: LOG_TARGET, "RpcClientWorker terminated."); + debug!( + target: LOG_TARGET, + "RpcClientWorker ({}) terminated.", + self.protocol_name() + ); } async fn do_request_response( @@ -407,9 +442,10 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send let latency = start.elapsed(); trace!( target: LOG_TARGET, - "Received response ({} byte(s)) from request #{} (method={}) in {:.0?}", + "Received response ({} byte(s)) from request #{} (protocol = {}, method={}) in {:.0?}", resp.len(), request_id, + self.protocol_name(), method, latency ); diff --git a/comms/src/protocol/rpc/message.rs b/comms/src/protocol/rpc/message.rs index 2e963b3d2d..dedd7e04fb 100644 --- a/comms/src/protocol/rpc/message.rs +++ b/comms/src/protocol/rpc/message.rs @@ -197,13 +197,20 @@ impl Into for RpcMethod { bitflags! { pub struct RpcMessageFlags: u8 { + /// Message stream has completed const FIN = 0x01; + /// Typically sent with empty contents and used to confirm a substream is alive. + const ACK = 0x02; } } impl RpcMessageFlags { pub fn is_fin(&self) -> bool { self.contains(Self::FIN) } + + pub fn is_ack(&self) -> bool { + self.contains(Self::ACK) + } } impl Default for RpcMessageFlags { diff --git a/comms/src/protocol/rpc/server/mod.rs b/comms/src/protocol/rpc/server/mod.rs index 092120ee48..5d775a5314 100644 --- a/comms/src/protocol/rpc/server/mod.rs +++ b/comms/src/protocol/rpc/server/mod.rs @@ -57,6 +57,7 @@ use futures::{channel::mpsc, AsyncRead, AsyncWrite, SinkExt, StreamExt}; use log::*; use prost::Message; use std::{ + borrow::Cow, future::Future, time::{Duration, Instant}, }; @@ -356,6 +357,7 @@ where let service = ActivePeerRpcService { config: self.config.clone(), + protocol, node_id: node_id.clone(), framed, service, @@ -372,6 +374,7 @@ where struct ActivePeerRpcService { config: RpcServerBuilder, + protocol: ProtocolId, node_id: NodeId, service: TSvc, framed: CanonicalFraming, @@ -385,14 +388,31 @@ where TCommsProvider: RpcCommsProvider + Send + Clone + 'static, { async fn start(mut self) { - debug!(target: LOG_TARGET, "(Peer = `{}`) Rpc server started.", self.node_id); + debug!( + target: LOG_TARGET, + "(Peer = `{}`) Rpc server ({}) started.", + self.node_id, + self.protocol_name() + ); if let Err(err) = self.run().await { error!( target: LOG_TARGET, - "(Peer = `{}`) Rpc server exited with an error: {}", self.node_id, err + "(Peer = `{}`) Rpc server ({}) exited with an error: {}", + self.node_id, + self.protocol_name(), + err ); } - debug!(target: LOG_TARGET, "(Peer = {}) Rpc service shutdown", self.node_id); + debug!( + target: LOG_TARGET, + "(Peer = {}) Rpc service ({}) shutdown", + self.node_id, + self.protocol_name() + ); + } + + fn protocol_name(&self) -> Cow<'_, str> { + String::from_utf8_lossy(&self.protocol) } async fn run(&mut self) -> Result<(), RpcServerError> { @@ -405,7 +425,8 @@ where let elapsed = start.elapsed(); debug!( target: LOG_TARGET, - "RPC request completed in {:.0?}{}", + "RPC ({}) request completed in {:.0?}{}", + self.protocol_name(), elapsed, if elapsed.as_secs() > 5 { " (LONG REQUEST)" } else { "" } ); @@ -448,6 +469,19 @@ where "[Peer=`{}`] Got request {}", self.node_id, decoded_msg ); + let msg_flags = RpcMessageFlags::from_bits_truncate(decoded_msg.flags as u8); + if msg_flags.contains(RpcMessageFlags::ACK) { + debug!(target: LOG_TARGET, "[Peer=`{}`] ACK.", self.node_id); + let ack = proto::rpc::RpcResponse { + request_id, + status: RpcStatus::ok().as_code(), + flags: RpcMessageFlags::ACK.bits().into(), + ..Default::default() + }; + self.framed.send(ack.to_encoded_bytes().into()).await?; + return Ok(()); + } + let req = Request::with_context( self.create_request_context(request_id), method, diff --git a/comms/src/protocol/rpc/test/greeting_service.rs b/comms/src/protocol/rpc/test/greeting_service.rs index 463057aed4..516ef230a1 100644 --- a/comms/src/protocol/rpc/test/greeting_service.rs +++ b/comms/src/protocol/rpc/test/greeting_service.rs @@ -23,7 +23,7 @@ use crate::{ async_trait, protocol::{ - rpc::{Request, Response, RpcError, RpcServerError, RpcStatus, Streaming}, + rpc::{NamedProtocolService, Request, Response, RpcError, RpcServerError, RpcStatus, Streaming}, ProtocolId, }, }; @@ -332,7 +332,7 @@ impl __rpc_deps::NamedProtocolService for GreetingClient { impl GreetingClient { pub async fn connect(framed: __rpc_deps::CanonicalFraming) -> Result where TSubstream: __rpc_deps::AsyncRead + __rpc_deps::AsyncWrite + Unpin + Send + 'static { - let inner = __rpc_deps::RpcClient::connect(Default::default(), framed).await?; + let inner = __rpc_deps::RpcClient::connect(Default::default(), framed, Self::PROTOCOL_NAME.into()).await?; Ok(Self { inner }) } diff --git a/comms/tests/rpc_stress.rs b/comms/tests/rpc_stress.rs index 24d124d1ee..0376a7dddc 100644 --- a/comms/tests/rpc_stress.rs +++ b/comms/tests/rpc_stress.rs @@ -31,7 +31,7 @@ mod helpers; use helpers::create_comms; use futures::{future, StreamExt}; -use std::{env, future::Future, time::Duration}; +use std::{future::Future, time::Duration}; use tari_comms::{ protocol::rpc::{RpcClientBuilder, RpcServer}, transports::TcpTransport, @@ -211,6 +211,7 @@ async fn few_large_messages() { .await; } +#[allow(dead_code)] async fn payload_limit() { run_stress_test(Params { num_tasks: 50, @@ -222,6 +223,7 @@ async fn payload_limit() { .await; } +#[allow(dead_code)] async fn high_contention() { run_stress_test(Params { num_tasks: 1000, @@ -233,6 +235,7 @@ async fn high_contention() { .await; } +#[allow(dead_code)] async fn high_concurrency() { run_stress_test(Params { num_tasks: 1000, @@ -244,6 +247,7 @@ async fn high_concurrency() { .await; } +#[allow(dead_code)] async fn high_contention_high_concurrency() { run_stress_test(Params { num_tasks: 2000, @@ -255,29 +259,17 @@ async fn high_contention_high_concurrency() { .await; } -#[tokio_macros::test] -async fn run_ci() { - log_timing("quick", quick()).await; - log_timing("basic", basic()).await; - log_timing("many_small_messages", many_small_messages()).await; - log_timing("few_large_messages", few_large_messages()).await; -} - #[tokio_macros::test] async fn run() { - if env::var("CI").is_ok() { - println!("Skipping the stress test on CI"); - return; - } // let _ = env_logger::try_init(); log_timing("quick", quick()).await; log_timing("basic", basic()).await; log_timing("many_small_messages", many_small_messages()).await; log_timing("few_large_messages", few_large_messages()).await; - log_timing("payload_limit", payload_limit()).await; - log_timing("high_contention", high_contention()).await; - log_timing("high_concurrency", high_concurrency()).await; - log_timing("high_contention_high_concurrency", high_contention_high_concurrency()).await; + // log_timing("payload_limit", payload_limit()).await; + // log_timing("high_contention", high_contention()).await; + // log_timing("high_concurrency", high_concurrency()).await; + // log_timing("high_contention_high_concurrency", high_contention_high_concurrency()).await; } async fn log_timing>(name: &str, fut: F) -> R {