Skip to content

Commit

Permalink
Move in-flight ChannelMonitorUpdates to ChannelManager
Browse files Browse the repository at this point in the history
Because `ChannelMonitorUpdate`s can be generated for a
channel which is already closed, and must still be tracked
through their completion, storing them in a `Channel`
doesn't make sense - we'd have to have a redundant place to
put them post-closure and handle both storage locations
equivalently.

Instead, here, we move to storing in-flight
`ChannelMonitorUpdate`s to the `ChannelManager`, leaving
blocked `ChannelMonitorUpdate`s in the `Channel` as they
were.
  • Loading branch information
TheBlueMatt committed Jun 23, 2023
1 parent 1c7b692 commit 4041f08
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 112 deletions.
88 changes: 27 additions & 61 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2264,34 +2264,25 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}

pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
let release_cs_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
let release_cs_monitor = self.context.pending_monitor_updates.is_empty();
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
// Even if we aren't supposed to let new monitor updates with commitment state
// updates run, we still need to push the preimage ChannelMonitorUpdateStep no
// matter what. Sadly, to push a new monitor update which flies before others
// already queued, we have to insert it into the pending queue and update the
// update_ids of all the following monitors.
let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
if release_cs_monitor && msg.is_some() {
let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check may bump latest_monitor_id but we want them
// to be strictly increasing by one, so decrement it here.
self.context.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
update: monitor_update, blocked: false,
});
self.context.pending_monitor_updates.len() - 1
} else {
let insert_pos = self.context.pending_monitor_updates.iter().position(|upd| upd.blocked)
.unwrap_or(self.context.pending_monitor_updates.len());
let new_mon_id = self.context.pending_monitor_updates.get(insert_pos)
let new_mon_id = self.context.pending_monitor_updates.get(0)
.map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
monitor_update.update_id = new_mon_id;
self.context.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
update: monitor_update, blocked: false,
});
for held_update in self.context.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
for held_update in self.context.pending_monitor_updates.iter_mut() {
held_update.update.update_id += 1;
}
if msg.is_some() {
Expand All @@ -2301,14 +2292,10 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
update, blocked: true,
});
}
insert_pos
};
self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
UpdateFulfillCommitFetch::NewClaim {
monitor_update: self.context.pending_monitor_updates.get(unblocked_update_pos)
.expect("We just pushed the monitor update").update.clone(),
htlc_value_msat,
}

self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, }
},
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
}
Expand Down Expand Up @@ -3349,8 +3336,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}

match self.free_holding_cell_htlcs(logger) {
(Some(_), htlcs_to_fail) => {
let mut additional_update = self.context.pending_monitor_updates.pop().unwrap().update;
(Some(mut additional_update), htlcs_to_fail) => {
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
// strictly increasing by one, so decrement it here.
self.context.latest_monitor_update_id = monitor_update.update_id;
Expand Down Expand Up @@ -3566,12 +3552,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
{
assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
self.context.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
let mut found_blocked = false;
self.context.pending_monitor_updates.retain(|upd| {
if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
if upd.blocked { found_blocked = true; }
upd.blocked
});
for upd in self.context.pending_monitor_updates.iter() {
debug_assert!(upd.blocked);
}

// If we're past (or at) the FundingSent stage on an outbound channel, try to
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
Expand Down Expand Up @@ -4439,48 +4422,31 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
/// Returns the next blocked monitor update, if one exists, and a bool which indicates a
/// further blocked monitor update exists after the next.
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> {
for i in 0..self.context.pending_monitor_updates.len() {
if self.context.pending_monitor_updates[i].blocked {
self.context.pending_monitor_updates[i].blocked = false;
return Some((self.context.pending_monitor_updates[i].update.clone(),
self.context.pending_monitor_updates.len() > i + 1));
}
for upd in self.context.pending_monitor_updates.iter() {
debug_assert!(upd.blocked);
}
None
if self.context.pending_monitor_updates.is_empty() { return None; }
Some((self.context.pending_monitor_updates.remove(0).update,
!self.context.pending_monitor_updates.is_empty()))
}

/// Pushes a new monitor update into our monitor update queue, returning it if it should be
/// immediately given to the user for persisting or `None` if it should be held as blocked.
fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
-> Option<ChannelMonitorUpdate> {
let release_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
update, blocked: !release_monitor,
});
if release_monitor { self.context.pending_monitor_updates.last().map(|upd| upd.update.clone()) } else { None }
}

pub fn no_monitor_updates_pending(&self) -> bool {
self.context.pending_monitor_updates.is_empty()
}

pub fn complete_all_mon_updates_through(&mut self, update_id: u64) {
self.context.pending_monitor_updates.retain(|upd| {
if upd.update.update_id <= update_id {
assert!(!upd.blocked, "Completed update must have flown");
false
} else { true }
});
}

pub fn complete_one_mon_update(&mut self, update_id: u64) {
self.context.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
let release_monitor = self.context.pending_monitor_updates.is_empty();
if !release_monitor {
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
update, blocked: true,
});
None
} else {
Some(update)
}
}

/// Returns an iterator over all unblocked monitor updates which have not yet completed.
pub fn uncompleted_unblocked_mon_updates(&self) -> impl Iterator<Item=&ChannelMonitorUpdate> {
self.context.pending_monitor_updates.iter()
.filter_map(|upd| if upd.blocked { None } else { Some(&upd.update) })
pub fn blocked_monitor_updates_pending(&self) -> usize {
self.context.pending_monitor_updates.len()
}

/// Returns true if the channel is awaiting the persistence of the initial ChannelMonitor.
Expand Down
Loading

0 comments on commit 4041f08

Please sign in to comment.