Skip to content

Commit

Permalink
refactor: use wallet connectivity in wallet services
Browse files Browse the repository at this point in the history
- use wallet connectivity in broadcast protocol
- remove base node update code in multiple services
- use wallet connectivity in transaction validation protocol
- use wallet connectivity in TXO validation task
- update/fix tests
  • Loading branch information
sdbondi committed Sep 28, 2021
1 parent 125b0ac commit cd7b74a
Show file tree
Hide file tree
Showing 53 changed files with 897 additions and 1,187 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion applications/tari_console_wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ default-features = false
features = ["crossterm"]

[features]
avx2 = []
avx2 = []
1 change: 0 additions & 1 deletion applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ pub async fn init_wallet(
base_node_query_timeout: config.base_node_query_timeout,
prevent_fee_gt_amount: config.prevent_fee_gt_amount,
event_channel_size: config.output_manager_event_channel_size,
base_node_update_publisher_channel_size: config.base_node_update_publisher_channel_size,
num_confirmations_required: config.transaction_num_confirmations_required,
..Default::default()
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::ui::{components::Component, state::AppState};
use tari_wallet::connectivity_service::OnlineStatus;
use tari_wallet::connectivity_service::{OnlineStatus, WalletConnectivityInterface};
use tui::{
backend::Backend,
layout::Rect,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;
use tari_comms::{connectivity::ConnectivityEvent, peer_manager::Peer};
use tari_wallet::{
base_node_service::{handle::BaseNodeEvent, service::BaseNodeState},
connectivity_service::WalletConnectivityInterface,
output_manager_service::{handle::OutputManagerEvent, TxId},
transaction_service::handle::TransactionEvent,
};
Expand Down Expand Up @@ -62,6 +63,7 @@ impl WalletEventMonitor {
let mut connectivity_events = self.app_state_inner.read().await.get_connectivity_event_stream();
let wallet_connectivity = self.app_state_inner.read().await.get_wallet_connectivity();
let mut connectivity_status = wallet_connectivity.get_connectivity_status_watch();
let mut base_node_changed = wallet_connectivity.get_current_base_node_watcher();

let mut base_node_events = self.app_state_inner.read().await.get_base_node_event_stream();
let mut software_update_notif = self
Expand Down Expand Up @@ -166,6 +168,13 @@ impl WalletEventMonitor {
Err(broadcast::error::RecvError::Closed) => {}
}
},
_ = base_node_changed.changed() => {
let peer = base_node_changed.borrow().as_ref().cloned();
if let Some(peer) = peer {
self.trigger_base_node_peer_refresh(peer).await;
self.trigger_balance_refresh();
}
}
result = base_node_events.recv() => {
match result {
Ok(msg) => {
Expand All @@ -174,10 +183,6 @@ impl WalletEventMonitor {
BaseNodeEvent::BaseNodeStateChanged(state) => {
self.trigger_base_node_state_refresh(state).await;
}
BaseNodeEvent::BaseNodePeerSet(peer) => {
self.trigger_base_node_peer_refresh(*peer).await;
self.trigger_balance_refresh();
}
}
},
Err(broadcast::error::RecvError::Lagged(n)) => {
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/wallet_modes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub fn tui_mode(config: WalletModeConfig, mut wallet: WalletSqlite) -> Result<()
base_node_config.base_node_custom = base_node_custom.clone();
if let Some(peer) = base_node_custom {
base_node_selected = peer;
} else if let Some(peer) = handle.block_on(wallet.get_base_node_peer())? {
} else if let Some(peer) = handle.block_on(wallet.get_base_node_peer()) {
base_node_selected = peer;
}

Expand Down
15 changes: 8 additions & 7 deletions base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,28 @@ tari_shutdown = { version = "^0.10", path = "../../infrastructure/shutdown" }
tari_storage = { version = "^0.10", path = "../../infrastructure/storage" }

aes-gcm = "^0.8"
async-trait = "0.1.50"
bincode = "1.3.1"
blake2 = "0.9.0"
chrono = { version = "0.4.6", features = ["serde"] }
crossbeam-channel = "0.3.8"
digest = "0.9.0"
diesel = { version = "1.4.7", features = ["sqlite", "serde_json", "chrono"] }
diesel_migrations = "1.4.0"
libsqlite3-sys = { version = ">=0.8.0, <0.13.0", features = ["bundled"], optional = true }
digest = "0.9.0"
fs2 = "0.3.0"
futures = { version = "^0.3.1", features = ["compat", "std"] }
libsqlite3-sys = { version = ">=0.8.0, <0.13.0", features = ["bundled"], optional = true }
lmdb-zero = "0.4.4"
log = "0.4.6"
log4rs = { version = "1.0.0", features = ["console_appender", "file_appender", "yaml_format"] }
lmdb-zero = "0.4.4"
rand = "0.8"
serde = { version = "1.0.89", features = ["derive"] }
serde_json = "1.0.39"
tokio = { version = "1.11", features = ["sync", "macros"] }
tower = "0.3.0-alpha.2"
tempfile = "3.1.0"
time = { version = "0.1.39" }
thiserror = "1.0.26"
bincode = "1.3.1"
time = { version = "0.1.39" }
tokio = { version = "1.11", features = ["sync", "macros"] }
tower = "0.3.0-alpha.2"

[dependencies.tari_core]
path = "../../base_layer/core"
Expand Down
24 changes: 0 additions & 24 deletions base_layer/wallet/src/base_node_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
use super::{error::BaseNodeServiceError, service::BaseNodeState};
use std::{sync::Arc, time::Duration};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::Peer;
use tari_service_framework::reply_channel::SenderService;
use tokio::sync::broadcast;
use tower::Service;
Expand All @@ -34,22 +33,17 @@ pub type BaseNodeEventReceiver = broadcast::Receiver<Arc<BaseNodeEvent>>;
#[derive(Debug)]
pub enum BaseNodeServiceRequest {
GetChainMetadata,
SetBaseNodePeer(Box<Peer>),
GetBaseNodePeer,
GetBaseNodeLatency,
}
/// API Response enum
#[derive(Debug)]
pub enum BaseNodeServiceResponse {
ChainMetadata(Option<ChainMetadata>),
BaseNodePeerSet,
BaseNodePeer(Option<Box<Peer>>),
Latency(Option<Duration>),
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum BaseNodeEvent {
BaseNodeStateChanged(BaseNodeState),
BaseNodePeerSet(Box<Peer>),
}

/// The Base Node Service Handle is a struct that contains the interfaces used to communicate with a running
Expand Down Expand Up @@ -82,24 +76,6 @@ impl BaseNodeServiceHandle {
}
}

pub async fn set_base_node_peer(&mut self, peer: Peer) -> Result<(), BaseNodeServiceError> {
match self
.handle
.call(BaseNodeServiceRequest::SetBaseNodePeer(Box::new(peer)))
.await??
{
BaseNodeServiceResponse::BaseNodePeerSet => Ok(()),
_ => Err(BaseNodeServiceError::UnexpectedApiResponse),
}
}

pub async fn get_base_node_peer(&mut self) -> Result<Option<Peer>, BaseNodeServiceError> {
match self.handle.call(BaseNodeServiceRequest::GetBaseNodePeer).await?? {
BaseNodeServiceResponse::BaseNodePeer(peer) => Ok(peer.map(|p| *p)),
_ => Err(BaseNodeServiceError::UnexpectedApiResponse),
}
}

pub async fn get_base_node_latency(&mut self) -> Result<Option<Duration>, BaseNodeServiceError> {
match self.handle.call(BaseNodeServiceRequest::GetBaseNodeLatency).await?? {
BaseNodeServiceResponse::Latency(latency) => Ok(latency),
Expand Down
12 changes: 0 additions & 12 deletions base_layer/wallet/src/base_node_service/mock_base_node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,24 +103,12 @@ impl MockBaseNodeService {
}
}

fn set_base_node_peer(&mut self, peer: Peer) {
self.base_node_peer = Some(peer);
}

/// This handler is called when requests arrive from the various streams
fn handle_request(
&mut self,
request: BaseNodeServiceRequest,
) -> Result<BaseNodeServiceResponse, BaseNodeServiceError> {
match request {
BaseNodeServiceRequest::SetBaseNodePeer(peer) => {
self.set_base_node_peer(*peer);
Ok(BaseNodeServiceResponse::BaseNodePeerSet)
},
BaseNodeServiceRequest::GetBaseNodePeer => {
let peer = self.base_node_peer.clone();
Ok(BaseNodeServiceResponse::BaseNodePeer(peer.map(Box::new)))
},
BaseNodeServiceRequest::GetChainMetadata => Ok(BaseNodeServiceResponse::ChainMetadata(
self.state.chain_metadata.clone(),
)),
Expand Down
18 changes: 11 additions & 7 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
handle::{BaseNodeEvent, BaseNodeEventSender},
service::BaseNodeState,
},
connectivity_service::WalletConnectivityHandle,
connectivity_service::WalletConnectivityInterface,
error::WalletStorageError,
storage::database::{WalletBackend, WalletDatabase},
};
Expand All @@ -42,20 +42,24 @@ use tokio::{sync::RwLock, time};

const LOG_TARGET: &str = "wallet::base_node_service::chain_metadata_monitor";

pub struct BaseNodeMonitor<T> {
pub struct BaseNodeMonitor<TBackend, TWalletConnectivity> {
interval: Duration,
state: Arc<RwLock<BaseNodeState>>,
db: WalletDatabase<T>,
wallet_connectivity: WalletConnectivityHandle,
db: WalletDatabase<TBackend>,
wallet_connectivity: TWalletConnectivity,
event_publisher: BaseNodeEventSender,
}

impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
impl<TBackend, TWalletConnectivity> BaseNodeMonitor<TBackend, TWalletConnectivity>
where
TBackend: WalletBackend + 'static,
TWalletConnectivity: WalletConnectivityInterface,
{
pub fn new(
interval: Duration,
state: Arc<RwLock<BaseNodeState>>,
db: WalletDatabase<T>,
wallet_connectivity: WalletConnectivityHandle,
db: WalletDatabase<TBackend>,
wallet_connectivity: TWalletConnectivity,
event_publisher: BaseNodeEventSender,
) -> Self {
Self {
Expand Down
27 changes: 1 addition & 26 deletions base_layer/wallet/src/base_node_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use super::{
config::BaseNodeServiceConfig,
error::BaseNodeServiceError,
handle::{BaseNodeEvent, BaseNodeEventSender, BaseNodeServiceRequest, BaseNodeServiceResponse},
handle::{BaseNodeEventSender, BaseNodeServiceRequest, BaseNodeServiceResponse},
};
use crate::{
base_node_service::monitor::BaseNodeMonitor,
Expand All @@ -35,7 +35,6 @@ use futures::{future, StreamExt};
use log::*;
use std::{sync::Arc, time::Duration};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::Peer;
use tari_service_framework::reply_channel::Receiver;
use tari_shutdown::ShutdownSignal;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -153,12 +152,6 @@ where T: WalletBackend + 'static
Ok(())
}

async fn set_base_node_peer(&mut self, peer: Peer) -> Result<(), BaseNodeServiceError> {
self.wallet_connectivity.set_base_node(peer.clone()).await?;
self.publish_event(BaseNodeEvent::BaseNodePeerSet(Box::new(peer)));
Ok(())
}

/// This handler is called when requests arrive from the various streams
async fn handle_request(
&mut self,
Expand All @@ -169,14 +162,6 @@ where T: WalletBackend + 'static
"Handling Wallet Base Node Service Request: {:?}", request
);
match request {
BaseNodeServiceRequest::SetBaseNodePeer(peer) => {
self.set_base_node_peer(*peer).await?;
Ok(BaseNodeServiceResponse::BaseNodePeerSet)
},
BaseNodeServiceRequest::GetBaseNodePeer => {
let peer = self.wallet_connectivity.get_current_base_node_peer().map(Box::new);
Ok(BaseNodeServiceResponse::BaseNodePeer(peer))
},
BaseNodeServiceRequest::GetChainMetadata => match self.get_state().await.chain_metadata.clone() {
Some(metadata) => Ok(BaseNodeServiceResponse::ChainMetadata(Some(metadata))),
None => {
Expand All @@ -190,14 +175,4 @@ where T: WalletBackend + 'static
},
}
}

fn publish_event(&self, event: BaseNodeEvent) {
trace!(target: LOG_TARGET, "Publishing event: {:?}", event);
let _ = self.event_publisher.send(Arc::new(event)).map_err(|_| {
trace!(
target: LOG_TARGET,
"Could not publish BaseNodeEvent as there are no subscribers"
)
});
}
}
35 changes: 25 additions & 10 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use super::service::OnlineStatus;
use crate::connectivity_service::{error::WalletConnectivityError, watch::Watch};
use crate::{connectivity_service::WalletConnectivityInterface, util::watch::Watch};
use tari_comms::{
peer_manager::{NodeId, Peer},
protocol::rpc::RpcClientLease,
types::CommsPublicKey,
};
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::sync::{mpsc, oneshot, watch};
Expand Down Expand Up @@ -53,10 +54,16 @@ impl WalletConnectivityHandle {
online_status_rx,
}
}
}

#[async_trait::async_trait]
impl WalletConnectivityInterface for WalletConnectivityHandle {
fn set_base_node(&mut self, base_node_peer: Peer) {
self.base_node_watch.send(Some(base_node_peer));
}

pub async fn set_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> {
self.base_node_watch.broadcast(Some(base_node_peer));
Ok(())
fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<Peer>> {
self.base_node_watch.get_receiver()
}

/// Obtain a BaseNodeWalletRpcClient.
Expand All @@ -65,7 +72,7 @@ impl WalletConnectivityHandle {
/// node/nodes. It will block until this happens. The ONLY other time it will return is if the node is
/// shutting down, where it will return None. Use this function whenever no work can be done without a
/// BaseNodeWalletRpcClient RPC session.
pub async fn obtain_base_node_wallet_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeWalletRpcClient>> {
async fn obtain_base_node_wallet_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeWalletRpcClient>> {
let (reply_tx, reply_rx) = oneshot::channel();
// Under what conditions do the (1) mpsc channel and (2) oneshot channel error?
// (1) when the receiver has been dropped
Expand All @@ -88,7 +95,7 @@ impl WalletConnectivityHandle {
/// node/nodes. It will block until this happens. The ONLY other time it will return is if the node is
/// shutting down, where it will return None. Use this function whenever no work can be done without a
/// BaseNodeSyncRpcClient RPC session.
pub async fn obtain_base_node_sync_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeSyncRpcClient>> {
async fn obtain_base_node_sync_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeSyncRpcClient>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(WalletConnectivityRequest::ObtainBaseNodeSyncRpcClient(reply_tx))
Expand All @@ -98,19 +105,27 @@ impl WalletConnectivityHandle {
reply_rx.await.ok()
}

pub fn get_connectivity_status(&mut self) -> OnlineStatus {
fn get_connectivity_status(&mut self) -> OnlineStatus {
*self.online_status_rx.borrow()
}

pub fn get_connectivity_status_watch(&self) -> watch::Receiver<OnlineStatus> {
fn get_connectivity_status_watch(&self) -> watch::Receiver<OnlineStatus> {
self.online_status_rx.clone()
}

pub fn get_current_base_node_peer(&self) -> Option<Peer> {
fn get_current_base_node_peer(&self) -> Option<Peer> {
self.base_node_watch.borrow().clone()
}

pub fn get_current_base_node_id(&self) -> Option<NodeId> {
fn get_current_base_node_peer_public_key(&self) -> Option<CommsPublicKey> {
self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone())
}

fn get_current_base_node_id(&self) -> Option<NodeId> {
self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone())
}

fn is_base_node_set(&self) -> bool {
self.base_node_watch.borrow().is_some()
}
}
8 changes: 6 additions & 2 deletions base_layer/wallet/src/connectivity_service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use super::{handle::WalletConnectivityHandle, service::WalletConnectivityService, watch::Watch};
use crate::{base_node_service::config::BaseNodeServiceConfig, connectivity_service::service::OnlineStatus};
use super::{handle::WalletConnectivityHandle, service::WalletConnectivityService};
use crate::{
base_node_service::config::BaseNodeServiceConfig,
connectivity_service::service::OnlineStatus,
util::watch::Watch,
};
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tokio::sync::mpsc;

Expand Down
Loading

0 comments on commit cd7b74a

Please sign in to comment.