Skip to content

Commit

Permalink
Add next_channel_id in PaymentForwarded event
Browse files Browse the repository at this point in the history
This update also includes a minor refactor. The return type of
`pending_monitor_events` has been changed to a `Vec` tuple with the
`OutPoint` type. This associates a `Vec` of `MonitorEvent`s with a
funding outpoint.

We've also renamed `source/sink_channel_id` to `prev/next_channel_id` in
the favour of clarity.
  • Loading branch information
atalw committed May 15, 2022
1 parent 637fb88 commit 1ae1de9
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 83 deletions.
2 changes: 1 addition & 1 deletion fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl chain::Watch<EnforcingSigner> for TestChainMonitor {
self.chain_monitor.update_channel(funding_txo, update)
}

fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)> {
return self.chain_monitor.release_pending_monitor_events();
}
}
Expand Down
20 changes: 12 additions & 8 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
persister: P,
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
/// from the user and not from a [`ChannelMonitor`].
pending_monitor_events: Mutex<Vec<MonitorEvent>>,
pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>)>>,
/// The best block height seen, used as a proxy for the passage of time.
highest_chain_height: AtomicUsize,
}
Expand Down Expand Up @@ -299,7 +299,7 @@ where C::Target: chain::Filter,
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
Err(ChannelMonitorUpdateErr::PermanentFailure) => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateFailed(*funding_outpoint));
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)]));
},
Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
Expand Down Expand Up @@ -455,10 +455,10 @@ where C::Target: chain::Filter,
// UpdateCompleted event.
return Ok(());
}
self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
});
}]));
},
MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
Expand All @@ -476,10 +476,10 @@ where C::Target: chain::Filter,
/// channel_monitor_updated once with the highest ID.
#[cfg(any(test, fuzzing))]
pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id,
});
}]));
}

#[cfg(any(test, fuzzing, feature = "_test_utils"))]
Expand Down Expand Up @@ -668,7 +668,7 @@ where C::Target: chain::Filter,
}
}

fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)> {
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
Expand All @@ -694,7 +694,11 @@ where C::Target: chain::Filter,
log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
log_error!(self.logger, " This may cause duplicate payment events to be generated.");
}
pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
if monitor_events.len() > 0 {
let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
pending_monitor_events.push((monitor_outpoint, monitor_events));
}
}
}
pending_monitor_events
Expand Down
2 changes: 1 addition & 1 deletion lightning/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub trait Watch<ChannelSigner: Sign> {
///
/// For details on asynchronous [`ChannelMonitor`] updating and returning
/// [`MonitorEvent::UpdateCompleted`] here, see [`ChannelMonitorUpdateErr::TemporaryFailure`].
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent>;
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)>;
}

/// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to
Expand Down
6 changes: 3 additions & 3 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,7 @@ fn test_monitor_update_fail_reestablish() {
assert!(updates.update_fee.is_none());
assert_eq!(updates.update_fulfill_htlcs.len(), 1);
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);
check_added_monitors!(nodes[1], 1);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false);
Expand Down Expand Up @@ -2087,7 +2087,7 @@ fn test_fail_htlc_on_broadcast_after_claim() {
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]);
let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
check_added_monitors!(nodes[1], 1);
expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);

mine_transaction(&nodes[1], &bs_txn[0]);
check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed);
Expand Down Expand Up @@ -2468,7 +2468,7 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f
assert_eq!(fulfill_msg, cs_updates.update_fulfill_htlcs[0]);
}
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &fulfill_msg);
expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);
check_added_monitors!(nodes[1], 1);

let mut bs_updates = None;
Expand Down
92 changes: 48 additions & 44 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3952,7 +3952,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
}
}

fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
match source {
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
mem::drop(channel_state_lock);
Expand Down Expand Up @@ -4043,12 +4043,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
} else { None };

let mut pending_events = self.pending_events.lock().unwrap();
let prev_channel_id = Some(prev_outpoint.to_channel_id());
let next_channel_id = Some(next_channel_id);

let source_channel_id = Some(prev_outpoint.to_channel_id());
pending_events.push(events::Event::PaymentForwarded {
source_channel_id,
fee_earned_msat,
claim_from_onchain_tx: from_onchain,
prev_channel_id,
next_channel_id,
});
}
}
Expand Down Expand Up @@ -4501,7 +4503,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
}
};
self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false);
self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, msg.channel_id);
Ok(())
}

