Skip to content

Commit

Permalink
implement Persist and Persister with generic KVStorePersister trait
Browse files Browse the repository at this point in the history
  • Loading branch information
johncantrell97 committed Apr 12, 2022
1 parent 711bcef commit 95fc538
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 121 deletions.
47 changes: 18 additions & 29 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescr
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
use lightning::util::events::{Event, EventHandler, EventsProvider};
use lightning::util::logger::Logger;
use lightning_persister::Persister;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
Expand Down Expand Up @@ -80,22 +81,7 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
#[cfg(test)]
const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;

/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
{
/// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed
/// (which will cause the [`BackgroundProcessor`] which called this method to exit).
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>;

/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>;
}


/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
struct DecoratingEventHandler<
Expand Down Expand Up @@ -142,7 +128,7 @@ impl BackgroundProcessor {
/// provided implementation.
///
/// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
/// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_network_graph`]
/// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_graph`]
/// for Rust-Lightning's provided implementation.
///
/// Typically, users should either implement [`Persister::persist_manager`] to never return an
Expand All @@ -161,8 +147,8 @@ impl BackgroundProcessor {
/// [`stop`]: Self::stop
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
/// [`FilesystemPersister::persist_network_graph`]: lightning_persister::FilesystemPersister::persist_network_graph
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister#impl-Persister
/// [`FilesystemPersister::persist_graph`]: lightning_persister::FilesystemPersister#impl-Persister
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
pub fn start<
Expand All @@ -180,7 +166,7 @@ impl BackgroundProcessor {
CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
PS: 'static + Send + Persister<Signer, CW, T, K, F, L>,
PS: 'static + Send + Persister,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
Expand Down Expand Up @@ -367,8 +353,8 @@ mod tests {
use lightning::util::test_utils;
use lightning_invoice::payment::{InvoicePayer, RetryAttempts};
use lightning_invoice::utils::DefaultRouter;
use lightning_persister::FilesystemPersister;
use std::fs;
use lightning_persister::{FilesystemPersister, Persister as LPPersister};
use std::fs::{self, File};
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -414,12 +400,14 @@ mod tests {
struct Persister {
data_dir: String,
graph_error: Option<(std::io::ErrorKind, &'static str)>,
manager_error: Option<(std::io::ErrorKind, &'static str)>
manager_error: Option<(std::io::ErrorKind, &'static str)>,
filesystem_persister: FilesystemPersister
}

impl Persister {
fn new(data_dir: String) -> Self {
Self { data_dir, graph_error: None, manager_error: None }
let filesystem_persister = FilesystemPersister::new(data_dir.clone());
Self { data_dir, graph_error: None, manager_error: None, filesystem_persister }
}

fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
Expand All @@ -431,23 +419,24 @@ mod tests {
}
}

impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for Persister where
impl LPPersister for Persister
{
fn persist_manager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
{
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
{
match self.manager_error {
None => FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager),
None => self.filesystem_persister.persist_manager(channel_manager),
Some((error, message)) => Err(std::io::Error::new(error, message)),
}
}

fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
match self.graph_error {
None => FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph),
None => self.filesystem_persister.persist_graph(network_graph),
Some((error, message)) => Err(std::io::Error::new(error, message)),
}
}
Expand Down
120 changes: 59 additions & 61 deletions lightning-persister/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ extern crate libc;
use bitcoin::hash_types::{BlockHash, Txid};
use bitcoin::hashes::hex::{FromHex, ToHex};
use lightning::routing::network_graph::NetworkGraph;
use crate::util::DiskWriteable;
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
Expand All @@ -28,9 +27,38 @@ use lightning::ln::channelmanager::ChannelManager;
use lightning::util::logger::Logger;
use lightning::util::ser::{ReadableArgs, Writeable};
use std::fs;
use std::io::{Cursor, Error};
use std::io::{Cursor, Error, Write};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::path::{Path, PathBuf, MAIN_SEPARATOR};


/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
pub trait Persister
{
/// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed
/// (which will cause the BackgroundProcessor which called this method to exit).
fn persist_manager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger;

/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>;
}

/// Trait for a key-value store for persisting some writeable object at some key
pub trait KVStorePersister<W: Writeable> {
/// Persist the given writeable using the provided key
fn persist(&self, key: String, object: &W) -> std::io::Result<()>;
}

impl<W: Writeable> KVStorePersister<W> for FilesystemPersister {
fn persist(&self, key: String, object: &W) -> std::io::Result<()> {
util::write_to_file(format!("{}{}{}", self.path_to_channel_data, MAIN_SEPARATOR, key), object)
}
}

/// FilesystemPersister persists channel data on disk, where each channel's
/// data is stored in a file named after its funding outpoint.
Expand All @@ -48,31 +76,6 @@ pub struct FilesystemPersister {
path_to_channel_data: String,
}

impl<Signer: Sign> DiskWriteable for ChannelMonitor<Signer> {
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), Error> {
self.write(writer)
}
}

impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> DiskWriteable for ChannelManager<Signer, M, T, K, F, L>
where
M::Target: chain::Watch<Signer>,
T::Target: BroadcasterInterface,
K::Target: KeysInterface<Signer=Signer>,
F::Target: FeeEstimator,
L::Target: Logger,
{
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error> {
self.write(writer)
}
}

impl DiskWriteable for NetworkGraph {
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error> {
self.write(writer)
}
}

impl FilesystemPersister {
/// Initialize a new FilesystemPersister and set the path to the individual channels'
/// files.
Expand All @@ -87,34 +90,8 @@ impl FilesystemPersister {
self.path_to_channel_data.clone()
}

pub(crate) fn path_to_monitor_data(&self) -> PathBuf {
let mut path = PathBuf::from(self.path_to_channel_data.clone());
path.push("monitors");
path
}

/// Writes the provided `ChannelManager` to the path provided at `FilesystemPersister`
/// initialization, within a file called "manager".
pub fn persist_manager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>(
data_dir: String,
manager: &ChannelManager<Signer, M, T, K, F, L>
) -> Result<(), std::io::Error>
where
M::Target: chain::Watch<Signer>,
T::Target: BroadcasterInterface,
K::Target: KeysInterface<Signer=Signer>,
F::Target: FeeEstimator,
L::Target: Logger,
{
let path = PathBuf::from(data_dir);
util::write_to_file(path, "manager".to_string(), manager)
}

/// Write the provided `NetworkGraph` to the path provided at `FilesystemPersister`
/// initialization, within a file called "network_graph"
pub fn persist_network_graph(data_dir: String, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
let path = PathBuf::from(data_dir);
util::write_to_file(path, "network_graph".to_string(), network_graph)
pub(crate) fn path_to_monitor_data(&self) -> String {
format!("{}{}monitors", self.path_to_channel_data, MAIN_SEPARATOR)
}

/// Read `ChannelMonitor`s from disk.
Expand All @@ -124,7 +101,7 @@ impl FilesystemPersister {
where K::Target: KeysInterface<Signer=Signer> + Sized,
{
let path = self.path_to_monitor_data();
if !Path::new(&path).exists() {
if !Path::new(&PathBuf::from(&path)).exists() {
return Ok(Vec::new());
}
let mut res = Vec::new();
Expand Down Expand Up @@ -187,18 +164,39 @@ impl<ChannelSigner: Sign> chainmonitor::Persist<ChannelSigner> for FilesystemPer
// even broadcasting!

fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
let key = format!("monitors{}{}_{}", MAIN_SEPARATOR, funding_txo.txid.to_hex(), funding_txo.index);
self.persist(key, monitor)
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
}

fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option<ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
let key = format!("monitors{}{}_{}", MAIN_SEPARATOR, funding_txo.txid.to_hex(), funding_txo.index);
self.persist(key, monitor)
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
}
}

impl Persister for FilesystemPersister {
fn persist_manager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer=Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger {
self.persist("manager".to_string(), channel_manager)
}

fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
if self.persist("network_graph".to_string(), network_graph).is_err()
{
// Persistence errors here are non-fatal as we can just fetch the routing graph
// again later, but they may indicate a disk error which could be fatal elsewhere.
eprintln!("Warning: Failed to persist network graph, check your disk and permissions");
}
Ok(())
}
}

#[cfg(test)]
mod tests {
extern crate lightning;
Expand Down
Loading

0 comments on commit 95fc538

Please sign in to comment.