Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure transactions_confirmed is idempotent #1861

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,13 @@ pub(crate) struct ChannelMonitorImpl<Signer: Sign> {
/// spending CSV for revocable outputs).
htlcs_resolved_on_chain: Vec<IrrevocablyResolvedHTLC>,

/// The set of `SpendableOutput` events which we have already passed upstream to be claimed.
/// These are tracked explicitly to ensure that we don't generate the same events redundantly
/// if users duplicatively confirm old transactions. Specifically for transactions claiming a
/// revoked remote outpoint we otherwise have no tracking at all once they've reached
/// [`ANTI_REORG_DELAY`], so we have to track them here.
spendable_txids_confirmed: Vec<Txid>,

// We simply modify best_block in Channel's block_connected so that serialization is
// consistent but hopefully the users' copy handles block_connected in a consistent way.
// (we do *not*, however, update them in update_monitor to ensure any local user copies keep
Expand Down Expand Up @@ -1071,6 +1078,7 @@ impl<Signer: Sign> Writeable for ChannelMonitorImpl<Signer> {
(7, self.funding_spend_seen, required),
(9, self.counterparty_node_id, option),
(11, self.confirmed_commitment_tx_counterparty_output, option),
(13, self.spendable_txids_confirmed, vec_type),
});

Ok(())
Expand Down Expand Up @@ -1179,6 +1187,7 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
funding_spend_confirmed: None,
confirmed_commitment_tx_counterparty_output: None,
htlcs_resolved_on_chain: Vec::new(),
spendable_txids_confirmed: Vec::new(),

