diff --git a/Cargo.lock b/Cargo.lock index 4a578724899..9e213268fda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -817,6 +817,20 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -851,6 +865,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.14" @@ -2391,8 +2415,8 @@ version = "0.1.0" dependencies = [ "async-speed-limit", "bitvec", + "crossbeam", "displaydoc", - "futures", "humantime", "lazy_static", "massa_async_pool", diff --git a/massa-bootstrap/Cargo.toml b/massa-bootstrap/Cargo.toml index 4a380ec95ca..311f1d8fc9a 100644 --- a/massa-bootstrap/Cargo.toml +++ b/massa-bootstrap/Cargo.toml @@ -12,7 +12,6 @@ async-speed-limit = { git = "https://github.com/adrien-zinger/async-speed-limit" "tokio", ] } displaydoc = "0.2" -futures = "0.3" num_enum = "0.5" nom = "7.1" rand = "0.8" @@ -39,6 +38,7 @@ massa_serialization = { path = "../massa-serialization" } massa_signature = { path = "../massa-signature" } massa_pos_exports = { path = "../massa-pos-exports" } massa_time = { path = "../massa-time" } +crossbeam = "0.8.2" [dev-dependencies] bitvec = { version = "1.0", features = ["serde"] } @@ -59,14 +59,5 @@ tempfile = "3.3" # for more information on what are the following features used for, see the cargo.toml at workspace level [features] -testing = [ - "massa_final_state/testing", - "massa_ledger_worker/testing", - "massa_consensus_exports/testing", - "massa_async_pool/testing", -] -sandbox = [ - "massa_async_pool/sandbox", - "massa_final_state/sandbox", - "massa_models/sandbox", -] +testing = ["massa_final_state/testing", "massa_ledger_worker/testing", "massa_consensus_exports/testing", "massa_async_pool/testing"] +sandbox = ["massa_async_pool/sandbox", "massa_final_state/sandbox", "massa_models/sandbox"] diff --git a/massa-bootstrap/src/error.rs b/massa-bootstrap/src/error.rs index 9dd7ea2d31b..d8f24d68630 100644 --- a/massa-bootstrap/src/error.rs +++ b/massa-bootstrap/src/error.rs @@ -52,4 +52,10 @@ pub enum BootstrapError { ReceivedError(String), /// clock error: {0} ClockError(String), + /// fail to init the list from file : {0} + InitListError(String), + /// IP {0} is blacklisted + BlackListed(String), + /// IP {0} is not in the whitelist + WhiteListed(String), } diff --git a/massa-bootstrap/src/server.rs b/massa-bootstrap/src/server.rs index d8e13b438f9..5659370694d 100644 --- a/massa-bootstrap/src/server.rs +++ b/massa-bootstrap/src/server.rs @@ -1,5 +1,33 @@ -use futures::stream::FuturesUnordered; -use futures::StreamExt; +//! start the bootstrapping system using [`start_bootstrap_server`] +//! Once your node will be ready, you may want other to bootstrap from you. +//! +//! # Listener +//! +//! Runs in the server-dedication tokio async runtime +//! Accepts bootstrap connections in an async-loop +//! Upon connection, pushes the accepted connection onto a channel for the worker loop to consume +//! +//! # Updater +//! +//! Runs on a dedicated thread. Signal sent my manager stop method terminates the thread. +//! Shares an Arc> guarded list of white and blacklists with the main worker. +//! Periodically does a read-only check to see if list needs updating. +//! Creates an updated list then swaps it out with write-locked list +//! Assuming no errors in code, this is the only write occurance, and is only a pointer-swap +//! under the hood, making write contention virtually non-existant. +//! +//! # Worker loop +//! +//! 1. Checks if the stopper has been invoked. +//! 2. Checks if the client is permited under the white/black list rules +//! 3. Checks if there are not too many active sessions already +//! 4. Checks if the client has attempted too recently +//! 5. All checks have passed: spawn a thread on which to run the bootstrap session +//! This thread creates a new tokio runtime, and runs it with `block_on` +mod white_black_list; +use white_black_list::*; + +use crossbeam::channel::{Receiver, Select, SendError, Sender}; use humantime::format_duration; use massa_async_pool::AsyncMessageId; use massa_consensus_exports::{bootstrapable_graph::BootstrapableGraph, ConsensusController}; @@ -14,365 +42,512 @@ use massa_signature::KeyPair; use massa_time::MassaTime; use parking_lot::RwLock; use std::{ - collections::{hash_map, HashMap, HashSet}, - io, + collections::HashMap, net::{IpAddr, SocketAddr}, - path::PathBuf, sync::Arc, + thread, time::{Duration, Instant}, }; -use tokio::{sync::mpsc, task::JoinHandle}; -use tracing::{debug, info, warn}; +use tokio::runtime::{self, Handle, Runtime}; +use tracing::{debug, error, info, warn}; use crate::{ error::BootstrapError, messages::{BootstrapClientMessage, BootstrapServerMessage}, server_binder::BootstrapServerBinder, - tools::normalize_ip, + types::{Duplex, Listener}, BootstrapConfig, Establisher, }; +/// Abstraction layer over data produced by the listener, and transported +/// over to the worker via a channel +type BsConn = (Duplex, SocketAddr); /// handle on the bootstrap server pub struct BootstrapManager { - join_handle: JoinHandle>, - manager_tx: mpsc::Sender<()>, + update_handle: std::thread::JoinHandle>>, + // need to preserve the listener handle up to here to prevent it being destroyed + _listen_handle: std::thread::JoinHandle, Box>>, + main_handle: std::thread::JoinHandle>>, + listen_stopper_tx: crossbeam::channel::Sender<()>, + update_stopper_tx: crossbeam::channel::Sender<()>, } impl BootstrapManager { /// stop the bootstrap server - pub async fn stop(self) -> Result<(), BootstrapError> { + pub async fn stop(self) -> Result<(), Box> { massa_trace!("bootstrap.lib.stop", {}); - if self.manager_tx.send(()).await.is_err() { + if self.listen_stopper_tx.send(()).is_err() { warn!("bootstrap server already dropped"); } - let _ = self.join_handle.await?; - Ok(()) + if self.update_stopper_tx.send(()).is_err() { + warn!("bootstrap ip-list-updater already dropped"); + } + // TODO?: handle join errors. + + // when the runtime is dropped at the end of this stop, the listener is auto-aborted + + // unwrap() effectively passes up a panic from the thread being handled + self.update_handle.join().unwrap()?; + + // unwrap() effectively passes up a panic from the thread being handled + self.main_handle.join().unwrap() } } -#[allow(clippy::too_many_arguments)] -/// TODO merging the command senders into one channel structure may allow removing that allow -/// -/// start a bootstrap server. -/// Once your node will be ready, you may want other to bootstrap from you. +/// See module level documentation for details pub async fn start_bootstrap_server( consensus_controller: Box, network_command_sender: NetworkCommandSender, final_state: Arc>, - bootstrap_config: BootstrapConfig, - establisher: Establisher, + config: BootstrapConfig, + mut establisher: Establisher, keypair: KeyPair, version: Version, -) -> Result, BootstrapError> { +) -> Result, Box> { massa_trace!("bootstrap.lib.start_bootstrap_server", {}); - if let Some(bind) = bootstrap_config.bind { - let (manager_tx, manager_rx) = mpsc::channel::<()>(1); + let Some(listen_addr) = config.listen_addr else { + return Ok(None); + }; + + // TODO(low prio): See if a zero capacity channel model can work + let (listen_stopper_tx, listen_stopper_rx) = crossbeam::channel::bounded::<()>(1); + let (update_stopper_tx, update_stopper_rx) = crossbeam::channel::bounded::<()>(1); + + let Ok(max_bootstraps) = config.max_simultaneous_bootstraps.try_into() else { + return Err(BootstrapError::GeneralError("Fail to convert u32 to usize".to_string()).into()); + }; - let join_handle = tokio::spawn(async move { + // We use a runtime dedicated to the bootstrap part of the system + // This should help keep it isolated from the rest of the overall binary + let Ok(bs_server_runtime) = runtime::Builder::new_multi_thread() + .enable_io() + .enable_time() + .thread_name("bootstrap-global-runtime-worker") + .thread_keep_alive(Duration::from_millis(u64::MAX)) + .build() else { + return Err(Box::new(BootstrapError::GeneralError("Failed to creato bootstrap async runtime".to_string()))); + }; + + let listener = establisher + .get_listener(listen_addr) + .await + .map_err(BootstrapError::IoError)?; + + // This is the primary interface between the async-listener, and the "sync" worker + let (listener_tx, listener_rx) = crossbeam::channel::bounded::(max_bootstraps * 2); + + let white_black_list = SharedWhiteBlackList::new( + config.bootstrap_whitelist_path.clone(), + config.bootstrap_blacklist_path.clone(), + )?; + + let updater_lists = white_black_list.clone(); + let update_handle = thread::Builder::new() + .name("wb_list_updater".to_string()) + .spawn(move || { + let res = BootstrapServer::run_updater( + updater_lists, + config.cache_duration.into(), + update_stopper_rx, + ); + match res { + Ok(_) => info!("ip white/blacklist updater exited cleanly"), + Err(ref er) => error!("updater exited with error: {}", er), + }; + res + }) + // the non-builder spawn doesn't return a Result, and documentation states that + // it's an error at the OS level. + .unwrap(); + let listen_rt_handle = bs_server_runtime.handle().clone(); + let listen_handle = thread::Builder::new() + .name("bs_listener".to_string()) + .spawn(move || { + let res = + listen_rt_handle.block_on(BootstrapServer::run_listener(listener, listener_tx)); + res + }) + // the non-builder spawn doesn't return a Result, and documentation states that + // it's an error at the OS level. + .unwrap(); + + let main_handle = std::thread::Builder::new() + .name("bs-main-loop".to_string()) + .spawn(move || { BootstrapServer { consensus_controller, network_command_sender, final_state, - establisher, - manager_rx, - bind, + listener_rx, + listen_stopper_rx, + white_black_list, keypair, version, - ip_hist_map: HashMap::with_capacity(bootstrap_config.ip_list_max_size), - bootstrap_config, + ip_hist_map: HashMap::with_capacity(config.ip_list_max_size), + bootstrap_config: config, + bs_server_runtime, } - .run() - .await - }); - Ok(Some(BootstrapManager { - join_handle, - manager_tx, - })) - } else { - Ok(None) - } + .run_loop(max_bootstraps) + }) + // the non-builder spawn doesn't return a Result, and documentation states that + // it's an error at the OS level. + .unwrap(); + // Give the runtime to the bootstrap manager, otherwise it will be dropped, forcibly aborting the spawned tasks. + // TODO: make the tasks sync, so the runtime is redundant + Ok(Some(BootstrapManager { + update_handle, + _listen_handle: listen_handle, + main_handle, + listen_stopper_tx, + update_stopper_tx, + })) } -struct BootstrapServer { +struct BootstrapServer<'a> { consensus_controller: Box, network_command_sender: NetworkCommandSender, final_state: Arc>, - establisher: Establisher, - manager_rx: mpsc::Receiver<()>, - bind: SocketAddr, + listener_rx: Receiver, + listen_stopper_rx: Receiver<()>, + white_black_list: SharedWhiteBlackList<'a>, keypair: KeyPair, bootstrap_config: BootstrapConfig, version: Version, ip_hist_map: HashMap, + bs_server_runtime: Runtime, } -#[allow(clippy::result_large_err)] -#[allow(clippy::type_complexity)] -fn reload_whitelist_blacklist( - whitelist_path: &PathBuf, - blacklist_path: &PathBuf, -) -> Result<(Option>, Option>), BootstrapError> { - let whitelist = if let Ok(whitelist) = std::fs::read_to_string(whitelist_path) { - Some( - serde_json::from_str::>(whitelist.as_str()) - .map_err(|_| { - BootstrapError::GeneralError(String::from( - "Failed to parse bootstrap whitelist", - )) - })? - .into_iter() - .map(normalize_ip) - .collect(), - ) - } else { - None - }; - - let blacklist = if let Ok(blacklist) = std::fs::read_to_string(blacklist_path) { - Some( - serde_json::from_str::>(blacklist.as_str()) - .map_err(|_| { - BootstrapError::GeneralError(String::from( - "Failed to parse bootstrap blacklist", - )) - })? - .into_iter() - .map(normalize_ip) - .collect(), - ) - } else { - None - }; - Ok((whitelist, blacklist)) -} - -impl BootstrapServer { - pub async fn run(mut self) -> Result<(), BootstrapError> { - debug!("starting bootstrap server"); - massa_trace!("bootstrap.lib.run", {}); - let mut listener = self.establisher.get_listener(self.bind).await?; - let mut bootstrap_sessions = FuturesUnordered::new(); - let cache_timeout = self.bootstrap_config.cache_duration.to_duration(); - let (mut whitelist, mut blacklist) = reload_whitelist_blacklist( - &self.bootstrap_config.bootstrap_whitelist_path, - &self.bootstrap_config.bootstrap_blacklist_path, - )?; - let mut cache_interval = tokio::time::interval(cache_timeout); - let per_ip_min_interval = self.bootstrap_config.per_ip_min_interval.to_duration(); - /* - select! without the "biased" modifier will randomly select the 1st branch to check, - then will check the next ones in the order they are written. - We choose this order: - * manager commands to avoid waiting too long to stop in case of contention - * cache timeout to avoid skipping timeouts cleanup tasks (they are relatively rare) - * bootstrap sessions (rare) - * listener: most frequent => last - */ +impl BootstrapServer<'_> { + fn run_updater( + mut list: SharedWhiteBlackList<'_>, + interval: Duration, + stopper: Receiver<()>, + ) -> Result<(), Box> { loop { - massa_trace!("bootstrap.lib.run.select", {}); - tokio::select! { - // managed commands - _ = self.manager_rx.recv() => { - massa_trace!("bootstrap.lib.run.select.manager", {}); - break + std::thread::sleep(interval); + match stopper.try_recv() { + Ok(()) => return Ok(()), + Err(e) => match e { + crossbeam::channel::TryRecvError::Empty => {} + crossbeam::channel::TryRecvError::Disconnected => { + return Err(BootstrapError::GeneralError( + "update stopper unexpected disconnect".to_string(), + ) + .into()); + } }, + } + // TODO: loop interval here is sleep + update time. Implement a state-based, + // rather than time-based, trigger (such as delta-count); + list.update()?; + } + } - // Whitelist cache timeout - _ = cache_interval.tick() => { - (whitelist, blacklist) = reload_whitelist_blacklist(&self.bootstrap_config.bootstrap_whitelist_path, &self.bootstrap_config.bootstrap_blacklist_path)?; + /// Listens on a channel for incoming connections, and loads them onto a crossbeam channel + /// for the main-loop to process. + /// + /// Ok(Ok(())) listener closed without issue + /// Ok(Err((dplx, address))) listener accepted a connection then tried sending on a disconnected channel + /// Err(..) Error accepting a connection + /// TODO: Integrate the listener into the bootstrap-main-loop + async fn run_listener( + mut listener: Listener, + listener_tx: Sender, + ) -> Result, Box> { + loop { + let msg = listener.accept().await.map_err(BootstrapError::IoError)?; + match listener_tx.send(msg) { + Ok(_) => continue, + Err(SendError((dplx, remote_addr))) => { + warn!( + "listener channel disconnected after accepting connection from {}", + remote_addr + ); + return Ok(Err((dplx, remote_addr))); } + }; + } + } - // bootstrap session finished - Some(_) = bootstrap_sessions.next() => { - massa_trace!("bootstrap.session.finished", {"active_count": bootstrap_sessions.len()}); - } + fn run_loop(mut self, max_bootstraps: usize) -> Result<(), Box> { + let Ok(bs_loop_rt) = runtime::Builder::new_multi_thread() + .max_blocking_threads(max_bootstraps * 2) + .enable_io() + .enable_time() + .thread_name("bootstrap-main-loop-worker") + .thread_keep_alive(Duration::from_millis(u64::MAX)) + .build() else { + return Err(Box::new(BootstrapError::GeneralError("Failed to create bootstrap main-loop runtime".to_string()))); + }; - // listener - res_connection = listener.accept() => { - let (mut server, remote_addr) = if let Ok((dplx, remote_addr)) = res_connection { - let server = BootstrapServerBinder::new( - dplx, - self.keypair.clone(), - self.bootstrap_config.max_bytes_read_write, - self.bootstrap_config.max_bootstrap_message_size, - self.bootstrap_config.thread_count, - self.bootstrap_config.max_datastore_key_length, - self.bootstrap_config.randomness_size_bytes, - self.bootstrap_config.max_consensus_block_ids); - - // check whether incoming peer IP is allowed or return an error which is ignored - let Ok((server, remote_addr)) = self.is_ip_allowed(remote_addr, server, &whitelist, &blacklist).await else { - continue; - }; - (server, remote_addr) - } else { - continue - }; + // Use the strong-count of this variable to track the session count + let bootstrap_sessions_counter: Arc<()> = Arc::new(()); + let per_ip_min_interval = self.bootstrap_config.per_ip_min_interval.to_duration(); + let mut selector = Select::new(); + selector.recv(&self.listen_stopper_rx); + selector.recv(&self.listener_rx); + // TODO: Work out how to integration-test this + loop { + // block until we have a connection to work with, or break out of main-loop + // if a stop-signal is received + let Some((dplx, remote_addr)) = self.receive_connection(&mut selector).map_err(BootstrapError::GeneralError)? else {break}; + // claim a slot in the max_bootstrap_sessions + let server = BootstrapServerBinder::new( + dplx, + self.keypair.clone(), + (&self.bootstrap_config).into(), + ); + + // check whether incoming peer IP is allowed. + if let Err(error_msg) = self.white_black_list.is_ip_allowed(&remote_addr) { + server.close_and_send_error( + self.bs_server_runtime.handle().clone(), + error_msg.to_string(), + remote_addr, + move || {}, + ); + continue; + }; - if bootstrap_sessions.len() < self.bootstrap_config.max_simultaneous_bootstraps.try_into().map_err(|_| BootstrapError::GeneralError("Fail to convert u32 to usize".to_string()))? { - massa_trace!("bootstrap.lib.run.select.accept", {"remote_addr": remote_addr}); - let now = Instant::now(); - - // clear IP history if necessary - if self.ip_hist_map.len() > self.bootstrap_config.ip_list_max_size { - self.ip_hist_map.retain(|_k, v| now.duration_since(*v) <= per_ip_min_interval); - if self.ip_hist_map.len() > self.bootstrap_config.ip_list_max_size { - // too many IPs are spamming us: clear cache - warn!("high bootstrap load: at least {} different IPs attempted bootstrap in the last {}", self.ip_hist_map.len(),format_duration(self.bootstrap_config.per_ip_min_interval.to_duration()).to_string()); - self.ip_hist_map.clear(); - } - } - - // check IP's bootstrap attempt history - match self.ip_hist_map.entry(remote_addr.ip()) { - hash_map::Entry::Occupied(mut occ) => { - if now.duration_since(*occ.get()) <= per_ip_min_interval { - let _ = match tokio::time::timeout(self.bootstrap_config.write_error_timeout.into(), server.send(BootstrapServerMessage::BootstrapError { - error: - format!("Your last bootstrap on this server was {} ago and you have to wait {} before retrying.", format_duration(occ.get().elapsed()), format_duration(per_ip_min_interval.saturating_sub(occ.get().elapsed()))) - })).await { - Err(_) => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "bootstrap error too early retry bootstrap send timed out").into()), - Ok(Err(e)) => Err(e), - Ok(Ok(_)) => Ok(()), - }; - // in list, non-expired => refuse - massa_trace!("bootstrap.lib.run.select.accept.refuse_limit", {"remote_addr": remote_addr}); - continue; - } else { - // in list, expired - occ.insert(now); - } - }, - hash_map::Entry::Vacant(vac) => { - vac.insert(now); - } - } - - // load cache if absent - // if bootstrap_data.is_none() { - // massa_trace!("bootstrap.lib.run.select.accept.cache_load.start", {}); - - // // Note that all requests are done simultaneously except for the consensus graph that is done after the others. - // // This is done to ensure that the execution bootstrap state is older than the consensus state. - // // If the consensus state snapshot is older than the execution state snapshot, - // // the execution final ledger will be in the future after bootstrap, which causes an inconsistency. - // bootstrap_data = Some((data_graph, data_peers, self.final_state.clone())); - // cache_timer.set(sleep(cache_timeout)); - // } - massa_trace!("bootstrap.lib.run.select.accept.cache_available", {}); - - // launch bootstrap - - let version = self.version; - let data_execution = self.final_state.clone(); - let consensus_command_sender = self.consensus_controller.clone(); - let network_command_sender = self.network_command_sender.clone(); - let config = self.bootstrap_config.clone(); - - bootstrap_sessions.push(async move { - debug!("awaiting on bootstrap of peer {}", remote_addr); - match tokio::time::timeout(config.bootstrap_timeout.into(), manage_bootstrap(&config, &mut server, data_execution, version, consensus_command_sender, network_command_sender)).await { - Ok(mgmt) => match mgmt { - Ok(_) => { - info!("bootstrapped peer {}", remote_addr) - }, - Err(BootstrapError::ReceivedError(error)) => debug!("bootstrap serving error received from peer {}: {}", remote_addr, error), - Err(err) => { - debug!("bootstrap serving error for peer {}: {}", remote_addr, err); - // We allow unused result because we don't care if an error is thrown when sending the error message to the server we will close the socket anyway. - let _ = tokio::time::timeout(config.write_error_timeout.into(), server.send(BootstrapServerMessage::BootstrapError { error: err.to_string() })).await; - }, - } - Err(_timeout) => { - debug!("bootstrap timeout for peer {}", remote_addr); - // We allow unused result because we don't care if an error is thrown when sending the error message to the server we will close the socket anyway. - let _ = tokio::time::timeout(config.write_error_timeout.into(), server.send(BootstrapServerMessage::BootstrapError { error: format!("Bootstrap process timedout ({})", format_duration(config.bootstrap_timeout.to_duration())) })).await; - } - } - - }); - massa_trace!("bootstrap.session.started", {"active_count": bootstrap_sessions.len()}); - } else { - let _ = match tokio::time::timeout(self.bootstrap_config.write_error_timeout.into(), server.send(BootstrapServerMessage::BootstrapError { - error: "Bootstrap failed because the bootstrap server currently has no slots available.".to_string() - })).await { - Err(_) => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "bootstrap error no available slots send timed out").into()), - Ok(Err(e)) => Err(e), - Ok(Ok(_)) => Ok(()), - }; - debug!("did not bootstrap {}: no available slots", remote_addr); + // the `- 1` is to account for the top-level Arc that is created at the top + // of this method. subsequent counts correspond to each `clone` that is passed + // into a thread + // TODO: If we don't find a way to handle the counting automagically, make + // a dedicated wrapper-type with doc-comments, manual drop impl that + // integrates logging, etc... + if Arc::strong_count(&bootstrap_sessions_counter) - 1 < max_bootstraps { + massa_trace!("bootstrap.lib.run.select.accept", { + "remote_addr": remote_addr + }); + let now = Instant::now(); + + // clear IP history if necessary + if self.ip_hist_map.len() > self.bootstrap_config.ip_list_max_size { + self.ip_hist_map + .retain(|_k, v| now.duration_since(*v) <= per_ip_min_interval); + if self.ip_hist_map.len() > self.bootstrap_config.ip_list_max_size { + // too many IPs are spamming us: clear cache + warn!("high bootstrap load: at least {} different IPs attempted bootstrap in the last {}", self.ip_hist_map.len(),format_duration(self.bootstrap_config.per_ip_min_interval.to_duration()).to_string()); + self.ip_hist_map.clear(); } } + + // check IP's bootstrap attempt history + if let Err(msg) = BootstrapServer::greedy_client_check( + &mut self.ip_hist_map, + remote_addr, + now, + per_ip_min_interval, + ) { + // Client has been too greedy: send out the bad-news :( + let msg = format!( + "Your last bootstrap on this server was {} ago and you have to wait {} before retrying.", + format_duration(msg), + format_duration(per_ip_min_interval.saturating_sub(msg)) + ); + let tracer = move || { + massa_trace!("bootstrap.lib.run.select.accept.refuse_limit", { + "remote_addr": remote_addr + }) + }; + server.close_and_send_error( + self.bs_server_runtime.handle().clone(), + msg, + remote_addr, + tracer, + ); + continue; + }; // Clients Option is good, and has been updated + + massa_trace!("bootstrap.lib.run.select.accept.cache_available", {}); + + // launch bootstrap + let version = self.version; + let data_execution = self.final_state.clone(); + let consensus_command_sender = self.consensus_controller.clone(); + let network_command_sender = self.network_command_sender.clone(); + let config = self.bootstrap_config.clone(); + + let bootstrap_count_token = bootstrap_sessions_counter.clone(); + let session_handle = bs_loop_rt.handle().clone(); + let _ = thread::Builder::new() + .name(format!("bootstrap thread, peer: {}", remote_addr)) + .spawn(move || { + run_bootstrap_session( + server, + bootstrap_count_token, + config, + remote_addr, + data_execution, + version, + consensus_command_sender, + network_command_sender, + session_handle, + ) + }); + + massa_trace!("bootstrap.session.started", { + "active_count": Arc::strong_count(&bootstrap_sessions_counter) - 1 + }); + } else { + server.close_and_send_error( + self.bs_server_runtime.handle().clone(), + "Bootstrap failed because the bootstrap server currently has no slots available.".to_string(), + remote_addr, + move || debug!("did not bootstrap {}: no available slots", remote_addr) + ); } } - // wait for bootstrap sessions to finish - while bootstrap_sessions.next().await.is_some() {} + // Give any remaining processes 20 seconds to clean up, otherwise force them to shutdown + bs_loop_rt.shutdown_timeout(Duration::from_secs(20)); Ok(()) } - #[cfg(test)] - // TODO we didn't test whether the peer IP address is banned - async fn is_ip_allowed( - &self, - remote_addr: SocketAddr, - server: BootstrapServerBinder, - _whitelist: &Option>, - _blacklist: &Option>, - ) -> io::Result<(BootstrapServerBinder, SocketAddr)> { - Ok((server, remote_addr)) - } - - #[cfg(not(test))] - // whether the peer IP address is banned - async fn is_ip_allowed( - &self, - remote_addr: SocketAddr, - mut server: BootstrapServerBinder, - whitelist: &Option>, - blacklist: &Option>, - ) -> io::Result<(BootstrapServerBinder, SocketAddr)> { - let ip = normalize_ip(remote_addr.ip()); - // whether the peer IP address is blacklisted - let not_allowed_msg = if let Some(ip_list) = &blacklist && ip_list.contains(&ip) { - massa_trace!("bootstrap.lib.run.select.accept.refuse_blacklisted", {"remote_addr": remote_addr}); - Some(format!("IP {} is blacklisted", &ip)) - // whether the peer IP address is not present in the whitelist - } else if let Some(ip_list) = &whitelist && !ip_list.contains(&ip){ - massa_trace!("bootstrap.lib.run.select.accept.refuse_not_whitelisted", {"remote_addr": remote_addr}); - Some(format!("A whitelist exists and the IP {} is not whitelisted", &ip)) + /// These are the steps to ensure that a connection is only processed if the server is active: + /// + /// - 1. Block until _something_ is ready + /// - 2. If that something is a stop-signal, stop + /// - 3. If that something is anything else: + /// - 3.a. double check the stop-signal is absent + /// - 3.b. If present, fall-back to the stop behaviour + /// - 3.c. If absent, all's clear to rock-n-roll. + fn receive_connection(&self, selector: &mut Select) -> Result, String> { + // 1. Block until _something_ is ready + let rdy = selector.ready(); + + // 2. If that something is a stop-signal, stop + // from `crossbeam::Select::read()` documentation: + // "Note that this method might return with success spuriously, so it’s a good idea + // to always double check if the operation is really ready." + if rdy == 0 && self.listen_stopper_rx.try_recv().is_ok() { + return Ok(None); + // - 3. If that something is anything else: } else { - None - }; + massa_trace!("bootstrap.lib.run.select", {}); - // whether the peer IP address is not allowed, send back an error message - if let Some(error_msg) = not_allowed_msg { - let _ = match tokio::time::timeout( - self.bootstrap_config.write_error_timeout.into(), - server.send(BootstrapServerMessage::BootstrapError { - error: error_msg.clone(), - }), - ) - .await - { - Err(_) => Err(std::io::Error::new( - std::io::ErrorKind::PermissionDenied, - format!("{} timed out", &error_msg), - ) - .into()), - Ok(Err(e)) => Err(e), - Ok(Ok(_)) => Ok(()), + // - 3.a. double check the stop-signal is absent + let stop = self.listen_stopper_rx.try_recv(); + if unlikely(stop.is_ok()) { + massa_trace!("bootstrap.lib.run.select.manager", {}); + // 3.b. If present, fall-back to the stop behaviour + return Ok(None); + } else if unlikely(stop == Err(crossbeam::channel::TryRecvError::Disconnected)) { + return Err("Unexpected stop-channel disconnection".to_string()); }; - return Err(std::io::Error::new( - std::io::ErrorKind::PermissionDenied, - error_msg, - )); } + // - 3.c. If absent, all's clear to rock-n-roll. + let msg = match self.listener_rx.try_recv() { + Ok(msg) => msg, + Err(try_rcv_err) => match try_rcv_err { + crossbeam::channel::TryRecvError::Empty => return Ok(None), + crossbeam::channel::TryRecvError::Disconnected => { + return Err("listener recv channel disconnected unexpectedly".to_string()) + } + }, + }; + Ok(Some(msg)) + } - Ok((server, remote_addr)) + /// Checks latest attempt. If too recent, provides the bad news (as an error). + /// Updates the latest attempt to "now" if it's all good. + /// + /// # Error + /// The elapsed time which is insufficient + fn greedy_client_check( + ip_hist_map: &mut HashMap, + remote_addr: SocketAddr, + now: Instant, + per_ip_min_interval: Duration, + ) -> Result<(), Duration> { + let mut res = Ok(()); + ip_hist_map + .entry(remote_addr.ip()) + .and_modify(|occ| { + // Well, let's only update the latest + if now.duration_since(*occ) <= per_ip_min_interval { + res = Err(occ.elapsed()); + } else { + // in list, expired + *occ = now; + } + }) + .or_insert(now); + res } } +/// To be called from a `thread::spawn` invocation +/// +/// Runs the bootstrap management in a dedicated thread, handling the async by using +/// a multi-thread-aware tokio runtime (the bs-main-loop runtime, to be exact). When this +/// function blocks in the `block_on`, it should thread-block, and switch to another session +/// +/// The arc_counter variable is used as a proxy to keep track the number of active bootstrap +/// sessions. +#[allow(clippy::too_many_arguments)] +fn run_bootstrap_session( + mut server: BootstrapServerBinder, + arc_counter: Arc<()>, + config: BootstrapConfig, + remote_addr: SocketAddr, + data_execution: Arc>, + version: Version, + consensus_command_sender: Box, + network_command_sender: NetworkCommandSender, + bs_loop_rt_handle: Handle, +) { + debug!("running bootstrap for peer {}", remote_addr); + bs_loop_rt_handle.block_on(async move { + let res = tokio::time::timeout( + config.bootstrap_timeout.into(), + manage_bootstrap( + &config, + &mut server, + data_execution, + version, + consensus_command_sender, + network_command_sender, + ), + ) + .await; + // This drop allows the server to accept new connections before having to complete the error notifications + // account for this session being finished, as well as the root-instance + massa_trace!("bootstrap.session.finished", { + "sessions_remaining": Arc::strong_count(&arc_counter) - 2 + }); + drop(arc_counter); + match res { + Ok(mgmt) => match mgmt { + Ok(_) => { + info!("bootstrapped peer {}", remote_addr); + } + Err(BootstrapError::ReceivedError(error)) => debug!( + "bootstrap serving error received from peer {}: {}", + remote_addr, error + ), + Err(err) => { + debug!("bootstrap serving error for peer {}: {}", remote_addr, err); + // We allow unused result because we don't care if an error is thrown when + // sending the error message to the server we will close the socket anyway. + let _ = server.send_error(err.to_string()).await; + } + }, + Err(_timeout) => { + debug!("bootstrap timeout for peer {}", remote_addr); + // We allow unused result because we don't care if an error is thrown when + // sending the error message to the server we will close the socket anyway. + let _ = server + .send_error(format!( + "Bootstrap process timedout ({})", + format_duration(config.bootstrap_timeout.to_duration()) + )) + .await; + } + } + }); +} + #[allow(clippy::too_many_arguments)] pub async fn stream_bootstrap_information( server: &mut BootstrapServerBinder, @@ -467,11 +642,9 @@ pub async fn stream_bootstrap_information( } if slot_too_old { - match tokio::time::timeout( - write_timeout, - server.send(BootstrapServerMessage::SlotTooOld), - ) - .await + match server + .send_msg(write_timeout, BootstrapServerMessage::SlotTooOld) + .await { Err(_) => Err(std::io::Error::new( std::io::ErrorKind::TimedOut, @@ -534,11 +707,9 @@ pub async fn stream_bootstrap_information( && final_state_changes_step.finished() && last_consensus_step.finished() { - match tokio::time::timeout( - write_timeout, - server.send(BootstrapServerMessage::BootstrapFinished), - ) - .await + match server + .send_msg(write_timeout, BootstrapServerMessage::BootstrapFinished) + .await { Err(_) => Err(std::io::Error::new( std::io::ErrorKind::TimedOut, @@ -552,21 +723,22 @@ pub async fn stream_bootstrap_information( } // At this point we know that consensus, final state or both are not finished - match tokio::time::timeout( - write_timeout, - server.send(BootstrapServerMessage::BootstrapPart { - slot: current_slot, - ledger_part, - async_pool_part, - pos_cycle_part, - pos_credits_part, - exec_ops_part, - final_state_changes, - consensus_part, - consensus_outdated_ids, - }), - ) - .await + match server + .send_msg( + write_timeout, + BootstrapServerMessage::BootstrapPart { + slot: current_slot, + ledger_part, + async_pool_part, + pos_cycle_part, + pos_credits_part, + exec_ops_part, + final_state_changes, + consensus_part, + consensus_outdated_ids, + }, + ) + .await { Err(_) => Err(std::io::Error::new( std::io::ErrorKind::TimedOut, @@ -580,7 +752,6 @@ pub async fn stream_bootstrap_information( Ok(()) } -#[allow(clippy::manual_async_fn)] #[allow(clippy::too_many_arguments)] async fn manage_bootstrap( bootstrap_config: &BootstrapConfig, @@ -624,14 +795,15 @@ async fn manage_bootstrap( // Sync clocks. let server_time = MassaTime::now()?; - match tokio::time::timeout( - write_timeout, - server.send(BootstrapServerMessage::BootstrapTime { - server_time, - version, - }), - ) - .await + match server + .send_msg( + write_timeout, + BootstrapServerMessage::BootstrapTime { + server_time, + version, + }, + ) + .await { Err(_) => Err(std::io::Error::new( std::io::ErrorKind::TimedOut, @@ -648,13 +820,14 @@ async fn manage_bootstrap( Ok(Err(e)) => break Err(e), Ok(Ok(msg)) => match msg { BootstrapClientMessage::AskBootstrapPeers => { - match tokio::time::timeout( - write_timeout, - server.send(BootstrapServerMessage::BootstrapPeers { - peers: network_command_sender.get_bootstrap_peers().await?, - }), - ) - .await + match server + .send_msg( + write_timeout, + BootstrapServerMessage::BootstrapPeers { + peers: network_command_sender.get_bootstrap_peers().await?, + }, + ) + .await { Err(_) => Err(std::io::Error::new( std::io::ErrorKind::TimedOut, @@ -697,3 +870,26 @@ async fn manage_bootstrap( }; } } + +// Stable means of providing compiler optimisation hints +// Also provides a self-documenting tool to communicate likely/unlikely code-paths +// https://users.rust-lang.org/t/compiler-hint-for-unlikely-likely-for-if-branches/62102/4 +#[inline] +#[cold] +fn cold() {} + +#[inline] +fn _likely(b: bool) -> bool { + if !b { + cold() + } + b +} + +#[inline] +fn unlikely(b: bool) -> bool { + if b { + cold() + } + b +} diff --git a/massa-bootstrap/src/server/white_black_list.rs b/massa-bootstrap/src/server/white_black_list.rs new file mode 100644 index 00000000000..49491015e56 --- /dev/null +++ b/massa-bootstrap/src/server/white_black_list.rs @@ -0,0 +1,132 @@ +use std::{ + borrow::Cow, + collections::HashSet, + net::{IpAddr, SocketAddr}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use crate::error::BootstrapError; +use massa_logging::massa_trace; +use parking_lot::RwLock; +use tracing::log::error; + +use crate::tools::normalize_ip; + +/// A wrapper around the white/black lists that allows efficient sharing between threads +// TODO: don't clone the path-bufs... +#[derive(Clone)] +pub(crate) struct SharedWhiteBlackList<'a> { + inner: Arc>, + white_path: Cow<'a, Path>, + black_path: Cow<'a, Path>, +} + +impl SharedWhiteBlackList<'_> { + pub(crate) fn new( + white_path: PathBuf, + black_path: PathBuf, + ) -> Result> { + let (white_list, black_list) = + WhiteBlackListInner::load_white_black_lists(&white_path, &black_path)?; + Ok(Self { + inner: Arc::new(RwLock::new(WhiteBlackListInner { + white_list, + black_list, + })), + white_path: Cow::from(white_path), + black_path: Cow::from(black_path), + }) + } + + /// Checks if the white/black list is up to date with a read-lock + /// Creates a new list, and replaces the old one in a write-lock + pub(crate) fn update(&mut self) -> Result<(), Box> { + let read_lock = self.inner.read(); + let (new_white, new_black) = + WhiteBlackListInner::load_white_black_lists(&self.white_path, &self.black_path)?; + let white_delta = new_white != read_lock.white_list; + let black_delta = new_black != read_lock.black_list; + if white_delta || black_delta { + // Ideally this scope would be atomic + let mut mut_inner = { + drop(read_lock); + self.inner.write() + }; + + if white_delta { + mut_inner.white_list = new_white; + } + if black_delta { + mut_inner.black_list = new_black; + } + } + Ok(()) + } + + #[cfg_attr(test, allow(unreachable_code, unused_variables))] + pub(crate) fn is_ip_allowed( + &self, + remote_addr: &SocketAddr, + ) -> Result<(), Box> { + #[cfg(test)] + return Ok(()); + + let ip = normalize_ip(remote_addr.ip()); + // whether the peer IP address is blacklisted + let read = self.inner.read(); + if let Some(ip_list) = &read.black_list && ip_list.contains(&ip) { + massa_trace!("bootstrap.lib.run.select.accept.refuse_blacklisted", {"remote_addr": remote_addr}); + Err(Box::new(BootstrapError::BlackListed(ip.to_string()))) + // whether the peer IP address is not present in the whitelist + } else if let Some(ip_list) = &read.white_list && !ip_list.contains(&ip) { + massa_trace!("bootstrap.lib.run.select.accept.refuse_not_whitelisted", {"remote_addr": remote_addr}); + Err(Box::new(BootstrapError::WhiteListed(ip.to_string()))) + } else { + Ok(()) + } + } +} + +impl WhiteBlackListInner { + #[allow(clippy::result_large_err)] + #[allow(clippy::type_complexity)] + fn load_white_black_lists( + whitelist_path: &Path, + blacklist_path: &Path, + ) -> Result<(Option>, Option>), Box> { + let white_list = Self::load_list(whitelist_path)?; + let black_list = Self::load_list(blacklist_path)?; + Ok((white_list, black_list)) + } + + fn load_list(list_path: &Path) -> Result>, Box> { + match std::fs::read_to_string(list_path) { + Err(e) => { + error!("error on load whitelist/blacklist file : {}", e); + Ok(None) + } + Ok(list) => { + let res = Some( + serde_json::from_str::>(list.as_str()) + .map_err(|e| { + BootstrapError::InitListError(format!( + "Failed to parse bootstrap whitelist : {}", + e + )) + })? + .into_iter() + .map(normalize_ip) + .collect(), + ); + Ok(res) + } + } + } +} + +#[derive(Default)] +pub(crate) struct WhiteBlackListInner { + white_list: Option>, + black_list: Option>, +} diff --git a/massa-bootstrap/src/server_binder.rs b/massa-bootstrap/src/server_binder.rs index 1d42326b42d..8ceb5bcf694 100644 --- a/massa-bootstrap/src/server_binder.rs +++ b/massa-bootstrap/src/server_binder.rs @@ -6,6 +6,7 @@ use crate::messages::{ BootstrapClientMessage, BootstrapClientMessageDeserializer, BootstrapServerMessage, BootstrapServerMessageSerializer, }; +use crate::settings::BootstrapSrvBindCfg; use async_speed_limit::clock::StandardClock; use async_speed_limit::{Limiter, Resource}; use massa_hash::Hash; @@ -14,8 +15,15 @@ use massa_models::serialization::{DeserializeMinBEInt, SerializeMinBEInt}; use massa_models::version::{Version, VersionDeserializer, VersionSerializer}; use massa_serialization::{DeserializeError, Deserializer, Serializer}; use massa_signature::KeyPair; +use massa_time::MassaTime; use std::convert::TryInto; +use std::net::SocketAddr; +use std::thread; +use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::runtime::Handle; +use tokio::time::error::Elapsed; +use tracing::error; /// Bootstrap server binder pub struct BootstrapServerBinder { @@ -30,6 +38,7 @@ pub struct BootstrapServerBinder { prev_message: Option, version_serializer: VersionSerializer, version_deserializer: VersionDeserializer, + write_error_timeout: MassaTime, } impl BootstrapServerBinder { @@ -40,20 +49,20 @@ impl BootstrapServerBinder { /// * `local_keypair`: local node user keypair /// * `limit`: limit max bytes per second (up and down) #[allow(clippy::too_many_arguments)] - pub fn new( - duplex: Duplex, - local_keypair: KeyPair, - limit: f64, - max_bootstrap_message_size: u32, - thread_count: u8, - max_datastore_key_length: u8, - randomness_size_bytes: usize, - max_consensus_block_ids: u64, - ) -> Self { + pub fn new(duplex: Duplex, local_keypair: KeyPair, cfg: BootstrapSrvBindCfg) -> Self { + let BootstrapSrvBindCfg { + max_bytes_read_write: limit, + max_bootstrap_message_size, + thread_count, + max_datastore_key_length, + randomness_size_bytes, + consensus_bootstrap_part_size, + write_error_timeout, + } = cfg; let size_field_len = u32::be_bytes_min_length(max_bootstrap_message_size); BootstrapServerBinder { max_bootstrap_message_size, - max_consensus_block_ids, + max_consensus_block_ids: consensus_bootstrap_part_size, size_field_len, local_keypair, duplex: ::new(limit).limit(duplex), @@ -63,6 +72,7 @@ impl BootstrapServerBinder { randomness_size_bytes, version_serializer: VersionSerializer::new(), version_deserializer: VersionDeserializer::new(), + write_error_timeout, } } } @@ -95,6 +105,61 @@ impl BootstrapServerBinder { Ok(()) } + pub async fn send_msg( + &mut self, + timeout: Duration, + msg: BootstrapServerMessage, + ) -> Result, Elapsed> { + tokio::time::timeout(timeout, self.send(msg)).await + } + + /// 1. Spawns a thread + /// 2. blocks on the passed in runtime + /// 3. uses passed in handle to send a message to the client + /// 4. logs an error if the send times out + /// 5. runs the passed in closure (typically a custom logging msg) + /// + /// consumes the binding in the process + pub(crate) fn close_and_send_error( + mut self, + server_outer_rt_hnd: Handle, + msg: String, + addr: SocketAddr, + close_fn: F, + ) where + F: FnOnce() + Send + 'static, + { + thread::Builder::new() + .name("bootstrap-error-send".to_string()) + .spawn(move || { + let msg_cloned = msg.clone(); + let err_send = + server_outer_rt_hnd.block_on(async move { self.send_error(msg_cloned).await }); + match err_send { + Err(_) => error!( + "bootstrap server timed out sending error '{}' to addr {}", + msg, addr + ), + Ok(Err(e)) => error!("{}", e), + Ok(Ok(_)) => {} + } + close_fn(); + }) + // the non-builder spawn doesn't return a Result, and documentation states that + // it's an error at the OS level. + .unwrap(); + } + pub async fn send_error( + &mut self, + error: String, + ) -> Result, Elapsed> { + tokio::time::timeout( + self.write_error_timeout.into(), + self.send(BootstrapServerMessage::BootstrapError { error }), + ) + .await + } + /// Writes the next message. NOT cancel-safe pub async fn send(&mut self, msg: BootstrapServerMessage) -> Result<(), BootstrapError> { // serialize message diff --git a/massa-bootstrap/src/settings.rs b/massa-bootstrap/src/settings.rs index 695f18f5032..1d307c11ece 100644 --- a/massa-bootstrap/src/settings.rs +++ b/massa-bootstrap/src/settings.rs @@ -31,7 +31,7 @@ pub struct BootstrapConfig { /// Path to the bootstrap blacklist file. This whitelist define IPs that will not be able to bootstrap on your node. This list is optional. pub bootstrap_blacklist_path: PathBuf, /// Port to listen if we choose to allow other nodes to use us as bootstrap node. - pub bind: Option, + pub listen_addr: Option, /// connection timeout pub connect_timeout: MassaTime, /// Time allocated to managing the bootstrapping process, @@ -123,6 +123,20 @@ pub struct BootstrapConfig { pub max_consensus_block_ids: u64, } +/// Bootstrap server binding +#[allow(missing_docs)] +#[derive(Debug, Deserialize, Clone, SubStruct)] +#[parent(type = "BootstrapConfig")] +pub struct BootstrapSrvBindCfg { + pub max_bytes_read_write: f64, + pub max_bootstrap_message_size: u32, + pub thread_count: u8, + pub max_datastore_key_length: u8, + pub randomness_size_bytes: usize, + pub consensus_bootstrap_part_size: u64, + pub write_error_timeout: MassaTime, +} + /// Bootstrap client config #[allow(missing_docs)] #[derive(Debug, Deserialize, Clone, SubStruct)] diff --git a/massa-bootstrap/src/tests/binders.rs b/massa-bootstrap/src/tests/binders.rs index 03492ac207f..77111e9c238 100644 --- a/massa-bootstrap/src/tests/binders.rs +++ b/massa-bootstrap/src/tests/binders.rs @@ -1,7 +1,5 @@ -use std::str::FromStr; - use crate::messages::{BootstrapClientMessage, BootstrapServerMessage}; -use crate::settings::BootstrapClientConfig; +use crate::settings::{BootstrapClientConfig, BootstrapSrvBindCfg}; use crate::types::Duplex; use crate::BootstrapConfig; use crate::{ @@ -20,7 +18,9 @@ use massa_models::config::{ use massa_models::node::NodeId; use massa_models::version::Version; use massa_signature::{KeyPair, PublicKey}; +use massa_time::MassaTime; use serial_test::serial; +use std::str::FromStr; use tokio::io::duplex; lazy_static::lazy_static! { @@ -70,12 +70,15 @@ async fn test_binders() { let mut server = BootstrapServerBinder::new( server, server_keypair.clone(), - f64::INFINITY, - MAX_BOOTSTRAP_MESSAGE_SIZE, - THREAD_COUNT, - MAX_DATASTORE_KEY_LENGTH, - BOOTSTRAP_RANDOMNESS_SIZE_BYTES, - CONSENSUS_BOOTSTRAP_PART_SIZE, + BootstrapSrvBindCfg { + max_bytes_read_write: f64::INFINITY, + max_bootstrap_message_size: MAX_BOOTSTRAP_MESSAGE_SIZE, + thread_count: THREAD_COUNT, + max_datastore_key_length: MAX_DATASTORE_KEY_LENGTH, + randomness_size_bytes: BOOTSTRAP_RANDOMNESS_SIZE_BYTES, + consensus_bootstrap_part_size: CONSENSUS_BOOTSTRAP_PART_SIZE, + write_error_timeout: MassaTime::from_millis(1000), + }, ); let mut client = BootstrapClientBinder::test_default( client, @@ -163,15 +166,19 @@ async fn test_binders_double_send_server_works() { let (bootstrap_config, server_keypair): &(BootstrapConfig, KeyPair) = &BOOTSTRAP_CONFIG_KEYPAIR; let (client, server) = duplex(1000000); + let mut server = BootstrapServerBinder::new( server, server_keypair.clone(), - f64::INFINITY, - MAX_BOOTSTRAP_MESSAGE_SIZE, - THREAD_COUNT, - MAX_DATASTORE_KEY_LENGTH, - BOOTSTRAP_RANDOMNESS_SIZE_BYTES, - CONSENSUS_BOOTSTRAP_PART_SIZE, + BootstrapSrvBindCfg { + max_bytes_read_write: f64::INFINITY, + max_bootstrap_message_size: MAX_BOOTSTRAP_MESSAGE_SIZE, + thread_count: THREAD_COUNT, + max_datastore_key_length: MAX_DATASTORE_KEY_LENGTH, + randomness_size_bytes: BOOTSTRAP_RANDOMNESS_SIZE_BYTES, + consensus_bootstrap_part_size: CONSENSUS_BOOTSTRAP_PART_SIZE, + write_error_timeout: MassaTime::from_millis(1000), + }, ); let mut client = BootstrapClientBinder::test_default( client, @@ -247,12 +254,15 @@ async fn test_binders_try_double_send_client_works() { let mut server = BootstrapServerBinder::new( server, server_keypair.clone(), - f64::INFINITY, - MAX_BOOTSTRAP_MESSAGE_SIZE, - THREAD_COUNT, - MAX_DATASTORE_KEY_LENGTH, - BOOTSTRAP_RANDOMNESS_SIZE_BYTES, - CONSENSUS_BOOTSTRAP_PART_SIZE, + BootstrapSrvBindCfg { + max_bytes_read_write: f64::INFINITY, + max_bootstrap_message_size: MAX_BOOTSTRAP_MESSAGE_SIZE, + thread_count: THREAD_COUNT, + max_datastore_key_length: MAX_DATASTORE_KEY_LENGTH, + randomness_size_bytes: BOOTSTRAP_RANDOMNESS_SIZE_BYTES, + consensus_bootstrap_part_size: CONSENSUS_BOOTSTRAP_PART_SIZE, + write_error_timeout: MassaTime::from_millis(1000), + }, ); let mut client = BootstrapClientBinder::test_default( client, diff --git a/massa-bootstrap/src/tests/tools.rs b/massa-bootstrap/src/tests/tools.rs index 5162cde5753..d8df9b343bf 100644 --- a/massa-bootstrap/src/tests/tools.rs +++ b/massa-bootstrap/src/tests/tools.rs @@ -286,7 +286,7 @@ pub fn get_dummy_signature(s: &str) -> Signature { pub fn get_bootstrap_config(bootstrap_public_key: NodeId) -> BootstrapConfig { BootstrapConfig { - bind: Some("0.0.0.0:31244".parse().unwrap()), + listen_addr: Some("0.0.0.0:31244".parse().unwrap()), bootstrap_protocol: IpType::Both, bootstrap_timeout: 120000.into(), connect_timeout: 200.into(), diff --git a/massa-node/src/main.rs b/massa-node/src/main.rs index a234740b14d..980d3a496f5 100644 --- a/massa-node/src/main.rs +++ b/massa-node/src/main.rs @@ -178,7 +178,7 @@ async fn launch( bootstrap_protocol: SETTINGS.bootstrap.bootstrap_protocol, bootstrap_whitelist_path: SETTINGS.bootstrap.bootstrap_whitelist_path.clone(), bootstrap_blacklist_path: SETTINGS.bootstrap.bootstrap_blacklist_path.clone(), - bind: SETTINGS.bootstrap.bind, + listen_addr: SETTINGS.bootstrap.bind, connect_timeout: SETTINGS.bootstrap.connect_timeout, bootstrap_timeout: SETTINGS.bootstrap.bootstrap_timeout, read_timeout: SETTINGS.bootstrap.read_timeout,