Skip to content

Commit

Permalink
f Go async
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Sep 22, 2022
1 parent 0475505 commit 35bad47
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 65 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ lightning-background-processor = { version = "0.0.110" }
lightning-rapid-gossip-sync = { version = "0.0.110" }

#bdk = "0.20.0"
bdk = { git = "https://github.com/tnull/bdk", branch="feat/use-external-esplora-client", features = ["use-esplora-ureq", "key-value-db"]}
bdk = { git = "https://github.com/tnull/bdk", branch="feat/use-external-esplora-client", features = ["use-esplora-reqwest", "key-value-db"]}
bitcoin = "0.28.1"

rand = "0.8.5"
Expand Down
126 changes: 62 additions & 64 deletions src/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use bdk::{SignOptions, SyncOptions};
use bitcoin::{BlockHash, Script, Transaction, Txid};

use std::collections::HashSet;
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{Arc, Mutex};

/// The minimum feerate we are allowed to send, as specify by LDK.
const MIN_FEERATE: u32 = 253;
Expand All @@ -32,7 +32,7 @@ where
watched_transactions: Mutex<Vec<Txid>>,
queued_outputs: Mutex<Vec<WatchedOutput>>,
watched_outputs: Mutex<Vec<WatchedOutput>>,
last_sync_height: Mutex<Option<u32>>,
last_sync_height: tokio::sync::Mutex<Option<u32>>,
logger: Arc<FilesystemLogger>,
}

Expand All @@ -48,7 +48,7 @@ where
let queued_transactions = Mutex::new(Vec::new());
let watched_outputs = Mutex::new(Vec::new());
let queued_outputs = Mutex::new(Vec::new());
let last_sync_height = Mutex::new(None);
let last_sync_height = tokio::sync::Mutex::new(None);
Self {
blockchain,
wallet,
Expand All @@ -61,7 +61,7 @@ where
}
}

pub(crate) fn sync_wallet(&self) -> Result<(), Error> {
pub(crate) async fn sync_wallet(&self) -> Result<(), Error> {
let sync_options = SyncOptions { progress: None };

self.wallet
Expand All @@ -73,73 +73,70 @@ where
Ok(())
}

pub(crate) fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync)>) -> Result<(), Error> {
pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync)>) -> Result<(), Error> {
let client = &*self.blockchain;

let cur_height = client.get_height()?;
let cur_height = client.get_height().await?;

let mut locked_last_sync_height = self.last_sync_height.lock().unwrap();
let mut locked_last_sync_height = self.last_sync_height.lock().await;
if cur_height >= locked_last_sync_height.unwrap_or(0) {
self.sync_best_block_updated(
confirmables.clone(),
cur_height,
&mut locked_last_sync_height,
)?;
self.sync_transactions_confirmed(confirmables.clone())?;
self.sync_transaction_unconfirmed(confirmables.clone())?;
self.sync_best_block_updated(&confirmables, cur_height, &mut locked_last_sync_height)
.await?;
self.sync_transactions_confirmed(&confirmables).await?;
self.sync_transaction_unconfirmed(&confirmables).await?;
}
// TODO: check whether new outputs have been registered by now and process them
Ok(())
}