Expand Down Expand Up @@ -4821,48 +4823,50 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
let mut failed_channels = Vec::new();
let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
let has_pending_monitor_events = !pending_monitor_events.is_empty();
for monitor_event in pending_monitor_events.drain(..) {
match monitor_event {
MonitorEvent::HTLCEvent(htlc_update) => {
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage, htlc_update.onchain_value_satoshis.map(|v| v * 1000), true);
} else {
log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
}
},
MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
MonitorEvent::UpdateFailed(funding_outpoint) => {
let mut channel_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_lock;
let by_id = &mut channel_state.by_id;
let pending_msg_events = &mut channel_state.pending_msg_events;
if let hash_map::Entry::Occupied(chan_entry) = by_id.entry(funding_outpoint.to_channel_id()) {
let mut chan = remove_channel!(self, channel_state, chan_entry);
failed_channels.push(chan.force_shutdown(false));
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
for (funding_outpoint, mut monitor_events) in pending_monitor_events.drain(..) {
for monitor_event in monitor_events.drain(..) {
match monitor_event {
MonitorEvent::HTLCEvent(htlc_update) => {
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage, htlc_update.onchain_value_satoshis.map(|v| v * 1000), true, funding_outpoint.to_channel_id());
} else {
log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
}
},
MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
MonitorEvent::UpdateFailed(funding_outpoint) => {
let mut channel_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_lock;
let by_id = &mut channel_state.by_id;
let pending_msg_events = &mut channel_state.pending_msg_events;
if let hash_map::Entry::Occupied(chan_entry) = by_id.entry(funding_outpoint.to_channel_id()) {
let mut chan = remove_channel!(self, channel_state, chan_entry);
failed_channels.push(chan.force_shutdown(false));
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
} else {
ClosureReason::CommitmentTxConfirmed
};
self.issue_channel_close_events(&chan, reason);
pending_msg_events.push(events::MessageSendEvent::HandleError {
node_id: chan.get_counterparty_node_id(),
action: msgs::ErrorAction::SendErrorMessage {
msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
},
});
}
let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
} else {
ClosureReason::CommitmentTxConfirmed
};
self.issue_channel_close_events(&chan, reason);
pending_msg_events.push(events::MessageSendEvent::HandleError {
node_id: chan.get_counterparty_node_id(),
action: msgs::ErrorAction::SendErrorMessage {
msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
},
});
}
},
MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => {
self.channel_monitor_updated(&funding_txo, monitor_update_id);
},
},
MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => {
self.channel_monitor_updated(&funding_txo, monitor_update_id);
},
}
}
}

Expand Down
15 changes: 10 additions & 5 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1327,15 +1327,20 @@ macro_rules! expect_payment_path_successful {
}

