Skip to content

Commit

Permalink
Add Notifier to OnionMessenger
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Aug 7, 2024
1 parent 9ce3dd5 commit 7b8200a
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 18 deletions.
76 changes: 62 additions & 14 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,23 +492,28 @@ pub(crate) mod futures_util {
pub(crate) struct Selector<
A: Future<Output = ()> + Unpin,
B: Future<Output = ()> + Unpin,
C: Future<Output = bool> + Unpin,
C: Future<Output = ()> + Unpin,
D: Future<Output = bool> + Unpin,
> {
pub a: A,
pub b: B,
pub c: C,
pub d: D,
}

pub(crate) enum SelectorOutput {
A,
B,
C(bool),
C,
D(bool),
}

impl<
A: Future<Output = ()> + Unpin,
B: Future<Output = ()> + Unpin,
C: Future<Output = bool> + Unpin,
> Future for Selector<A, B, C>
C: Future<Output = ()> + Unpin,
D: Future<Output = bool> + Unpin,
> Future for Selector<A, B, C, D>
{
type Output = SelectorOutput;
fn poll(
Expand All @@ -527,15 +532,43 @@ pub(crate) mod futures_util {
Poll::Pending => {},
}
match Pin::new(&mut self.c).poll(ctx) {
Poll::Ready(()) => {
return Poll::Ready(SelectorOutput::C);
},
Poll::Pending => {},
}
match Pin::new(&mut self.d).poll(ctx) {
Poll::Ready(res) => {
return Poll::Ready(SelectorOutput::C(res));
return Poll::Ready(SelectorOutput::D(res));
},
Poll::Pending => {},
}
Poll::Pending
}
}

/// A selector that takes a future wrapped in an option that will be polled if it is `Some` and
/// will always be pending otherwise.
pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
pub optional_future: Option<F>,
}

impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
match self.optional_future.as_mut() {
Some(f) => match Pin::new(f).poll(ctx) {
Poll::Ready(()) => {
self.optional_future.take();
Poll::Ready(())
},
Poll::Pending => Poll::Pending,
},
None => Poll::Pending,
}
}
}

// If we want to poll a future without an async context to figure out if it has completed or
// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
// but sadly there's a good bit of boilerplate here.
Expand All @@ -557,7 +590,7 @@ pub(crate) mod futures_util {
#[cfg(feature = "futures")]
use core::task;
#[cfg(feature = "futures")]
use futures_util::{dummy_waker, Selector, SelectorOutput};
use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};

/// Processes background events in a future.
///
Expand Down Expand Up @@ -782,18 +815,25 @@ where
scorer,
should_break,
{
let om_fut = if let Some(om) = onion_messenger.as_ref() {
let fut = om.get_om().get_update_future();
OptionalSelector { optional_future: Some(fut) }
} else {
OptionalSelector { optional_future: None }
};
let fut = Selector {
a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
b: chain_monitor.get_update_future(),
c: sleeper(if mobile_interruptable_platform {
c: om_fut,
d: sleeper(if mobile_interruptable_platform {
Duration::from_millis(100)
} else {
Duration::from_secs(FASTEST_TIMER)
}),
};
match fut.await {
SelectorOutput::A | SelectorOutput::B => {},
SelectorOutput::C(exit) => {
SelectorOutput::A | SelectorOutput::B | SelectorOutput::C => {},
SelectorOutput::D(exit) => {
should_break = exit;
},
}
Expand Down Expand Up @@ -938,11 +978,19 @@ impl BackgroundProcessor {
scorer,
stop_thread.load(Ordering::Acquire),
{
Sleeper::from_two_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
)
.wait_timeout(Duration::from_millis(100));
let sleeper = if let Some(om) = onion_messenger.as_ref() {
Sleeper::from_three_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
&om.get_om().get_update_future(),
)
} else {
Sleeper::from_two_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
)
};
sleeper.wait_timeout(Duration::from_millis(100));
},
|_| Instant::now(),
|time: &Instant, dur| time.elapsed().as_secs() > dur,
Expand Down
34 changes: 31 additions & 3 deletions lightning/src/onion_message/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload
use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture};
use crate::util::logger::{Logger, WithContext};
use crate::util::ser::Writeable;
use crate::util::wakers::{Future, Notifier};

use core::fmt;
use core::ops::Deref;
Expand Down Expand Up @@ -266,6 +267,9 @@ pub struct OnionMessenger<
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
pending_peer_connected_events: Mutex<Vec<Event>>,
pending_events_processor: AtomicBool,
/// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
/// it to give to users.
event_notifier: Notifier,
}