best_block,
counterparty_node_id: Some(counterparty_node_id),
Expand Down Expand Up @@ -2860,7 +2869,37 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {

let mut watch_outputs = Vec::new();
let mut claimable_outpoints = Vec::new();
for tx in &txn_matched {
'tx_iter: for tx in &txn_matched {
let txid = tx.txid();
// If a transaction has already been confirmed, ensure we don't bother processing it duplicatively.
if Some(txid) == self.funding_spend_confirmed {
log_debug!(logger, "Skipping redundant processing of funding-spend tx {} as it was previously confirmed", txid);
continue 'tx_iter;
}
for ev in self.onchain_events_awaiting_threshold_conf.iter() {
if ev.txid == txid {
if let Some(conf_hash) = ev.block_hash {
assert_eq!(header.block_hash(), conf_hash,
"Transaction {} was already confirmed and is being re-confirmed in a different block.\n\
This indicates a severe bug in the transaction connection logic - a reorg should have been processed first!", ev.txid);
}
log_debug!(logger, "Skipping redundant processing of confirming tx {} as it was previously confirmed", txid);
continue 'tx_iter;
}
}
for htlc in self.htlcs_resolved_on_chain.iter() {
if Some(txid) == htlc.resolving_txid {
log_debug!(logger, "Skipping redundant processing of HTLC resolution tx {} as it was previously confirmed", txid);
continue 'tx_iter;
}
}
for spendable_txid in self.spendable_txids_confirmed.iter() {
if txid == *spendable_txid {
log_debug!(logger, "Skipping redundant processing of spendable tx {} as it was previously confirmed", txid);
continue 'tx_iter;
}
}

if tx.input.len() == 1 {
// Assuming our keys were not leaked (in which case we're screwed no matter what),
// commitment transactions and HTLC transactions will all only ever have one input,
Expand All @@ -2870,7 +2909,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
if prevout.txid == self.funding_info.0.txid && prevout.vout == self.funding_info.0.index as u32 {
let mut balance_spendable_csv = None;
log_info!(logger, "Channel {} closed by funding output spend in txid {}.",
log_bytes!(self.funding_info.0.to_channel_id()), tx.txid());
log_bytes!(self.funding_info.0.to_channel_id()), txid);
self.funding_spend_seen = true;
let mut commitment_tx_to_counterparty_output = None;
if (tx.input[0].sequence.0 >> 8*3) as u8 == 0x80 && (tx.lock_time.0 >> 8*3) as u8 == 0x20 {
Expand All @@ -2893,7 +2932,6 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
}
}
}
let txid = tx.txid();
self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry {
txid,
transaction: Some((*tx).clone()),
Expand Down Expand Up @@ -3042,6 +3080,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
self.pending_events.push(Event::SpendableOutputs {
outputs: vec![descriptor]
});
self.spendable_txids_confirmed.push(entry.txid);
},
OnchainEvent::HTLCSpendConfirmation { commitment_tx_output_idx, preimage, .. } => {
self.htlcs_resolved_on_chain.push(IrrevocablyResolvedHTLC {
Expand Down Expand Up @@ -3763,13 +3802,15 @@ impl<'a, K: KeysInterface> ReadableArgs<&'a K>
let mut funding_spend_seen = Some(false);
let mut counterparty_node_id = None;
let mut confirmed_commitment_tx_counterparty_output = None;
let mut spendable_txids_confirmed = Some(Vec::new());
read_tlv_fields!(reader, {
(1, funding_spend_confirmed, option),
(3, htlcs_resolved_on_chain, vec_type),
(5, pending_monitor_events, vec_type),
(7, funding_spend_seen, option),
(9, counterparty_node_id, option),
(11, confirmed_commitment_tx_counterparty_output, option),
(13, spendable_txids_confirmed, vec_type),
});

let mut secp_ctx = Secp256k1::new();
Expand Down Expand Up @@ -3822,6 +3863,7 @@ impl<'a, K: KeysInterface> ReadableArgs<&'a K>
funding_spend_confirmed,
confirmed_commitment_tx_counterparty_output,
htlcs_resolved_on_chain: htlcs_resolved_on_chain.unwrap(),
spendable_txids_confirmed: spendable_txids_confirmed.unwrap(),

best_block,
counterparty_node_id,
Expand Down
46 changes: 41 additions & 5 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ pub enum ConnectStyle {
/// The same as `TransactionsFirst`, however when we have multiple blocks to connect, we only
/// make a single `best_block_updated` call.
TransactionsFirstSkippingBlocks,
/// The same as `TransactionsFirst`, however when we have multiple blocks to connect, we only
/// make a single `best_block_updated` call. Further, we call `transactions_confirmed` multiple
/// times to ensure it's idempotent.
TransactionsDuplicativelyFirstSkippingBlocks,
/// The same as `TransactionsFirst`, however when we have multiple blocks to connect, we only
/// make a single `best_block_updated` call. Further, we call `transactions_confirmed` multiple
/// times to ensure it's idempotent.
HighlyRedundantTransactionsFirstSkippingBlocks,
/// The same as `TransactionsFirst` when connecting blocks. During disconnection only
/// `transaction_unconfirmed` is called.
TransactionsFirstReorgsOnlyTip,
Expand All @@ -121,14 +129,16 @@ impl ConnectStyle {
use core::hash::{BuildHasher, Hasher};
// Get a random value using the only std API to do so - the DefaultHasher
let rand_val = std::collections::hash_map::RandomState::new().build_hasher().finish();
let res = match rand_val % 7 {
let res = match rand_val % 9 {
0 => ConnectStyle::BestBlockFirst,
1 => ConnectStyle::BestBlockFirstSkippingBlocks,
2 => ConnectStyle::BestBlockFirstReorgsOnlyTip,
3 => ConnectStyle::TransactionsFirst,
4 => ConnectStyle::TransactionsFirstSkippingBlocks,
5 => ConnectStyle::TransactionsFirstReorgsOnlyTip,
6 => ConnectStyle::FullBlockViaListen,
5 => ConnectStyle::TransactionsDuplicativelyFirstSkippingBlocks,
6 => ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks,
7 => ConnectStyle::TransactionsFirstReorgsOnlyTip,
8 => ConnectStyle::FullBlockViaListen,
_ => unreachable!(),
};
eprintln!("Using Block Connection Style: {:?}", res);
Expand All @@ -143,6 +153,7 @@ impl ConnectStyle {
pub fn connect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, depth: u32) -> BlockHash {
let skip_intermediaries = match *node.connect_style.borrow() {
ConnectStyle::BestBlockFirstSkippingBlocks|ConnectStyle::TransactionsFirstSkippingBlocks|
ConnectStyle::TransactionsDuplicativelyFirstSkippingBlocks|ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks|
ConnectStyle::BestBlockFirstReorgsOnlyTip|ConnectStyle::TransactionsFirstReorgsOnlyTip => true,
_ => false,
};
Expand Down Expand Up @@ -193,8 +204,32 @@ fn do_connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: Block, sk
node.node.best_block_updated(&block.header, height);
node.node.transactions_confirmed(&block.header, &txdata, height);
},
ConnectStyle::TransactionsFirst|ConnectStyle::TransactionsFirstSkippingBlocks|ConnectStyle::TransactionsFirstReorgsOnlyTip => {
ConnectStyle::TransactionsFirst|ConnectStyle::TransactionsFirstSkippingBlocks|
ConnectStyle::TransactionsDuplicativelyFirstSkippingBlocks|ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks|
ConnectStyle::TransactionsFirstReorgsOnlyTip => {
if *node.connect_style.borrow() == ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks {
let mut connections = Vec::new();
for (block, height) in node.blocks.lock().unwrap().iter() {
if !block.txdata.is_empty() {
// Reconnect all transactions we've ever seen to ensure transaction connection
// is *really* idempotent. This is a somewhat likely deployment for some
// esplora implementations of chain sync which try to reduce state and
// complexity as much as possible.
//
// Sadly we have to clone the block here to maintain lockorder. In the
// future we should consider Arc'ing the blocks to avoid this.
connections.push((block.clone(), *height));
}
}
for (old_block, height) in connections {
node.chain_monitor.chain_monitor.transactions_confirmed(&old_block.header,
&old_block.txdata.iter().enumerate().collect::<Vec<_>>(), height);
}
}
node.chain_monitor.chain_monitor.transactions_confirmed(&block.header, &txdata, height);
if *node.connect_style.borrow() == ConnectStyle::TransactionsDuplicativelyFirstSkippingBlocks {
node.chain_monitor.chain_monitor.transactions_confirmed(&block.header, &txdata, height);
}
call_claimable_balances(node);
node.chain_monitor.chain_monitor.best_block_updated(&block.header, height);
node.node.transactions_confirmed(&block.header, &txdata, height);
Expand Down Expand Up @@ -226,7 +261,8 @@ pub fn disconnect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, count: u32)
node.chain_monitor.chain_monitor.block_disconnected(&orig.0.header, orig.1);
Listen::block_disconnected(node.node, &orig.0.header, orig.1);
},
ConnectStyle::BestBlockFirstSkippingBlocks|ConnectStyle::TransactionsFirstSkippingBlocks => {
ConnectStyle::BestBlockFirstSkippingBlocks|ConnectStyle::TransactionsFirstSkippingBlocks|
ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks|ConnectStyle::TransactionsDuplicativelyFirstSkippingBlocks => {
if i == count - 1 {
node.chain_monitor.chain_monitor.best_block_updated(&prev.0.header, prev.1);
node.node.best_block_updated(&prev.0.header, prev.1);
Expand Down
29 changes: 20 additions & 9 deletions lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2814,12 +2814,17 @@ fn test_htlc_on_chain_success() {
check_added_monitors!(nodes[1], 1);
check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed);
let node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().clone();
assert_eq!(node_txn.len(), 6); // ChannelManager : 3 (commitment tx + HTLC-Sucess * 2), ChannelMonitor : 3 (HTLC-Success, 2* RBF bumps of above HTLC txn)
assert!(node_txn.len() == 4 || node_txn.len() == 6); // ChannelManager : 3 (commitment tx + HTLC-Sucess * 2), ChannelMonitor : 3 (HTLC-Success, 2* RBF bumps of above HTLC txn)
let commitment_spend =
if node_txn[0].input[0].previous_output.txid == node_a_commitment_tx[0].txid() {
check_spends!(node_txn[1], commitment_tx[0]);
check_spends!(node_txn[2], commitment_tx[0]);
assert_ne!(node_txn[1].input[0].previous_output.vout, node_txn[2].input[0].previous_output.vout);
if node_txn.len() == 6 {
// In some block `ConnectionStyle`s we may avoid broadcasting the double-spending
// transactions spending the HTLC outputs of C's commitment transaction. Otherwise,
// check that the extra broadcasts (double-)spend those here.
check_spends!(node_txn[1], commitment_tx[0]);
check_spends!(node_txn[2], commitment_tx[0]);
assert_ne!(node_txn[1].input[0].previous_output.vout, node_txn[2].input[0].previous_output.vout);
}
&node_txn[0]
} else {
check_spends!(node_txn[0], commitment_tx[0]);
Expand All @@ -2834,10 +2839,11 @@ fn test_htlc_on_chain_success() {
assert_eq!(commitment_spend.input[1].witness.last().unwrap().len(), OFFERED_HTLC_SCRIPT_WEIGHT);
assert_eq!(commitment_spend.lock_time.0, 0);
assert!(commitment_spend.output[0].script_pubkey.is_v0_p2wpkh()); // direct payment
check_spends!(node_txn[3], chan_1.3);
assert_eq!(node_txn[3].input[0].witness.clone().last().unwrap().len(), 71);
check_spends!(node_txn[4], node_txn[3]);
check_spends!(node_txn[5], node_txn[3]);
let funding_spend_offset = if node_txn.len() == 6 { 3 } else { 1 };
check_spends!(node_txn[funding_spend_offset], chan_1.3);
assert_eq!(node_txn[funding_spend_offset].input[0].witness.clone().last().unwrap().len(), 71);
check_spends!(node_txn[funding_spend_offset + 1], node_txn[funding_spend_offset]);
check_spends!(node_txn[funding_spend_offset + 2], node_txn[funding_spend_offset]);
// We don't bother to check that B can claim the HTLC output on its commitment tx here as
// we already checked the same situation with A.

Expand Down Expand Up @@ -3370,6 +3376,12 @@ fn test_htlc_ignore_latest_remote_commitment() {
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
if *nodes[1].connect_style.borrow() == ConnectStyle::FullBlockViaListen {
// We rely on the ability to connect a block redundantly, which isn't allowed via
// `chain::Listen`, so we never run the test if we randomly get assigned that
// connect_style.
return;
}
create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());

route_payment(&nodes[0], &[&nodes[1]], 10000000);
Expand All @@ -3391,7 +3403,6 @@ fn test_htlc_ignore_latest_remote_commitment() {

// Duplicate the connect_block call since this may happen due to other listeners
// registering new transactions
header.prev_blockhash = header.block_hash();
connect_block(&nodes[1], &Block { header, txdata: vec![node_txn[0].clone(), node_txn[2].clone()]});
}

Expand Down
50 changes: 50 additions & 0 deletions lightning/src/ln/monitor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,16 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1);
assert_eq!(Vec::<Balance>::new(),
nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());

// Ensure that even if we connect more blocks, potentially replaying the entire chain if we're
// using `ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks`, we don't get new
// monitor events or claimable balances.
for node in nodes.iter() {
connect_blocks(node, 6);
connect_blocks(node, 6);
assert!(node.chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
assert!(node.chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());
}
}

#[test]
Expand Down Expand Up @@ -750,6 +760,14 @@ fn test_balances_on_local_commitment_htlcs() {
connect_blocks(&nodes[0], node_a_htlc_claimable - nodes[0].best_block_info().1);
assert!(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());
test_spendable_output(&nodes[0], &as_txn[1]);

// Ensure that even if we connect more blocks, potentially replaying the entire chain if we're
// using `ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks`, we don't get new
// monitor events or claimable balances.
connect_blocks(&nodes[0], 6);
connect_blocks(&nodes[0], 6);
assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
assert!(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());
}

#[test]
Expand Down Expand Up @@ -982,6 +1000,14 @@ fn test_no_preimage_inbound_htlc_balances() {

connect_blocks(&nodes[1], 1);
assert!(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());

// Ensure that even if we connect more blocks, potentially replaying the entire chain if we're
// using `ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks`, we don't get new
// monitor events or claimable balances.
connect_blocks(&nodes[1], 6);
connect_blocks(&nodes[1], 6);
assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
assert!(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());
}

fn sorted_vec_with_additions<T: Ord + Clone>(v_orig: &Vec<T>, extra_ts: &[&T]) -> Vec<T> {
Expand Down Expand Up @@ -1231,6 +1257,14 @@ fn do_test_revoked_counterparty_commitment_balances(confirm_htlc_spend_first: bo
test_spendable_output(&nodes[1], &claim_txn[1]);
expect_payment_failed!(nodes[1], timeout_payment_hash, false);
assert_eq!(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances(), Vec::new());

// Ensure that even if we connect more blocks, potentially replaying the entire chain if we're
// using `ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks`, we don't get new
// monitor events or claimable balances.
connect_blocks(&nodes[1], 6);
connect_blocks(&nodes[1], 6);
assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
assert!(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());
}

#[test]
Expand Down Expand Up @@ -1437,6 +1471,14 @@ fn test_revoked_counterparty_htlc_tx_balances() {
test_spendable_output(&nodes[0], &as_second_htlc_claim_tx[1]);

assert_eq!(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances(), Vec::new());

// Ensure that even if we connect more blocks, potentially replaying the entire chain if we're
// using `ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks`, we don't get new
// monitor events or claimable balances.
connect_blocks(&nodes[0], 6);
connect_blocks(&nodes[0], 6);
assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
assert!(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());
}

#[test]
Expand Down Expand Up @@ -1628,4 +1670,12 @@ fn test_revoked_counterparty_aggregated_claims() {
expect_payment_failed!(nodes[1], revoked_payment_hash, false);
test_spendable_output(&nodes[1], &claim_txn_2[0]);
assert!(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());

// Ensure that even if we connect more blocks, potentially replaying the entire chain if we're
// using `ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks`, we don't get new
// monitor events or claimable balances.
connect_blocks(&nodes[1], 6);
connect_blocks(&nodes[1], 6);
assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
assert!(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());
}
5 changes: 3 additions & 2 deletions lightning/src/ln/payment_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,9 @@ fn do_test_dup_htlc_onchain_fails_on_reload(persist_manager_post_event: bool, co
let funding_txo = OutPoint { txid: funding_tx.txid(), index: 0 };
let mon_updates: Vec<_> = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap()
.get_mut(&funding_txo).unwrap().drain().collect();
// If we are using chain::Confirm instead of chain::Listen, we will get the same update twice
assert!(mon_updates.len() == 1 || mon_updates.len() == 2);
// If we are using chain::Confirm instead of chain::Listen, we will get the same update twice.
// If we're testing connection idempotency we may get substantially more.
assert!(mon_updates.len() >= 1);
assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
assert!(nodes[0].node.get_and_clear_pending_events().is_empty());

Expand Down