fn sync_best_block_updated(
&self, confirmables: Vec<&(dyn Confirm + Sync)>, cur_height: u32,
locked_last_sync_height: &mut MutexGuard<Option<u32>>,
async fn sync_best_block_updated(
&self, confirmables: &Vec<&(dyn Confirm + Sync)>, cur_height: u32,
locked_last_sync_height: &mut tokio::sync::MutexGuard<'_, Option<u32>>,
) -> Result<(), Error> {
let client = &*self.blockchain;

// Inform the interface of the new block.
let cur_block_header = client.get_header(cur_height)?;
for c in &confirmables {
let cur_block_header = client.get_header(cur_height).await?;
for c in confirmables {
c.best_block_updated(&cur_block_header, cur_height);
}

**locked_last_sync_height = Some(cur_height);
Ok(())
}

fn sync_transactions_confirmed(
&self, confirmables: Vec<&(dyn Confirm + Sync)>,
async fn sync_transactions_confirmed(
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
) -> Result<(), Error> {
let client = &*self.blockchain;

// First, check the confirmation status of registered transactions as well as the
// status of dependent transactions of registered outputs.
let mut locked_queued_transactions = self.queued_transactions.lock().unwrap();
let mut locked_queued_outputs = self.queued_outputs.lock().unwrap();
let mut locked_watched_transactions = self.watched_transactions.lock().unwrap();
let mut locked_watched_outputs = self.watched_outputs.lock().unwrap();

let mut confirmed_txs = Vec::new();

// Check in the current queue, as well as in registered transactions leftover from
// previous iterations.
let registered_txs: HashSet<Txid> = locked_watched_transactions
.iter()
.chain(locked_queued_transactions.iter())
.cloned()
.collect();
let registered_txs: HashSet<Txid> = {
let locked_queued_transactions = self.queued_transactions.lock().unwrap();
let locked_watched_transactions = self.watched_transactions.lock().unwrap();
locked_watched_transactions
.iter()
.chain(locked_queued_transactions.iter())
.cloned()
.collect()
};

// Remember all registered but unconfirmed transactions for future processing.
let mut unconfirmed_registered_txs = Vec::new();

for txid in registered_txs {
if let Some(tx_status) = client.get_tx_status(&txid)? {
if let Some(tx_status) = client.get_tx_status(&txid).await? {
if tx_status.confirmed {
if let Some(tx) = client.get_tx(&txid)? {
if let Some(tx) = client.get_tx(&txid).await? {
if let Some(block_height) = tx_status.block_height {
let block_header = client.get_header(block_height)?;
if let Some(merkle_proof) = client.get_merkle_proof(&txid)? {
let block_header = client.get_header(block_height).await?;
if let Some(merkle_proof) = client.get_merkle_proof(&txid).await? {
confirmed_txs.push((
tx,
block_height,
Expand All @@ -156,25 +153,29 @@ where
}

// Check all registered outputs for dependent spending transactions.
let registered_outputs: Vec<WatchedOutput> =
locked_watched_outputs.iter().chain(locked_queued_outputs.iter()).cloned().collect();
let registered_outputs: Vec<WatchedOutput> = {
let locked_queued_outputs = self.queued_outputs.lock().unwrap();
let locked_watched_outputs = self.watched_outputs.lock().unwrap();
locked_watched_outputs.iter().chain(locked_queued_outputs.iter()).cloned().collect()
};

// Remember all registered outputs that haven't been spent for future processing.
let mut unspent_registered_outputs = Vec::new();

for output in registered_outputs {
if let Some(output_status) =
client.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)?
if let Some(output_status) = client
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)
.await?
{
if output_status.spent {
if let Some(spending_tx_status) = output_status.status {
if spending_tx_status.confirmed {
let spending_txid = output_status.txid.unwrap();
if let Some(spending_tx) = client.get_tx(&spending_txid)? {
if let Some(spending_tx) = client.get_tx(&spending_txid).await? {
let block_height = spending_tx_status.block_height.unwrap();
let block_header = client.get_header(block_height)?;
let block_header = client.get_header(block_height).await?;
if let Some(merkle_proof) =
client.get_merkle_proof(&spending_txid)?
client.get_merkle_proof(&spending_txid).await?
{
confirmed_txs.push((
spending_tx,
Expand All @@ -200,41 +201,38 @@ where
},
);
for (tx, block_height, block_header, pos) in confirmed_txs {
for c in &confirmables {
for c in confirmables {
c.transactions_confirmed(&block_header, &[(pos, &tx)], block_height);
}
}

*locked_watched_transactions = unconfirmed_registered_txs;
*locked_queued_transactions = Vec::new();
*locked_watched_outputs = unspent_registered_outputs;
*locked_queued_outputs = Vec::new();
*self.queued_transactions.lock().unwrap() = Vec::new();
*self.watched_transactions.lock().unwrap() = unconfirmed_registered_txs;
*self.queued_outputs.lock().unwrap() = Vec::new();
*self.watched_outputs.lock().unwrap() = unspent_registered_outputs;

Ok(())
}

fn sync_transaction_unconfirmed(
&self, confirmables: Vec<&(dyn Confirm + Sync)>,
async fn sync_transaction_unconfirmed(
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
) -> Result<(), Error> {
let client = &*self.blockchain;
// Query the interface for relevant txids and check whether they have been
// reorged-out of the chain.
let unconfirmed_txids = confirmables
.iter()
.flat_map(|c| c.get_relevant_txids())
.filter(|txid| {
client
.get_tx_status(txid)
.ok()
.unwrap_or(None)
.map_or(true, |status| !status.confirmed)
})
.collect::<Vec<Txid>>();

// Mark all relevant unconfirmed transactions as unconfirmed.
for txid in &unconfirmed_txids {
for c in &confirmables {
c.transaction_unconfirmed(txid);
let relevant_txids =
confirmables.iter().flat_map(|c| c.get_relevant_txids()).collect::<HashSet<Txid>>();
for txid in relevant_txids {
let tx_unconfirmed = client
.get_tx_status(&txid)
.await
.ok()
.unwrap_or(None)
.map_or(true, |status| !status.confirmed);
if tx_unconfirmed {
for c in confirmables {
c.transaction_unconfirmed(&txid);
}
}
}

Expand Down

0 comments on commit 35bad47

Please sign in to comment.