From d5fbc041a77a15a68179d75cab3bc58b2f99a2f9 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 12 Jul 2022 00:04:36 -0400 Subject: [PATCH] Forward onion messages in PeerManager Making sure channel messages are prioritized over OMs and we only write them when there's sufficient space in the peer's buffer. We also take this opportunity to add a utility for when to drop gossip. --- lightning/src/ln/peer_handler.rs | 64 ++++++++++++++++++++++++++------ 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 4d672129bba..45d83b062d8 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -309,15 +309,23 @@ enum InitSyncTracker{ /// forwarding gossip messages to peers altogether. const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2; +/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we pause +/// forwarding onion messages to peers altogether. +const OM_BUFFER_LIMIT_RATIO: usize = 2; + /// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until /// we have fewer than this many messages in the outbound buffer again. -/// We also use this as the target number of outbound gossip messages to keep in the write buffer, -/// refilled as we send bytes. +/// We also use this as the target number of outbound gossip and onion messages to keep in the write +/// buffer, refilled as we send bytes. const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10; /// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to /// the peer. const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO; +/// When the outbound buffer has this many messages, we won't poll for new onion messages for this +/// peer. +const OUTBOUND_BUFFER_LIMIT_PAUSE_OMS: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * OM_BUFFER_LIMIT_RATIO; + /// If we've sent a ping, and are still awaiting a response, we may need to churn our way through /// the socket receive buffer before receiving the ping. /// @@ -393,6 +401,22 @@ impl Peer { InitSyncTracker::NodesSyncing(pk) => pk < node_id, } } + + /// Returns the number of onion messages we can fit in this peer's buffer. + fn onion_message_buffer_slots_available(&self) -> usize { + cmp::min( + OUTBOUND_BUFFER_LIMIT_PAUSE_OMS.saturating_sub(self.pending_outbound_buffer.len()), + (BUFFER_DRAIN_MSGS_PER_TICK * OM_BUFFER_LIMIT_RATIO).saturating_sub(self.msgs_sent_since_pong)) + } + + /// Returns whether this peer's buffer is full and we should drop gossip messages. + fn buffer_full_drop_gossip(&self) -> bool { + if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO { + return false + } + true + } } /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. @@ -811,8 +835,12 @@ impl Result<(), PeerHandleError> { let peers = self.peers.read().unwrap(); match peers.get(descriptor) { @@ -1335,9 +1363,7 @@ impl OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO - { + if peer.buffer_full_drop_gossip() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1361,9 +1387,7 @@ impl OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO - { + if peer.buffer_full_drop_gossip() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1386,9 +1410,7 @@ impl OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO - { + if peer.buffer_full_drop_gossip() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1412,6 +1434,8 @@ impl 0 { + for event in self.message_handler.onion_message_handler.next_onion_messages_for_peer( + peer_node_id.unwrap(), om_buffer_slots_avail) + { + if let MessageSendEvent::SendOnionMessage { ref node_id, ref msg } = event { + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + } + } } } if !peers_to_disconnect.is_empty() {