Skip to content

Commit

Permalink
wip: monitor updating persister
Browse files Browse the repository at this point in the history
  • Loading branch information
domZippilli committed Jun 15, 2023
1 parent 74a9ed9 commit dac1604
Show file tree
Hide file tree
Showing 2 changed files with 404 additions and 95 deletions.
169 changes: 89 additions & 80 deletions lightning-persister/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@ extern crate lightning;
extern crate bitcoin;
extern crate libc;

use bitcoin::hash_types::{BlockHash, Txid};
use bitcoin::hashes::hex::FromHex;
use lightning::chain::channelmonitor::ChannelMonitor;
use lightning::sign::{EntropySource, SignerProvider};
use lightning::util::ser::{ReadableArgs, Writeable};
use lightning::util::ser::Writeable;
use lightning::util::persist::KVStorePersister;
use std::fs;
use std::io::Cursor;
use std::ops::Deref;
use std::path::{Path, PathBuf};

const TEMP_FILE_EXT: &str = "tmp";

/// FilesystemPersister persists channel data on disk, where each channel's
/// data is stored in a file named after its funding outpoint.
///
Expand Down Expand Up @@ -56,96 +52,68 @@ impl FilesystemPersister {
pub fn get_data_dir(&self) -> String {
self.path_to_channel_data.clone()
}

/// Read `ChannelMonitor`s from disk.
pub fn read_channelmonitors<ES: Deref, SP: Deref> (
&self, entropy_source: ES, signer_provider: SP
) -> std::io::Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>>
where
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized
{
let mut path = PathBuf::from(&self.path_to_channel_data);
path.push("monitors");
if !Path::new(&path).exists() {
return Ok(Vec::new());
}
let mut res = Vec::new();
for file_option in fs::read_dir(path)? {
let file = file_option.unwrap();
let owned_file_name = file.file_name();
let filename = owned_file_name.to_str()
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData,
"File name is not a valid utf8 string"))?;
if !filename.is_ascii() || filename.len() < 65 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Invalid ChannelMonitor file name",
));
}
if filename.ends_with(".tmp") {
// If we were in the middle of committing an new update and crashed, it should be
// safe to ignore the update - we should never have returned to the caller and
// irrevocably committed to the new state in any way.
continue;
}

let txid: Txid = Txid::from_hex(filename.split_at(64).0)
.map_err(|_| std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Invalid tx ID in filename",
))?;

let index: u16 = filename.split_at(65).1.parse()
.map_err(|_| std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Invalid tx index in filename",
))?;

let contents = fs::read(&file.path())?;
let mut buffer = Cursor::new(&contents);
match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>::read(&mut buffer, (&*entropy_source, &*signer_provider)) {
Ok((blockhash, channel_monitor)) => {
if channel_monitor.get_funding_txo().0.txid != txid || channel_monitor.get_funding_txo().0.index != index {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData,
"ChannelMonitor was stored in the wrong file"));
}
res.push((blockhash, channel_monitor));
}
Err(e) => return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Failed to deserialize ChannelMonitor: {}", e),
))
}
}
Ok(res)
}
}

impl KVStorePersister for FilesystemPersister {
fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
let mut dest_file = PathBuf::from(self.path_to_channel_data.clone());
let mut dest_file = PathBuf::from(self.get_data_dir());
dest_file.push(key);
util::write_to_file(dest_file, object)
}
/// Read an object from storage.
fn get<K: AsRef<Path>>(&self, key: &K) -> std::io::Result<Vec<u8>> {
let dest_file = PathBuf::from(self.get_data_dir()).join(key);
fs::read(dest_file)
}
/// Read an object from storage.
fn delete<K: AsRef<Path>>(&self, key: &K) -> std::io::Result<()> {
let dest_file = PathBuf::from(self.get_data_dir()).join(key);
fs::remove_file(dest_file)
}
/// Return a sorted list of names in a key path. Paths (directories) are omitted, and the listing
/// is not recursive. If path does not exist, the list will be empty.
fn list_names<P: AsRef<Path>>(&self, path: &P) -> std::io::Result<Vec<String>> {
let path = PathBuf::from(self.get_data_dir()).join(path);
if !path.exists() {
return Ok(vec![]);
};
let mut files: Vec<String> = fs::read_dir(&path)?
.flatten()
.filter_map(|entry| {
if entry.path().is_dir() {
return None;
}
entry.file_name().into_string().ok()
})
// Exclude temp files. These are an artifact of how we persist, and shouldn't be
// surfaced to complicate the logic for other functions.
.filter(|f| !f.ends_with(TEMP_FILE_EXT))
.collect();
files.sort();
Ok(files)
}
}


