From dac16041df7a487d978ec949ca04bfe0efd155b5 Mon Sep 17 00:00:00 2001 From: Dom Zippilli Date: Thu, 15 Jun 2023 01:38:02 -0700 Subject: [PATCH] wip: monitor updating persister --- lightning-persister/src/lib.rs | 169 +++++++++-------- lightning/src/util/persist.rs | 330 +++++++++++++++++++++++++++++++-- 2 files changed, 404 insertions(+), 95 deletions(-) diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index 670a7369d8b..473ef4668be 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -16,17 +16,13 @@ extern crate lightning; extern crate bitcoin; extern crate libc; -use bitcoin::hash_types::{BlockHash, Txid}; -use bitcoin::hashes::hex::FromHex; -use lightning::chain::channelmonitor::ChannelMonitor; -use lightning::sign::{EntropySource, SignerProvider}; -use lightning::util::ser::{ReadableArgs, Writeable}; +use lightning::util::ser::Writeable; use lightning::util::persist::KVStorePersister; use std::fs; -use std::io::Cursor; -use std::ops::Deref; use std::path::{Path, PathBuf}; +const TEMP_FILE_EXT: &str = "tmp"; + /// FilesystemPersister persists channel data on disk, where each channel's /// data is stored in a file named after its funding outpoint. /// @@ -56,96 +52,68 @@ impl FilesystemPersister { pub fn get_data_dir(&self) -> String { self.path_to_channel_data.clone() } - - /// Read `ChannelMonitor`s from disk. - pub fn read_channelmonitors ( - &self, entropy_source: ES, signer_provider: SP - ) -> std::io::Result::Signer>)>> - where - ES::Target: EntropySource + Sized, - SP::Target: SignerProvider + Sized - { - let mut path = PathBuf::from(&self.path_to_channel_data); - path.push("monitors"); - if !Path::new(&path).exists() { - return Ok(Vec::new()); - } - let mut res = Vec::new(); - for file_option in fs::read_dir(path)? { - let file = file_option.unwrap(); - let owned_file_name = file.file_name(); - let filename = owned_file_name.to_str() - .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, - "File name is not a valid utf8 string"))?; - if !filename.is_ascii() || filename.len() < 65 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Invalid ChannelMonitor file name", - )); - } - if filename.ends_with(".tmp") { - // If we were in the middle of committing an new update and crashed, it should be - // safe to ignore the update - we should never have returned to the caller and - // irrevocably committed to the new state in any way. - continue; - } - - let txid: Txid = Txid::from_hex(filename.split_at(64).0) - .map_err(|_| std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Invalid tx ID in filename", - ))?; - - let index: u16 = filename.split_at(65).1.parse() - .map_err(|_| std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Invalid tx index in filename", - ))?; - - let contents = fs::read(&file.path())?; - let mut buffer = Cursor::new(&contents); - match <(BlockHash, ChannelMonitor<::Signer>)>::read(&mut buffer, (&*entropy_source, &*signer_provider)) { - Ok((blockhash, channel_monitor)) => { - if channel_monitor.get_funding_txo().0.txid != txid || channel_monitor.get_funding_txo().0.index != index { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, - "ChannelMonitor was stored in the wrong file")); - } - res.push((blockhash, channel_monitor)); - } - Err(e) => return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("Failed to deserialize ChannelMonitor: {}", e), - )) - } - } - Ok(res) - } } impl KVStorePersister for FilesystemPersister { fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { - let mut dest_file = PathBuf::from(self.path_to_channel_data.clone()); + let mut dest_file = PathBuf::from(self.get_data_dir()); dest_file.push(key); util::write_to_file(dest_file, object) } + /// Read an object from storage. + fn get>(&self, key: &K) -> std::io::Result> { + let dest_file = PathBuf::from(self.get_data_dir()).join(key); + fs::read(dest_file) + } + /// Read an object from storage. + fn delete>(&self, key: &K) -> std::io::Result<()> { + let dest_file = PathBuf::from(self.get_data_dir()).join(key); + fs::remove_file(dest_file) + } + /// Return a sorted list of names in a key path. Paths (directories) are omitted, and the listing + /// is not recursive. If path does not exist, the list will be empty. + fn list_names>(&self, path: &P) -> std::io::Result> { + let path = PathBuf::from(self.get_data_dir()).join(path); + if !path.exists() { + return Ok(vec![]); + }; + let mut files: Vec = fs::read_dir(&path)? + .flatten() + .filter_map(|entry| { + if entry.path().is_dir() { + return None; + } + entry.file_name().into_string().ok() + }) + // Exclude temp files. These are an artifact of how we persist, and shouldn't be + // surfaced to complicate the logic for other functions. + .filter(|f| !f.ends_with(TEMP_FILE_EXT)) + .collect(); + files.sort(); + Ok(files) + } } + #[cfg(test)] mod tests { extern crate lightning; extern crate bitcoin; use crate::FilesystemPersister; + use crate::lightning::util::persist::KVStoreChannelMonitorReader; use bitcoin::hashes::hex::FromHex; use bitcoin::Txid; use lightning::chain::ChannelMonitorUpdateStatus; use lightning::chain::chainmonitor::Persist; use lightning::chain::channelmonitor::CLOSED_CHANNEL_UPDATE_ID; use lightning::chain::transaction::OutPoint; + use lightning::util::logger::{Logger, Record}; use lightning::{check_closed_broadcast, check_closed_event, check_added_monitors}; use lightning::events::{ClosureReason, MessageSendEventsProvider}; use lightning::ln::functional_test_utils::*; use lightning::util::test_utils; use std::fs; + use std::rc::Rc; #[cfg(target_os = "windows")] use { lightning::get_event_msg, @@ -163,6 +131,22 @@ mod tests { } } + pub struct StdoutLogger {} + + impl Logger for StdoutLogger { + fn log(&self, record: &Record) { + let raw_log = record.args.to_string(); + let log = format!( + "{:<5} [{}:{}] {}\n", + record.level.to_string(), + record.module_path, + record.line, + raw_log + ); + print!("{}", log); + } + } + #[test] fn test_if_monitors_is_not_dir() { let persister = FilesystemPersister::new("test_monitors_is_not_dir".to_string()); @@ -178,11 +162,20 @@ mod tests { node_cfgs[0].chain_monitor = chain_mon_0; let node_chanmgrs = create_node_chanmgrs(1, &node_cfgs, &[None]); let nodes = create_network(1, &node_cfgs, &node_chanmgrs); - + let logger = Rc::new(StdoutLogger{}); + // Check that read_channelmonitors() returns error if monitors/ is not a // directory. - assert!(persister.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager).is_err()); - } + assert!(persister + .read_channelmonitors( + nodes[0].keys_manager, + nodes[0].keys_manager, + &&chanmon_cfgs[0].tx_broadcaster, + &chanmon_cfgs[0].fee_estimator, + &logger + ) + .is_err()); + } // Integration-test the FilesystemPersister. Test relaying a few payments // and check that the persisted data is updated the appropriate number of @@ -192,7 +185,7 @@ mod tests { // Create the nodes, giving them FilesystemPersisters for data persisters. let persister_0 = FilesystemPersister::new("test_filesystem_persister_0".to_string()); let persister_1 = FilesystemPersister::new("test_filesystem_persister_1".to_string()); - let chanmon_cfgs = create_chanmon_cfgs(2); + let chanmon_cfgs = create_chanmon_cfgs(4); let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &persister_0, node_cfgs[0].keys_manager); let chain_mon_1 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[1].chain_source), &chanmon_cfgs[1].tx_broadcaster, &chanmon_cfgs[1].logger, &chanmon_cfgs[1].fee_estimator, &persister_1, node_cfgs[1].keys_manager); @@ -200,23 +193,39 @@ mod tests { node_cfgs[1].chain_monitor = chain_mon_1; let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let logger = Rc::new(StdoutLogger {}); + // sharing the broadcasters with TestChainMonitor seems to deadlock + let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster; + let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster; // Check that the persisted channel data is empty before any channels are // open. - let mut persisted_chan_data_0 = persister_0.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager).unwrap(); + let mut persisted_chan_data_0 = persister_0.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager, + &broadcaster_0, + &chanmon_cfgs[0].fee_estimator, + &logger).unwrap(); assert_eq!(persisted_chan_data_0.len(), 0); - let mut persisted_chan_data_1 = persister_1.read_channelmonitors(nodes[1].keys_manager, nodes[1].keys_manager).unwrap(); + let mut persisted_chan_data_1 = persister_1.read_channelmonitors(nodes[1].keys_manager, nodes[1].keys_manager, + &broadcaster_1, + &chanmon_cfgs[1].fee_estimator, + &logger).unwrap(); assert_eq!(persisted_chan_data_1.len(), 0); // Helper to make sure the channel is on the expected update ID. macro_rules! check_persisted_data { ($expected_update_id: expr) => { - persisted_chan_data_0 = persister_0.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager).unwrap(); + persisted_chan_data_0 = persister_0.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager, + &broadcaster_0, + &chanmon_cfgs[0].fee_estimator, + &logger).unwrap(); assert_eq!(persisted_chan_data_0.len(), 1); for (_, mon) in persisted_chan_data_0.iter() { assert_eq!(mon.get_latest_update_id(), $expected_update_id); } - persisted_chan_data_1 = persister_1.read_channelmonitors(nodes[1].keys_manager, nodes[1].keys_manager).unwrap(); + persisted_chan_data_1 = persister_1.read_channelmonitors(nodes[1].keys_manager, nodes[1].keys_manager, + &broadcaster_1, + &chanmon_cfgs[1].fee_estimator, + &logger).unwrap(); assert_eq!(persisted_chan_data_1.len(), 1); for (_, mon) in persisted_chan_data_1.iter() { assert_eq!(mon.get_latest_update_id(), $expected_update_id); diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 435ef30d331..79072f20914 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -8,9 +8,14 @@ //! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`], //! and [`ChannelMonitor`] all in one place. +use core::convert::{TryFrom, TryInto}; use core::ops::Deref; -use bitcoin::hashes::hex::ToHex; +use std::io::Cursor; +use std::path::{Path, PathBuf}; +use bitcoin::{Txid, BlockHash}; +use bitcoin::hashes::hex::{ToHex, FromHex}; use crate::io; +use crate::ln::msgs::DecodeError; use crate::routing::scoring::WriteableScore; use crate::chain; @@ -18,11 +23,17 @@ use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::chainmonitor::{Persist, MonitorUpdateId}; use crate::sign::{EntropySource, NodeSigner, WriteableEcdsaChannelSigner, SignerProvider}; use crate::chain::transaction::OutPoint; -use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}; +use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID}; use crate::ln::channelmanager::ChannelManager; use crate::routing::router::Router; use crate::routing::gossip::NetworkGraph; -use super::{logger::Logger, ser::Writeable}; +use super::{logger::Logger, ser::Writeable, ser::Readable, ser::ReadableArgs}; + +const MANAGER_KEY: &str = "manager"; +const NETWORK_GRAPH_KEY: &str = "network_graph"; +const SCORER_KEY: &str = "scorer"; +const MONITOR_PATH: &str = "monitors"; +const UPDATE_PATH: &str = "monitor_updates"; /// Trait for a key-value store for persisting some writeable object at some key /// Implementing `KVStorePersister` provides auto-implementations for [`Persister`] @@ -31,6 +42,272 @@ use super::{logger::Logger, ser::Writeable}; pub trait KVStorePersister { /// Persist the given writeable using the provided key fn persist(&self, key: &str, object: &W) -> io::Result<()>; + /// Read an object from storage. + fn get>(&self, key: &K) -> io::Result>; + /// Delete an object from storage. + fn delete>(&self, key: &K) -> io::Result<()>; + /// Return a sorted list of names in a key path. Paths (directories) are omitted, and the listing + /// is not recursive. If path does not exist, the list will be empty. + fn list_names>(&self, path: &P) -> io::Result>; +} + +enum KVStoreChannelMonitorReaderError { + /// The monitor name was improperly formatted. + BadMonitorName(String, String), + /// The monitor could not be decoded. + MonitorDecodeFailed(DecodeError, PathBuf), + /// The update could not be decoded. + UpdateDecodeFailed(DecodeError, PathBuf), + /// Storage could not be read. + StorageReadFailed(io::Error, PathBuf), + /// An update could not be applied to a monitor. + UpdateFailed(String, String), +} + +impl From for io::Error { + fn from(value: KVStoreChannelMonitorReaderError) -> Self { + match value { + KVStoreChannelMonitorReaderError::BadMonitorName(reason, context) => { + io::Error::new(io::ErrorKind::InvalidInput, format!("{reason}, context: {context}'")) + }, + KVStoreChannelMonitorReaderError::MonitorDecodeFailed(reason, context) => { + io::Error::new(io::ErrorKind::InvalidData, format!("{reason}, context: {context:?}'")) + }, + KVStoreChannelMonitorReaderError::UpdateDecodeFailed(reason, context) => { + io::Error::new(io::ErrorKind::InvalidData, format!("{reason}, context: {context:?}'")) + }, + KVStoreChannelMonitorReaderError::StorageReadFailed(reason, context) => { + io::Error::new(io::ErrorKind::InvalidData, format!("{reason}, context: {context:?}'")) + }, + KVStoreChannelMonitorReaderError::UpdateFailed(reason, context) => { + io::Error::new(io::ErrorKind::InvalidData, format!("{reason}, context: {context}'")) + }, + } + } +} + +/// A struct representing a name for a monitor. +#[derive(Clone, Debug)] +pub struct MonitorName(String); + +impl TryFrom for OutPoint { + type Error = std::io::Error; + + fn try_from(value: MonitorName) -> Result { + let (txid_hex, index) = value.0.split_once('_').ok_or_else(|| { + KVStoreChannelMonitorReaderError::BadMonitorName("no underscore".to_string(), value.0.clone()) + })?; + let index = index.parse().map_err(|e| { + KVStoreChannelMonitorReaderError::BadMonitorName( + format!("bad index value, caused by {e}"), + value.0.clone(), + ) + })?; + let txid = Txid::from_hex(txid_hex).map_err(|e| { + KVStoreChannelMonitorReaderError::BadMonitorName( + format!("bad txid, caused by: {e}"), + value.0.clone(), + ) + })?; + Ok(OutPoint { txid, index }) + } +} + +impl From for MonitorName { + fn from(value: OutPoint) -> Self { + MonitorName(format!("{}_{}", value.txid.to_hex(), value.index)) + } +} + +/// A struct representing a name for an update. +#[derive(Clone, Debug)] +pub struct UpdateName(String); + +impl From for UpdateName { + fn from(value: u64) -> Self { + Self(format!("{:0>20}", value)) + } +} + +#[allow(clippy::type_complexity)] +pub trait KVStoreChannelMonitorReader { + fn read_channelmonitors( + &self, entropy_source: ES, signer_provider: SP, broadcaster: &B, fee_estimator: F, + logger: &L, + ) -> std::io::Result::Signer>)>> + where + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + B::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger; + /// List all the names of monitors. + fn list_monitor_names(&self) -> io::Result>; + /// Key to a specific monitor. + fn monitor_key(&self, monitor_name: &MonitorName) -> PathBuf; + /// Deserialize a channel monitor. + fn deserialize_monitor( + &self, entropy_source: ES, signer_provider: SP, monitor_name: MonitorName, + ) -> io::Result<(BlockHash, ChannelMonitor<::Signer>)> + where + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized; + /// List all the names of updates corresponding to a given monitor name. + fn list_update_names(&self, monitor_name: &MonitorName) -> io::Result>; + /// Path to corresponding update directory for a given monitor name. + fn path_to_monitor_updates(&self, monitor_name: &MonitorName) -> PathBuf; + /// Deserialize a channel monitor update. + fn deserialize_monitor_update( + &self, monitor_name: &MonitorName, update_name: &UpdateName, + ) -> io::Result; + /// Key to a specific update. + fn update_key(&self, monitor_name: &MonitorName, update_name: &UpdateName) -> PathBuf; + /// Delete updates with an update_id lower than the given channel monitor. + fn delete_stale_updates( + &self, channel_id: OutPoint, monitor: &ChannelMonitor, + ) -> io::Result<()>; +} + +impl KVStoreChannelMonitorReader for K { + fn read_channelmonitors< + ES: Deref + Clone, + SP: Deref + Clone, + B: Deref, + F: Deref + Clone, + L: Deref, + >( + &self, entropy_source: ES, signer_provider: SP, broadcaster: &B, fee_estimator: F, + logger: &L, + ) -> std::io::Result::Signer>)>> + where + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + B::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + { + let mut res = Vec::new(); + // for each monitor... + for monitor_name in self.list_monitor_names()? { + // ...parse the monitor + let (bh, monitor) = self.deserialize_monitor( + entropy_source.clone(), + signer_provider.clone(), + monitor_name.clone(), + )?; + // ...parse and apply the updates with an id higher than the monitor. + for update_name in self.list_update_names(&monitor_name)? { + let update = self.deserialize_monitor_update(&monitor_name, &update_name)?; + if update.update_id == CLOSED_CHANNEL_UPDATE_ID + || update.update_id > monitor.get_latest_update_id() + { + monitor + .update_monitor(&update, broadcaster, fee_estimator.clone(), logger) + .map_err(|_| { + KVStoreChannelMonitorReaderError::UpdateFailed( + "update_monitor returned Err(())".to_string(), + format!("monitor: {:?}", monitor_name), + ) + })?; + } + } + // ...push the result into the return vec + res.push((bh, monitor)) + } + Ok(res) + } + + /// Key to a specific monitor. + fn monitor_key(&self, monitor_name: &MonitorName) -> PathBuf { + [MONITOR_PATH, &monitor_name.0].iter().collect() + } + + /// Key to a specific update. + fn update_key(&self, monitor_name: &MonitorName, update_name: &UpdateName) -> PathBuf { + self.path_to_monitor_updates(monitor_name).join(&update_name.0) + } + + /// List all the names of monitors. + fn list_monitor_names(&self) -> io::Result> { + Ok(self.list_names(&PathBuf::from(MONITOR_PATH))?.into_iter().map(MonitorName).collect()) + } + + /// List all the names of updates corresponding to a given monitor name. + fn list_update_names(&self, monitor_name: &MonitorName) -> io::Result> { + let update_dir_path = self.path_to_monitor_updates(monitor_name); + Ok(self.list_names(&update_dir_path)?.into_iter().map(UpdateName).collect()) + } + + /// Path to corresponding update directory for a given monitor name. + fn path_to_monitor_updates(&self, monitor_name: &MonitorName) -> PathBuf { + [UPDATE_PATH, &monitor_name.0].iter().collect() + } + + /// Deserialize a channel monitor. + fn deserialize_monitor( + &self, entropy_source: ES, signer_provider: SP, monitor_name: MonitorName, + ) -> io::Result<(BlockHash, ChannelMonitor<::Signer>)> + where + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + { + let key = self.monitor_key(&monitor_name); + let contents = self + .get(&PathBuf::from(&key)) + .map_err(|e| KVStoreChannelMonitorReaderError::StorageReadFailed(e, key.clone()))?; + let outpoint: OutPoint = monitor_name.try_into()?; + let mut buffer = Cursor::new(&contents); + match <(BlockHash, ChannelMonitor<::Signer>)>::read( + &mut buffer, + (&*entropy_source, &*signer_provider), + ) { + Ok((blockhash, channel_monitor)) => { + if channel_monitor.get_funding_txo().0.txid != outpoint.txid + || channel_monitor.get_funding_txo().0.index != outpoint.index + { + return Err(KVStoreChannelMonitorReaderError::MonitorDecodeFailed( + DecodeError::InvalidValue, + key, + ) + .into()); + } + Ok((blockhash, channel_monitor)) + } + Err(e) => Err(KVStoreChannelMonitorReaderError::MonitorDecodeFailed(e, key).into()), + } + } + + /// Deserialize a channel monitor update. + fn deserialize_monitor_update( + &self, monitor_name: &MonitorName, update_name: &UpdateName, + ) -> io::Result { + let key = self.update_key(monitor_name, update_name); + let contents = self + .get(&PathBuf::from(&key)) + .map_err(|e| KVStoreChannelMonitorReaderError::StorageReadFailed(e, key.clone()))?; + let mut buffer = Cursor::new(&contents); + Ok(ChannelMonitorUpdate::read(&mut buffer) + .map_err(|e| KVStoreChannelMonitorReaderError::UpdateDecodeFailed(e, key))?) + } + + /// Delete updates with an update_id lower than the given channel monitor. + fn delete_stale_updates( + &self, channel_id: OutPoint, monitor: &ChannelMonitor, + ) -> io::Result<()> { + let monitor_name: MonitorName = channel_id.into(); + let update_names = + self.list_update_names(&monitor_name)?; + for update_name in update_names { + let update = + self.deserialize_monitor_update(&monitor_name, &update_name)?; + if update.update_id != CLOSED_CHANNEL_UPDATE_ID + && update.update_id <= monitor.get_latest_update_id() + { + self.delete(&self.update_key(&monitor_name, &update_name))?; + } + } + Ok(()) + } } /// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk. @@ -66,39 +343,62 @@ impl<'a, A: KVStorePersister, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Dere { /// Persist the given ['ChannelManager'] to disk with the name "manager", returning an error if persistence failed. fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), io::Error> { - self.persist("manager", channel_manager) + self.persist(MANAGER_KEY, channel_manager) } /// Persist the given [`NetworkGraph`] to disk with the name "network_graph", returning an error if persistence failed. fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> { - self.persist("network_graph", network_graph) + self.persist(NETWORK_GRAPH_KEY, network_graph) } /// Persist the given [`WriteableScore`] to disk with name "scorer", returning an error if persistence failed. fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> { - self.persist("scorer", &scorer) + self.persist(SCORER_KEY, &scorer) } } -impl Persist for K { +impl Persist for K { // TODO: We really need a way for the persister to inform the user that its time to crash/shut // down once these start returning failure. // A PermanentFailure implies we should probably just shut down the node since we're // force-closing channels without even broadcasting! fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus { - let key = format!("monitors/{}_{}", funding_txo.txid.to_hex(), funding_txo.index); - match self.persist(&key, monitor) { - Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, + let key = self.monitor_key(&funding_txo.into()); + // TODO(domz): really this should be put(path) + match self.persist(key.as_os_str().to_str().unwrap(), monitor) { + Ok(()) => { + if let Err(_e) = self.delete_stale_updates(funding_txo, monitor) { + // TODO(domz): what do? + //log_error!(self.logger, "error cleaning up channel monitor updates! {}", e); + }; + chain::ChannelMonitorUpdateStatus::Completed + }, Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure, } } - fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus { - let key = format!("monitors/{}_{}", funding_txo.txid.to_hex(), funding_txo.index); - match self.persist(&key, monitor) { - Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, - Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure, + fn update_persisted_channel( + &self, funding_txo: OutPoint, update: Option<&ChannelMonitorUpdate>, + monitor: &ChannelMonitor, update_id: MonitorUpdateId, + ) -> chain::ChannelMonitorUpdateStatus { + match update { + Some(update) => { + // This is an update to the monitor, which we persist to apply on restart. + // IMPORTANT: update_id: MonitorUpdateId is not to be confused with ChannelMonitorUpdate.update_id. + // The first is an opaque identifier for this call (used for calling back write completion). The second + // is the channel update sequence number. + let key = self.update_key(&funding_txo.into(), &update.update_id.into()); + // TODO(domz): really this should be put(path) + match self.persist(key.as_os_str().to_str().unwrap(), update) { + Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, + Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure, + } + } + // A new block. Now we need to persist the entire new monitor and discard the old + // updates. + None => self.persist_new_channel(funding_txo, monitor, update_id), } } + }