Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail HTLCs which were removed from a channel but not persisted #1857

Merged
merged 3 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 62 additions & 46 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1828,12 +1828,60 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
res
}

/// Gets the set of outbound HTLCs which can be (or have been) resolved by this
/// `ChannelMonitor`. This is used to determine if an HTLC was removed from the channel prior
/// to the `ChannelManager` having been persisted.
///
/// This is similar to [`Self::get_pending_outbound_htlcs`] except it includes HTLCs which were
/// resolved by this `ChannelMonitor`.
pub(crate) fn get_all_current_outbound_htlcs(&self) -> HashMap<HTLCSource, HTLCOutputInCommitment> {
let mut res = HashMap::new();
// Just examine the available counterparty commitment transactions. See docs on
// `fail_unbroadcast_htlcs`, below, for justification.
let us = self.inner.lock().unwrap();
macro_rules! walk_counterparty_commitment {
($txid: expr) => {
if let Some(ref latest_outpoints) = us.counterparty_claimable_outpoints.get($txid) {
for &(ref htlc, ref source_option) in latest_outpoints.iter() {
if let &Some(ref source) = source_option {
res.insert((**source).clone(), htlc.clone());
}
}
}
}
}
if let Some(ref txid) = us.current_counterparty_commitment_txid {
walk_counterparty_commitment!(txid);
}
if let Some(ref txid) = us.prev_counterparty_commitment_txid {
walk_counterparty_commitment!(txid);
}
res
}

