Skip to content

Commit

Permalink
Implement some rate limiting for onion messages.
Browse files Browse the repository at this point in the history
In this commit, we add business logic for checking if a peer's outbound buffer
has room for onion messages, and if so pulls them from an implementer of a new
trait, OnionMessageProvider.

Makes sure channel messages are prioritized over OMs.

The onion_message module remains private until further rate limiting is added.
  • Loading branch information
valentinewallace committed Aug 4, 2022
1 parent b317453 commit 0b4056f
Showing 1 changed file with 42 additions and 2 deletions.
44 changes: 42 additions & 2 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,23 @@ enum InitSyncTracker{
/// forwarding gossip messages to peers altogether.
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;

/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we pause
/// forwarding onion messages to peers altogether.
const OM_BUFFER_LIMIT_RATIO: usize = 2;

/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
/// we have fewer than this many messages in the outbound buffer again.
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
/// refilled as we send bytes.
/// We also use this as the target number of outbound gossip and onion messages to keep in the write
/// buffer, refilled as we send bytes.
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
/// the peer.
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;

/// When the outbound buffer has this many messages, we won't poll for new onion messages for this
/// peer.
const OUTBOUND_BUFFER_LIMIT_PAUSE_OMS: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * OM_BUFFER_LIMIT_RATIO;

/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
/// the socket receive buffer before receiving the ping.
///
Expand Down Expand Up @@ -393,6 +401,14 @@ impl Peer {
InitSyncTracker::NodesSyncing(pk) => pk < node_id,
}
}

/// Returns the number of onion messages we can fit in this peer's buffer.
fn onion_message_buffer_slots_available(&self) -> usize {
cmp::min(
OUTBOUND_BUFFER_LIMIT_PAUSE_OMS.saturating_sub(self.pending_outbound_buffer.len()),
(BUFFER_DRAIN_MSGS_PER_TICK * OM_BUFFER_LIMIT_RATIO).saturating_sub(self.msgs_sent_since_pong))
}

/// Returns whether this peer's buffer is full and we should drop gossip messages.
fn buffer_full_drop_gossip(&self) -> bool {
if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
Expand Down Expand Up @@ -817,8 +833,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
/// ready to call `[write_buffer_space_avail`] again if a write call generated here isn't
/// sufficient!
///
/// If any bytes are written, [`process_events`] should be called afterwards.
// TODO: why?
///
/// [`send_data`]: SocketDescriptor::send_data
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
/// [`process_events`]: PeerManager::process_events
pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> {
let peers = self.peers.read().unwrap();
match peers.get(descriptor) {
Expand Down Expand Up @@ -1412,6 +1432,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
/// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
/// or one of the other clients provided in our language bindings.
///
/// Note that this method should be called again if any bytes are written.
///
/// Note that if there are any other calls to this function waiting on lock(s) this may return
/// without doing any work. All available events that need handling will be handled before the
/// other calls return.
Expand Down Expand Up @@ -1666,6 +1688,24 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM

for (descriptor, peer_mutex) in peers.iter() {
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());

// Only see if we have room for onion messages after we've written all channel messages, to
// ensure they take priority.
let (peer_node_id, om_buffer_slots_avail) = {
let peer = peer_mutex.lock().unwrap();
if let Some(peer_node_id) = peer.their_node_id {
(Some(peer_node_id.clone()), peer.onion_message_buffer_slots_available())
} else { (None, 0) }
};
if peer_node_id.is_some() && om_buffer_slots_avail > 0 {
for event in self.message_handler.onion_message_handler.next_onion_messages_for_peer(
peer_node_id.unwrap(), om_buffer_slots_avail)
{
if let MessageSendEvent::SendOnionMessage { ref node_id, ref msg } = event {
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
}
}
}
}
if !peers_to_disconnect.is_empty() {
Expand Down

0 comments on commit 0b4056f

Please sign in to comment.