From 140d2d163d51505b79bee9bc965ac0bf7043591b Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Fri, 15 Oct 2021 14:19:24 -0700 Subject: [PATCH] Emit warnings when locks are held for too long - Add read_with_warn() and write_with_warn() methods to RwLock - They emit warnings if lock acqusition takes over 100ms - They emit warnings if locks are held for over 100ms - Use read_with_warn() and write_with_warn() in most places RwLock's are accessed Using these methods I was able to immediately identify the cause of a deadlock during boot. --- ethportal-peertest/src/main.rs | 5 +- src/main.rs | 5 +- trin-core/src/jsonrpc/handlers.rs | 5 +- trin-core/src/lib.rs | 1 + trin-core/src/locks.rs | 153 +++++++++++++++++++++++++++++ trin-core/src/portalnet/events.rs | 3 +- trin-core/src/portalnet/overlay.rs | 29 +++--- trin-core/src/portalnet/utp.rs | 20 +++- trin-state/src/events.rs | 3 +- trin-state/src/jsonrpc.rs | 14 +-- trin-state/src/lib.rs | 12 ++- trin-state/src/network.rs | 11 +-- 12 files changed, 218 insertions(+), 43 deletions(-) create mode 100644 trin-core/src/locks.rs diff --git a/ethportal-peertest/src/main.rs b/ethportal-peertest/src/main.rs index 841b02c07..dfa61edb0 100644 --- a/ethportal-peertest/src/main.rs +++ b/ethportal-peertest/src/main.rs @@ -9,6 +9,7 @@ use ethportal_peertest::events::PortalnetEvents; use ethportal_peertest::jsonrpc::{ test_jsonrpc_endpoints_over_http, test_jsonrpc_endpoints_over_ipc, }; +use trin_core::locks::RwLoggingExt; use trin_core::portalnet::{ discovery::Discovery, overlay::{OverlayConfig, OverlayProtocol}, @@ -30,10 +31,10 @@ async fn main() -> Result<(), Box> { }; let discovery = Arc::new(RwLock::new(Discovery::new(portal_config).unwrap())); - discovery.write().await.start().await.unwrap(); + discovery.write_with_warn().await.start().await.unwrap(); let db = Arc::new(setup_overlay_db( - discovery.read().await.local_enr().node_id(), + discovery.read_with_warn().await.local_enr().node_id(), )); let overlay = Arc::new( diff --git a/src/main.rs b/src/main.rs index 2862438c8..032a560fd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ use tokio::sync::RwLock; use trin_core::jsonrpc::handlers::JsonRpcHandler; use trin_core::jsonrpc::types::PortalJsonRpcRequest; +use trin_core::locks::RwLoggingExt; use trin_core::portalnet::events::PortalnetEvents; use trin_core::{ cli::{TrinConfig, HISTORY_NETWORK, STATE_NETWORK}, @@ -52,11 +53,11 @@ async fn main() -> Result<(), Box> { let discovery = Arc::new(RwLock::new( Discovery::new(portalnet_config.clone()).unwrap(), )); - discovery.write().await.start().await.unwrap(); + discovery.write_with_warn().await.start().await.unwrap(); // Setup Overlay database let db = Arc::new(setup_overlay_db( - discovery.read().await.local_enr().node_id(), + discovery.read_with_warn().await.local_enr().node_id(), )); debug!("Selected networks to spawn: {:?}", trin_config.networks); diff --git a/trin-core/src/jsonrpc/handlers.rs b/trin-core/src/jsonrpc/handlers.rs index 319c7d245..b2b1519b3 100644 --- a/trin-core/src/jsonrpc/handlers.rs +++ b/trin-core/src/jsonrpc/handlers.rs @@ -8,6 +8,7 @@ use tokio::sync::RwLock; use crate::jsonrpc::endpoints::{Discv5Endpoint, HistoryEndpoint, StateEndpoint, TrinEndpoint}; use crate::jsonrpc::types::{HistoryJsonRpcRequest, PortalJsonRpcRequest, StateJsonRpcRequest}; +use crate::locks::RwLoggingExt; use crate::portalnet::discovery::Discovery; type Responder = mpsc::UnboundedSender>; @@ -25,9 +26,9 @@ impl JsonRpcHandler { while let Some(request) = self.portal_jsonrpc_rx.recv().await { let response: Value = match request.endpoint { TrinEndpoint::Discv5Endpoint(endpoint) => match endpoint { - Discv5Endpoint::NodeInfo => self.discovery.read().await.node_info(), + Discv5Endpoint::NodeInfo => self.discovery.read_with_warn().await.node_info(), Discv5Endpoint::RoutingTableInfo => { - self.discovery.write().await.routing_table_info() + self.discovery.write_with_warn().await.routing_table_info() } }, TrinEndpoint::HistoryEndpoint(endpoint) => { diff --git a/trin-core/src/lib.rs b/trin-core/src/lib.rs index de8f221c3..56dc0061a 100644 --- a/trin-core/src/lib.rs +++ b/trin-core/src/lib.rs @@ -3,6 +3,7 @@ extern crate lazy_static; pub mod cli; pub mod jsonrpc; +pub mod locks; pub mod portalnet; pub mod socket; pub mod utils; diff --git a/trin-core/src/locks.rs b/trin-core/src/locks.rs new file mode 100644 index 000000000..db196a816 --- /dev/null +++ b/trin-core/src/locks.rs @@ -0,0 +1,153 @@ +use futures::future::FutureExt; +use std::future::Future; +use std::marker::Sync; +use std::ops::Deref; +use std::ops::DerefMut; +use std::panic::Location; +use std::pin::Pin; +use std::time::Duration; +use std::time::Instant; +use tokio::sync::RwLock; +use tokio::sync::RwLockReadGuard; +use tokio::sync::RwLockWriteGuard; +use tokio::task::JoinHandle; + +const ACQUIRE_TIMEOUT_MS: u64 = 100; +const HOLD_TIMEOUT_MS: u64 = 100; + +/// Tries to look exactly like a T, by implementing Deref and DerefMut, but emits +/// a warning if drop() is not called soon enough. +pub struct TimedGuard { + inner: T, + acquisition_line: u32, + acquisition_file: &'static str, + acquisition_time: Instant, + sleep_task: JoinHandle<()>, +} + +impl TimedGuard { + fn new(inner: T, acquisition_line: u32, acquisition_file: &'static str) -> TimedGuard { + let now = Instant::now(); + let move_line = acquisition_line; + let move_file = acquisition_file; + let handle = tokio::spawn(async move { + sleep_then_log(move_file, move_line).await; + }); + + TimedGuard { + inner, + acquisition_line, + acquisition_file, + acquisition_time: now, + sleep_task: handle, + } + } +} + +impl Deref for TimedGuard { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for TimedGuard { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Drop for TimedGuard { + fn drop(&mut self) { + self.sleep_task.abort(); + let held_for = self.acquisition_time.elapsed().as_millis(); + if held_for > HOLD_TIMEOUT_MS.into() { + log::warn!( + "[{}:{}] lock held for too long: {}ms", + self.acquisition_file, + self.acquisition_line, + held_for, + ) + } + } +} + +async fn sleep_then_log(file: &'static str, line: u32) { + tokio::time::sleep(Duration::from_millis(HOLD_TIMEOUT_MS)).await; + log::warn!( + "[{}:{}] lock held for over {}ms, not yet released", + file, + line, + HOLD_TIMEOUT_MS.to_string() + ); +} + +async fn try_lock(fut: Fut, file: &'static str, line: u32) -> TimedGuard +where + Fut: Future, +{ + let acquire_timeout = Duration::from_millis(ACQUIRE_TIMEOUT_MS); + let sleep = tokio::time::sleep(acquire_timeout).fuse(); + let fused = fut.fuse(); + + futures::pin_mut!(sleep, fused); + + let now = Instant::now(); + + futures::select! { + _ = sleep => { + log::warn!( + "[{}:{}] waiting more than {}ms to acquire lock, still waiting", + file, line, ACQUIRE_TIMEOUT_MS, + ); + }, + guard = fused => { + return TimedGuard::new(guard, line, file); + } + } + + let guard = fused.await; + let wait_time = now.elapsed().as_millis(); + log::warn!("[{}:{}] waited {}ms to acquire lock", file, line, wait_time); + + TimedGuard::new(guard, line, file) +} + +// this is a workaround: +// - Rust does not support async in traits +// https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html +// - async_trait does not give us enough flexibility to implement #[track_caller] +// +// So we manually desugar the async functions and have them return futures +type Async<'a, T> = Pin + Send + 'a>>; + +/// These methods should be used in favor of the stock read() and write() methods. +/// +/// These methods emit warnings when the lock takes too long to acquire (meaning it's +/// likely some other user is holding onto the lock for too long). +/// +/// They also emit warnings when the returned TimedGuard is kept alive for too long. +/// (The lock is held until the returned TimedGuard is dropped, so it should be dropped +/// as soon as possible!) +pub trait RwLoggingExt { + #[track_caller] + fn read_with_warn(&self) -> Async>>; + + #[track_caller] + fn write_with_warn(&self) -> Async>>; +} + +impl RwLoggingExt for RwLock { + #[track_caller] + fn read_with_warn(&self) -> Async>> { + let loc = Location::caller(); + Box::pin(try_lock(self.read(), loc.file(), loc.line())) + } + + #[track_caller] + fn write_with_warn(&self) -> Async>> { + let loc = Location::caller(); + Box::pin(try_lock(self.write(), loc.file(), loc.line())) + } +} diff --git a/trin-core/src/portalnet/events.rs b/trin-core/src/portalnet/events.rs index f4329090e..98b171122 100644 --- a/trin-core/src/portalnet/events.rs +++ b/trin-core/src/portalnet/events.rs @@ -10,6 +10,7 @@ use super::{ utp::{UtpListener, UTP_PROTOCOL}, }; use crate::cli::{HISTORY_NETWORK, STATE_NETWORK}; +use crate::locks::RwLoggingExt; use std::collections::HashMap; use std::convert::TryInto; @@ -28,7 +29,7 @@ impl PortalnetEvents { state_sender: Option>, ) -> Self { let protocol_receiver = discovery - .write() + .write_with_warn() .await .discv5 .event_stream() diff --git a/trin-core/src/portalnet/overlay.rs b/trin-core/src/portalnet/overlay.rs index 9053c0e22..6d6762f61 100644 --- a/trin-core/src/portalnet/overlay.rs +++ b/trin-core/src/portalnet/overlay.rs @@ -1,3 +1,4 @@ +use crate::locks::RwLoggingExt; use crate::utils::xor_two_values; use super::{ @@ -112,7 +113,12 @@ impl OverlayProtocol { data_radius: U256, ) -> Self { let kbuckets = Arc::new(RwLock::new(KBucketsTable::new( - discovery.read().await.local_enr().node_id().into(), + discovery + .read_with_warn() + .await + .local_enr() + .node_id() + .into(), config.bucket_pending_timeout, config.max_incoming_per_bucket, config.table_filter, @@ -140,7 +146,7 @@ impl OverlayProtocol { let response = match request { Request::Ping(Ping { .. }) => { debug!("Got overlay ping request {:?}", request); - let enr_seq = self.discovery.read().await.local_enr().seq(); + let enr_seq = self.discovery.read_with_warn().await.local_enr().seq(); let payload = CustomPayload::new(self.data_radius().await, None); Response::Pong(Pong { enr_seq, @@ -181,12 +187,12 @@ impl OverlayProtocol { /// Returns the local ENR of the node. pub async fn local_enr(&self) -> Enr { - self.discovery.read().await.discv5.local_enr() + self.discovery.read_with_warn().await.discv5.local_enr() } // Returns the data radius of the node. pub async fn data_radius(&self) -> U256 { - self.data_radius.read().await.clone() + self.data_radius.read_with_warn().await.clone() } /// Returns a vector of the ENRs of the closest nodes by the given log2 distances. @@ -203,7 +209,7 @@ impl OverlayProtocol { } if !log2_distances.is_empty() { - let mut kbuckets = self.kbuckets.write().await; + let mut kbuckets = self.kbuckets.write_with_warn().await; for node in kbuckets .nodes_by_distances(&log2_distances, FIND_NODES_MAX_NODES) .into_iter() @@ -247,7 +253,7 @@ impl OverlayProtocol { /// Returns a vector of all ENR node IDs of nodes currently contained in the routing table. pub async fn table_entries_id(&self) -> Vec { self.kbuckets - .write() + .write_with_warn() .await .iter() .map(|entry| *entry.node.key.preimage()) @@ -257,7 +263,7 @@ impl OverlayProtocol { /// Returns a vector of all the ENRs of nodes currently contained in the routing table. pub async fn table_entries_enr(&self) -> Vec { self.kbuckets - .write() + .write_with_warn() .await .iter() .map(|entry| entry.node.value.enr().clone()) @@ -271,7 +277,8 @@ impl OverlayProtocol { protocol: ProtocolKind, payload: Option>, ) -> Result, SendPingError> { - let enr_seq = self.discovery.read().await.local_enr().seq(); + let enr_seq = self.discovery.read_with_warn().await.local_enr().seq(); + let payload = CustomPayload::new(data_radius, payload); let msg = Ping { enr_seq, @@ -279,7 +286,7 @@ impl OverlayProtocol { }; Ok(self .discovery - .read() + .read_with_warn() .await .send_talkreq( enr, @@ -297,7 +304,7 @@ impl OverlayProtocol { ) -> Result, RequestError> { let msg = FindNodes { distances }; self.discovery - .read() + .read_with_warn() .await .send_talkreq( enr, @@ -315,7 +322,7 @@ impl OverlayProtocol { ) -> Result, RequestError> { let msg = FindContent { content_key }; self.discovery - .read() + .read_with_warn() .await .send_talkreq( enr, diff --git a/trin-core/src/portalnet/utp.rs b/trin-core/src/portalnet/utp.rs index 76dca2ae9..df375977c 100644 --- a/trin-core/src/portalnet/utp.rs +++ b/trin-core/src/portalnet/utp.rs @@ -12,6 +12,8 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock; +use crate::locks::RwLoggingExt; + pub const UTP_PROTOCOL: &str = "utp"; pub const HEADER_SIZE: usize = 20; pub const MAX_DISCV5_PACKET_SIZE: usize = 1280; @@ -433,7 +435,13 @@ impl UtpListener { } } Type::StSyn => { - if let Some(enr) = self.discovery.read().await.discv5.find_enr(&node_id) { + if let Some(enr) = self + .discovery + .read_with_warn() + .await + .discv5 + .find_enr(&node_id) + { // If neither of those cases happened handle this is a new request let mut conn = UtpStream::init(Arc::clone(&self.discovery), enr); conn.handle_packet(packet).await; @@ -466,7 +474,13 @@ impl UtpListener { // I am honestly not sure if I should init this with Enr or NodeId since we could use both async fn connect(&mut self, connection_id: u16, node_id: NodeId) { - if let Some(enr) = self.discovery.read().await.discv5.find_enr(&node_id) { + if let Some(enr) = self + .discovery + .read_with_warn() + .await + .discv5 + .find_enr(&node_id) + { let mut conn = UtpStream::init(Arc::clone(&self.discovery), enr); conn.make_connection(connection_id).await; self.utp_connections.insert( @@ -587,7 +601,7 @@ impl UtpStream { } let talk_request_result = self .discovery - .read() + .read_with_warn() .await .send_talkreq(self.enr.clone(), UTP_PROTOCOL.to_string(), packet.0.clone()) .await; diff --git a/trin-state/src/events.rs b/trin-state/src/events.rs index 9bc93c7fd..eb21cab39 100644 --- a/trin-state/src/events.rs +++ b/trin-state/src/events.rs @@ -3,6 +3,7 @@ use discv5::TalkRequest; use log::{debug, error, warn}; use std::sync::Arc; use tokio::sync::{mpsc::UnboundedReceiver, RwLock}; +use trin_core::locks::RwLoggingExt; use trin_core::portalnet::types::Message; pub struct StateEvents { @@ -17,7 +18,7 @@ impl StateEvents { let reply = match self .network - .write() + .write_with_warn() .await .overlay .process_one_request(&talk_request) diff --git a/trin-state/src/jsonrpc.rs b/trin-state/src/jsonrpc.rs index 0b0be25c6..06a4ce8fa 100644 --- a/trin-state/src/jsonrpc.rs +++ b/trin-state/src/jsonrpc.rs @@ -3,6 +3,7 @@ use serde_json::Value; use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; use trin_core::jsonrpc::{endpoints::StateEndpoint, types::StateJsonRpcRequest}; +use trin_core::locks::RwLoggingExt; /// Handles State network JSON-RPC requests pub struct StateRequestHandler { @@ -15,16 +16,9 @@ impl StateRequestHandler { while let Some(request) = self.state_rx.recv().await { match request.endpoint { StateEndpoint::DataRadius => { - let _ = request.resp.send(Ok(Value::String( - self.network - .read() - .await - .overlay - .data_radius - .read() - .await - .to_string(), - ))); + let net = self.network.read_with_warn().await; + let radius = net.overlay.data_radius.read_with_warn().await; + let _ = request.resp.send(Ok(Value::String(radius.to_string()))); } } } diff --git a/trin-state/src/lib.rs b/trin-state/src/lib.rs index 3ea40cdc9..f41c6c735 100644 --- a/trin-state/src/lib.rs +++ b/trin-state/src/lib.rs @@ -1,5 +1,6 @@ use log::info; use rocksdb::DB; +use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; use tokio::task::JoinHandle; @@ -7,9 +8,9 @@ use crate::events::StateEvents; use crate::jsonrpc::StateRequestHandler; use discv5::TalkRequest; use network::StateNetwork; -use std::sync::Arc; use trin_core::cli::TrinConfig; use trin_core::jsonrpc::types::StateJsonRpcRequest; +use trin_core::locks::RwLoggingExt; use trin_core::portalnet::discovery::Discovery; use trin_core::portalnet::events::PortalnetEvents; use trin_core::portalnet::types::PortalnetConfig; @@ -46,7 +47,7 @@ pub async fn main() -> Result<(), Box> { // Setup Overlay database let db = Arc::new(setup_overlay_db( - discovery.read().await.local_enr().node_id(), + discovery.read_with_warn().await.local_enr().node_id(), )); let (state_event_tx, state_event_rx) = mpsc::unbounded_channel::(); @@ -117,7 +118,12 @@ pub fn spawn_state_network( tokio::spawn(state_events.process_requests()); // hacky test: make sure we establish a session with the boot node - network.write().await.ping_bootnodes().await.unwrap(); + network + .write_with_warn() + .await + .ping_bootnodes() + .await + .unwrap(); tokio::signal::ctrl_c() .await diff --git a/trin-state/src/network.rs b/trin-state/src/network.rs index 59ca1379b..1add874e6 100644 --- a/trin-state/src/network.rs +++ b/trin-state/src/network.rs @@ -2,6 +2,7 @@ use log::debug; use rocksdb::DB; use std::sync::Arc; use tokio::sync::RwLock; +use trin_core::locks::RwLoggingExt; use trin_core::portalnet::{ discovery::Discovery, overlay::{OverlayConfig, OverlayProtocol, SendPingError}, @@ -34,14 +35,8 @@ impl StateNetwork { // Trigger bonding with bootnodes, at both the base layer and portal overlay. // The overlay ping via talkreq will trigger a session at the base layer, then // a session on the (overlay) portal network. - for enr in self - .overlay - .discovery - .read() - .await - .discv5 - .table_entries_enr() - { + let guard = self.overlay.discovery.read_with_warn().await; + for enr in guard.discv5.table_entries_enr() { debug!("Attempting bond with bootnode {}", enr); let ping_result = self .overlay