Skip to content

Commit

Permalink
Add single-monitor read function.
Browse files Browse the repository at this point in the history
Adds a public function that only reads a single monitor, so that
customers can list monitors themselves, and then parallelize reading
them with updates from storage.
  • Loading branch information
domZippilli committed Sep 26, 2023
1 parent 5783f8e commit d7ebca1
Showing 1 changed file with 73 additions and 48 deletions.
121 changes: 73 additions & 48 deletions lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,9 @@ where
///
/// # Reading channel state from storage
///
/// Channel state is reconstructed by calling [`Self::read_channel_monitors_with_updates`]. Please
/// see that function's documentation for more information.
/// Channel state is reconstructed by calling
/// [`MonitorUpdatingPersister::read_channel_monitors_with_updates`]. Please see that function's
/// documentation for more information.
///
/// # Pruning stale channel updates
///
Expand All @@ -335,7 +336,9 @@ where
/// will complete. However, stale updates are not a problem for data integrity, since updates are
/// only read that are higher than the stored [`ChannelMonitor`]'s update_id.
///
/// To keep storage usage in check, the [`Self::cleanup_stale_updates`] function as needed.
/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
/// would like to get rid of them, consider using the
/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref + Clone, SP: Deref + Clone>
where
K::Target: KVStore,
Expand Down Expand Up @@ -382,16 +385,13 @@ where
}
}

