Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generic sensei persistence layer, chain manager uses trait objs instead of concrete BitcoindClient type #37

Merged
merged 10 commits into from
Apr 21, 2022
20 changes: 7 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ name = "senseid"
path = "src/main.rs"

[dependencies]
lightning = { version = "0.0.106", features = ["max_level_trace"] }
lightning-block-sync = { version = "0.0.106", features = [ "rpc-client" ] }
lightning-invoice = { version = "0.14.0" }
lightning-net-tokio = { version = "0.0.106" }
lightning-persister = { version = "0.0.106" }
lightning-background-processor = { version = "0.0.106" }
lightning = { version = "0.0.106", features = ["max_level_trace"], git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" }
lightning-block-sync = { version = "0.0.106", features = [ "rpc-client" ], git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" }
lightning-invoice = { version = "0.14.0", git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" }
lightning-net-tokio = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" }
lightning-persister = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" }
lightning-background-processor = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" }

base64 = "0.13.0"
bitcoin = "0.27"
Expand Down
61 changes: 12 additions & 49 deletions src/chain/bitcoind_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ impl TryInto<BlockchainInfo> for JsonResponse {
}
pub struct BitcoindClient {
bitcoind_rpc_client: Arc<Mutex<RpcClient>>,
host: String,
port: u16,
rpc_user: String,
rpc_password: String,
fees: Arc<HashMap<Target, AtomicU32>>,
handle: tokio::runtime::Handle,
}
Expand All @@ -72,31 +68,28 @@ pub enum Target {
HighPriority,
}

impl BlockSource for &BitcoindClient {
impl BlockSource for BitcoindClient {
fn get_header<'a>(
&'a mut self,
&'a self,
header_hash: &'a BlockHash,
height_hint: Option<u32>,
) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
let mut rpc = self.bitcoind_rpc_client.lock().await;
let rpc = self.bitcoind_rpc_client.lock().await;
rpc.get_header(header_hash, height_hint).await
})
}

fn get_block<'a>(
&'a mut self,
header_hash: &'a BlockHash,
) -> AsyncBlockSourceResult<'a, Block> {
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
let mut rpc = self.bitcoind_rpc_client.lock().await;
let rpc = self.bitcoind_rpc_client.lock().await;
rpc.get_block(header_hash).await
})
}

