Skip to content

Commit

Permalink
Create SpendableOutputs events no matter the chain::Confirm order
Browse files Browse the repository at this point in the history
We had a user who pointed out that we weren't creating
`SpendableOutputs` events when we should have been after they
called `ChannelMonitor::best_block_updated` with a block well
after a CSV locktime and then called
`ChannelMonitor::transactions_confirmed` with the transaction which
we should have been spending (with a block height/hash a ways in
the past).

This was due to `ChannelMonitor::transactions_confirmed` only
calling `ChannelMonitor::block_confirmed` with the height at which
the transactions were confirmed, resulting in all checks being done
against that, not the current height.

Further, in the same scenario, we also would not fail-back and HTLC
where the HTLC-Timeout transaction was confirmed more than
ANTI_REORG_DELAY blocks ago.

To address this, we use the best block height for confirmation
threshold checks in `ChannelMonitor::block_confirmed` and pass both
the confirmation and current heights through to
`OnchainTx::update_claims_view`, using each as appropriate.

Fixes lightningdevkit#962.
  • Loading branch information
TheBlueMatt committed Jun 30, 2021
1 parent a11b6aa commit 2cd48c3
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 33 deletions.
33 changes: 19 additions & 14 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
macro_rules! claim_htlcs {
($commitment_number: expr, $txid: expr) => {
let htlc_claim_reqs = self.get_counterparty_htlc_output_claim_reqs($commitment_number, $txid, None);
self.onchain_tx_handler.update_claims_view(&Vec::new(), htlc_claim_reqs, self.best_block.height(), broadcaster, fee_estimator, logger);
self.onchain_tx_handler.update_claims_view(&Vec::new(), htlc_claim_reqs, self.best_block.height(), self.best_block.height(), broadcaster, fee_estimator, logger);
}
}
if let Some(txid) = self.current_counterparty_commitment_txid {
Expand All @@ -1354,10 +1354,10 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
// holder commitment transactions.
if self.broadcasted_holder_revokable_script.is_some() {
let (claim_reqs, _) = self.get_broadcasted_holder_claims(&self.current_holder_commitment_tx, 0);
self.onchain_tx_handler.update_claims_view(&Vec::new(), claim_reqs, self.best_block.height(), broadcaster, fee_estimator, logger);
self.onchain_tx_handler.update_claims_view(&Vec::new(), claim_reqs, self.best_block.height(), self.best_block.height(), broadcaster, fee_estimator, logger);
if let Some(ref tx) = self.prev_holder_signed_commitment_tx {
let (claim_reqs, _) = self.get_broadcasted_holder_claims(&tx, 0);
self.onchain_tx_handler.update_claims_view(&Vec::new(), claim_reqs, self.best_block.height(), broadcaster, fee_estimator, logger);
self.onchain_tx_handler.update_claims_view(&Vec::new(), claim_reqs, self.best_block.height(), self.best_block.height(), broadcaster, fee_estimator, logger);
}
}
}
Expand Down Expand Up @@ -1926,7 +1926,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {

if height > self.best_block.height() {
self.best_block = BestBlock::new(block_hash, height);
self.block_confirmed(height, vec![], vec![], vec![], broadcaster, fee_estimator, logger)
self.block_confirmed(height, vec![], vec![], vec![], &broadcaster, &fee_estimator, &logger)
} else if block_hash != self.best_block.block_hash() {
self.best_block = BestBlock::new(block_hash, height);
self.onchain_events_awaiting_threshold_conf.retain(|ref entry| entry.height <= height);
Expand Down Expand Up @@ -2008,33 +2008,37 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
self.best_block = BestBlock::new(block_hash, height);
}

self.block_confirmed(height, txn_matched, watch_outputs, claimable_outpoints, broadcaster, fee_estimator, logger)
self.block_confirmed(height, txn_matched, watch_outputs, claimable_outpoints, &broadcaster, &fee_estimator, &logger)
}

