Skip to content

Commit

Permalink
Decay historical liquidity tracking when no new data is added
Browse files Browse the repository at this point in the history
To avoid scoring based on incredibly old historical liquidity data,
we add a new half-life here which is used to (very slowly) decay
historical liquidity tracking buckets.
  • Loading branch information
TheBlueMatt committed Oct 5, 2022
1 parent f2b9189 commit 6c7f568
Showing 1 changed file with 47 additions and 6 deletions.
53 changes: 47 additions & 6 deletions lightning/src/routing/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use util::logger::Logger;
use util::time::Time;

use prelude::*;
use core::fmt;
use core::{cmp, fmt};
use core::cell::{RefCell, RefMut};
use core::convert::TryInto;
use core::ops::{Deref, DerefMut};
Expand Down Expand Up @@ -436,6 +436,16 @@ pub struct ProbabilisticScoringParameters {
/// [`liquidity_penalty_amount_multiplier_msat`]: Self::liquidity_penalty_amount_multiplier_msat
pub historical_liquidity_penalty_amount_multiplier_msat: u64,

/// If we aren't learning any new datapoints for a channel, the historical liquidity bounds
/// tracking can simply live on with increasingly stale data. Instead, when a channel has not
/// seen a liquidity estimate update for this amount of time, the historical datapoints are
/// decayed by half.
///
/// Note that after 16 or more half lives all historical data will be completely gone.
///
/// Default value: 14 days
pub historical_no_updates_half_life: Duration,

/// Manual penalties used for the given nodes. Allows to set a particular penalty for a given
/// node. Note that a manual penalty of `u64::max_value()` means the node would not ever be
/// considered during path finding.
Expand Down Expand Up @@ -509,6 +519,13 @@ impl HistoricalBucketRangeTracker {
self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
}
}
/// Decay all buckets by the given number of half-lives. Used to more aggressively remove old
/// datapoints as we receive newer information.
fn time_decay_data(&mut self, half_lives: u32) {
for e in self.buckets.iter_mut() {
*e = e.checked_shr(half_lives).unwrap_or(0);
}
}
}

impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
Expand Down Expand Up @@ -645,6 +662,7 @@ impl ProbabilisticScoringParameters {
liquidity_penalty_amount_multiplier_msat: 0,
historical_liquidity_penalty_multiplier_msat: 0,
historical_liquidity_penalty_amount_multiplier_msat: 0,
historical_no_updates_half_life: Duration::from_secs(60 * 60 * 24 * 14),
manual_node_penalties: HashMap::new(),
anti_probing_penalty_msat: 0,
considered_impossible_penalty_msat: 0,
Expand All @@ -670,6 +688,7 @@ impl Default for ProbabilisticScoringParameters {
liquidity_penalty_amount_multiplier_msat: 192,
historical_liquidity_penalty_multiplier_msat: 10_000,
historical_liquidity_penalty_amount_multiplier_msat: 64,
historical_no_updates_half_life: Duration::from_secs(60 * 60 * 24 * 14),
manual_node_penalties: HashMap::new(),
anti_probing_penalty_msat: 250,
considered_impossible_penalty_msat: 1_0000_0000_000,
Expand Down Expand Up @@ -810,14 +829,30 @@ impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>,
// sending less than 1/16th of a channel's capacity, or 1/8th if we used the top of the
// bucket.
let mut total_valid_points_tracked = 0;
let required_decays = self.now.duration_since(*self.last_updated).as_secs()
.checked_div(params.historical_no_updates_half_life.as_secs())
.map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);

// Rather than actually decaying the individual buckets, which would lose precision, we
// simply track whether all buckets would be decayed to zero, in which case we treat it
// as if we had no data.
let mut is_fully_decayed = true;
let mut check_track_bucket_contains_undecayed_points =
|bucket_val: u16| if bucket_val.checked_shr(required_decays).unwrap_or(0) > 0 { is_fully_decayed = false; };

for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
check_track_bucket_contains_undecayed_points(*min_bucket);
for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
check_track_bucket_contains_undecayed_points(*max_bucket);
}
}
if total_valid_points_tracked == 0 {
// If we don't have any valid points, redo the non-historical calculation with no
// liquidity bounds tracked and the historical penalty multipliers.
// If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme),
// treat it as if we were fully decayed.
if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 || is_fully_decayed {
// If we don't have any valid points (or, once decayed, we have less than a full
// point), redo the non-historical calculation with no liquidity bounds tracked and
// the historical penalty multipliers.
let max_capacity = self.capacity_msat.saturating_sub(amount_msat).saturating_add(1);
let negative_log10_times_2048 =
approx::negative_log10_times_2048(max_capacity, self.capacity_msat.saturating_add(1));
Expand Down Expand Up @@ -925,6 +960,12 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
}

fn update_history_buckets(&mut self) {
let half_lives = self.now.duration_since(*self.last_updated).as_secs()
.checked_div(self.params.historical_no_updates_half_life.as_secs())
.map(|v| v.try_into().unwrap_or(u32::max_value())).unwrap_or(u32::max_value());
self.min_liquidity_offset_history.time_decay_data(half_lives);
self.max_liquidity_offset_history.time_decay_data(half_lives);

debug_assert!(*self.min_liquidity_offset_msat <= self.capacity_msat);
self.min_liquidity_offset_history.track_datapoint(
// Ensure the bucket index we pass is in the range [0, 7], even if the liquidity offset
Expand All @@ -947,8 +988,8 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
} else {
self.decayed_offset_msat(*self.max_liquidity_offset_msat)
};
*self.last_updated = self.now;
self.update_history_buckets();
*self.last_updated = self.now;
}

/// Adjusts the upper bound of the channel liquidity balance in this direction.
Expand All @@ -959,8 +1000,8 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
} else {
self.decayed_offset_msat(*self.min_liquidity_offset_msat)
};
*self.last_updated = self.now;
self.update_history_buckets();
*self.last_updated = self.now;
}
}

Expand Down

0 comments on commit 6c7f568

Please sign in to comment.