Skip to content

Commit

Permalink
Decay historical liquidity tracking when no new data is added
Browse files Browse the repository at this point in the history
To avoid scoring based on incredibly old historical liquidity data,
we add a new half-life here which is used to (very slowly) decay
historical liquidity tracking buckets.
  • Loading branch information
TheBlueMatt committed Oct 6, 2022
1 parent ec68f13 commit c8fb859
Showing 1 changed file with 127 additions and 59 deletions.
186 changes: 127 additions & 59 deletions lightning/src/routing/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use util::logger::Logger;
use util::time::Time;

use prelude::*;
use core::fmt;
use core::{cmp, fmt};
use core::cell::{RefCell, RefMut};
use core::convert::TryInto;
use core::ops::{Deref, DerefMut};
Expand Down Expand Up @@ -436,6 +436,16 @@ pub struct ProbabilisticScoringParameters {
/// [`liquidity_penalty_amount_multiplier_msat`]: Self::liquidity_penalty_amount_multiplier_msat
pub historical_liquidity_penalty_amount_multiplier_msat: u64,

/// If we aren't learning any new datapoints for a channel, the historical liquidity bounds
/// tracking can simply live on with increasingly stale data. Instead, when a channel has not
/// seen a liquidity estimate update for this amount of time, the historical datapoints are
/// decayed by half.
///
/// Note that after 16 or more half lives all historical data will be completely gone.
///
/// Default value: 14 days
pub historical_no_updates_half_life: Duration,

/// Manual penalties used for the given nodes. Allows to set a particular penalty for a given
/// node. Note that a manual penalty of `u64::max_value()` means the node would not ever be
/// considered during path finding.
Expand Down Expand Up @@ -509,10 +519,89 @@ impl HistoricalBucketRangeTracker {
self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
}
}
/// Decay all buckets by the given number of half-lives. Used to more aggressively remove old
/// datapoints as we receive newer information.
fn time_decay_data(&mut self, half_lives: u32) {
for e in self.buckets.iter_mut() {
*e = e.checked_shr(half_lives).unwrap_or(0);
}
}
}

impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });

struct HistoricalMinMaxBuckets<'a> {
min_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
max_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
}

impl HistoricalMinMaxBuckets<'_> {
#[inline]
fn calculate_success_probability_times_billion(&self, required_decays: u32, payment_amt_64th_bucket: u8) -> Option<u64> {
// If historical penalties are enabled, calculate the penalty by walking the set of
// historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for
// each, calculate the probability of success given our payment amount, then total the
// weighted average probability of success.
//
// We use a sliding scale to decide which point within a given bucket will be compared to
// the amount being sent - for lower-bounds, the amount being sent is compared to the lower
// edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last
// bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the
// comparison point by 1/64th. For upper-bounds, the same applies, however with an offset
// of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign
// penalties to channels at the edges.
//
// If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to
// such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats
// for a 1 BTC channel!).
//
// If we used the middle of each bucket we'd never assign any penalty at all when sending
// less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket.
let mut total_valid_points_tracked = 0;

// Rather than actually decaying the individual buckets, which would lose precision, we
// simply track whether all buckets would be decayed to zero, in which case we treat it as
// if we had no data.
let mut is_fully_decayed = true;
let mut check_track_bucket_contains_undecayed_points =
|bucket_val: u16| if bucket_val.checked_shr(required_decays).unwrap_or(0) > 0 { is_fully_decayed = false; };

for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
check_track_bucket_contains_undecayed_points(*min_bucket);
for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
check_track_bucket_contains_undecayed_points(*max_bucket);
}
}
// If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat
// it as if we were fully decayed.
if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 || is_fully_decayed {
return None;
}

let mut cumulative_success_prob_times_billion = 0;
for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) {
let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64)
* 1024 * 1024 / total_valid_points_tracked;
let min_64th_bucket = min_idx as u8 * 9;
let max_64th_bucket = (7 - max_idx as u8) * 9 + 1;
if payment_amt_64th_bucket > max_64th_bucket {
// Success probability 0, the payment amount is above the max liquidity
} else if payment_amt_64th_bucket <= min_64th_bucket {
cumulative_success_prob_times_billion += bucket_prob_times_million * 1024;
} else {
cumulative_success_prob_times_billion += bucket_prob_times_million *
((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 /
((max_64th_bucket - min_64th_bucket) as u64);
}
}
}