/// Reads channel monitors, along with any stored updates for them.
/// Reads all stored channel monitors, along with any stored updates for them.
///
/// # EXTREMELY IMPORTANT
/// It is extremely important that your [`KVStore::read`] implementation uses the
/// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_
/// in that circumstance (not when there is really a permissions error, for example). This is
/// because this function does not list updates. Instead, it lists all monitors, and then using
/// their stored `update_id`, synthesizes update storage keys, and tries them in sequence until
/// one is not found. All _other_ errors will be bubbled up in this function's [`io::Result`].
pub fn read_channel_monitors_with_updates<B: Deref, F: Deref + Clone>(
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please refer to
/// [`MonitorUpdatingPersister::read_channel_monitor_with_updates`].
pub fn read_all_channel_monitors_with_updates<B: Deref, F: Deref + Clone>(
&self, broadcaster: &B, fee_estimator: F,
) -> io::Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>>
where
Expand All @@ -405,31 +405,59 @@ where
CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE,
)?;
let mut res = Vec::with_capacity(monitor_list.len());
// for each monitor...
for maybe_monitor_name in monitor_list.into_iter().map(MonitorName::new) {
let monitor_name = maybe_monitor_name?;
// ...parse the monitor
let (block_hash, monitor) = self.read_monitor(&monitor_name)?;
// ...parse and apply the updates with an id higher than the monitor.
let mut current_update_id = monitor.get_latest_update_id();
loop {
current_update_id = match current_update_id.checked_add(1) {
Some(next_update_id) => next_update_id,
None => break,
};
let update_name = UpdateName::from(current_update_id);
let update = match self.read_monitor_update(&monitor_name, &update_name) {
Ok(update) => update,
Err(err) if err.kind() == io::ErrorKind::NotFound => {
// We can't find any more updates, so we are done.
break;
}
Err(err) => return Err(err),
};
for monitor_key in monitor_list {
res.push(self.read_channel_monitor_with_updates(
broadcaster,
&fee_estimator,
monitor_key,
)?)
}
Ok(res)
}

/// Read a single channel monitor, along with any stored updates for it.
///
/// # EXTREMELY IMPORTANT
/// It is extremely important that your [`KVStore::read`] implementation uses the
/// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_
/// in that circumstance (not when there is really a permissions error, for example). This is
/// because this function does not list updates. Instead, it reads the monitor, and using its
/// stored `update_id`, synthesizes update storage keys, and tries them in sequence until one is
/// not found. All _other_ errors will be bubbled up in this function's [`io::Result`].
///
/// # Parallelizing
/// Loading a large number of monitors will be faster if done in parallel. You can use this
/// function to accomplish this. Take care to limit the number of parallel readers.
pub fn read_channel_monitor_with_updates<B: Deref, F: Deref + Clone>(
&self, broadcaster: &B, fee_estimator: &F, monitor_key: String,
) -> io::Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>
where
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
B::Target: BroadcasterInterface,
F::Target: FeeEstimator,
{
let monitor_name = MonitorName::new(monitor_key)?;
let (block_hash, monitor) = self.read_monitor(&monitor_name)?;
let mut current_update_id = monitor.get_latest_update_id();
loop {
current_update_id = match current_update_id.checked_add(1) {
Some(next_update_id) => next_update_id,
None => break,
};
let update_name = UpdateName::from(current_update_id);
let update = match self.read_monitor_update(&monitor_name, &update_name) {
Ok(update) => update,
Err(err) if err.kind() == io::ErrorKind::NotFound => {
// We can't find any more updates, so we are done.
break;
}
Err(err) => return Err(err),
};

monitor
.update_monitor(&update, broadcaster, fee_estimator.clone(), &self.logger)
.map_err(|e| {
monitor
.update_monitor(&update, broadcaster, fee_estimator.clone(), &self.logger)
.map_err(|e| {
log_error!(
self.logger,
"Monitor update failed. monitor: {} update: {} reason: {:?}",
Expand All @@ -439,11 +467,8 @@ where
);
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
})?;
}
// ...push the result into the return vec
res.push((block_hash, monitor))
}
Ok(res)
Ok((block_hash, monitor))
}

/// Read a channel monitor.
Expand Down Expand Up @@ -519,7 +544,7 @@ fn read_monitor(
/// Cleans up stale updates for all monitors.
///
/// This function works by first listing all monitors, and then for each of them, listing all
/// updates. The updates which have an update_id less than or equal to than the stored monitor
/// updates. The updates that have an update_id less than or equal to than the stored monitor
/// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will
/// be passed to [`KVStore::remove`].
pub fn cleanup_stale_updates(&self, lazy: bool) -> io::Result<()> {
Expand Down Expand Up @@ -564,7 +589,7 @@ where
SP::Target: SignerProvider + Sized,
{
/// Persists a new channel. This means writing the entire monitor to the
/// provided [`KVStore`].
/// parametrized [`KVStore`].
fn persist_new_channel(
&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
_monitor_update_call_id: MonitorUpdateId,
Expand Down Expand Up @@ -667,7 +692,7 @@ where
}


/// Persists a channel update, writing only the update to the provided [`KVStore`] if possible.
/// Persists a channel update, writing only the update to the parameterized [`KVStore`] if possible.
///
/// In some cases, this will forward to [`MonitorUpdatingPersister::persist_new_channel`]:
///
Expand Down Expand Up @@ -918,19 +943,19 @@ mod tests {
// Check that the persisted channel data is empty before any channels are
// open.
let mut persisted_chan_data_0 = persister_0
.read_channel_monitors_with_updates(&broadcaster_0, &chanmon_cfgs[0].fee_estimator)
.read_all_channel_monitors_with_updates(&broadcaster_0, &chanmon_cfgs[0].fee_estimator)
.unwrap();
assert_eq!(persisted_chan_data_0.len(), 0);
let mut persisted_chan_data_1 = persister_1
.read_channel_monitors_with_updates(&broadcaster_1, &chanmon_cfgs[1].fee_estimator)
.read_all_channel_monitors_with_updates(&broadcaster_1, &chanmon_cfgs[1].fee_estimator)
.unwrap();
assert_eq!(persisted_chan_data_1.len(), 0);

// Helper to make sure the channel is on the expected update ID.
macro_rules! check_persisted_data {
($expected_update_id: expr) => {
persisted_chan_data_0 = persister_0
.read_channel_monitors_with_updates(
.read_all_channel_monitors_with_updates(
&broadcaster_0,
&chanmon_cfgs[0].fee_estimator,
)
Expand All @@ -940,7 +965,7 @@ mod tests {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
}
persisted_chan_data_1 = persister_1
.read_channel_monitors_with_updates(
.read_all_channel_monitors_with_updates(
&broadcaster_1,
&chanmon_cfgs[1].fee_estimator,
)
Expand Down Expand Up @@ -1027,7 +1052,7 @@ mod tests {

// Make sure the expected number of stale updates is present.
let persisted_chan_data = persister_0
.read_channel_monitors_with_updates(&broadcaster_0, &chanmon_cfgs[0].fee_estimator)
.read_all_channel_monitors_with_updates(&broadcaster_0, &chanmon_cfgs[0].fee_estimator)
.unwrap();
let (_, monitor) = &persisted_chan_data[0];
let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
Expand Down Expand Up @@ -1168,7 +1193,7 @@ mod tests {
// Check that the persisted channel data is empty before any channels are
// open.
let persisted_chan_data = persister_0
.read_channel_monitors_with_updates(&broadcaster_0, &chanmon_cfgs[0].fee_estimator)
.read_all_channel_monitors_with_updates(&broadcaster_0, &chanmon_cfgs[0].fee_estimator)
.unwrap();
assert_eq!(persisted_chan_data.len(), 0);

Expand All @@ -1181,7 +1206,7 @@ mod tests {

// Get the monitor and make a fake stale update at update_id=1 (lowest height of an update possible)
let persisted_chan_data = persister_0
.read_channel_monitors_with_updates(&broadcaster_0, &chanmon_cfgs[0].fee_estimator)
.read_all_channel_monitors_with_updates(&broadcaster_0, &chanmon_cfgs[0].fee_estimator)
.unwrap();
let (_, monitor) = &persisted_chan_data[0];
let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
Expand Down

0 comments on commit d7ebca1

Please sign in to comment.