Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client error message on bootstrap rejected because of blacklist/whitelist #3511

Merged
merged 8 commits into from
Feb 14, 2023
Merged
23 changes: 2 additions & 21 deletions massa-bootstrap/src/establisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,8 @@ pub mod types {
#[cfg(not(test))]
/// Connection types
pub mod types {
use crate::tools::normalize_ip;
use massa_time::MassaTime;
use std::{
collections::HashSet,
io,
net::{IpAddr, SocketAddr},
};
use std::{io, net::SocketAddr};
use tokio::{
net::{TcpListener, TcpStream},
time::timeout,
Expand All @@ -40,23 +35,9 @@ pub mod types {

impl DefaultListener {
/// Accepts a new incoming connection from this listener.
pub async fn accept(
&mut self,
whitelist: &Option<HashSet<IpAddr>>,
blacklist: &Option<HashSet<IpAddr>>,
) -> io::Result<(Duplex, SocketAddr)> {
pub async fn accept(&mut self) -> io::Result<(Duplex, SocketAddr)> {
// accept
let (sock, mut remote_addr) = self.0.accept().await?;
let ip = normalize_ip(remote_addr.ip());
if let Some(blacklist) = blacklist && blacklist.contains(&ip) {
return Err(io::Error::new(io::ErrorKind::Other, "IP is blacklisted"));
}
if let Some(whitelist) = whitelist && !whitelist.contains(&ip) {
return Err(io::Error::new(
io::ErrorKind::Other,
"A whitelist exists and the IP is not whitelisted",
));
}
// normalize address
remote_addr.set_ip(remote_addr.ip().to_canonical());
Ok((sock, remote_addr))
Expand Down
93 changes: 82 additions & 11 deletions massa-bootstrap/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use massa_time::MassaTime;
use parking_lot::RwLock;
use std::{
collections::{hash_map, HashMap, HashSet},
io,
net::{IpAddr, SocketAddr},
path::PathBuf,
sync::Arc,
Expand Down Expand Up @@ -187,14 +188,28 @@ impl BootstrapServer {
}

// listener
res_connection = listener.accept(&whitelist, &blacklist) => {
let (dplx, remote_addr) = if res_connection.is_ok() {
res_connection.unwrap()
res_connection = listener.accept() => {
let (mut server, remote_addr) = if let Ok((dplx, remote_addr)) = res_connection {
let server = BootstrapServerBinder::new(
dplx,
self.keypair.clone(),
self.bootstrap_config.max_bytes_read_write,
self.bootstrap_config.max_bootstrap_message_size,
self.bootstrap_config.thread_count,
self.bootstrap_config.max_datastore_key_length,
self.bootstrap_config.randomness_size_bytes,
self.bootstrap_config.consensus_bootstrap_part_size);

// check whether incoming peer IP is allowed or return an error which is ignored
let Ok((server, remote_addr)) = self.is_ip_allowed(remote_addr, server, &whitelist, &blacklist).await else {
continue;
};
(server, remote_addr)
} else {
continue;
continue
};
if bootstrap_sessions.len() < self.bootstrap_config.max_simultaneous_bootstraps.try_into().map_err(|_| BootstrapError::GeneralError("Fail to convert u32 to usize".to_string()))? {

if bootstrap_sessions.len() < self.bootstrap_config.max_simultaneous_bootstraps.try_into().map_err(|_| BootstrapError::GeneralError("Fail to convert u32 to usize".to_string()))? {
massa_trace!("bootstrap.lib.run.select.accept", {"remote_addr": remote_addr});
let now = Instant::now();

Expand All @@ -212,7 +227,6 @@ impl BootstrapServer {
match self.ip_hist_map.entry(remote_addr.ip()) {
hash_map::Entry::Occupied(mut occ) => {
if now.duration_since(*occ.get()) <= per_ip_min_interval {
let mut server = BootstrapServerBinder::new(dplx, self.keypair.clone(), self.bootstrap_config.max_bytes_read_write, self.bootstrap_config.max_bootstrap_message_size, self.bootstrap_config.thread_count, self.bootstrap_config.max_datastore_key_length, self.bootstrap_config.randomness_size_bytes, self.bootstrap_config.consensus_bootstrap_part_size);
let _ = match tokio::time::timeout(self.bootstrap_config.write_error_timeout.into(), server.send(BootstrapServerMessage::BootstrapError {
error:
format!("Your last bootstrap on this server was {} ago and you have to wait {} before retrying.", format_duration(occ.get().elapsed()), format_duration(per_ip_min_interval.saturating_sub(occ.get().elapsed())))
Expand Down Expand Up @@ -253,11 +267,9 @@ impl BootstrapServer {
let data_execution = self.final_state.clone();
let consensus_command_sender = self.consensus_controller.clone();
let network_command_sender = self.network_command_sender.clone();
let keypair = self.keypair.clone();
let config = self.bootstrap_config.clone();

bootstrap_sessions.push(async move {
let mut server = BootstrapServerBinder::new(dplx, keypair, config.max_bytes_read_write, config.max_bootstrap_message_size, config.thread_count, config.max_datastore_key_length, config.randomness_size_bytes, config.consensus_bootstrap_part_size);
debug!("awaiting on bootstrap of peer {}", remote_addr);
match tokio::time::timeout(config.bootstrap_timeout.into(), manage_bootstrap(&config, &mut server, data_execution, version, consensus_command_sender, network_command_sender)).await {
Ok(mgmt) => match mgmt {
Expand All @@ -281,9 +293,7 @@ impl BootstrapServer {
});
massa_trace!("bootstrap.session.started", {"active_count": bootstrap_sessions.len()});
} else {
let config = self.bootstrap_config.clone();
let mut server = BootstrapServerBinder::new(dplx, self.keypair.clone(), config.max_bytes_read_write, config.max_bootstrap_message_size, config.thread_count, config.max_datastore_key_length, config.randomness_size_bytes, config.consensus_bootstrap_part_size);
let _ = match tokio::time::timeout(config.clone().write_error_timeout.into(), server.send(BootstrapServerMessage::BootstrapError {
let _ = match tokio::time::timeout(self.bootstrap_config.write_error_timeout.into(), server.send(BootstrapServerMessage::BootstrapError {
error: "Bootstrap failed because the bootstrap server currently has no slots available.".to_string()
})).await {
Err(_) => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "bootstrap error no available slots send timed out").into()),
Expand All @@ -300,6 +310,67 @@ impl BootstrapServer {

Ok(())
}

#[cfg(test)]
// TODO we didn't test whether the peer IP address is banned
async fn is_ip_allowed(
&self,
remote_addr: SocketAddr,
server: BootstrapServerBinder,
_whitelist: &Option<HashSet<IpAddr>>,
_blacklist: &Option<HashSet<IpAddr>>,
) -> io::Result<(BootstrapServerBinder, SocketAddr)> {
Ok((server, remote_addr))
}

#[cfg(not(test))]
// whether the peer IP address is banned
async fn is_ip_allowed(
&self,
remote_addr: SocketAddr,
mut server: BootstrapServerBinder,
whitelist: &Option<HashSet<IpAddr>>,
blacklist: &Option<HashSet<IpAddr>>,
) -> io::Result<(BootstrapServerBinder, SocketAddr)> {
let ip = normalize_ip(remote_addr.ip());
// whether the peer IP address is blacklisted
let not_allowed_msg = if let Some(ip_list) = &blacklist && ip_list.contains(&ip) {
massa_trace!("bootstrap.lib.run.select.accept.refuse_blacklisted", {"remote_addr": remote_addr});
Some(format!("IP {} is blacklisted", &ip))
// whether the peer IP address is not present in the whitelist
} else if let Some(ip_list) = &whitelist && !ip_list.contains(&ip){
massa_trace!("bootstrap.lib.run.select.accept.refuse_not_whitelisted", {"remote_addr": remote_addr});
Some(format!("A whitelist exists and the IP {} is not whitelisted", &ip))
} else {
None
};

// whether the peer IP address is not allowed, send back an error message
if let Some(error_msg) = not_allowed_msg {
let _ = match tokio::time::timeout(
self.bootstrap_config.write_error_timeout.into(),
server.send(BootstrapServerMessage::BootstrapError {
error: error_msg.clone(),
}),
)
.await
{
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
format!("{} timed out", &error_msg),
)
.into()),
Ok(Err(e)) => Err(e),
Ok(Ok(_)) => Ok(()),
};
return Err(std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
error_msg,
));
}

Ok((server, remote_addr))
}
}

#[allow(clippy::too_many_arguments)]
Expand Down
9 changes: 2 additions & 7 deletions massa-bootstrap/src/tests/mock_establisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

use massa_models::config::{CHANNEL_SIZE, MAX_DUPLEX_BUFFER_SIZE};
use massa_time::MassaTime;
use std::collections::HashSet;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use tokio::io::DuplexStream;
use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout;
Expand Down Expand Up @@ -36,11 +35,7 @@ pub struct MockListener {
}

impl MockListener {
pub async fn accept(
&mut self,
_whitelist: &Option<HashSet<IpAddr>>,
_blacklist: &Option<HashSet<IpAddr>>,
) -> std::io::Result<(Duplex, SocketAddr)> {
pub async fn accept(&mut self) -> std::io::Result<(Duplex, SocketAddr)> {
let (addr, sender) = self.connection_listener_rx.recv().await.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
Expand Down