/// Gets the set of outbound HTLCs which are pending resolution in this channel.
/// This is used to reconstruct pending outbound payments on restart in the ChannelManager.
pub(crate) fn get_pending_outbound_htlcs(&self) -> HashMap<HTLCSource, HTLCOutputInCommitment> {
let mut res = HashMap::new();
let us = self.inner.lock().unwrap();
// We're only concerned with the confirmation count of HTLC transactions, and don't
// actually care how many confirmations a commitment transaction may or may not have. Thus,
// we look for either a FundingSpendConfirmation event or a funding_spend_confirmed.
let confirmed_txid = us.funding_spend_confirmed.or_else(|| {
us.onchain_events_awaiting_threshold_conf.iter().find_map(|event| {
if let OnchainEvent::FundingSpendConfirmation { .. } = event.event {
Some(event.txid)
} else { None }
})
});

if confirmed_txid.is_none() {
// If we have not seen a commitment transaction on-chain (ie the channel is not yet
// closed), just get the full set.
mem::drop(us);
return self.get_all_current_outbound_htlcs();
}

let mut res = HashMap::new();
macro_rules! walk_htlcs {
($holder_commitment: expr, $htlc_iter: expr) => {
for (htlc, source) in $htlc_iter {
Expand Down Expand Up @@ -1869,54 +1917,22 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
}
}

// We're only concerned with the confirmation count of HTLC transactions, and don't
// actually care how many confirmations a commitment transaction may or may not have. Thus,
// we look for either a FundingSpendConfirmation event or a funding_spend_confirmed.
let confirmed_txid = us.funding_spend_confirmed.or_else(|| {
us.onchain_events_awaiting_threshold_conf.iter().find_map(|event| {
if let OnchainEvent::FundingSpendConfirmation { .. } = event.event {
Some(event.txid)
let txid = confirmed_txid.unwrap();
if Some(txid) == us.current_counterparty_commitment_txid || Some(txid) == us.prev_counterparty_commitment_txid {
walk_htlcs!(false, us.counterparty_claimable_outpoints.get(&txid).unwrap().iter().filter_map(|(a, b)| {
if let &Some(ref source) = b {
Some((a, &**source))
} else { None }
})
});
if let Some(txid) = confirmed_txid {
if Some(txid) == us.current_counterparty_commitment_txid || Some(txid) == us.prev_counterparty_commitment_txid {
walk_htlcs!(false, us.counterparty_claimable_outpoints.get(&txid).unwrap().iter().filter_map(|(a, b)| {
if let &Some(ref source) = b {
Some((a, &**source))
} else { None }
}));
} else if txid == us.current_holder_commitment_tx.txid {
walk_htlcs!(true, us.current_holder_commitment_tx.htlc_outputs.iter().filter_map(|(a, _, c)| {
}));
} else if txid == us.current_holder_commitment_tx.txid {
walk_htlcs!(true, us.current_holder_commitment_tx.htlc_outputs.iter().filter_map(|(a, _, c)| {
if let Some(source) = c { Some((a, source)) } else { None }
}));
} else if let Some(prev_commitment) = &us.prev_holder_signed_commitment_tx {
if txid == prev_commitment.txid {
walk_htlcs!(true, prev_commitment.htlc_outputs.iter().filter_map(|(a, _, c)| {
if let Some(source) = c { Some((a, source)) } else { None }
}));
} else if let Some(prev_commitment) = &us.prev_holder_signed_commitment_tx {
if txid == prev_commitment.txid {
walk_htlcs!(true, prev_commitment.htlc_outputs.iter().filter_map(|(a, _, c)| {
if let Some(source) = c { Some((a, source)) } else { None }
}));
}
}
} else {
// If we have not seen a commitment transaction on-chain (ie the channel is not yet
// closed), just examine the available counterparty commitment transactions. See docs
// on `fail_unbroadcast_htlcs`, below, for justification.
macro_rules! walk_counterparty_commitment {
($txid: expr) => {
if let Some(ref latest_outpoints) = us.counterparty_claimable_outpoints.get($txid) {
for &(ref htlc, ref source_option) in latest_outpoints.iter() {
if let &Some(ref source) = source_option {
res.insert((**source).clone(), htlc.clone());
}
}
}
}
}
if let Some(ref txid) = us.current_counterparty_commitment_txid {
walk_counterparty_commitment!(txid);
}
if let Some(ref txid) = us.prev_counterparty_commitment_txid {
walk_counterparty_commitment!(txid);
}
}

Expand Down
9 changes: 5 additions & 4 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5951,15 +5951,16 @@ impl<Signer: Sign> Channel<Signer> {
(monitor_update, dropped_outbound_htlcs)
}

pub fn inflight_htlc_sources(&self) -> impl Iterator<Item=&HTLCSource> {
pub fn inflight_htlc_sources(&self) -> impl Iterator<Item=(&HTLCSource, &PaymentHash)> {
self.holding_cell_htlc_updates.iter()
.flat_map(|htlc_update| {
match htlc_update {
HTLCUpdateAwaitingACK::AddHTLC { source, .. } => { Some(source) }
_ => None
HTLCUpdateAwaitingACK::AddHTLC { source, payment_hash, .. }
=> Some((source, payment_hash)),
_ => None,
}
})
.chain(self.pending_outbound_htlcs.iter().map(|htlc| &htlc.source))
.chain(self.pending_outbound_htlcs.iter().map(|htlc| (&htlc.source, &htlc.payment_hash)))
}
}

Expand Down
71 changes: 60 additions & 11 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5728,7 +5728,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
let mut inflight_htlcs = InFlightHtlcs::new();

for chan in self.channel_state.lock().unwrap().by_id.values() {
for htlc_source in chan.inflight_htlc_sources() {
for (htlc_source, _) in chan.inflight_htlc_sources() {
if let HTLCSource::OutboundRoute { path, .. } = htlc_source {
inflight_htlcs.process_path(path, self.get_our_node_id());
}
Expand All @@ -5746,6 +5746,12 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
events.into_inner()
}

#[cfg(test)]
pub fn pop_pending_event(&self) -> Option<events::Event> {
let mut events = self.pending_events.lock().unwrap();
if events.is_empty() { None } else { Some(events.remove(0)) }
}

#[cfg(test)]
pub fn has_pending_payments(&self) -> bool {
!self.pending_outbound_payments.lock().unwrap().is_empty()
Expand Down Expand Up @@ -7206,6 +7212,25 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
user_channel_id: channel.get_user_id(),
reason: ClosureReason::OutdatedChannelManager
});
for (channel_htlc_source, payment_hash) in channel.inflight_htlc_sources() {
let mut found_htlc = false;
for (monitor_htlc_source, _) in monitor.get_all_current_outbound_htlcs() {
if *channel_htlc_source == monitor_htlc_source { found_htlc = true; break; }
}
if !found_htlc {
// If we have some HTLCs in the channel which are not present in the newer
// ChannelMonitor, they have been removed and should be failed back to
// ensure we don't forget them entirely. Note that if the missing HTLC(s)
// were actually claimed we'd have generated and ensured the previous-hop
// claim update ChannelMonitor updates were persisted prior to persising
// the ChannelMonitor update for the forward leg, so attempting to fail the
// backwards leg of the HTLC will simply be rejected.
log_info!(args.logger,
"Failing HTLC with hash {} as it is missing in the ChannelMonitor for channel {} but was present in the (stale) ChannelManager",
log_bytes!(channel.channel_id()), log_bytes!(payment_hash.0));
failed_htlcs.push((channel_htlc_source.clone(), *payment_hash, channel.get_counterparty_node_id(), channel.channel_id()));
}
}
} else {
log_info!(args.logger, "Successfully loaded channel {}", log_bytes!(channel.channel_id()));
if let Some(short_channel_id) = channel.get_short_channel_id() {
Expand Down Expand Up @@ -7286,16 +7311,6 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
None => continue,
}
}
if forward_htlcs_count > 0 {
// If we have pending HTLCs to forward, assume we either dropped a
// `PendingHTLCsForwardable` or the user received it but never processed it as they
// shut down before the timer hit. Either way, set the time_forwardable to a small
// constant as enough time has likely passed that we should simply handle the forwards
// now, or at least after the user gets a chance to reconnect to our peers.
pending_events_read.push(events::Event::PendingHTLCsForwardable {
time_forwardable: Duration::from_secs(2),
});
}

let background_event_count: u64 = Readable::read(reader)?;
let mut pending_background_events_read: Vec<BackgroundEvent> = Vec::with_capacity(cmp::min(background_event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<BackgroundEvent>()));
Expand Down Expand Up @@ -7404,10 +7419,44 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
}
}
}
for (htlc_source, htlc) in monitor.get_all_current_outbound_htlcs() {
if let HTLCSource::PreviousHopData(prev_hop_data) = htlc_source {
// The ChannelMonitor is now responsible for this HTLC's
// failure/success and will let us know what its outcome is. If we
// still have an entry for this HTLC in `forward_htlcs`, we were
// apparently not persisted after the monitor was when forwarding
// the payment.
forward_htlcs.retain(|_, forwards| {
forwards.retain(|forward| {
if let HTLCForwardInfo::AddHTLC(htlc_info) = forward {
if htlc_info.prev_short_channel_id == prev_hop_data.short_channel_id &&
htlc_info.prev_htlc_id == prev_hop_data.htlc_id
{
log_info!(args.logger, "Removing pending to-forward HTLC with hash {} as it was forwarded to the closed channel {}",
log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id()));
false
} else { true }
} else { true }
});
!forwards.is_empty()
})
}
}
}
}
}

if !forward_htlcs.is_empty() {
// If we have pending HTLCs to forward, assume we either dropped a
// `PendingHTLCsForwardable` or the user received it but never processed it as they
// shut down before the timer hit. Either way, set the time_forwardable to a small
// constant as enough time has likely passed that we should simply handle the forwards
// now, or at least after the user gets a chance to reconnect to our peers.
pending_events_read.push(events::Event::PendingHTLCsForwardable {
time_forwardable: Duration::from_secs(2),
});
}

let inbound_pmt_key_material = args.keys_manager.get_inbound_payment_key_material();
let expanded_inbound_key = inbound_payment::ExpandedKey::new(&inbound_pmt_key_material);

Expand Down
6 changes: 3 additions & 3 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ macro_rules! check_closed_event {
use $crate::util::events::Event;

let events = $node.node.get_and_clear_pending_events();
assert_eq!(events.len(), $events);
assert_eq!(events.len(), $events, "{:?}", events);
let expected_reason = $reason;
let mut issues_discard_funding = false;
for event in events {
Expand Down Expand Up @@ -1350,7 +1350,7 @@ macro_rules! expect_pending_htlcs_forwardable_conditions {
let events = $node.node.get_and_clear_pending_events();
match events[0] {
$crate::util::events::Event::PendingHTLCsForwardable { .. } => { },
_ => panic!("Unexpected event"),
_ => panic!("Unexpected event {:?}", events),
};

let count = expected_failures.len() + 1;
Expand Down Expand Up @@ -1560,7 +1560,7 @@ macro_rules! expect_payment_forwarded {
if !$downstream_force_closed {
assert!($node.node.list_channels().iter().any(|x| x.counterparty.node_id == $next_node.node.get_our_node_id() && x.channel_id == next_channel_id.unwrap()));
}
assert_eq!(claim_from_onchain_tx, $upstream_force_closed);
assert_eq!(claim_from_onchain_tx, $downstream_force_closed);
},
_ => panic!("Unexpected event"),
}
Expand Down
Loading