Skip to content

Commit

Permalink
f DRY and use get_merkle_block
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Nov 4, 2022
1 parent 5f6676f commit 18e7fe6
Showing 1 changed file with 120 additions and 97 deletions.
217 changes: 120 additions & 97 deletions src/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use bdk::database::BatchDatabase;
use bdk::wallet::AddressIndex;
use bdk::{SignOptions, SyncOptions};

use bitcoin::{BlockHash, Script, Transaction, Txid};
use bitcoin::{BlockHash, BlockHeader, Script, Transaction, Txid};

use std::collections::HashSet;
use std::sync::{Arc, Mutex};
Expand All @@ -28,15 +28,15 @@ where
// A BDK on-chain wallet.
wallet: Mutex<bdk::Wallet<D>>,
// Transactions that were registered via the `Filter` interface and have to be processed.
queued_transactions: Mutex<Vec<Txid>>,
queued_transactions: Mutex<HashSet<Txid>>,
// Transactions that were previously processed, but must not be forgotten yet.
watched_transactions: Mutex<Vec<Txid>>,
watched_transactions: Mutex<HashSet<Txid>>,
// Outputs that were registered via the `Filter` interface and have to be processed.
queued_outputs: Mutex<Vec<WatchedOutput>>,
queued_outputs: Mutex<HashSet<WatchedOutput>>,
// Outputs that were previously processed, but must not be forgotten yet.
watched_outputs: Mutex<Vec<WatchedOutput>>,
// The tip height observed during our last sync.
last_sync_height: tokio::sync::Mutex<Option<u32>>,
watched_outputs: Mutex<HashSet<WatchedOutput>>,
// The tip hash observed during our last sync.
last_sync_hash: tokio::sync::Mutex<Option<BlockHash>>,
logger: Arc<FilesystemLogger>,
}

Expand All @@ -48,19 +48,19 @@ where
blockchain: EsploraBlockchain, wallet: bdk::Wallet<D>, logger: Arc<FilesystemLogger>,
) -> Self {
let wallet = Mutex::new(wallet);
let watched_transactions = Mutex::new(Vec::new());
let queued_transactions = Mutex::new(Vec::new());
let watched_outputs = Mutex::new(Vec::new());
let queued_outputs = Mutex::new(Vec::new());
let last_sync_height = tokio::sync::Mutex::new(None);
let watched_transactions = Mutex::new(HashSet::new());
let queued_transactions = Mutex::new(HashSet::new());
let watched_outputs = Mutex::new(HashSet::new());
let queued_outputs = Mutex::new(HashSet::new());
let last_sync_hash = tokio::sync::Mutex::new(None);
Self {
blockchain,
wallet,
queued_transactions,
watched_transactions,
queued_outputs,
watched_outputs,
last_sync_height,
last_sync_hash,
logger,
}
}
Expand All @@ -76,33 +76,71 @@ where
}

pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync)>) -> Result<(), Error> {
let client = &*self.blockchain;
// This lock makes sure we're syncing once at a time.
let mut locked_last_sync_hash = self.last_sync_hash.lock().await;

let tip_hash= client.get_tip_hash().await?;
let tip_block_status = client.get_block_status(&tip_hash).await?;
let tip_height = tip_block_status.height.unwrap_or(0);
let client = &*self.blockchain;

let mut locked_last_sync_height = self.last_sync_height.lock().await;
if tip_block_status.in_best_chain && (tip_height >= locked_last_sync_height.unwrap_or(0)) {
self.sync_best_block_updated(&confirmables, &tip_hash, tip_height).await?;
*locked_last_sync_height = Some(tip_height);
loop {
let pending_registrations = self.process_queues();
let tip_hash = client.get_tip_hash().await?;
let new_tip = Some(tip_hash) != *locked_last_sync_hash;

self.sync_transactions_confirmed(&confirmables).await?;
self.sync_transaction_unconfirmed(&confirmables).await?;
if pending_registrations || new_tip {
if new_tip {
self.sync_best_block_updated(&confirmables, &tip_hash).await?;
*locked_last_sync_hash = Some(tip_hash);
}
self.sync_transactions_confirmed(&confirmables).await?;
self.sync_transaction_unconfirmed(&confirmables).await?;
} else {
break;
}
}
// TODO: check whether new outputs have been registered by now and process them
Ok(())
}

// Processes the transaction and output queues, returns `true` if new items had been
// registered.
fn process_queues(&self) -> bool {
let mut pending_registrations = false;
{
let mut locked_queued_transactions = self.queued_transactions.lock().unwrap();
if !locked_queued_transactions.is_empty() {
let mut locked_watched_transactions = self.watched_transactions.lock().unwrap();
pending_registrations = true;

locked_watched_transactions.extend(locked_queued_transactions.iter());
*locked_queued_transactions = HashSet::new();
}
}
{
let mut locked_queued_outputs = self.queued_outputs.lock().unwrap();
if !locked_queued_outputs.is_empty() {
let mut locked_watched_outputs = self.watched_outputs.lock().unwrap();
pending_registrations = true;

locked_watched_outputs.extend(locked_queued_outputs.iter().cloned());
*locked_queued_outputs = HashSet::new();
}
}
pending_registrations
}

async fn sync_best_block_updated(
&self, confirmables: &Vec<&(dyn Confirm + Sync)>, tip_hash: &BlockHash, tip_height: u32
&self, confirmables: &Vec<&(dyn Confirm + Sync)>, tip_hash: &BlockHash,
) -> Result<(), Error> {
let client = &*self.blockchain;

// Inform the interface of the new block.
let tip_block_header = client.get_header_by_hash(tip_hash).await?;
for c in confirmables {
c.best_block_updated(&tip_block_header, tip_height);
let tip_header = client.get_header_by_hash(tip_hash).await?;
let tip_status = client.get_block_status(&tip_hash).await?;
if tip_status.in_best_chain {
if let Some(tip_height) = tip_status.height {
for c in confirmables {
c.best_block_updated(&tip_header, tip_height);
}
}
}
Ok(())
}
Expand All @@ -119,104 +157,82 @@ where

// Check in the current queue, as well as in registered transactions leftover from
// previous iterations.
let registered_txs: HashSet<Txid> = {
let locked_queued_transactions = self.queued_transactions.lock().unwrap();
let locked_watched_transactions = self.watched_transactions.lock().unwrap();
locked_watched_transactions
.iter()
.chain(locked_queued_transactions.iter())
.cloned()
.collect()
};
let registered_txs = self.watched_transactions.lock().unwrap().clone();

// Remember all registered but unconfirmed transactions for future processing.
let mut unconfirmed_registered_txs = Vec::new();
let mut unconfirmed_registered_txs = HashSet::new();

for txid in registered_txs {
if let Some(tx_status) = client.get_tx_status(&txid).await? {
if tx_status.confirmed {
if let Some(block_hash) = tx_status.block_hash {
if let Some(tx) = client.get_tx(&txid).await? {
let block_header = client.get_header_by_hash(&block_hash).await?;
if let Some(merkle_proof) = client.get_merkle_proof(&txid).await? {
confirmed_txs.push((
tx,
merkle_proof.block_height,
block_header,
merkle_proof.pos,
));
continue;
}
}
}
}
if let Some(confirmed_tx) = self.get_confirmed_tx(&txid).await? {
confirmed_txs.push(confirmed_tx);
} else {
unconfirmed_registered_txs.insert(txid);
}
unconfirmed_registered_txs.push(txid);
}

// Check all registered outputs for dependent spending transactions.
let registered_outputs: Vec<WatchedOutput> = {
let locked_queued_outputs = self.queued_outputs.lock().unwrap();
let locked_watched_outputs = self.watched_outputs.lock().unwrap();
locked_watched_outputs.iter().chain(locked_queued_outputs.iter()).cloned().collect()
};
let registered_outputs = self.watched_outputs.lock().unwrap().clone();

// Remember all registered outputs that haven't been spent for future processing.
let mut unspent_registered_outputs = Vec::new();
let mut unspent_registered_outputs = HashSet::new();

for output in registered_outputs {
if let Some(output_status) = client
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)
.await?
{
if output_status.spent {
if let Some(spending_tx_status) = output_status.status {
if spending_tx_status.confirmed {
let spending_txid = output_status.txid.unwrap();
if let Some(spending_tx) = client.get_tx(&spending_txid).await? {
if let Some(block_hash) = spending_tx_status.block_hash {
let block_header = client.get_header_by_hash(&block_hash).await?;
if let Some(merkle_proof) =
client.get_merkle_proof(&spending_txid).await?
{
confirmed_txs.push((
spending_tx,
merkle_proof.block_height,
block_header,
merkle_proof.pos,
));
continue;
}
}
}
}
if let Some(spending_txid) = output_status.txid {
if let Some(confirmed_tx_tuple) = self.get_confirmed_tx(&spending_txid).await? {
confirmed_txs.push(confirmed_tx_tuple);
continue;
}
}
}
unspent_registered_outputs.push(output);
unspent_registered_outputs.insert(output);
}

// Sort all confirmed transactions first by block height, then by in-block
// position, and finally feed them to the interface in order.
confirmed_txs.sort_unstable_by(
|(_, block_height1, _, pos1), (_, block_height2, _, pos2)| {
block_height1.cmp(&block_height2).then_with(|| pos1.cmp(&pos2))
},
);
for (tx, block_height, block_header, pos) in confirmed_txs {
confirmed_txs.sort_unstable_by(|tx1, tx2| {
tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
});
for ctx in confirmed_txs {
for c in confirmables {
c.transactions_confirmed(&block_header, &[(pos, &tx)], block_height);
c.transactions_confirmed(
&ctx.block_header,
&[(ctx.pos, &ctx.tx)],
ctx.block_height,
);
}
}

*self.queued_transactions.lock().unwrap() = Vec::new();
*self.watched_transactions.lock().unwrap() = unconfirmed_registered_txs;
*self.queued_outputs.lock().unwrap() = Vec::new();
*self.watched_outputs.lock().unwrap() = unspent_registered_outputs;

Ok(())
}

async fn get_confirmed_tx(&self, txid: &Txid) -> Result<Option<ConfirmedTx>, Error> {
let client = &*self.blockchain;
if let Some(merkle_block) = client.get_merkle_block(&txid).await? {
let mut matches = vec![*txid];
let mut indexes = Vec::new();
let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
assert_eq!(indexes.len(), 1);
let pos = indexes[0] as usize;

if let Some(tx) = client.get_tx(&txid).await? {
let block_header = merkle_block.header;
let block_hash = block_header.block_hash();
let block_status = client.get_block_status(&block_hash).await?;
if let Some(block_height) = block_status.height {
return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
}
}
}
Ok(None)
}

async fn sync_transaction_unconfirmed(
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
) -> Result<(), Error> {
Expand All @@ -232,7 +248,7 @@ where
if let Some(block_hash) = block_hash_opt {
let block_status = client.get_block_status(&block_hash).await?;
if block_status.in_best_chain {
// Skip if the block in queestion is still confirmed.
// Skip if the block in question is still confirmed.
continue;
}
}
Expand Down Expand Up @@ -281,7 +297,14 @@ where
}
}

impl<D> FeeEstimator for LdkLiteChainAccess<D>
struct ConfirmedTx {
tx: Transaction,
block_header: BlockHeader,
block_height: u32,
pos: usize,
}

impl<D> FeeEstimator for ChainAccess<D>
where
D: BatchDatabase,
{
Expand Down Expand Up @@ -314,11 +337,11 @@ where
D: BatchDatabase,
{
fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
self.queued_transactions.lock().unwrap().push(*txid);
self.queued_transactions.lock().unwrap().insert(*txid);
}

fn register_output(&self, output: WatchedOutput) {
self.queued_outputs.lock().unwrap().push(output);
self.queued_outputs.lock().unwrap().insert(output);
}
}

Expand Down

0 comments on commit 18e7fe6

Please sign in to comment.