Some(cumulative_success_prob_times_billion)
}
}

/// Accounting for channel liquidity balance uncertainty.
///
/// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the
Expand Down Expand Up @@ -645,6 +734,7 @@ impl ProbabilisticScoringParameters {
liquidity_penalty_amount_multiplier_msat: 0,
historical_liquidity_penalty_multiplier_msat: 0,
historical_liquidity_penalty_amount_multiplier_msat: 0,
historical_no_updates_half_life: Duration::from_secs(60 * 60 * 24 * 14),
manual_node_penalties: HashMap::new(),
anti_probing_penalty_msat: 0,
considered_impossible_penalty_msat: 0,
Expand All @@ -670,6 +760,7 @@ impl Default for ProbabilisticScoringParameters {
liquidity_penalty_amount_multiplier_msat: 192,
historical_liquidity_penalty_multiplier_msat: 10_000,
historical_liquidity_penalty_amount_multiplier_msat: 64,
historical_no_updates_half_life: Duration::from_secs(60 * 60 * 24 * 14),
manual_node_penalties: HashMap::new(),
anti_probing_penalty_msat: 250,
considered_impossible_penalty_msat: 1_0000_0000_000,
Expand Down Expand Up @@ -791,35 +882,27 @@ impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>,

if params.historical_liquidity_penalty_multiplier_msat != 0 ||
params.historical_liquidity_penalty_amount_multiplier_msat != 0 {
// If historical penalties are enabled, calculate the penalty by walking the set of
// historical liquidity bucket (min, max) combinations (where min_idx < max_idx)
// and, for each, calculate the probability of success given our payment amount, then
// total the weighted average probability of success.
//
// We use a sliding scale to decide which point within a given bucket will be compared
// to the amount being sent - for lower-bounds, the amount being sent is compared to
// the lower edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of
// the last bucket (i.e. 9 times the index, or 63), with each bucket in between
// increasing the comparison point by 1/64th. For upper-bounds, the same applies,
// however with an offset of 1/64th (i.e. starting at one and ending at 64). This
// avoids failing to assign penalties to channels at the edges.
//
// If we used the bottom edge of buckets, we'd end up never assigning any penalty at
// all to such a channel when sending less than ~0.19% of the channel's capacity (e.g.
// ~200k sats for a 1 BTC channel!).
//
// If we used the middle of each bucket we'd never assign any penalty at all when
// sending less than 1/16th of a channel's capacity, or 1/8th if we used the top of the
// bucket.
let mut total_valid_points_tracked = 0;
for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
}
}
if total_valid_points_tracked == 0 {
// If we don't have any valid points, redo the non-historical calculation with no
// liquidity bounds tracked and the historical penalty multipliers.
let required_decays = self.now.duration_since(*self.last_updated).as_secs()
.checked_div(params.historical_no_updates_half_life.as_secs())
.map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);
let payment_amt_64th_bucket = amount_msat * 64 / self.capacity_msat;
debug_assert!(payment_amt_64th_bucket <= 64);
if payment_amt_64th_bucket > 64 { return res; }

let buckets = HistoricalMinMaxBuckets {
min_liquidity_offset_history: &self.min_liquidity_offset_history,
max_liquidity_offset_history: &self.max_liquidity_offset_history,
};
if let Some(cumulative_success_prob_times_billion) = buckets
.calculate_success_probability_times_billion(required_decays, payment_amt_64th_bucket as u8) {
let historical_negative_log10_times_2048 = approx::negative_log10_times_2048(cumulative_success_prob_times_billion + 1, 1024 * 1024 * 1024);
res = res.saturating_add(Self::combined_penalty_msat(amount_msat,
historical_negative_log10_times_2048, params.historical_liquidity_penalty_multiplier_msat,
params.historical_liquidity_penalty_amount_multiplier_msat));
} else {
// If we don't have any valid points (or, once decayed, we have less than a full
// point), redo the non-historical calculation with no liquidity bounds tracked and
// the historical penalty multipliers.
let max_capacity = self.capacity_msat.saturating_sub(amount_msat).saturating_add(1);
let negative_log10_times_2048 =
approx::negative_log10_times_2048(max_capacity, self.capacity_msat.saturating_add(1));
Expand All @@ -828,33 +911,6 @@ impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>,
params.historical_liquidity_penalty_amount_multiplier_msat));
return res;
}