fn get_best_block(&mut self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)> {
fn get_best_block(&self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)> {
Box::pin(async move {
let mut rpc = self.bitcoind_rpc_client.lock().await;
let rpc = self.bitcoind_rpc_client.lock().await;
rpc.get_best_block().await
})
}
Expand All @@ -116,7 +109,7 @@ impl BitcoindClient {
let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port);
let rpc_credentials =
base64::encode(format!("{}:{}", rpc_user.clone(), rpc_password.clone()));
let mut bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?;
let bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?;
let _dummy = bitcoind_rpc_client
.call_method::<BlockchainInfo>("getblockchaininfo", &[])
.await
Expand All @@ -130,10 +123,6 @@ impl BitcoindClient {
fees.insert(Target::HighPriority, AtomicU32::new(5000));
let client = Self {
bitcoind_rpc_client: Arc::new(Mutex::new(bitcoind_rpc_client)),
host,
port,
rpc_user,
rpc_password,
fees: Arc::new(fees),
handle: handle.clone(),
};
Expand All @@ -153,7 +142,7 @@ impl BitcoindClient {
handle.spawn(async move {
loop {
let background_estimate = {
let mut rpc = rpc_client.lock().await;
let rpc = rpc_client.lock().await;
let background_conf_target = serde_json::json!(144);
let background_estimate_mode = serde_json::json!("ECONOMICAL");
let resp = rpc
Expand All @@ -170,7 +159,7 @@ impl BitcoindClient {
};

let normal_estimate = {
let mut rpc = rpc_client.lock().await;
let rpc = rpc_client.lock().await;
let normal_conf_target = serde_json::json!(18);
let normal_estimate_mode = serde_json::json!("ECONOMICAL");
let resp = rpc
Expand All @@ -187,7 +176,7 @@ impl BitcoindClient {
};

let high_prio_estimate = {
let mut rpc = rpc_client.lock().await;
let rpc = rpc_client.lock().await;
let high_prio_conf_target = serde_json::json!(6);
let high_prio_estimate_mode = serde_json::json!("CONSERVATIVE");
let resp = rpc
Expand Down Expand Up @@ -217,32 +206,6 @@ impl BitcoindClient {
}
});
}

pub fn get_new_rpc_client(&self) -> std::io::Result<RpcClient> {
let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port);
let rpc_credentials = base64::encode(format!(
"{}:{}",
self.rpc_user.clone(),
self.rpc_password.clone()
));
RpcClient::new(&rpc_credentials, http_endpoint)
}

pub async fn send_raw_transaction(&self, raw_tx: String) {
let mut rpc = self.bitcoind_rpc_client.lock().await;

let raw_tx_json = serde_json::json!(raw_tx);
rpc.call_method::<Txid>("sendrawtransaction", &[raw_tx_json])
.await
.unwrap();
}

pub async fn get_blockchain_info(&self) -> BlockchainInfo {
let mut rpc = self.bitcoind_rpc_client.lock().await;
rpc.call_method::<BlockchainInfo>("getblockchaininfo", &[])
.await
.unwrap()
}
}

impl FeeEstimator for BitcoindClient {
Expand Down Expand Up @@ -272,7 +235,7 @@ impl BroadcasterInterface for BitcoindClient {
let bitcoind_rpc_client = self.bitcoind_rpc_client.clone();
let tx_serialized = serde_json::json!(encode::serialize_hex(tx));
self.handle.spawn(async move {
let mut rpc = bitcoind_rpc_client.lock().await;
let rpc = bitcoind_rpc_client.lock().await;
// This may error due to RL calling `broadcast_transaction` with the same transaction
// multiple times, but the error is safe to ignore.
match rpc
Expand Down
18 changes: 11 additions & 7 deletions src/chain/listener_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,17 @@ impl ListenerDatabase {
.get_path_from_script_pubkey(&output.script_pubkey)
.unwrap()
{
database
.set_utxo(&LocalUtxo {
outpoint: OutPoint::new(tx.txid(), i as u32),
txout: output.clone(),
keychain,
})
.unwrap();
let outpoint = OutPoint::new(tx.txid(), i as u32);
let existing_utxo = database.get_utxo(&outpoint).unwrap();
if existing_utxo.is_none() {
database
.set_utxo(&LocalUtxo {
outpoint: OutPoint::new(tx.txid(), i as u32),
txout: output.clone(),
keychain,
})
.unwrap();
}
incoming += output.value;

// TODO: implement this
Expand Down
55 changes: 25 additions & 30 deletions src/chain/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,50 +11,44 @@ use crate::{
node::{ChainMonitor, ChannelManager},
};
use bitcoin::BlockHash;
use lightning::chain::{BestBlock, Listen};
use lightning::chain::{
chaininterface::{BroadcasterInterface, FeeEstimator},
BestBlock, Listen,
};
use lightning_block_sync::SpvClient;
use lightning_block_sync::{init, poll, UnboundedCache};
use lightning_block_sync::{poll::ValidatedBlockHeader, BlockSource};
use std::ops::Deref;

use super::{
bitcoind_client::BitcoindClient, listener::SenseiChainListener,
listener_database::ListenerDatabase,
};
use super::{listener::SenseiChainListener, listener_database::ListenerDatabase};

pub struct SenseiChainManager {
config: SenseiConfig,
pub listener: Arc<SenseiChainListener>,
pub bitcoind_client: Arc<BitcoindClient>,
pub block_source: Arc<dyn BlockSource + Send + Sync>,
pub fee_estimator: Arc<dyn FeeEstimator + Send + Sync>,
pub broadcaster: Arc<dyn BroadcasterInterface + Send + Sync>,
poller_paused: Arc<AtomicBool>,
}

impl SenseiChainManager {
pub async fn new(config: SenseiConfig) -> Result<Self, crate::error::Error> {
pub async fn new(
config: SenseiConfig,
block_source: Arc<dyn BlockSource + Send + Sync>,
fee_estimator: Arc<dyn FeeEstimator + Send + Sync>,
broadcaster: Arc<dyn BroadcasterInterface + Send + Sync>,
) -> Result<Self, crate::error::Error> {
let listener = Arc::new(SenseiChainListener::new());

let bitcoind_client = Arc::new(
BitcoindClient::new(
config.bitcoind_rpc_host.clone(),
config.bitcoind_rpc_port,
config.bitcoind_rpc_username.clone(),
config.bitcoind_rpc_password.clone(),
tokio::runtime::Handle::current(),
)
.await
.expect("invalid bitcoind rpc config"),
);

let poller_paused = Arc::new(AtomicBool::new(false));

let block_source_poller = bitcoind_client.clone();
let block_source_poller = block_source.clone();
let listener_poller = listener.clone();
let poller_paused = Arc::new(AtomicBool::new(false));
let poller_paused_poller = poller_paused.clone();
tokio::spawn(async move {
let derefed = &mut block_source_poller.deref();
let mut cache = UnboundedCache::new();
let chain_tip = init::validate_best_block_header(derefed).await.unwrap();
let chain_poller = poll::ChainPoller::new(derefed, config.network);
let chain_tip = init::validate_best_block_header(block_source_poller.clone())
.await
.unwrap();
let chain_poller = poll::ChainPoller::new(block_source_poller, config.network);
let mut spv_client =
SpvClient::new(chain_tip, chain_poller, &mut cache, listener_poller);
loop {
Expand All @@ -68,8 +62,10 @@ impl SenseiChainManager {
Ok(Self {
config,
listener,
bitcoind_client,
poller_paused,
block_source,
fee_estimator,
broadcaster,
})
}

Expand All @@ -78,7 +74,7 @@ impl SenseiChainManager {
chain_listeners: Vec<(BlockHash, &(dyn Listen + Send + Sync))>,
) -> Result<ValidatedBlockHeader, crate::error::Error> {
let chain_tip = init::synchronize_listeners(
&mut self.bitcoind_client.deref(),
self.block_source.clone(),
self.config.network,
&mut UnboundedCache::new(),
chain_listeners,
Expand Down Expand Up @@ -121,8 +117,7 @@ impl SenseiChainManager {
}

pub async fn get_best_block(&self) -> Result<BestBlock, crate::error::Error> {
let mut block_source = self.bitcoind_client.deref();
let (latest_blockhash, latest_height) = block_source.get_best_block().await.unwrap();
let (latest_blockhash, latest_height) = self.block_source.get_best_block().await.unwrap();
Ok(BestBlock::new(latest_blockhash, latest_height.unwrap()))
}
}
Loading