macro_rules! expect_payment_forwarded {
($node: expr, $source_node: expr, $expected_fee: expr, $upstream_force_closed: expr) => {
($node: expr, $prev_node: expr, $next_node: expr, $expected_fee: expr, $upstream_force_closed: expr, $downstream_force_closed: expr) => {
let events = $node.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match events[0] {
Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
assert_eq!(fee_earned_msat, $expected_fee);
if fee_earned_msat.is_some() {
// Is the event channel_id in one of the channels between the two nodes?
assert!($node.node.list_channels().iter().any(|x| x.counterparty.node_id == $source_node.node.get_our_node_id() && x.channel_id == source_channel_id.unwrap()));
// Is the event prev_channel_id in one of the channels between the two nodes?
assert!($node.node.list_channels().iter().any(|x| x.counterparty.node_id == $prev_node.node.get_our_node_id() && x.channel_id == prev_channel_id.unwrap()));
}
// We check for force closures since a force closed channel is removed from the
// node's channel list
if !$downstream_force_closed {
assert!($node.node.list_channels().iter().any(|x| x.counterparty.node_id == $next_node.node.get_our_node_id() && x.channel_id == next_channel_id.unwrap()));
}
assert_eq!(claim_from_onchain_tx, $upstream_force_closed);
},
Expand Down Expand Up @@ -1579,7 +1584,7 @@ pub fn do_claim_payment_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>,
{
$node.node.handle_update_fulfill_htlc(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().0);
let fee = $node.node.channel_state.lock().unwrap().by_id.get(&next_msgs.as_ref().unwrap().0.channel_id).unwrap().config.forwarding_fee_base_msat;
expect_payment_forwarded!($node, $next_node, Some(fee as u64), false);
expect_payment_forwarded!($node, $next_node, $prev_node, Some(fee as u64), false, false);
expected_total_fee_msat += fee as u64;
check_added_monitors!($node, 1);
let new_next_msgs = if $new_msgs {
Expand Down
19 changes: 11 additions & 8 deletions lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2686,18 +2686,20 @@ fn test_htlc_on_chain_success() {
}
let chan_id = Some(chan_1.2);
match forwarded_events[1] {
Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
assert_eq!(fee_earned_msat, Some(1000));
assert_eq!(source_channel_id, chan_id);
assert_eq!(prev_channel_id, chan_id);
assert_eq!(claim_from_onchain_tx, true);
assert_eq!(next_channel_id, Some(chan_2.2));
},
_ => panic!()
}
match forwarded_events[2] {
Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
assert_eq!(fee_earned_msat, Some(1000));
assert_eq!(source_channel_id, chan_id);
assert_eq!(prev_channel_id, chan_id);
assert_eq!(claim_from_onchain_tx, true);
assert_eq!(next_channel_id, Some(chan_2.2));
},
_ => panic!()
}
Expand Down Expand Up @@ -5117,10 +5119,11 @@ fn test_onchain_to_onchain_claim() {
_ => panic!("Unexpected event"),
}
match events[1] {
Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
assert_eq!(fee_earned_msat, Some(1000));
assert_eq!(source_channel_id, Some(chan_1.2));
assert_eq!(prev_channel_id, Some(chan_1.2));
assert_eq!(claim_from_onchain_tx, true);
assert_eq!(next_channel_id, Some(chan_2.2));
},
_ => panic!("Unexpected event"),
}
Expand Down Expand Up @@ -5287,7 +5290,7 @@ fn test_duplicate_payment_hash_one_failure_one_success() {
// Note that the fee paid is effectively double as the HTLC value (including the nodes[1] fee
// and nodes[2] fee) is rounded down and then claimed in full.
mine_transaction(&nodes[1], &htlc_success_txn[0]);
expect_payment_forwarded!(nodes[1], nodes[0], Some(196*2), true);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(196*2), true, true);
let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
assert!(updates.update_add_htlcs.is_empty());
assert!(updates.update_fail_htlcs.is_empty());
Expand Down Expand Up @@ -8869,7 +8872,7 @@ fn do_test_onchain_htlc_settlement_after_close(broadcast_alice: bool, go_onchain
assert_eq!(carol_updates.update_fulfill_htlcs.len(), 1);

nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &carol_updates.update_fulfill_htlcs[0]);
expect_payment_forwarded!(nodes[1], nodes[0], if go_onchain_before_fulfill || force_closing_node == 1 { None } else { Some(1000) }, false);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], if go_onchain_before_fulfill || force_closing_node == 1 { None } else { Some(1000) }, false, false);
// If Alice broadcasted but Bob doesn't know yet, here he prepares to tell her about the preimage.
if !go_onchain_before_fulfill && broadcast_alice {
let events = nodes[1].node.get_and_clear_pending_msg_events();
Expand Down
2 changes: 1 addition & 1 deletion lightning/src/ln/payment_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ fn do_retry_with_no_persist(confirm_before_reload: bool) {
let bs_htlc_claim_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(bs_htlc_claim_txn.len(), 1);
check_spends!(bs_htlc_claim_txn[0], as_commitment_tx);
expect_payment_forwarded!(nodes[1], nodes[0], None, false);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, false, false);

if !confirm_before_reload {
mine_transaction(&nodes[0], &as_commitment_tx);
Expand Down
2 changes: 1 addition & 1 deletion lightning/src/ln/reorg_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ fn do_test_onchain_htlc_reorg(local_commitment: bool, claim: bool) {
// ChannelManager only polls chain::Watch::release_pending_monitor_events when we
// probe it for events, so we probe non-message events here (which should just be the
// PaymentForwarded event).
expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), true);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), true, true);
} else {
// Confirm the timeout tx and check that we fail the HTLC backwards
let block = Block {
Expand Down
Loading

0 comments on commit 1ae1de9

Please sign in to comment.