Skip to content

Commit

Permalink
feat(dht): monitor and display warning for min ratio for TCPv4 nodes (#…
Browse files Browse the repository at this point in the history
…3953)

Description
---
- monitors TCPv4/other connection ratios and displays warning if below minimum
- adds `common.dht_minimum_desired_tcpv4_node_ratio` config 

Motivation and Context
---
Display warning if node does not meet a configured minimum ratio. This is to warn about the potential for sybil attacks because of very low (TCPv6) or zero (Tor) cost for creating new network addresses.

How Has This Been Tested?
---
Manually, log warning is emitted
  • Loading branch information
sdbondi authored Mar 24, 2022
1 parent 384ab0c commit c4070ff
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 22 deletions.
6 changes: 5 additions & 1 deletion applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use log::*;
use tari_app_utilities::{consts, identity_management, utilities::create_transport_type};
use tari_common::{configuration::bootstrap::ApplicationType, GlobalConfig};
use tari_comms::{peer_manager::Peer, protocol::rpc::RpcServer, NodeIdentity, UnspawnedCommsNode};
use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, Dht, DhtConfig};
use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, Dht, DhtConfig, DhtConnectivityConfig};
use tari_core::{
base_node,
base_node::{
Expand Down Expand Up @@ -271,6 +271,10 @@ where B: BlockchainBackend + 'static
..Default::default()
},
dedup_cache_capacity: self.config.dht_dedup_cache_capacity,
connectivity: DhtConnectivityConfig {
minimum_desired_tcpv4_node_ratio: self.config.dht_minimum_desired_tcpv4_node_ratio,
..Default::default()
},
..Default::default()
},
allow_test_addresses: self.config.comms_allow_test_addresses,
Expand Down
6 changes: 6 additions & 0 deletions common/config/presets/common.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ buffer_rate_limit_console_wallet = 1000
# The cache will also be trimmed down to size periodically (min value = 0, default value = 2500).
dedup_cache_capacity = 25000

# The minimum desired ratio of TCPv4 to Tor connections. This setting does not currently guarantee this ratio is maintained
# but does emits a log warning if the actual ratio is below this setting e.g. a setting of 0.5 requires 50% of base-layer node connections to
# be TCPv4. A ratio of 0.0 disables this warning.
# Default: 0.0
# dht_minimum_desired_tcpv4_node_ratio = 0.0

# The timeout (s) for requesting blocks from a peer during blockchain sync (min value = 10 s, default value = 150 s).
#fetch_blocks_timeout = 150

