Skip to content

Commit

Permalink
feat(cache): introduce LruCacheEventListener to subscribe erasure and…
Browse files Browse the repository at this point in the history
… eviction (risingwavelabs#3334)
  • Loading branch information
MrCroxx authored Jun 21, 2022
1 parent 5627f25 commit 68596ab
Showing 1 changed file with 147 additions and 11 deletions.
158 changes: 147 additions & 11 deletions src/common/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,18 @@ pub struct LruCacheShard<K: LruKey, T: LruValue> {
lru_usage: Arc<AtomicUsize>,
usage: Arc<AtomicUsize>,
capacity: usize,

listeners: Vec<Arc<dyn LruCacheEventListener<K = K, T = T>>>,
}

unsafe impl<K: LruKey, T: LruValue> Send for LruCacheShard<K, T> {}

impl<K: LruKey, T: LruValue> LruCacheShard<K, T> {
fn new(capacity: usize, object_capacity: usize) -> Self {
fn new(
capacity: usize,
object_capacity: usize,
listeners: Vec<Arc<dyn LruCacheEventListener<K = K, T = T>>>,
) -> Self {
let mut lru = Box::new(LruHandle::default());
lru.prev = lru.as_mut();
lru.next = lru.as_mut();
Expand All @@ -339,6 +345,8 @@ impl<K: LruKey, T: LruValue> LruCacheShard<K, T> {
lru,
table: LruHandleTable::new(),
write_request: HashMap::with_capacity(16),

listeners,
}
}

Expand Down Expand Up @@ -382,23 +390,26 @@ impl<K: LruKey, T: LruValue> LruCacheShard<K, T> {
let old_ptr = self.lru.next;
self.table.remove((*old_ptr).hash, (*old_ptr).get_key());
self.lru_remove(old_ptr);
let value = self.clear_handle(old_ptr);
let (key, value) = self.clear_handle(old_ptr);
for listener in &self.listeners {
listener.on_evict(&key, &value);
}
last_reference_list.push(value);
}
}

/// Clear a currently used handle and recycle it if possible
unsafe fn clear_handle(&mut self, h: *mut LruHandle<K, T>) -> T {
unsafe fn clear_handle(&mut self, h: *mut LruHandle<K, T>) -> (K, T) {
debug_assert!(!h.is_null());
debug_assert!((*h).kv.is_some());
#[cfg(debug_assertions)]
assert!(!(*h).is_in_lru());
debug_assert!(!(*h).is_in_cache());
debug_assert!(!(*h).has_refs());
self.usage.fetch_sub((*h).charge, Ordering::Relaxed);
let value = (*h).take_kv().1;
let (key, value) = (*h).take_kv();
self.try_recycle_handle_object(h);
value
(key, value)
}

/// Try to recycle a handle object if the object pool is not full.
Expand Down Expand Up @@ -466,6 +477,9 @@ impl<K: LruKey, T: LruValue> LruCacheShard<K, T> {
return None;
}
// Remove the handle from table.
for listener in &self.listeners {
listener.on_evict((*h).get_key(), (*h).get_value());
}
self.table.remove((*h).hash, (*h).get_key());
}

Expand All @@ -474,8 +488,8 @@ impl<K: LruKey, T: LruValue> LruCacheShard<K, T> {
#[cfg(debug_assertions)]
assert!(!(*h).is_in_lru());

let data = self.clear_handle(h);
Some(data)
let (_key, value) = self.clear_handle(h);
Some(value)
}

unsafe fn lookup(&mut self, hash: u64, key: &K) -> *mut LruHandle<K, T> {
Expand All @@ -495,6 +509,9 @@ impl<K: LruKey, T: LruValue> LruCacheShard<K, T> {
unsafe fn erase(&mut self, hash: u64, key: &K) -> Option<T> {
let h = self.table.remove(hash, key);
if !h.is_null() {
for listener in &self.listeners {
listener.on_erase((*h).get_key(), (*h).get_value());
}
self.try_remove_cache_handle(h)
} else {
None
Expand All @@ -511,8 +528,8 @@ impl<K: LruKey, T: LruValue> LruCacheShard<K, T> {
// referenced externally. Since we have checked that it is not referenced externally, it
// must be in the LRU, and therefore we are safe to call `lru_remove`.
self.lru_remove(h);
let data = self.clear_handle(h);
return Some(data);
let (_key, value) = self.clear_handle(h);
return Some(value);
}
None
}
Expand All @@ -537,10 +554,23 @@ impl<K: LruKey, T: LruValue> Drop for LruCacheShard<K, T> {
}
}

pub trait LruCacheEventListener: Send + Sync {
type K: LruKey;
type T: LruValue;

/// `on_evict` is called when a cache entry is evicted by a new inserted entry.
fn on_evict(&self, _key: &Self::K, _value: &Self::T) {}

/// `on_erase` is called when a cache entry is removed by calling `erase`.
fn on_erase(&self, _key: &Self::K, _value: &Self::T) {}
}

pub struct LruCache<K: LruKey, T: LruValue> {
shards: Vec<Mutex<LruCacheShard<K, T>>>,
shard_usages: Vec<Arc<AtomicUsize>>,
shard_lru_usages: Vec<Arc<AtomicUsize>>,

listeners: Vec<Arc<dyn LruCacheEventListener<K = K, T = T>>>,
}

// we only need a small object pool because when the cache reach the limit of capacity, it will
Expand All @@ -549,13 +579,21 @@ const DEFAULT_OBJECT_POOL_SIZE: usize = 1024;

impl<K: LruKey, T: LruValue> LruCache<K, T> {
pub fn new(num_shard_bits: usize, capacity: usize) -> Self {
Self::with_event_listeners(num_shard_bits, capacity, vec![])
}

pub fn with_event_listeners(
num_shard_bits: usize,
capacity: usize,
listeners: Vec<Arc<dyn LruCacheEventListener<K = K, T = T>>>,
) -> Self {
let num_shards = 1 << num_shard_bits;
let mut shards = Vec::with_capacity(num_shards);
let per_shard = capacity / num_shards;
let mut shard_usages = Vec::with_capacity(num_shards);
let mut shard_lru_usages = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
let shard = LruCacheShard::new(per_shard, DEFAULT_OBJECT_POOL_SIZE);
let shard = LruCacheShard::new(per_shard, DEFAULT_OBJECT_POOL_SIZE, listeners.clone());
shard_usages.push(shard.usage.clone());
shard_lru_usages.push(shard.lru_usage.clone());
shards.push(Mutex::new(shard));
Expand All @@ -564,6 +602,8 @@ impl<K: LruKey, T: LruValue> LruCache<K, T> {
shards,
shard_usages,
shard_lru_usages,

listeners,
}
}

Expand Down Expand Up @@ -826,7 +866,14 @@ mod tests {
}

fn create_cache(capacity: usize) -> LruCacheShard<String, String> {
LruCacheShard::new(capacity, capacity)
create_cache_with_event_listeners(capacity, vec![])
}

fn create_cache_with_event_listeners(
capacity: usize,
listeners: Vec<Arc<dyn LruCacheEventListener<K = String, T = String>>>,
) -> LruCacheShard<String, String> {
LruCacheShard::new(capacity, capacity, listeners)
}

fn lookup(cache: &mut LruCacheShard<String, String>, key: &str) -> bool {
Expand Down Expand Up @@ -1066,4 +1113,93 @@ mod tests {
_ => panic!(),
}
}

#[derive(Default, Debug)]
struct TestLruCacheEventListener {
evicted: Mutex<HashMap<String, String>>,
erased: Mutex<HashMap<String, String>>,
}

impl LruCacheEventListener for TestLruCacheEventListener {
type K = String;
type T = String;

fn on_evict(&self, key: &Self::K, value: &Self::T) {
self.evicted.lock().insert(key.clone(), value.clone());
}

fn on_erase(&self, key: &Self::K, value: &Self::T) {
self.erased.lock().insert(key.clone(), value.clone());
}
}

#[test]
fn test_event_listener() {
unsafe {
let listener = Arc::new(TestLruCacheEventListener::default());
let mut cache = create_cache_with_event_listeners(2, vec![listener.clone()]);

// full-fill cache
let h = cache.insert("k1".to_string(), 0, 1, "v1".to_string(), &mut vec![]);
cache.release(h);
let h = cache.insert("k2".to_string(), 0, 1, "v2".to_string(), &mut vec![]);
cache.release(h);
assert_eq!(cache.usage.load(Ordering::Relaxed), 2);
assert!(listener.erased.lock().is_empty());
assert!(listener.evicted.lock().is_empty());

// test evict
let h = cache.insert("k3".to_string(), 0, 1, "v3".to_string(), &mut vec![]);
cache.release(h);
assert_eq!(cache.usage.load(Ordering::Relaxed), 2);
assert!(listener.erased.lock().is_empty());
assert!(listener.evicted.lock().remove("k1").is_some());

// test erase
cache.erase(0, &"k2".to_string());
assert_eq!(cache.usage.load(Ordering::Relaxed), 1);
assert!(listener.erased.lock().remove("k2").is_some());
assert!(listener.evicted.lock().is_empty());

// test refill
let h = cache.insert("k4".to_string(), 0, 1, "v4".to_string(), &mut vec![]);
cache.release(h);
assert_eq!(cache.usage.load(Ordering::Relaxed), 2);
assert!(listener.erased.lock().is_empty());
assert!(listener.evicted.lock().is_empty());

// test release after full
// 1. full-full cache but not release
let h1 = cache.insert("k5".to_string(), 0, 1, "v5".to_string(), &mut vec![]);
assert_eq!(cache.usage.load(Ordering::Relaxed), 2);
assert!(listener.erased.lock().is_empty());
assert!(listener.evicted.lock().remove("k3").is_some());
let h2 = cache.insert("k6".to_string(), 0, 1, "v6".to_string(), &mut vec![]);
assert_eq!(cache.usage.load(Ordering::Relaxed), 2);
assert!(listener.erased.lock().is_empty());
assert!(listener.evicted.lock().remove("k4").is_some());

// 2. insert one more entry after cache is full, cache will be oversized
let h3 = cache.insert("k7".to_string(), 0, 1, "v7".to_string(), &mut vec![]);
assert_eq!(cache.usage.load(Ordering::Relaxed), 3);
assert!(listener.erased.lock().is_empty());
assert!(listener.evicted.lock().is_empty());

// 3. release one entry, and it will be evicted immediately bucause cache is oversized
cache.release(h1);
assert_eq!(cache.usage.load(Ordering::Relaxed), 2);
assert!(listener.erased.lock().is_empty());
assert!(listener.evicted.lock().remove("k5").is_some());

// 4. release other entries, no entry will be evicted
cache.release(h2);
assert_eq!(cache.usage.load(Ordering::Relaxed), 2);
assert!(listener.erased.lock().is_empty());
assert!(listener.evicted.lock().is_empty());
cache.release(h3);
assert_eq!(cache.usage.load(Ordering::Relaxed), 2);
assert!(listener.erased.lock().is_empty());
assert!(listener.evicted.lock().is_empty());
}
}
}

0 comments on commit 68596ab

Please sign in to comment.