#[cfg(test)]
mod tests {
extern crate lightning;
extern crate bitcoin;
use crate::FilesystemPersister;
use crate::lightning::util::persist::KVStoreChannelMonitorReader;
use bitcoin::hashes::hex::FromHex;
use bitcoin::Txid;
use lightning::chain::ChannelMonitorUpdateStatus;
use lightning::chain::chainmonitor::Persist;
use lightning::chain::channelmonitor::CLOSED_CHANNEL_UPDATE_ID;
use lightning::chain::transaction::OutPoint;
use lightning::util::logger::{Logger, Record};
use lightning::{check_closed_broadcast, check_closed_event, check_added_monitors};
use lightning::events::{ClosureReason, MessageSendEventsProvider};
use lightning::ln::functional_test_utils::*;
use lightning::util::test_utils;
use std::fs;
use std::rc::Rc;
#[cfg(target_os = "windows")]
use {
lightning::get_event_msg,
Expand All @@ -163,6 +131,22 @@ mod tests {
}
}

pub struct StdoutLogger {}

impl Logger for StdoutLogger {
fn log(&self, record: &Record) {
let raw_log = record.args.to_string();
let log = format!(
"{:<5} [{}:{}] {}\n",
record.level.to_string(),
record.module_path,
record.line,
raw_log
);
print!("{}", log);
}
}

#[test]
fn test_if_monitors_is_not_dir() {
let persister = FilesystemPersister::new("test_monitors_is_not_dir".to_string());
Expand All @@ -178,11 +162,20 @@ mod tests {
node_cfgs[0].chain_monitor = chain_mon_0;
let node_chanmgrs = create_node_chanmgrs(1, &node_cfgs, &[None]);
let nodes = create_network(1, &node_cfgs, &node_chanmgrs);

let logger = Rc::new(StdoutLogger{});

// Check that read_channelmonitors() returns error if monitors/ is not a
// directory.
assert!(persister.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager).is_err());
}
assert!(persister
.read_channelmonitors(
nodes[0].keys_manager,
nodes[0].keys_manager,
&&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].fee_estimator,
&logger
)
.is_err());
}

// Integration-test the FilesystemPersister. Test relaying a few payments
// and check that the persisted data is updated the appropriate number of
Expand All @@ -192,31 +185,47 @@ mod tests {
// Create the nodes, giving them FilesystemPersisters for data persisters.
let persister_0 = FilesystemPersister::new("test_filesystem_persister_0".to_string());
let persister_1 = FilesystemPersister::new("test_filesystem_persister_1".to_string());
let chanmon_cfgs = create_chanmon_cfgs(2);
let chanmon_cfgs = create_chanmon_cfgs(4);
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &persister_0, node_cfgs[0].keys_manager);
let chain_mon_1 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[1].chain_source), &chanmon_cfgs[1].tx_broadcaster, &chanmon_cfgs[1].logger, &chanmon_cfgs[1].fee_estimator, &persister_1, node_cfgs[1].keys_manager);
node_cfgs[0].chain_monitor = chain_mon_0;
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let logger = Rc::new(StdoutLogger {});
// sharing the broadcasters with TestChainMonitor seems to deadlock
let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster;

// Check that the persisted channel data is empty before any channels are
// open.
let mut persisted_chan_data_0 = persister_0.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager).unwrap();
let mut persisted_chan_data_0 = persister_0.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager,
&broadcaster_0,
&chanmon_cfgs[0].fee_estimator,
&logger).unwrap();
assert_eq!(persisted_chan_data_0.len(), 0);
let mut persisted_chan_data_1 = persister_1.read_channelmonitors(nodes[1].keys_manager, nodes[1].keys_manager).unwrap();
let mut persisted_chan_data_1 = persister_1.read_channelmonitors(nodes[1].keys_manager, nodes[1].keys_manager,
&broadcaster_1,
&chanmon_cfgs[1].fee_estimator,
&logger).unwrap();
assert_eq!(persisted_chan_data_1.len(), 0);

// Helper to make sure the channel is on the expected update ID.
macro_rules! check_persisted_data {
($expected_update_id: expr) => {
persisted_chan_data_0 = persister_0.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager).unwrap();
persisted_chan_data_0 = persister_0.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager,
&broadcaster_0,
&chanmon_cfgs[0].fee_estimator,
&logger).unwrap();
assert_eq!(persisted_chan_data_0.len(), 1);
for (_, mon) in persisted_chan_data_0.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
}
persisted_chan_data_1 = persister_1.read_channelmonitors(nodes[1].keys_manager, nodes[1].keys_manager).unwrap();
persisted_chan_data_1 = persister_1.read_channelmonitors(nodes[1].keys_manager, nodes[1].keys_manager,
&broadcaster_1,
&chanmon_cfgs[1].fee_estimator,
&logger).unwrap();
assert_eq!(persisted_chan_data_1.len(), 1);
for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
Expand Down
Loading

0 comments on commit dac1604

Please sign in to comment.