Expand Down
7 changes: 7 additions & 0 deletions common/src/configuration/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub struct GlobalConfig {
pub db_config: LMDBConfig,
pub db_type: DatabaseType,
pub dht_dedup_cache_capacity: usize,
pub dht_minimum_desired_tcpv4_node_ratio: f32,
pub dns_seeds: Vec<String>,
pub dns_seeds_name_server: DnsNameServer,
pub dns_seeds_use_dnssec: bool,
Expand Down Expand Up @@ -658,6 +659,11 @@ fn convert_node_config(
.get_int(key)
.map_err(|e| ConfigurationError::new(key, None, &e.to_string()))? as usize;

let key = "common.dht_minimum_desired_tcpv4_node_ratio";
let dht_minimum_desired_tcpv4_node_ratio =
cfg.get_float(key)
.map_err(|e| ConfigurationError::new(key, None, &e.to_string()))? as f32;

let key = "common.fetch_blocks_timeout";
let fetch_blocks_timeout = Duration::from_secs(
cfg.get_int(key)
Expand Down Expand Up @@ -858,6 +864,7 @@ fn convert_node_config(
db_config,
db_type,
dht_dedup_cache_capacity,
dht_minimum_desired_tcpv4_node_ratio,
dns_seeds,
dns_seeds_name_server,
dns_seeds_use_dnssec,
Expand Down
2 changes: 2 additions & 0 deletions common/src/configuration/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ pub fn default_config(bootstrap: &ConfigBootstrap) -> Config {
cfg.set_default("common.buffer_rate_limit_console_wallet", 1_000)
.unwrap();
cfg.set_default("common.dedup_cache_capacity", 2_500).unwrap();
cfg.set_default("common.dht_minimum_desired_tcpv4_node_ratio", 0.0f64)
.unwrap();
cfg.set_default("common.fetch_blocks_timeout", 150).unwrap();
cfg.set_default("common.fetch_utxos_timeout", 600).unwrap();
cfg.set_default("common.service_request_timeout", 180).unwrap();
Expand Down
41 changes: 31 additions & 10 deletions comms/dht/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,8 @@ pub struct DhtConfig {
/// change happens again after this period, another join will be sent.
/// Default: 10 minutes
pub join_cooldown_interval: Duration,
/// The interval to update the neighbouring and random pools, if necessary.
/// Default: 2 minutes
pub connectivity_update_interval: Duration,
/// The interval to change the random pool peers.
/// Default: 2 hours
pub connectivity_random_pool_refresh: Duration,
pub connectivity_high_failure_rate_cooldown: Duration,
pub connectivity: DhtConnectivityConfig,

/// Network discovery config
pub network_discovery: NetworkDiscoveryConfig,
/// Length of time to ban a peer if the peer misbehaves at the DHT-level.
Expand Down Expand Up @@ -147,9 +142,7 @@ impl Default for DhtConfig {
dedup_allowed_message_occurrences: 1,
database_url: DbConnectionUrl::Memory,
discovery_request_timeout: Duration::from_secs(2 * 60),
connectivity_update_interval: Duration::from_secs(2 * 60),
connectivity_random_pool_refresh: Duration::from_secs(2 * 60 * 60),
connectivity_high_failure_rate_cooldown: Duration::from_secs(45),
connectivity: DhtConnectivityConfig::default(),
auto_join: false,
join_cooldown_interval: Duration::from_secs(10 * 60),
network_discovery: Default::default(),
Expand All @@ -162,3 +155,31 @@ impl Default for DhtConfig {
}
}
}

#[derive(Debug, Clone, Copy)]
pub struct DhtConnectivityConfig {
/// The interval to update the neighbouring and random pools, if necessary.
/// Default: 2 minutes
pub update_interval: Duration,
/// The interval to change the random pool peers.
/// Default: 2 hours
pub random_pool_refresh: Duration,
/// Length of cooldown when high connection failure rates are encountered. Default: 45s
pub high_failure_rate_cooldown: Duration,
/// The minimum desired ratio of TCPv4 to Tor connections. TCPv4 addresses have some significant cost to create,
/// making sybil attacks costly. This setting does not guarantee this ratio is maintained.
/// Currently, it only emits a warning if the ratio is below this setting.
/// Default: 0.1 (10%)
pub minimum_desired_tcpv4_node_ratio: f32,
}

impl Default for DhtConnectivityConfig {
fn default() -> Self {
Self {
update_interval: Duration::from_secs(2 * 60),
random_pool_refresh: Duration::from_secs(2 * 60 * 60),
high_failure_rate_cooldown: Duration::from_secs(45),
minimum_desired_tcpv4_node_ratio: 0.1,
}
}
}
68 changes: 58 additions & 10 deletions comms/dht/src/connectivity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ use std::{sync::Arc, time::Instant};
use log::*;
pub use metrics::{MetricsCollector, MetricsCollectorHandle};
use tari_comms::{
connectivity::{ConnectivityError, ConnectivityEvent, ConnectivityEventRx, ConnectivityRequester},
connectivity::{
ConnectivityError,
ConnectivityEvent,
ConnectivityEventRx,
ConnectivityRequester,
ConnectivitySelection,
},
multiaddr,
peer_manager::{NodeDistance, NodeId, PeerManagerError, PeerQuery, PeerQuerySortBy},
NodeIdentity,
PeerConnection,
Expand Down Expand Up @@ -142,33 +149,36 @@ impl DhtConnectivity {
debug!(target: LOG_TARGET, "DHT connectivity starting");
self.refresh_neighbour_pool().await?;

let mut ticker = time::interval(self.config.connectivity_update_interval);
let mut ticker = time::interval(self.config.connectivity.update_interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
Ok(event) = connectivity_events.recv() => {
if let Err(err) = self.handle_connectivity_event(event).await {
debug!(target: LOG_TARGET, "Error handling connectivity event: {:?}", err);
error!(target: LOG_TARGET, "Error handling connectivity event: {:?}", err);
}
},

Ok(event) = self.dht_events.recv() => {
if let Err(err) = self.handle_dht_event(&event).await {
debug!(target: LOG_TARGET, "Error handling DHT event: {:?}", err);
error!(target: LOG_TARGET, "Error handling DHT event: {:?}", err);
}
},

_ = ticker.tick() => {
if let Err(err) = self.check_and_ban_flooding_peers().await {
debug!(target: LOG_TARGET, "Error checking for peer flooding: {:?}", err);
error!(target: LOG_TARGET, "Error checking for peer flooding: {:?}", err);
}
if let Err(err) = self.refresh_neighbour_pool_if_required().await {
debug!(target: LOG_TARGET, "Error refreshing neighbour peer pool: {:?}", err);
error!(target: LOG_TARGET, "Error refreshing neighbour peer pool: {:?}", err);
}
if let Err(err) = self.refresh_random_pool_if_required().await {
debug!(target: LOG_TARGET, "Error refreshing random peer pool: {:?}", err);
error!(target: LOG_TARGET, "Error refreshing random peer pool: {:?}", err);
}
self.log_status();
if let Err(err) = self.check_minimum_required_tcp_nodes().await {
error!(target: LOG_TARGET, "Error checking minimum required TCP nodes: {:?}", err);
}
},

_ = self.shutdown_signal.wait() => {
Expand All @@ -181,6 +191,43 @@ impl DhtConnectivity {
Ok(())
}

async fn check_minimum_required_tcp_nodes(&mut self) -> Result<(), DhtConnectivityError> {
let desired_ratio = self.config.connectivity.minimum_desired_tcpv4_node_ratio;
if desired_ratio == 0.0 {
return Ok(());
}

let conns = self
.connectivity
.select_connections(ConnectivitySelection::all_nodes(vec![]))
.await?;
if conns.len() <= 1 {
return Ok(());
}

let num_tcp_nodes = conns
.iter()
.filter(|conn| {
let ip = conn.address().iter().next();
let tcp = conn.address().iter().nth(2);
matches!(ip, Some(multiaddr::Protocol::Ip4(_))) && matches!(tcp, Some(multiaddr::Protocol::Tcp(_)))
})
.count();
let current_ratio = num_tcp_nodes as f32 / conns.len() as f32;
if current_ratio < desired_ratio {
warn!(
target: LOG_TARGET,
"{}% of this node's {} connections are using TCPv4. This node requires at least {}% of nodes to be \
TCP nodes.",
(current_ratio * 100.0).round() as i64,
conns.len(),
(desired_ratio * 100.0).round() as i64,
);
}

Ok(())
}

fn log_status(&self) {
let (neighbour_connected, neighbour_pending) = self
.neighbours
Expand All @@ -198,7 +245,8 @@ impl DhtConnectivity {
.map(|ts| format!(
"COOLDOWN({:.2?} remaining) ",
self.config
.connectivity_high_failure_rate_cooldown
.connectivity
.high_failure_rate_cooldown
.saturating_sub(ts.elapsed())
))
.unwrap_or_else(String::new),
Expand Down Expand Up @@ -394,7 +442,7 @@ impl DhtConnectivity {
async fn refresh_random_pool_if_required(&mut self) -> Result<(), DhtConnectivityError> {
let should_refresh = self.config.num_random_nodes > 0 &&
self.random_pool_last_refresh
.map(|instant| instant.elapsed() >= self.config.connectivity_random_pool_refresh)
.map(|instant| instant.elapsed() >= self.config.connectivity.random_pool_refresh)
.unwrap_or(true);
if should_refresh {
self.refresh_random_pool().await?;
Expand Down Expand Up @@ -547,7 +595,7 @@ impl DhtConnectivity {

if self
.cooldown_in_effect
.map(|ts| ts.elapsed() >= self.config.connectivity_high_failure_rate_cooldown)
.map(|ts| ts.elapsed() >= self.config.connectivity.high_failure_rate_cooldown)
.unwrap_or(true)
{
if self.cooldown_in_effect.is_some() {
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ mod connectivity;
pub use connectivity::MetricsCollectorHandle;

mod config;
pub use config::DhtConfig;
pub use config::{DhtConfig, DhtConnectivityConfig};

mod crypt;

Expand Down

0 comments on commit c4070ff

Please sign in to comment.