diff --git a/Cargo.lock b/Cargo.lock index fcecb56..cf43677 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1075,8 +1075,7 @@ dependencies = [ [[package]] name = "lightning" version = "0.0.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "580647f97f8e6d138ad724027c8ca9b890b1001b05374c270bbee4c10309b641" +source = "git+https://github.com/lightningdevkit/rust-lightning?rev=d0f69f77bd6ed40bff7ef1026f23e4444a5a884a#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "secp256k1", @@ -1085,22 +1084,20 @@ dependencies = [ [[package]] name = "lightning-background-processor" version = "0.0.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a9c8c4c6e4b9652287d8ce2fc3a168861fdb40eb68793567be5ed77ecce6eb" +source = "git+https://github.com/lightningdevkit/rust-lightning?rev=d0f69f77bd6ed40bff7ef1026f23e4444a5a884a#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "lightning", - "lightning-persister", ] [[package]] name = "lightning-block-sync" version = "0.0.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8731c4f20bd4e0d588db6e849ac2114674a112cdfda65354cf9b4cf6018878a" +source = "git+https://github.com/lightningdevkit/rust-lightning?rev=d0f69f77bd6ed40bff7ef1026f23e4444a5a884a#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "chunked_transfer", + "futures", "lightning", "serde", "serde_json", @@ -1109,8 +1106,7 @@ dependencies = [ [[package]] name = "lightning-invoice" version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f069b6eb46d7639d07977d14dc7d9a40d9d8bc5ac2b47f1924318fec13edd5c0" +source = "git+https://github.com/lightningdevkit/rust-lightning?rev=d0f69f77bd6ed40bff7ef1026f23e4444a5a884a#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bech32", "bitcoin_hashes", @@ -1122,8 +1118,7 @@ dependencies = [ [[package]] name = "lightning-net-tokio" version = "0.0.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85119f898ac097d46c17a0ad7dda0f6ef6b923e5bcb4d1a5e39d33c3c68aa7bc" +source = "git+https://github.com/lightningdevkit/rust-lightning?rev=d0f69f77bd6ed40bff7ef1026f23e4444a5a884a#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "lightning", @@ -1133,8 +1128,7 @@ dependencies = [ [[package]] name = "lightning-persister" version = "0.0.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f8aac01a61b302f3928adf235660c38aa5c246113fc7d19cc4cb60b5f53b7ae" +source = "git+https://github.com/lightningdevkit/rust-lightning?rev=d0f69f77bd6ed40bff7ef1026f23e4444a5a884a#d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" dependencies = [ "bitcoin", "libc", diff --git a/Cargo.toml b/Cargo.toml index b40a2e6..4b75f89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,12 +12,12 @@ name = "senseid" path = "src/main.rs" [dependencies] -lightning = { version = "0.0.106", features = ["max_level_trace"] } -lightning-block-sync = { version = "0.0.106", features = [ "rpc-client" ] } -lightning-invoice = { version = "0.14.0" } -lightning-net-tokio = { version = "0.0.106" } -lightning-persister = { version = "0.0.106" } -lightning-background-processor = { version = "0.0.106" } +lightning = { version = "0.0.106", features = ["max_level_trace"], git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" } +lightning-block-sync = { version = "0.0.106", features = [ "rpc-client" ], git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" } +lightning-invoice = { version = "0.14.0", git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" } +lightning-net-tokio = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" } +lightning-persister = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" } +lightning-background-processor = { version = "0.0.106", git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0f69f77bd6ed40bff7ef1026f23e4444a5a884a" } base64 = "0.13.0" bitcoin = "0.27" diff --git a/src/chain/bitcoind_client.rs b/src/chain/bitcoind_client.rs index 351d8e4..d831231 100644 --- a/src/chain/bitcoind_client.rs +++ b/src/chain/bitcoind_client.rs @@ -57,10 +57,6 @@ impl TryInto for JsonResponse { } pub struct BitcoindClient { bitcoind_rpc_client: Arc>, - host: String, - port: u16, - rpc_user: String, - rpc_password: String, fees: Arc>, handle: tokio::runtime::Handle, } @@ -72,31 +68,28 @@ pub enum Target { HighPriority, } -impl BlockSource for &BitcoindClient { +impl BlockSource for BitcoindClient { fn get_header<'a>( - &'a mut self, + &'a self, header_hash: &'a BlockHash, height_hint: Option, ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { Box::pin(async move { - let mut rpc = self.bitcoind_rpc_client.lock().await; + let rpc = self.bitcoind_rpc_client.lock().await; rpc.get_header(header_hash, height_hint).await }) } - fn get_block<'a>( - &'a mut self, - header_hash: &'a BlockHash, - ) -> AsyncBlockSourceResult<'a, Block> { + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { Box::pin(async move { - let mut rpc = self.bitcoind_rpc_client.lock().await; + let rpc = self.bitcoind_rpc_client.lock().await; rpc.get_block(header_hash).await }) } - fn get_best_block(&mut self) -> AsyncBlockSourceResult<(BlockHash, Option)> { + fn get_best_block(&self) -> AsyncBlockSourceResult<(BlockHash, Option)> { Box::pin(async move { - let mut rpc = self.bitcoind_rpc_client.lock().await; + let rpc = self.bitcoind_rpc_client.lock().await; rpc.get_best_block().await }) } @@ -116,7 +109,7 @@ impl BitcoindClient { let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port); let rpc_credentials = base64::encode(format!("{}:{}", rpc_user.clone(), rpc_password.clone())); - let mut bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?; + let bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?; let _dummy = bitcoind_rpc_client .call_method::("getblockchaininfo", &[]) .await @@ -130,10 +123,6 @@ impl BitcoindClient { fees.insert(Target::HighPriority, AtomicU32::new(5000)); let client = Self { bitcoind_rpc_client: Arc::new(Mutex::new(bitcoind_rpc_client)), - host, - port, - rpc_user, - rpc_password, fees: Arc::new(fees), handle: handle.clone(), }; @@ -153,7 +142,7 @@ impl BitcoindClient { handle.spawn(async move { loop { let background_estimate = { - let mut rpc = rpc_client.lock().await; + let rpc = rpc_client.lock().await; let background_conf_target = serde_json::json!(144); let background_estimate_mode = serde_json::json!("ECONOMICAL"); let resp = rpc @@ -170,7 +159,7 @@ impl BitcoindClient { }; let normal_estimate = { - let mut rpc = rpc_client.lock().await; + let rpc = rpc_client.lock().await; let normal_conf_target = serde_json::json!(18); let normal_estimate_mode = serde_json::json!("ECONOMICAL"); let resp = rpc @@ -187,7 +176,7 @@ impl BitcoindClient { }; let high_prio_estimate = { - let mut rpc = rpc_client.lock().await; + let rpc = rpc_client.lock().await; let high_prio_conf_target = serde_json::json!(6); let high_prio_estimate_mode = serde_json::json!("CONSERVATIVE"); let resp = rpc @@ -217,32 +206,6 @@ impl BitcoindClient { } }); } - - pub fn get_new_rpc_client(&self) -> std::io::Result { - let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port); - let rpc_credentials = base64::encode(format!( - "{}:{}", - self.rpc_user.clone(), - self.rpc_password.clone() - )); - RpcClient::new(&rpc_credentials, http_endpoint) - } - - pub async fn send_raw_transaction(&self, raw_tx: String) { - let mut rpc = self.bitcoind_rpc_client.lock().await; - - let raw_tx_json = serde_json::json!(raw_tx); - rpc.call_method::("sendrawtransaction", &[raw_tx_json]) - .await - .unwrap(); - } - - pub async fn get_blockchain_info(&self) -> BlockchainInfo { - let mut rpc = self.bitcoind_rpc_client.lock().await; - rpc.call_method::("getblockchaininfo", &[]) - .await - .unwrap() - } } impl FeeEstimator for BitcoindClient { @@ -272,7 +235,7 @@ impl BroadcasterInterface for BitcoindClient { let bitcoind_rpc_client = self.bitcoind_rpc_client.clone(); let tx_serialized = serde_json::json!(encode::serialize_hex(tx)); self.handle.spawn(async move { - let mut rpc = bitcoind_rpc_client.lock().await; + let rpc = bitcoind_rpc_client.lock().await; // This may error due to RL calling `broadcast_transaction` with the same transaction // multiple times, but the error is safe to ignore. match rpc diff --git a/src/chain/listener_database.rs b/src/chain/listener_database.rs index 376ee50..07459c9 100644 --- a/src/chain/listener_database.rs +++ b/src/chain/listener_database.rs @@ -98,13 +98,17 @@ impl ListenerDatabase { .get_path_from_script_pubkey(&output.script_pubkey) .unwrap() { - database - .set_utxo(&LocalUtxo { - outpoint: OutPoint::new(tx.txid(), i as u32), - txout: output.clone(), - keychain, - }) - .unwrap(); + let outpoint = OutPoint::new(tx.txid(), i as u32); + let existing_utxo = database.get_utxo(&outpoint).unwrap(); + if existing_utxo.is_none() { + database + .set_utxo(&LocalUtxo { + outpoint: OutPoint::new(tx.txid(), i as u32), + txout: output.clone(), + keychain, + }) + .unwrap(); + } incoming += output.value; // TODO: implement this diff --git a/src/chain/manager.rs b/src/chain/manager.rs index 3e25b1b..f9b8c73 100644 --- a/src/chain/manager.rs +++ b/src/chain/manager.rs @@ -11,50 +11,44 @@ use crate::{ node::{ChainMonitor, ChannelManager}, }; use bitcoin::BlockHash; -use lightning::chain::{BestBlock, Listen}; +use lightning::chain::{ + chaininterface::{BroadcasterInterface, FeeEstimator}, + BestBlock, Listen, +}; use lightning_block_sync::SpvClient; use lightning_block_sync::{init, poll, UnboundedCache}; use lightning_block_sync::{poll::ValidatedBlockHeader, BlockSource}; use std::ops::Deref; -use super::{ - bitcoind_client::BitcoindClient, listener::SenseiChainListener, - listener_database::ListenerDatabase, -}; +use super::{listener::SenseiChainListener, listener_database::ListenerDatabase}; pub struct SenseiChainManager { config: SenseiConfig, pub listener: Arc, - pub bitcoind_client: Arc, + pub block_source: Arc, + pub fee_estimator: Arc, + pub broadcaster: Arc, poller_paused: Arc, } impl SenseiChainManager { - pub async fn new(config: SenseiConfig) -> Result { + pub async fn new( + config: SenseiConfig, + block_source: Arc, + fee_estimator: Arc, + broadcaster: Arc, + ) -> Result { let listener = Arc::new(SenseiChainListener::new()); - - let bitcoind_client = Arc::new( - BitcoindClient::new( - config.bitcoind_rpc_host.clone(), - config.bitcoind_rpc_port, - config.bitcoind_rpc_username.clone(), - config.bitcoind_rpc_password.clone(), - tokio::runtime::Handle::current(), - ) - .await - .expect("invalid bitcoind rpc config"), - ); - - let poller_paused = Arc::new(AtomicBool::new(false)); - - let block_source_poller = bitcoind_client.clone(); + let block_source_poller = block_source.clone(); let listener_poller = listener.clone(); + let poller_paused = Arc::new(AtomicBool::new(false)); let poller_paused_poller = poller_paused.clone(); tokio::spawn(async move { - let derefed = &mut block_source_poller.deref(); let mut cache = UnboundedCache::new(); - let chain_tip = init::validate_best_block_header(derefed).await.unwrap(); - let chain_poller = poll::ChainPoller::new(derefed, config.network); + let chain_tip = init::validate_best_block_header(block_source_poller.clone()) + .await + .unwrap(); + let chain_poller = poll::ChainPoller::new(block_source_poller, config.network); let mut spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, listener_poller); loop { @@ -68,8 +62,10 @@ impl SenseiChainManager { Ok(Self { config, listener, - bitcoind_client, poller_paused, + block_source, + fee_estimator, + broadcaster, }) } @@ -78,7 +74,7 @@ impl SenseiChainManager { chain_listeners: Vec<(BlockHash, &(dyn Listen + Send + Sync))>, ) -> Result { let chain_tip = init::synchronize_listeners( - &mut self.bitcoind_client.deref(), + self.block_source.clone(), self.config.network, &mut UnboundedCache::new(), chain_listeners, @@ -121,8 +117,7 @@ impl SenseiChainManager { } pub async fn get_best_block(&self) -> Result { - let mut block_source = self.bitcoind_client.deref(); - let (latest_blockhash, latest_height) = block_source.get_best_block().await.unwrap(); + let (latest_blockhash, latest_height) = self.block_source.get_best_block().await.unwrap(); Ok(BestBlock::new(latest_blockhash, latest_height.unwrap())) } } diff --git a/src/config.rs b/src/config.rs index 8dac1ec..f0932bd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,6 +13,12 @@ use bitcoin::Network; use serde::{Deserialize, Serialize}; use serde_json::Value; +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum KVPersistence { + Filesystem, + Database, +} + #[derive(Clone, Serialize, Deserialize)] pub struct SenseiConfig { #[serde(skip)] @@ -25,6 +31,7 @@ pub struct SenseiConfig { pub api_port: u16, pub port_range_min: u16, pub port_range_max: u16, + pub kv_persistence: KVPersistence, } impl Default for SenseiConfig { @@ -41,6 +48,7 @@ impl Default for SenseiConfig { api_port: 5401, port_range_min: 1024, port_range_max: 65535, + kv_persistence: KVPersistence::Filesystem, } } } @@ -103,6 +111,7 @@ pub struct LightningNodeConfig { pub network: Network, pub passphrase: String, pub external_router: bool, + pub kv_persistence: KVPersistence, } impl Default for LightningNodeConfig { @@ -115,6 +124,7 @@ impl Default for LightningNodeConfig { network: Network::Bitcoin, passphrase: "satoshi".into(), external_router: true, + kv_persistence: KVPersistence::Filesystem, } } } @@ -134,19 +144,4 @@ impl LightningNodeConfig { pub fn admin_macaroon_path(&self) -> String { format!("{}/admin.macaroon", self.data_dir()) } - pub fn seed_path(&self) -> String { - format!("{}/seed", self.data_dir()) - } - pub fn channel_manager_path(&self) -> String { - format!("{}/manager", self.data_dir()) - } - pub fn network_graph_path(&self) -> String { - format!("{}/network_graph", self.data_dir()) - } - pub fn scorer_path(&self) -> String { - format!("{}/scorer", self.data_dir()) - } - pub fn channel_peer_data_path(&self) -> String { - format!("{}/channel_peer_data", self.data_dir()) - } } diff --git a/src/database/mod.rs b/src/database/mod.rs index 22bf7e6..471d8a2 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,3 +1,5 @@ +use std::io::ErrorKind; + // This file is Copyright its original authors, visible in version control // history. // @@ -21,3 +23,13 @@ impl From for Error { Error::Encode(e) } } + +impl From for std::io::Error { + fn from(e: Error) -> std::io::Error { + let error_message = match e { + Error::Generic(str) => str, + Error::Encode(e) => e.to_string(), + }; + std::io::Error::new(ErrorKind::Other, error_message) + } +} diff --git a/src/database/node.rs b/src/database/node.rs index c4f834d..3974382 100644 --- a/src/database/node.rs +++ b/src/database/node.rs @@ -78,6 +78,8 @@ static MIGRATIONS: &[&str] = &[ "CREATE INDEX idx_to_channel_id ON forwarded_payments(to_channel_id)", "CREATE UNIQUE INDEX idx_hours_since_epoch ON forwarded_payments(hours_since_epoch, from_channel_id, to_channel_id)", "CREATE TABLE last_sync (blockhash BLOB)", + "CREATE TABLE kv_store (k TEXT, v BLOB)", + "CREATE UNIQUE INDEX idx_k ON kv_store(k)" ]; pub struct NodeDatabase { @@ -319,6 +321,53 @@ impl NodeDatabase { } } + pub fn set_value(&self, key: String, value: Vec) -> Result<(), Error> { + let mut statement = self.connection.prepare_cached( + " + INSERT INTO kv_store (k, v) + VALUES (:key, :value) + ON CONFLICT + DO UPDATE SET v = excluded.v + ", + )?; + + statement.execute(named_params! { + ":key": key, + ":value": value, + })?; + + Ok(()) + } + + pub fn get_keys(&self, pattern: String) -> Result, Error> { + let mut statement = self + .connection + .prepare_cached("SELECT k FROM kv_store WHERE k LIKE :pattern")?; + + let mut rows = statement.query(named_params! { ":pattern": pattern })?; + let mut keys = vec![]; + while let Some(row) = rows.next()? { + keys.push(row.get(0)?) + } + + Ok(keys) + } + + pub fn get_value(&self, key: String) -> Result>, Error> { + let mut statement = self + .connection + .prepare_cached("SELECT v FROM kv_store WHERE k=:key")?; + + let mut rows = statement.query(named_params! { ":key": key })?; + + let row = rows.next()?; + + match row { + Some(row) => Ok(Some(row.get(0)?)), + None => Ok(None), + } + } + pub fn get_payments( &self, pagination: PaginationRequest, diff --git a/src/disk.rs b/src/disk.rs index 2acbd71..1eac411 100644 --- a/src/disk.rs +++ b/src/disk.rs @@ -7,27 +7,10 @@ // You may not use this file except in accordance with one or both of these // licenses. -use crate::chain::broadcaster::SenseiBroadcaster; -use crate::chain::fee_estimator::SenseiFeeEstimator; -use crate::node::{self, ChainMonitor, ChannelManager}; -use bitcoin::secp256k1::key::PublicKey; -use bitcoin::BlockHash; use chrono::Utc; -use lightning::chain::keysinterface::{InMemorySigner, KeysManager}; -use lightning::routing::network_graph::NetworkGraph; -use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters}; use lightning::util::logger::{Logger, Record}; -use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; -use lightning_background_processor::Persister; -use lightning_persister::FilesystemPersister; - -use std::collections::HashMap; +use lightning::util::ser::Writer; use std::fs; -use std::fs::File; -use std::io::{BufRead, BufReader, BufWriter}; -use std::net::SocketAddr; -use std::path::Path; -use std::sync::Arc; pub struct FilesystemLogger { data_dir: String, @@ -65,105 +48,3 @@ impl Logger for FilesystemLogger { .unwrap(); } } -pub fn persist_channel_peer(path: &Path, peer_info: &str) -> std::io::Result<()> { - let mut file = fs::OpenOptions::new() - .create(true) - .append(true) - .open(path)?; - file.write_all(format!("{}\n", peer_info).as_bytes()) -} - -pub fn read_channel_peer_data( - path: &Path, -) -> Result, std::io::Error> { - let mut peer_data = HashMap::new(); - if !Path::new(&path).exists() { - return Ok(HashMap::new()); - } - let file = File::open(path)?; - let reader = BufReader::new(file); - for line in reader.lines() { - match node::parse_peer_info(line.unwrap()) { - Ok((pubkey, socket_addr)) => { - peer_data.insert(pubkey, socket_addr); - } - Err(e) => return Err(e), - } - } - Ok(peer_data) -} - -pub fn read_network(path: &Path, genesis_hash: BlockHash) -> NetworkGraph { - if let Ok(file) = File::open(path) { - if let Ok(graph) = NetworkGraph::read(&mut BufReader::new(file)) { - return graph; - } - } - NetworkGraph::new(genesis_hash) -} - -pub fn persist_scorer( - path: &Path, - scorer: &ProbabilisticScorer>, -) -> std::io::Result<()> { - let mut tmp_path = path.to_path_buf().into_os_string(); - tmp_path.push(".tmp"); - let file = fs::OpenOptions::new() - .write(true) - .create(true) - .open(&tmp_path)?; - let write_res = scorer.write(&mut BufWriter::new(file)); - if let Err(e) = write_res.and_then(|_| fs::rename(&tmp_path, path)) { - let _ = fs::remove_file(&tmp_path); - Err(e) - } else { - Ok(()) - } -} - -pub fn read_scorer( - path: &Path, - graph: Arc, -) -> ProbabilisticScorer> { - let params = ProbabilisticScoringParameters::default(); - if let Ok(file) = File::open(path) { - if let Ok(scorer) = - ProbabilisticScorer::read(&mut BufReader::new(file), (params, Arc::clone(&graph))) - { - return scorer; - } - } - ProbabilisticScorer::new(params, graph) -} - -pub struct DataPersister { - pub data_dir: String, - pub external_router: bool, -} - -impl - Persister< - InMemorySigner, - Arc, - Arc, - Arc, - Arc, - Arc, - > for DataPersister -{ - fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), std::io::Error> { - FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager) - } - - fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> { - if !self.external_router - && FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph) - .is_err() - { - // Persistence errors here are non-fatal as we can just fetch the routing graph - // again later, but they may indicate a disk error which could be fatal elsewhere. - eprintln!("Warning: Failed to persist network graph, check your disk and permissions"); - } - Ok(()) - } -} diff --git a/src/event_handler.rs b/src/event_handler.rs index ccf0ccb..67c8b73 100644 --- a/src/event_handler.rs +++ b/src/event_handler.rs @@ -19,10 +19,7 @@ use bdk::{FeeRate, SignOptions}; use bitcoin::{secp256k1::Secp256k1, Network}; use bitcoin_bech32::WitnessProgram; use lightning::{ - chain::{ - chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}, - keysinterface::KeysManager, - }, + chain::{chaininterface::ConfirmationTarget, keysinterface::KeysManager}, util::events::{Event, EventHandler, PaymentPurpose}, }; use rand::{thread_rng, Rng}; @@ -73,7 +70,7 @@ impl EventHandler for LightningNodeEventHandler { let mut tx_builder = wallet.build_tx(); let _fee_sats_per_1000_wu = self .chain_manager - .bitcoind_client + .fee_estimator .get_est_sat_per_1000_weight(ConfirmationTarget::Normal); // TODO: is this the correct conversion?? @@ -225,6 +222,7 @@ impl EventHandler for LightningNodeEventHandler { Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx, + source_channel_id: _, } => { let from_onchain_str = if *claim_from_onchain_tx { "from onchain downstream claim" @@ -273,7 +271,7 @@ impl EventHandler for LightningNodeEventHandler { let tx_feerate = self .chain_manager - .bitcoind_client + .fee_estimator .get_est_sat_per_1000_weight(ConfirmationTarget::Normal); let spending_tx = self @@ -288,7 +286,7 @@ impl EventHandler for LightningNodeEventHandler { .unwrap(); self.chain_manager - .bitcoind_client + .broadcaster .broadcast_transaction(&spending_tx); } Event::ChannelClosed { diff --git a/src/lib/mod.rs b/src/lib/mod.rs index 8a75814..47061da 100644 --- a/src/lib/mod.rs +++ b/src/lib/mod.rs @@ -8,3 +8,4 @@ // licenses. pub mod network_graph; +pub mod persist; diff --git a/src/lib/persist.rs b/src/lib/persist.rs new file mode 100644 index 0000000..47df976 --- /dev/null +++ b/src/lib/persist.rs @@ -0,0 +1,309 @@ +use std::{ + collections::HashMap, + fs::{self}, + io::Cursor, + net::SocketAddr, + ops::Deref, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, +}; + +use bitcoin::secp256k1::key::PublicKey; +use bitcoin::{ + blockdata::constants::genesis_block, hashes::hex::FromHex, BlockHash, Network, Txid, +}; +use lightning::util::ser::ReadableArgs; +use lightning::{ + chain::{ + channelmonitor::ChannelMonitor, + keysinterface::{KeysInterface, Sign}, + }, + routing::{ + network_graph::NetworkGraph, + scoring::{ProbabilisticScorer, ProbabilisticScoringParameters}, + }, + util::{ + persist::KVStorePersister, + ser::{Readable, Writeable}, + }, +}; +use lightning_persister::FilesystemPersister; + +use crate::{database::node::NodeDatabase, node}; + +pub trait KVStoreReader { + fn read(&self, key: &str) -> std::io::Result>>; + fn list(&self, key: &str) -> std::io::Result>; +} + +pub struct DatabaseStore { + database: Arc>, +} + +impl KVStorePersister for DatabaseStore { + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + let database = self.database.lock().unwrap(); + database + .set_value(key.to_string(), object.encode()) + .map_err(|e| e.into()) + } +} + +impl KVStoreReader for DatabaseStore { + fn read(&self, key: &str) -> std::io::Result>> { + let database = self.database.lock().unwrap(); + database.get_value(key.to_string()).map_err(|e| e.into()) + } + + fn list(&self, key: &str) -> std::io::Result> { + let pattern = format!("{}%", key); + let database = self.database.lock().unwrap(); + database + .get_keys(pattern) + .map(|full_keys| { + let replace_str = format!("{}/", key); + full_keys + .iter() + .map(|full_key| full_key.replace(&replace_str, "")) + .collect() + }) + .map_err(|e| e.into()) + } +} + +impl DatabaseStore { + pub fn new(database: Arc>) -> Self { + Self { database } + } +} + +pub struct FileStore { + filesystem_persister: FilesystemPersister, +} + +impl KVStorePersister for FileStore { + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + self.filesystem_persister.persist(key, object) + } +} + +impl KVStoreReader for FileStore { + fn read(&self, key: &str) -> std::io::Result>> { + let full_path = format!("{}/{}", self.filesystem_persister.get_data_dir(), key); + let path = PathBuf::from(full_path); + match fs::read(path) { + Ok(contents) => Ok(Some(contents)), + Err(_) => Ok(None), + } + } + + fn list(&self, key: &str) -> std::io::Result> { + let path = format!("{}/{}", self.filesystem_persister.get_data_dir(), key); + if !Path::new(&PathBuf::from(&path)).exists() { + return Ok(Vec::new()); + } + let mut res = Vec::new(); + for file_option in fs::read_dir(path).unwrap() { + let file = file_option.unwrap(); + let owned_file_name = file.file_name(); + if let Some(filename) = owned_file_name.to_str() { + res.push(filename.to_string()) + } + } + Ok(res) + } +} + +impl FileStore { + pub fn new(root: String) -> Self { + Self { + filesystem_persister: FilesystemPersister::new(root), + } + } +} + +pub enum AnyKVStore { + File(FileStore), + Database(DatabaseStore), +} + +impl KVStorePersister for AnyKVStore { + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + match self { + AnyKVStore::File(store) => store.persist(key, object), + AnyKVStore::Database(store) => store.persist(key, object), + } + } +} + +impl KVStoreReader for AnyKVStore { + fn read(&self, key: &str) -> std::io::Result>> { + match self { + AnyKVStore::File(store) => store.read(key), + AnyKVStore::Database(store) => store.read(key), + } + } + + fn list(&self, key: &str) -> std::io::Result> { + match self { + AnyKVStore::File(store) => store.list(key), + AnyKVStore::Database(store) => store.list(key), + } + } +} + +pub struct SenseiPersister { + store: AnyKVStore, + network: Network, +} + +impl SenseiPersister { + pub fn new(store: AnyKVStore, network: Network) -> Self { + Self { store, network } + } + + pub fn read_channel_manager(&self) -> std::io::Result>> { + self.store.read("manager") + } + + pub fn read_network_graph(&self) -> NetworkGraph { + if let Ok(Some(contents)) = self.store.read("network_graph") { + let mut cursor = Cursor::new(contents); + if let Ok(graph) = NetworkGraph::read(&mut cursor) { + return graph; + } + } + + let genesis_hash = genesis_block(self.network).header.block_hash(); + NetworkGraph::new(genesis_hash) + } + + pub fn read_scorer( + &self, + network_graph: Arc, + ) -> ProbabilisticScorer> { + let params = ProbabilisticScoringParameters::default(); + if let Ok(Some(contents)) = self.store.read("scorer") { + let mut cursor = Cursor::new(contents); + if let Ok(scorer) = + ProbabilisticScorer::read(&mut cursor, (params, Arc::clone(&network_graph))) + { + return scorer; + } + } + ProbabilisticScorer::new(params, network_graph) + } + + pub fn persist_scorer( + &self, + scorer: &ProbabilisticScorer>, + ) -> std::io::Result<()> { + self.store.persist("scorer", scorer) + } + + fn get_raw_channel_peer_data(&self) -> String { + if let Ok(Some(contents)) = self.store.read("channel_peer_data") { + if let Ok(channel_peer_data) = String::read(&mut Cursor::new(contents)) { + return channel_peer_data; + } + } + + String::new() + } + + pub fn persist_channel_peer(&self, peer_info: &str) -> std::io::Result<()> { + let mut peer_data = self.get_raw_channel_peer_data(); + peer_data.push_str(peer_info); + peer_data.push('\n'); + self.store.persist("channel_peer_data", &peer_data) + } + + pub async fn read_channel_peer_data( + &self, + ) -> Result, std::io::Error> { + let mut peer_data = HashMap::new(); + let raw_peer_data = self.get_raw_channel_peer_data(); + for line in raw_peer_data.lines() { + match node::parse_peer_info(line.to_string()).await { + Ok((pubkey, socket_addr)) => { + peer_data.insert(pubkey, socket_addr); + } + Err(e) => return Err(e), + } + } + Ok(peer_data) + } + + /// Read `ChannelMonitor`s from disk. + pub fn read_channelmonitors( + &self, + keys_manager: K, + ) -> Result)>, std::io::Error> + where + K::Target: KeysInterface + Sized, + { + let filenames = self.store.list("monitors").unwrap(); + + let mut res = Vec::new(); + for filename in filenames { + if !filename.is_ascii() || filename.len() < 65 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid ChannelMonitor file name", + )); + } + if filename.ends_with(".tmp") { + // If we were in the middle of committing an new update and crashed, it should be + // safe to ignore the update - we should never have returned to the caller and + // irrevocably committed to the new state in any way. + continue; + } + + let txid = Txid::from_hex(filename.split_at(64).0); + if txid.is_err() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid tx ID in filename", + )); + } + + let index: Result = filename.split_at(65).1.parse(); + if index.is_err() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid tx index in filename", + )); + } + + let monitor_path = format!("monitors/{}", filename); + let contents = self.store.read(&monitor_path)?.unwrap(); + let mut buffer = Cursor::new(&contents); + match <(BlockHash, ChannelMonitor)>::read(&mut buffer, &*keys_manager) { + Ok((blockhash, channel_monitor)) => { + if channel_monitor.get_funding_txo().0.txid != txid.unwrap() + || channel_monitor.get_funding_txo().0.index != index.unwrap() + { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "ChannelMonitor was stored in the wrong file", + )); + } + res.push((blockhash, channel_monitor)); + } + Err(e) => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Failed to deserialize ChannelMonitor: {}", e), + )) + } + } + } + Ok(res) + } +} + +impl KVStorePersister for SenseiPersister { + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + self.store.persist(key, object) + } +} diff --git a/src/main.rs b/src/main.rs index 4abf014..d465a3a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,10 +22,11 @@ mod node; mod services; mod utils; -use crate::config::SenseiConfig; +use crate::chain::bitcoind_client::BitcoindClient; use crate::database::admin::AdminDatabase; use crate::http::admin::add_routes as add_admin_routes; use crate::http::node::add_routes as add_node_routes; +use crate::{chain::manager::SenseiChainManager, config::SenseiConfig}; use ::http::{ header::{self, ACCEPT, AUTHORIZATION, CONTENT_TYPE, COOKIE}, Method, Uri, @@ -39,6 +40,7 @@ use axum::{ AddExtensionLayer, Router, }; use clap::Parser; +use config::KVPersistence; use rust_embed::RustEmbed; use std::net::SocketAddr; @@ -97,6 +99,8 @@ struct SenseiArgs { port_range_max: Option, #[clap(long, env = "API_PORT")] api_port: Option, + #[clap(long, env = "KV_PERSISTENCE")] + kv_persistence: Option, } pub type AdminRequestResponse = (AdminRequest, Sender); @@ -151,6 +155,13 @@ async fn main() { if let Some(api_port) = args.api_port { config.api_port = api_port; } + if let Some(kv_persistence) = args.kv_persistence { + config.kv_persistence = match kv_persistence.as_str() { + "filesystem" => KVPersistence::Filesystem, + "database" => KVPersistence::Database, + _ => panic!("invalid kv_persistence value"), + }; + } let sqlite_path = format!("{}/{}/admin.db", sensei_dir, config.network); let mut database = AdminDatabase::new(sqlite_path); @@ -159,11 +170,35 @@ async fn main() { let addr = SocketAddr::from(([0, 0, 0, 0], config.api_port)); let node_directory = Arc::new(Mutex::new(HashMap::new())); + let bitcoind_client = Arc::new( + BitcoindClient::new( + config.bitcoind_rpc_host.clone(), + config.bitcoind_rpc_port, + config.bitcoind_rpc_username.clone(), + config.bitcoind_rpc_password.clone(), + tokio::runtime::Handle::current(), + ) + .await + .expect("invalid bitcoind rpc config"), + ); + + let chain_manager = Arc::new( + SenseiChainManager::new( + config.clone(), + bitcoind_client.clone(), + bitcoind_client.clone(), + bitcoind_client, + ) + .await + .unwrap(), + ); + let admin_service = AdminService::new( &sensei_dir, config.clone(), node_directory.clone(), database, + chain_manager, ) .await; diff --git a/src/node.rs b/src/node.rs index c8080cf..094e1e2 100644 --- a/src/node.rs +++ b/src/node.rs @@ -11,16 +11,17 @@ use crate::chain::broadcaster::SenseiBroadcaster; use crate::chain::fee_estimator::SenseiFeeEstimator; use crate::chain::listener_database::ListenerDatabase; use crate::chain::manager::SenseiChainManager; -use crate::config::LightningNodeConfig; +use crate::config::{KVPersistence, LightningNodeConfig}; use crate::database::node::NodeDatabase; -use crate::disk::{DataPersister, FilesystemLogger}; +use crate::disk::FilesystemLogger; use crate::error::Error; use crate::event_handler::LightningNodeEventHandler; use crate::lib::network_graph::OptionalNetworkGraphMsgHandler; +use crate::lib::persist::{AnyKVStore, DatabaseStore, FileStore, SenseiPersister}; use crate::services::node::{Channel, NodeInfo, NodeRequest, NodeRequestError, NodeResponse, Peer}; use crate::services::{PaginationRequest, PaginationResponse, PaymentsFilter}; use crate::utils::PagedVec; -use crate::{database, disk, hex_utils}; +use crate::{database, hex_utils}; use bdk::database::SqliteDatabase; use bdk::keys::ExtendedKey; use bdk::wallet::AddressIndex; @@ -33,7 +34,6 @@ use lightning_invoice::payment::PaymentError; use tindercrypt::cryptors::RingCryptor; use bdk::template::DescriptorTemplateOut; -use bitcoin::blockdata::constants::genesis_block; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::network::constants::Network; use bitcoin::secp256k1::PublicKey; @@ -57,16 +57,14 @@ use lightning_background_processor::BackgroundProcessor; use lightning_invoice::utils::DefaultRouter; use lightning_invoice::{payment, utils, Currency, Invoice}; use lightning_net_tokio::SocketDescriptor; -use lightning_persister::FilesystemPersister; use macaroon::Macaroon; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; use std::fmt::Display; use std::fs::File; -use std::io::Read; use std::io::Write; +use std::io::{Cursor, Read}; use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; -use std::path::Path; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -145,7 +143,7 @@ pub type ChainMonitor = chainmonitor::ChainMonitor< Arc, Arc, Arc, - Arc, + Arc, >; trait MustSized: Sized {} @@ -257,6 +255,7 @@ pub struct LightningNode { pub invoice_payer: Arc, pub scorer: Arc, Instant>>>, pub stop_listen: Arc, + pub persister: Arc, } impl LightningNode { @@ -290,8 +289,10 @@ impl LightningNode { seed: &[u8], pubkey: String, macaroon_path: String, - database: &mut NodeDatabase, + database: Arc>, ) -> Result { + let mut database = database.lock().unwrap(); + match File::open(macaroon_path.clone()) { Ok(mut file) => { let mut bytes: Vec = Vec::new(); @@ -369,7 +370,6 @@ impl LightningNode { let mut node_database = NodeDatabase::new(config.node_database_path()); let network = config.network; - let channel_manager_path = config.channel_manager_path(); let admin_macaroon_path = config.admin_macaroon_path(); let seed = @@ -406,15 +406,22 @@ impl LightningNode { let logger = Arc::new(FilesystemLogger::new(data_dir.clone())); let fee_estimator = Arc::new(SenseiFeeEstimator { - fee_estimator: chain_manager.bitcoind_client.clone(), + fee_estimator: chain_manager.fee_estimator.clone(), }); let broadcaster = Arc::new(SenseiBroadcaster { - broadcaster: chain_manager.bitcoind_client.clone(), + broadcaster: chain_manager.broadcaster.clone(), listener_database: listener_database.clone(), }); - let persister = Arc::new(FilesystemPersister::new(data_dir)); + let database = Arc::new(Mutex::new(node_database)); + + let persistence_store = match config.kv_persistence { + KVPersistence::Filesystem => AnyKVStore::File(FileStore::new(data_dir)), + KVPersistence::Database => AnyKVStore::Database(DatabaseStore::new(database.clone())), + }; + + let persister = Arc::new(SenseiPersister::new(persistence_store, config.network)); let keys_manager = Arc::new(KeysManager::new(&seed, cur.as_secs(), cur.subsec_nanos())); @@ -437,7 +444,7 @@ impl LightningNode { let best_block = chain_manager.get_best_block().await?; let (channel_manager_blockhash, channel_manager) = { - if let Ok(mut f) = fs::File::open(channel_manager_path) { + if let Ok(Some(contents)) = persister.read_channel_manager() { let mut channel_monitor_mut_references = Vec::new(); for (_, channel_monitor) in channelmonitors.iter_mut() { channel_monitor_mut_references.push(channel_monitor); @@ -451,7 +458,8 @@ impl LightningNode { user_config, channel_monitor_mut_references, ); - <(BlockHash, ChannelManager)>::read(&mut f, read_args).unwrap() + let mut buffer = Cursor::new(&contents); + <(BlockHash, ChannelManager)>::read(&mut buffer, read_args).unwrap() } else { // TODO: in reality we could error for other reasons when there's supposed to be // an existing chanenl manager. need to handle this the same way we do for seed file @@ -504,8 +512,10 @@ impl LightningNode { chain_listeners.push((block_hash, monitor as &(dyn chain::Listen + Send + Sync))); } - let bdk_database_last_sync = - node_database.find_or_create_last_sync(best_block.block_hash())?; + let bdk_database_last_sync = { + let mut db = database.lock().unwrap(); + db.find_or_create_last_sync(best_block.block_hash())? + }; chain_listeners.push(( bdk_database_last_sync, @@ -546,14 +556,7 @@ impl LightningNode { let network_graph = match network_graph { Some(network_graph) => network_graph, - None => { - let genesis = genesis_block(config.network).header.block_hash(); - - Arc::new(disk::read_network( - Path::new(&config.network_graph_path()), - genesis, - )) - } + None => Arc::new(persister.read_network_graph()), }; let network_graph_msg_handler: Arc = @@ -592,11 +595,9 @@ impl LightningNode { Arc::new(IgnoringMessageHandler {}), )); - let scorer_path = config.scorer_path(); - let scorer = Arc::new(Mutex::new(disk::read_scorer( - Path::new(&scorer_path), - Arc::clone(&network_graph), - ))); + let scorer = Arc::new(Mutex::new( + persister.read_scorer(Arc::clone(&network_graph)), + )); let router = DefaultRouter::new( network_graph.clone(), @@ -610,11 +611,9 @@ impl LightningNode { &seed, pubkey, admin_macaroon_path, - &mut node_database, + database.clone(), )?; - let database = Arc::new(Mutex::new(node_database)); - let event_handler = Arc::new(LightningNodeEventHandler { config: config.clone(), wallet: bdk_wallet.clone(), @@ -653,6 +652,7 @@ impl LightningNode { scorer, invoice_payer, stop_listen, + persister, }) } @@ -686,14 +686,15 @@ impl LightningNode { } })); - let scorer_path = self.config.scorer_path(); + let scorer_persister = Arc::clone(&self.persister); let scorer_persist = Arc::clone(&self.scorer); handles.push(tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(600)); loop { interval.tick().await; - if disk::persist_scorer(Path::new(&scorer_path), &scorer_persist.lock().unwrap()) + if scorer_persister + .persist_scorer(&scorer_persist.lock().unwrap()) .is_err() { // Persistence errors here are non-fatal as channels will be re-scored as payments @@ -703,15 +704,12 @@ impl LightningNode { } })); - let persister = DataPersister { - data_dir: self.config.data_dir(), - external_router: self.config.external_router, - }; + let bg_persister = Arc::clone(&self.persister); // TODO: should we allow 'child' nodes to update NetworkGraph based on payment failures? // feels like probably but depends on exactly what is updated let background_processor = BackgroundProcessor::start( - persister, + bg_persister, self.invoice_payer.clone(), self.chain_monitor.clone(), self.channel_manager.clone(), @@ -722,15 +720,14 @@ impl LightningNode { // Reconnect to channel peers if possible. - let channel_peer_data_path = config.channel_peer_data_path(); let channel_manager_reconnect = self.channel_manager.clone(); let peer_manager_reconnect = self.peer_manager.clone(); - + let persister_peer = self.persister.clone(); handles.push(tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(5)); loop { interval.tick().await; - match disk::read_channel_peer_data(Path::new(&channel_peer_data_path)) { + match persister_peer.read_channel_peer_data().await { Ok(mut info) => { for (pubkey, peer_addr) in info.drain() { for chan_info in channel_manager_reconnect.list_channels() { @@ -855,19 +852,8 @@ impl LightningNode { } pub async fn connect_to_peer(&self, pubkey: PublicKey, addr: SocketAddr) -> Result<(), Error> { - let listen_addr = public_ip::addr().await.unwrap(); - - let connect_address = match listen_addr == addr.ip() { - true => format!("127.0.0.1:{}", addr.port()).parse().unwrap(), - false => addr, - }; - - match lightning_net_tokio::connect_outbound( - Arc::clone(&self.peer_manager), - pubkey, - connect_address, - ) - .await + match lightning_net_tokio::connect_outbound(Arc::clone(&self.peer_manager), pubkey, addr) + .await { Some(connection_closed_future) => { let mut connection_closed_future = Box::pin(connection_closed_future); @@ -1264,7 +1250,7 @@ impl LightningNode { amt_satoshis, public, } => { - let (pubkey, addr) = parse_peer_info(node_connection_string.clone())?; + let (pubkey, addr) = parse_peer_info(node_connection_string.clone()).await?; let found_peer = self .peer_manager @@ -1279,10 +1265,7 @@ impl LightningNode { let res = self.open_channel(pubkey, amt_satoshis, 0, 0, public); if res.is_ok() { - let _ = disk::persist_channel_peer( - Path::new(&self.config.channel_peer_data_path()), - &node_connection_string, - ); + let _ = self.persister.persist_channel_peer(&node_connection_string); } Ok(NodeResponse::OpenChannel {}) @@ -1326,7 +1309,7 @@ impl LightningNode { NodeRequest::ConnectPeer { node_connection_string, } => { - let (pubkey, addr) = parse_peer_info(node_connection_string)?; + let (pubkey, addr) = parse_peer_info(node_connection_string).await?; let found_peer = self .peer_manager @@ -1390,7 +1373,7 @@ impl LightningNode { } } -pub fn parse_peer_info( +pub async fn parse_peer_info( peer_pubkey_and_ip_addr: String, ) -> Result<(PublicKey, SocketAddr), std::io::Error> { let mut pubkey_and_addr = peer_pubkey_and_ip_addr.split('@'); @@ -1422,7 +1405,16 @@ pub fn parse_peer_info( )); } - Ok((pubkey.unwrap(), peer_addr.unwrap().unwrap())) + let addr = peer_addr.unwrap().unwrap(); + + let listen_addr = public_ip::addr().await.unwrap(); + + let connect_address = match listen_addr == addr.ip() { + true => format!("127.0.0.1:{}", addr.port()).parse().unwrap(), + false => addr, + }; + + Ok((pubkey.unwrap(), connect_address)) } pub(crate) async fn connect_peer_if_necessary( @@ -1436,15 +1428,7 @@ pub(crate) async fn connect_peer_if_necessary( } } - let listen_addr = public_ip::addr().await.unwrap(); - - let connect_address = match listen_addr == peer_addr.ip() { - true => format!("127.0.0.1:{}", peer_addr.port()).parse().unwrap(), - false => peer_addr, - }; - - match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), pubkey, connect_address) - .await + match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), pubkey, peer_addr).await { Some(connection_closed_future) => { let mut connection_closed_future = Box::pin(connection_closed_future); diff --git a/src/services/admin.rs b/src/services/admin.rs index 4fcf680..fa95708 100644 --- a/src/services/admin.rs +++ b/src/services/admin.rs @@ -136,13 +136,14 @@ impl AdminService { config: SenseiConfig, node_directory: NodeDirectory, database: AdminDatabase, + chain_manager: Arc, ) -> Self { Self { data_dir: String::from(data_dir), - config: Arc::new(config.clone()), + config: Arc::new(config), node_directory, database: Arc::new(Mutex::new(database)), - chain_manager: Arc::new(SenseiChainManager::new(config).await.unwrap()), + chain_manager, } } } @@ -523,6 +524,7 @@ impl AdminService { network: self.config.network, passphrase, external_router, + kv_persistence: self.config.kv_persistence.clone(), } }