/// [`OnionMessage`]s buffered to be sent.
Expand All @@ -290,13 +294,19 @@ impl OnionMessageRecipient {
}
}

fn enqueue_message(&mut self, message: OnionMessage) {
// Returns whether changes were made that are pending event processing
fn enqueue_message(&mut self, message: OnionMessage) -> bool {
let mut pending_event_processing = false;
let pending_messages = match self {
OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages,
OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages,
OnionMessageRecipient::PendingConnection(pending_messages, _, _) => {
pending_event_processing = true;
pending_messages
}
};

pending_messages.push_back(message);
pending_event_processing
}

fn dequeue_message(&mut self) -> Option<OnionMessage> {
Expand Down Expand Up @@ -1037,6 +1047,7 @@ macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset:
if $res.iter().any(|r| r.is_err()) {
// We failed handling some events. Return to have them eventually replayed.
$self.pending_events_processor.store(false, Ordering::Release);
$self.event_notifier.notify();
return;
}
}
Expand Down Expand Up @@ -1119,6 +1130,7 @@ where
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
pending_peer_connected_events: Mutex::new(Vec::new()),
pending_events_processor: AtomicBool::new(false),
event_notifier: Notifier::new(),
}
}

Expand Down Expand Up @@ -1228,8 +1240,11 @@ where
hash_map::Entry::Vacant(e) => match addresses {
None => Err(SendError::InvalidFirstHop(first_node_id)),
Some(addresses) => {
e.insert(OnionMessageRecipient::pending_connection(addresses))
let notify = e.insert(OnionMessageRecipient::pending_connection(addresses))
.enqueue_message(onion_message);
if notify {
self.event_notifier.notify();
}
Ok(SendSuccess::BufferedAwaitingConnection(first_node_id))
},
},
Expand Down Expand Up @@ -1345,6 +1360,18 @@ where
return
}
pending_intercepted_msgs_events.push(event);
self.event_notifier.notify();
}

/// Gets a [`Future`] that completes when an event is available via
/// [`EventsProvider::process_pending_events`] or [`Self::process_pending_events_async`].
///
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
/// [`OnionMessenger`] and should instead register actions to be taken later.
///
/// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
pub fn get_update_future(&self) -> Future {
self.event_notifier.get_future()
}

/// Processes any events asynchronously using the given handler.
Expand Down Expand Up @@ -1616,6 +1643,7 @@ where
pending_peer_connected_events.push(
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
);
self.event_notifier.notify();
}
} else {
self.message_recipients.lock().unwrap().remove(their_node_id);
Expand Down
13 changes: 12 additions & 1 deletion lightning/src/util/wakers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,21 @@ impl Sleeper {
Self { notifiers: vec![Arc::clone(&future.state)] }
}
/// Constructs a new sleeper from two futures, allowing blocking on both at once.
// Note that this is the common case - a ChannelManager and ChainMonitor.
pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self {
Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] }
}
/// Constructs a new sleeper from three futures, allowing blocking on all three at once.
///
// Note that this is the common case - a ChannelManager, a ChainMonitor, and an
// OnionMessenger.
pub fn from_three_futures(fut_a: &Future, fut_b: &Future, fut_c: &Future) -> Self {
let notifiers = vec![
Arc::clone(&fut_a.state),
Arc::clone(&fut_b.state),
Arc::clone(&fut_c.state)
];
Self { notifiers }
}
/// Constructs a new sleeper on many futures, allowing blocking on all at once.
pub fn new(futures: Vec<Future>) -> Self {
Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() }
Expand Down

0 comments on commit 7b8200a

Please sign in to comment.