let payment_amt_64th_bucket = amount_msat * 64 / self.capacity_msat;
debug_assert!(payment_amt_64th_bucket <= 64);
if payment_amt_64th_bucket > 64 { return res; }

let mut cumulative_success_prob_times_billion = 0;
for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) {
let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64)
* 1024 * 1024 / total_valid_points_tracked;
let min_64th_bucket = min_idx as u64 * 9;
let max_64th_bucket = (7 - max_idx as u64) * 9 + 1;
if payment_amt_64th_bucket > max_64th_bucket {
// Success probability 0, the payment amount is above the max liquidity
} else if payment_amt_64th_bucket <= min_64th_bucket {
cumulative_success_prob_times_billion += bucket_prob_times_million * 1024;
} else {
cumulative_success_prob_times_billion += bucket_prob_times_million *
(max_64th_bucket - payment_amt_64th_bucket) * 1024 /
(max_64th_bucket - min_64th_bucket);
}
}
}
let historical_negative_log10_times_2048 = approx::negative_log10_times_2048(cumulative_success_prob_times_billion + 1, 1024 * 1024 * 1024);
res = res.saturating_add(Self::combined_penalty_msat(amount_msat,
historical_negative_log10_times_2048, params.historical_liquidity_penalty_multiplier_msat,
params.historical_liquidity_penalty_amount_multiplier_msat));
}

res
Expand Down Expand Up @@ -927,6 +983,12 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
}

fn update_history_buckets(&mut self) {
let half_lives = self.now.duration_since(*self.last_updated).as_secs()
.checked_div(self.params.historical_no_updates_half_life.as_secs())
.map(|v| v.try_into().unwrap_or(u32::max_value())).unwrap_or(u32::max_value());
self.min_liquidity_offset_history.time_decay_data(half_lives);
self.max_liquidity_offset_history.time_decay_data(half_lives);

debug_assert!(*self.min_liquidity_offset_msat <= self.capacity_msat);
self.min_liquidity_offset_history.track_datapoint(
// Ensure the bucket index we pass is in the range [0, 7], even if the liquidity offset
Expand All @@ -949,8 +1011,8 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
} else {
self.decayed_offset_msat(*self.max_liquidity_offset_msat)
};
*self.last_updated = self.now;
self.update_history_buckets();
*self.last_updated = self.now;
}

/// Adjusts the upper bound of the channel liquidity balance in this direction.
Expand All @@ -961,8 +1023,8 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
} else {
self.decayed_offset_msat(*self.min_liquidity_offset_msat)
};
*self.last_updated = self.now;
self.update_history_buckets();
*self.last_updated = self.now;
}
}

Expand Down Expand Up @@ -2479,6 +2541,7 @@ mod tests {
let params = ProbabilisticScoringParameters {
historical_liquidity_penalty_multiplier_msat: 1024,
historical_liquidity_penalty_amount_multiplier_msat: 1024,
historical_no_updates_half_life: Duration::from_secs(10),
..ProbabilisticScoringParameters::zero_penalty()
};
let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger);
Expand All @@ -2500,6 +2563,11 @@ mod tests {
// still remember that there was some failure in the past, and assign a non-0 penalty.
scorer.payment_path_failed(&payment_path_for_amount(1000).iter().collect::<Vec<_>>(), 43);
assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 198);

// Advance the time forward 16 half-lives (which the docs claim will ensure all data is
// gone), and check that we're back to where we started.
SinceEpoch::advance(Duration::from_secs(10 * 16));
assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 47);
}

#[test]
Expand Down

0 comments on commit c8fb859

Please sign in to comment.