Skip to content

Commit

Permalink
Forward onion messages in PeerManager
Browse files Browse the repository at this point in the history
Making sure channel messages are prioritized over OMs and we only write them
when there's sufficient space in the peer's buffer.

We also take this opportunity to add a utility for when to drop gossip.
  • Loading branch information
valentinewallace committed Jul 12, 2022
1 parent 7ee6edc commit d5fbc04
Showing 1 changed file with 53 additions and 11 deletions.
64 changes: 53 additions & 11 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,23 @@ enum InitSyncTracker{
/// forwarding gossip messages to peers altogether.
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;

/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we pause
/// forwarding onion messages to peers altogether.
const OM_BUFFER_LIMIT_RATIO: usize = 2;

/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
/// we have fewer than this many messages in the outbound buffer again.
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
/// refilled as we send bytes.
/// We also use this as the target number of outbound gossip and onion messages to keep in the write
/// buffer, refilled as we send bytes.
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
/// the peer.
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;

/// When the outbound buffer has this many messages, we won't poll for new onion messages for this
/// peer.
const OUTBOUND_BUFFER_LIMIT_PAUSE_OMS: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * OM_BUFFER_LIMIT_RATIO;

/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
/// the socket receive buffer before receiving the ping.
///
Expand Down Expand Up @@ -393,6 +401,22 @@ impl Peer {
InitSyncTracker::NodesSyncing(pk) => pk < node_id,
}
}

/// Returns the number of onion messages we can fit in this peer's buffer.
fn onion_message_buffer_slots_available(&self) -> usize {
cmp::min(
OUTBOUND_BUFFER_LIMIT_PAUSE_OMS.saturating_sub(self.pending_outbound_buffer.len()),
(BUFFER_DRAIN_MSGS_PER_TICK * OM_BUFFER_LIMIT_RATIO).saturating_sub(self.msgs_sent_since_pong))
}

/// Returns whether this peer's buffer is full and we should drop gossip messages.
fn buffer_full_drop_gossip(&self) -> bool {
if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
|| self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
return false
}
true
}
}

/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
Expand Down Expand Up @@ -811,8 +835,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
/// ready to call `[write_buffer_space_avail`] again if a write call generated here isn't
/// sufficient!
///
/// If any bytes are written, [`process_events`] should be called afterwards.
// TODO: why?
///
/// [`send_data`]: SocketDescriptor::send_data
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
/// [`process_events`]: PeerManager::process_events
pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> {
let peers = self.peers.read().unwrap();
match peers.get(descriptor) {
Expand Down Expand Up @@ -1335,9 +1363,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
|| peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
{
if peer.buffer_full_drop_gossip() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
Expand All @@ -1361,9 +1387,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
!peer.should_forward_node_announcement(msg.contents.node_id) {
continue
}
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
|| peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
{
if peer.buffer_full_drop_gossip() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
Expand All @@ -1386,9 +1410,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
|| peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
{
if peer.buffer_full_drop_gossip() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
Expand All @@ -1412,6 +1434,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
/// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
/// or one of the other clients provided in our language bindings.
///
/// Note that this method should be called again if any bytes are written.
///
/// Note that if there are any other calls to this function waiting on lock(s) this may return
/// without doing any work. All available events that need handling will be handled before the
/// other calls return.
Expand Down Expand Up @@ -1666,6 +1690,24 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM

for (descriptor, peer_mutex) in peers.iter() {
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());

// Only see if we have room for onion messages after we've written all channel messages, to
// ensure they take priority.
let (peer_node_id, om_buffer_slots_avail) = {
let peer = peer_mutex.lock().unwrap();
if let Some(peer_node_id) = peer.their_node_id {
(Some(peer_node_id.clone()), peer.onion_message_buffer_slots_available())
} else { (None, 0) }
};
if peer_node_id.is_some() && om_buffer_slots_avail > 0 {
for event in self.message_handler.onion_message_handler.next_onion_messages_for_peer(
peer_node_id.unwrap(), om_buffer_slots_avail)
{
if let MessageSendEvent::SendOnionMessage { ref node_id, ref msg } = event {
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
}
}
}
}
if !peers_to_disconnect.is_empty() {
Expand Down

0 comments on commit d5fbc04

Please sign in to comment.