/// Update state for new block(s)/transaction(s) confirmed. Note that the caller *must* update
/// self.best_block before calling, and set `conf_height` to the height at which any new
/// transaction(s)/block(s) were confirmed at, even if it is not the current best height
/// (assuming we are being called with new transaction(s) confirmed).
fn block_confirmed<B: Deref, F: Deref, L: Deref>(
&mut self,
height: u32,
conf_height: u32,
txn_matched: Vec<&Transaction>,
mut watch_outputs: Vec<TransactionOutputs>,
mut claimable_outpoints: Vec<PackageTemplate>,
broadcaster: B,
fee_estimator: F,
logger: L,
broadcaster: &B,
fee_estimator: &F,
logger: &L,
) -> Vec<TransactionOutputs>
where
B::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
let should_broadcast = self.would_broadcast_at_height(height, &logger);
let should_broadcast = self.would_broadcast_at_height(self.best_block.height(), logger);
if should_broadcast {
let funding_outp = HolderFundingOutput::build(self.funding_redeemscript.clone());
let commitment_package = PackageTemplate::build_package(self.funding_info.0.txid.clone(), self.funding_info.0.index as u32, PackageSolvingData::HolderFundingOutput(funding_outp), height, false, height);
let commitment_package = PackageTemplate::build_package(self.funding_info.0.txid.clone(), self.funding_info.0.index as u32, PackageSolvingData::HolderFundingOutput(funding_outp), self.best_block.height(), false, self.best_block.height());
claimable_outpoints.push(commitment_package);
self.pending_monitor_events.push(MonitorEvent::CommitmentTxBroadcasted(self.funding_info.0));
let commitment_tx = self.onchain_tx_handler.get_fully_signed_holder_tx(&self.funding_redeemscript);
self.holder_tx_signed = true;
let (mut new_outpoints, _) = self.get_broadcasted_holder_claims(&self.current_holder_commitment_tx, height);
let (mut new_outpoints, _) = self.get_broadcasted_holder_claims(&self.current_holder_commitment_tx, self.best_block.height());
let new_outputs = self.get_broadcasted_holder_watch_outputs(&self.current_holder_commitment_tx, &commitment_tx);
if !new_outputs.is_empty() {
watch_outputs.push((self.current_holder_commitment_tx.txid.clone(), new_outputs));
Expand All @@ -2047,7 +2051,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
self.onchain_events_awaiting_threshold_conf.drain(..).collect::<Vec<_>>();
let mut onchain_events_reaching_threshold_conf = Vec::new();
for entry in onchain_events_awaiting_threshold_conf {
if entry.has_reached_confirmation_threshold(height) {
if entry.has_reached_confirmation_threshold(self.best_block.height()) {
onchain_events_reaching_threshold_conf.push(entry);
} else {
self.onchain_events_awaiting_threshold_conf.push(entry);
Expand Down Expand Up @@ -2102,7 +2106,8 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
}
}

self.onchain_tx_handler.update_claims_view(&txn_matched, claimable_outpoints, height, &&*broadcaster, &&*fee_estimator, &&*logger);
self.onchain_tx_handler.update_claims_view(&txn_matched, claimable_outpoints, conf_height,
cmp::max(conf_height, self.best_block.height()), broadcaster, fee_estimator, logger);

// Determine new outputs to watch by comparing against previously known outputs to watch,
// updating the latter in the process.
Expand Down
37 changes: 20 additions & 17 deletions lightning/src/chain/onchaintx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,15 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
/// (CSV or CLTV following cases). In case of high-fee spikes, claim tx may stuck in the mempool, so you need to bump its feerate quickly using Replace-By-Fee or Child-Pay-For-Parent.
/// Panics if there are signing errors, because signing operations in reaction to on-chain events
/// are not expected to fail, and if they do, we may lose funds.
fn generate_claim_tx<F: Deref, L: Deref>(&mut self, height: u32, cached_request: &PackageTemplate, fee_estimator: &F, logger: &L) -> Option<(Option<u32>, u64, Transaction)>
fn generate_claim_tx<F: Deref, L: Deref>(&mut self, cur_height: u32, cached_request: &PackageTemplate, fee_estimator: &F, logger: &L) -> Option<(Option<u32>, u64, Transaction)>
where F::Target: FeeEstimator,
L::Target: Logger,
{
if cached_request.outpoints().len() == 0 { return None } // But don't prune pending claiming request yet, we may have to resurrect HTLCs

// Compute new height timer to decide when we need to regenerate a new bumped version of the claim tx (if we
// didn't receive confirmation of it before, or not enough reorg-safe depth on top of it).
let new_timer = Some(cached_request.get_height_timer(height));
let new_timer = Some(cached_request.get_height_timer(cur_height));
if cached_request.is_malleable() {
let predicted_weight = cached_request.package_weight(&self.destination_script);
if let Some((output_value, new_feerate)) = cached_request.compute_package_output(predicted_weight, fee_estimator, logger) {
Expand All @@ -377,12 +377,15 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
/// for this channel, provide new relevant on-chain transactions and/or new claim requests.
/// Formerly this was named `block_connected`, but it is now also used for claiming an HTLC output
/// if we receive a preimage after force-close.
pub(crate) fn update_claims_view<B: Deref, F: Deref, L: Deref>(&mut self, txn_matched: &[&Transaction], requests: Vec<PackageTemplate>, height: u32, broadcaster: &B, fee_estimator: &F, logger: &L)
/// `conf_height` represents the height at which the transactions in `txn_matched` were
/// confirmed, this does not need to equal the current blockchain tip height, which should be
/// provided via `cur_height`.
pub(crate) fn update_claims_view<B: Deref, F: Deref, L: Deref>(&mut self, txn_matched: &[&Transaction], requests: Vec<PackageTemplate>, conf_height: u32, cur_height: u32, broadcaster: &B, fee_estimator: &F, logger: &L)
where B::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
log_debug!(logger, "Updating claims view at height {} with {} matched transactions and {} claim requests", height, txn_matched.len(), requests.len());
log_debug!(logger, "Updating claims view at height {} with {} matched transactions in block {} and {} claim requests", cur_height, txn_matched.len(), conf_height, requests.len());
let mut preprocessed_requests = Vec::with_capacity(requests.len());
let mut aggregated_request = None;

Expand All @@ -401,17 +404,17 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
continue;
}

if req.package_timelock() > height + 1 {
log_info!(logger, "Delaying claim of package until its timelock at {} (current height {}), the following outpoints are spent:", req.package_timelock(), height);
if req.package_timelock() > cur_height + 1 {
log_info!(logger, "Delaying claim of package until its timelock at {} (current height {}), the following outpoints are spent:", req.package_timelock(), cur_height);
for outpoint in req.outpoints() {
log_info!(logger, " Outpoint {}", outpoint);
}
self.locktimed_packages.entry(req.package_timelock()).or_insert(Vec::new()).push(req);
continue;
}

log_trace!(logger, "Test if outpoint can be aggregated with expiration {} against {}", req.timelock(), height + CLTV_SHARED_CLAIM_BUFFER);
if req.timelock() <= height + CLTV_SHARED_CLAIM_BUFFER || !req.aggregable() {
log_trace!(logger, "Test if outpoint can be aggregated with expiration {} against {}", req.timelock(), cur_height + CLTV_SHARED_CLAIM_BUFFER);
if req.timelock() <= cur_height + CLTV_SHARED_CLAIM_BUFFER || !req.aggregable() {
// Don't aggregate if outpoint package timelock is soon or marked as non-aggregable
preprocessed_requests.push(req);
} else if aggregated_request.is_none() {
Expand All @@ -425,8 +428,8 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
preprocessed_requests.push(req);
}

// Claim everything up to and including height + 1
let remaining_locked_packages = self.locktimed_packages.split_off(&(height + 2));
// Claim everything up to and including cur_height + 1
let remaining_locked_packages = self.locktimed_packages.split_off(&(cur_height + 2));
for (pop_height, mut entry) in self.locktimed_packages.iter_mut() {
log_trace!(logger, "Restoring delayed claim of package(s) at their timelock at {}.", pop_height);
preprocessed_requests.append(&mut entry);
Expand All @@ -436,13 +439,13 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
// Generate claim transactions and track them to bump if necessary at
// height timer expiration (i.e in how many blocks we're going to take action).
for mut req in preprocessed_requests {
if let Some((new_timer, new_feerate, tx)) = self.generate_claim_tx(height, &req, &*fee_estimator, &*logger) {
if let Some((new_timer, new_feerate, tx)) = self.generate_claim_tx(cur_height, &req, &*fee_estimator, &*logger) {
req.set_timer(new_timer);
req.set_feerate(new_feerate);
let txid = tx.txid();
for k in req.outpoints() {
log_info!(logger, "Registering claiming request for {}:{}", k.txid, k.vout);
self.claimable_outpoints.insert(k.clone(), (txid, height));
self.claimable_outpoints.insert(k.clone(), (txid, conf_height));
}
self.pending_claim_requests.insert(txid, req);
log_info!(logger, "Broadcasting onchain {}", log_tx!(tx));
Expand Down Expand Up @@ -476,7 +479,7 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
() => {
let entry = OnchainEventEntry {
txid: tx.txid(),
height,
height: conf_height,
event: OnchainEvent::Claim { claim_request: first_claim_txid_height.0.clone() }
};
if !self.onchain_events_awaiting_threshold_conf.contains(&entry) {
Expand Down Expand Up @@ -516,7 +519,7 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
for package in claimed_outputs_material.drain(..) {
let entry = OnchainEventEntry {
txid: tx.txid(),
height,
height: conf_height,
event: OnchainEvent::ContentiousOutpoint { package },
};
if !self.onchain_events_awaiting_threshold_conf.contains(&entry) {
Expand All @@ -529,7 +532,7 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
let onchain_events_awaiting_threshold_conf =
self.onchain_events_awaiting_threshold_conf.drain(..).collect::<Vec<_>>();
for entry in onchain_events_awaiting_threshold_conf {
if entry.has_reached_confirmation_threshold(height) {
if entry.has_reached_confirmation_threshold(cur_height) {
match entry.event {
OnchainEvent::Claim { claim_request } => {
// We may remove a whole set of claim outpoints here, as these one may have
Expand All @@ -555,7 +558,7 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
// Check if any pending claim request must be rescheduled
for (first_claim_txid, ref request) in self.pending_claim_requests.iter() {
if let Some(h) = request.timer() {
if height >= h {
if cur_height >= h {
bump_candidates.insert(*first_claim_txid, (*request).clone());
}
}
Expand All @@ -564,7 +567,7 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
// Build, bump and rebroadcast tx accordingly
log_trace!(logger, "Bumping {} candidates", bump_candidates.len());
for (first_claim_txid, request) in bump_candidates.iter() {
if let Some((new_timer, new_feerate, bump_tx)) = self.generate_claim_tx(height, &request, &*fee_estimator, &*logger) {
if let Some((new_timer, new_feerate, bump_tx)) = self.generate_claim_tx(cur_height, &request, &*fee_estimator, &*logger) {
log_info!(logger, "Broadcasting RBF-bumped onchain {}", log_tx!(bump_tx));
broadcaster.broadcast_transaction(&bump_tx);
if let Some(request) = self.pending_claim_requests.get_mut(first_claim_txid) {
Expand Down
3 changes: 3 additions & 0 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ impl<'a, 'b, 'c> Node<'a, 'b, 'c> {
pub fn best_block_info(&self) -> (BlockHash, u32) {
self.blocks.lock().unwrap().last().map(|(a, b)| (a.block_hash(), *b)).unwrap()
}
pub fn get_block_header(&self, height: u32) -> BlockHeader {
self.blocks.lock().unwrap()[height as usize].0
}
}

impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> {
Expand Down
85 changes: 83 additions & 2 deletions lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
//! claim outputs on-chain.
use chain;
use chain::Listen;
use chain::Watch;
use chain::{Confirm, Listen, Watch};
use chain::channelmonitor;
use chain::channelmonitor::{ChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
use chain::transaction::OutPoint;
Expand Down Expand Up @@ -9285,3 +9284,85 @@ fn test_invalid_funding_tx() {
} else { panic!(); }
assert_eq!(nodes[1].node.list_channels().len(), 0);
}

fn do_test_tx_confirmed_skipping_blocks_immediate_broadcast(check_no_broadcast_too_early: bool) {
// In the first version of the chain::Confirm interface, after a refactor was made to not
// broadcast CSV-locked transactions until their CSV lock is up, we wouldn't reliably broadcast
// transactions after a `transactions_confirmed` call. Specifically, if the chain, provided via
// `best_block_updated` is at height N, and a transaction output which we wish to spend at
// height N-1 (due to a CSV to height N-1) is provided at height N, we will not broadcast the
// spending transaction until height N+1 (or greater). This was due to the way
// `ChannelMonitor::transactions_confirmed` worked, only checking if we should broadcast a
// spending transaction at the height the input transaction was confirmed at, not whether we
// should broadcast a spending transaction at the current height.
// A second, similar, issue involved failing HTLCs backwards - because we only provided the
// height at which transactions were confirmed to `OnchainTx::update_claims_view`, it wasn't
// aware that the anti-reorg-delay had, in fact, already expired, waiting to fail-backwards
// until we learned about an additional block.
let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
*nodes[0].connect_style.borrow_mut() = ConnectStyle::BestBlockFirstSkippingBlocks;

create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
let (chan_announce, _, channel_id, _) = create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known());
let (_, payment_hash, _) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1_000_000);
nodes[1].node.peer_disconnected(&nodes[2].node.get_our_node_id(), false);
nodes[2].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);

nodes[1].node.force_close_channel(&channel_id).unwrap();
check_closed_broadcast!(nodes[1], true);
check_added_monitors!(nodes[1], 1);
let node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(node_txn.len(), 1);

let conf_height = nodes[1].best_block_info().1;
if !check_no_broadcast_too_early {
connect_blocks(&nodes[1], 24 * 6);
} else {
// As an additional check, test that we aren't broadcasting transactions too early, either
}
nodes[1].chain_monitor.chain_monitor.transactions_confirmed(
&nodes[1].get_block_header(conf_height), &[(0, &node_txn[0])], conf_height);
if check_no_broadcast_too_early {
// If we confirmed the close transaction, but timelocks have not yet expired, we should not
// generate any events or broadcast any transactions
assert!(nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
} else {
// We should broadcast an HTLC transaction spending our funding transaction first
let spending_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(spending_txn.len(), 2);
assert_eq!(spending_txn[0], node_txn[0]);
check_spends!(spending_txn[1], node_txn[0]);
// We should also generate a SpendableOutputs event with the to_self output (as its
// timelock is up).
let descriptor_spend_txn = check_spendable_outputs!(nodes[1], node_cfgs[1].keys_manager);
assert_eq!(descriptor_spend_txn.len(), 1);

// If we also discover that the HTLC-Timeout transaction was confirmed some time ago, we
// should immediately fail-backwards the HTLC to the previous hop, without waiting for an
// additional block built on top of the current chain.
nodes[1].chain_monitor.chain_monitor.transactions_confirmed(
&nodes[1].get_block_header(conf_height + 1), &[(0, &spending_txn[1])], conf_height + 1);
expect_pending_htlcs_forwardable!(nodes[1]);
check_added_monitors!(nodes[1], 1);

let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
assert!(updates.update_add_htlcs.is_empty());
assert!(updates.update_fulfill_htlcs.is_empty());
assert_eq!(updates.update_fail_htlcs.len(), 1);
assert!(updates.update_fail_malformed_htlcs.is_empty());
assert!(updates.update_fee.is_none());
nodes[0].node.handle_update_fail_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fail_htlcs[0]);
commitment_signed_dance!(nodes[0], nodes[1], updates.commitment_signed, true, true);
expect_payment_failed!(nodes[0], payment_hash, false);
expect_payment_failure_chan_update!(nodes[0], chan_announce.contents.short_channel_id, true);
}
}
#[test]
fn test_tx_confirmed_skipping_blocks_immediate_broadcast() {
do_test_tx_confirmed_skipping_blocks_immediate_broadcast(false);
do_test_tx_confirmed_skipping_blocks_immediate_broadcast(true);
}

0 comments on commit 2cd48c3

Please sign in to comment.