Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(os/gcache): memory leak for LRU when adding operations more faster than deleting #3823

Merged
merged 4 commits into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion os/gcache/gcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

// Func is the cache function that calculates and returns the value.
type Func func(ctx context.Context) (value interface{}, err error)
type Func = func(ctx context.Context) (value interface{}, err error)

const (
DurationNoExpire = time.Duration(0) // Expire duration that never expires.
Expand Down
163 changes: 86 additions & 77 deletions os/gcache/gcache_adapter_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,12 @@ import (

// AdapterMemory is an adapter implements using memory.
type AdapterMemory struct {
// cap limits the size of the cache pool.
// If the size of the cache exceeds the cap,
// the cache expiration process performs according to the LRU algorithm.
// It is 0 in default which means no limits.
cap int
data *adapterMemoryData // data is the underlying cache data which is stored in a hash table.
expireTimes *adapterMemoryExpireTimes // expireTimes is the expiring key to its timestamp mapping, which is used for quick indexing and deleting.
expireSets *adapterMemoryExpireSets // expireSets is the expiring timestamp to its key set mapping, which is used for quick indexing and deleting.
lru *adapterMemoryLru // lru is the LRU manager, which is enabled when attribute cap > 0.
lruGetList *glist.List // lruGetList is the LRU history according to Get function.
eventList *glist.List // eventList is the asynchronous event list for internal data synchronization.
closed *gtype.Bool // closed controls the cache closed or not.
}

// Internal cache item.
type adapterMemoryItem struct {
v interface{} // Value.
e int64 // Expire timestamp in milliseconds.
data *memoryData // data is the underlying cache data which is stored in a hash table.
expireTimes *memoryExpireTimes // expireTimes is the expiring key to its timestamp mapping, which is used for quick indexing and deleting.
expireSets *memoryExpireSets // expireSets is the expiring timestamp to its key set mapping, which is used for quick indexing and deleting.
lru *memoryLru // lru is the LRU manager, which is enabled when attribute cap > 0.
eventList *glist.List // eventList is the asynchronous event list for internal data synchronization.
closed *gtype.Bool // closed controls the cache closed or not.
}

// Internal event item.
Expand All @@ -53,21 +41,28 @@ const (
defaultMaxExpire = 9223372036854
)

// NewAdapterMemory creates and returns a new memory cache object.
func NewAdapterMemory(lruCap ...int) Adapter {
// NewAdapterMemory creates and returns a new adapter_memory cache object.
func NewAdapterMemory() *AdapterMemory {
return doNewAdapterMemory()
}

// NewAdapterMemoryLru creates and returns a new adapter_memory cache object with LRU.
func NewAdapterMemoryLru(cap int) *AdapterMemory {
c := doNewAdapterMemory()
c.lru = newMemoryLru(cap)
return c
}

// doNewAdapterMemory creates and returns a new adapter_memory cache object.
func doNewAdapterMemory() *AdapterMemory {
c := &AdapterMemory{
data: newAdapterMemoryData(),
lruGetList: glist.New(true),
expireTimes: newAdapterMemoryExpireTimes(),
expireSets: newAdapterMemoryExpireSets(),
data: newMemoryData(),
expireTimes: newMemoryExpireTimes(),
expireSets: newMemoryExpireSets(),
eventList: glist.New(true),
closed: gtype.NewBool(),
}
if len(lruCap) > 0 {
c.cap = lruCap[0]
c.lru = newMemCacheLru(c)
}
// Here may be a "timer leak" if adapter is manually changed from memory adapter.
// Here may be a "timer leak" if adapter is manually changed from adapter_memory adapter.
// Do not worry about this, as adapter is less changed, and it does nothing if it's not used.
gtimer.AddSingleton(context.Background(), time.Second, c.syncEventAndClearExpired)
return c
Expand All @@ -78,8 +73,9 @@ func NewAdapterMemory(lruCap ...int) Adapter {
// It does not expire if `duration` == 0.
// It deletes the keys of `data` if `duration` < 0 or given `value` is nil.
func (c *AdapterMemory) Set(ctx context.Context, key interface{}, value interface{}, duration time.Duration) error {
defer c.handleLruKey(ctx, key)
expireTime := c.getInternalExpire(duration)
c.data.Set(key, adapterMemoryItem{
c.data.Set(key, memoryDataItem{
v: value,
e: expireTime,
})
Expand Down Expand Up @@ -108,6 +104,11 @@ func (c *AdapterMemory) SetMap(ctx context.Context, data map[interface{}]interfa
e: expireTime,
})
}
if c.lru != nil {
for key := range data {
c.handleLruKey(ctx, key)
}
}
return nil
}

Expand All @@ -118,6 +119,7 @@ func (c *AdapterMemory) SetMap(ctx context.Context, data map[interface{}]interfa
// It does not expire if `duration` == 0.
// It deletes the `key` if `duration` < 0 or given `value` is nil.
func (c *AdapterMemory) SetIfNotExist(ctx context.Context, key interface{}, value interface{}, duration time.Duration) (bool, error) {
defer c.handleLruKey(ctx, key)
isContained, err := c.Contains(ctx, key)
if err != nil {
return false, err
Expand All @@ -140,6 +142,7 @@ func (c *AdapterMemory) SetIfNotExist(ctx context.Context, key interface{}, valu
// It does not expire if `duration` == 0.
// It deletes the `key` if `duration` < 0 or given `value` is nil.
func (c *AdapterMemory) SetIfNotExistFunc(ctx context.Context, key interface{}, f Func, duration time.Duration) (bool, error) {
defer c.handleLruKey(ctx, key)
isContained, err := c.Contains(ctx, key)
if err != nil {
return false, err
Expand All @@ -166,6 +169,7 @@ func (c *AdapterMemory) SetIfNotExistFunc(ctx context.Context, key interface{},
// Note that it differs from function `SetIfNotExistFunc` is that the function `f` is executed within
// writing mutex lock for concurrent safety purpose.
func (c *AdapterMemory) SetIfNotExistFuncLock(ctx context.Context, key interface{}, f Func, duration time.Duration) (bool, error) {
defer c.handleLruKey(ctx, key)
isContained, err := c.Contains(ctx, key)
if err != nil {
return false, err
Expand All @@ -185,10 +189,7 @@ func (c *AdapterMemory) SetIfNotExistFuncLock(ctx context.Context, key interface
func (c *AdapterMemory) Get(ctx context.Context, key interface{}) (*gvar.Var, error) {
item, ok := c.data.Get(key)
if ok && !item.IsExpired() {
// Adding to LRU history if LRU feature is enabled.
if c.cap > 0 {
c.lruGetList.PushBack(key)
}
c.handleLruKey(ctx, key)
return gvar.New(item.v), nil
}
return nil, nil
Expand All @@ -202,6 +203,7 @@ func (c *AdapterMemory) Get(ctx context.Context, key interface{}) (*gvar.Var, er
// It deletes the `key` if `duration` < 0 or given `value` is nil, but it does nothing
// if `value` is a function and the function result is nil.
func (c *AdapterMemory) GetOrSet(ctx context.Context, key interface{}, value interface{}, duration time.Duration) (*gvar.Var, error) {
defer c.handleLruKey(ctx, key)
v, err := c.Get(ctx, key)
if err != nil {
return nil, err
Expand All @@ -220,6 +222,7 @@ func (c *AdapterMemory) GetOrSet(ctx context.Context, key interface{}, value int
// It deletes the `key` if `duration` < 0 or given `value` is nil, but it does nothing
// if `value` is a function and the function result is nil.
func (c *AdapterMemory) GetOrSetFunc(ctx context.Context, key interface{}, f Func, duration time.Duration) (*gvar.Var, error) {
defer c.handleLruKey(ctx, key)
v, err := c.Get(ctx, key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -248,6 +251,7 @@ func (c *AdapterMemory) GetOrSetFunc(ctx context.Context, key interface{}, f Fun
// Note that it differs from function `GetOrSetFunc` is that the function `f` is executed within
// writing mutex lock for concurrent safety purpose.
func (c *AdapterMemory) GetOrSetFuncLock(ctx context.Context, key interface{}, f Func, duration time.Duration) (*gvar.Var, error) {
defer c.handleLruKey(ctx, key)
v, err := c.Get(ctx, key)
if err != nil {
return nil, err
Expand All @@ -274,6 +278,7 @@ func (c *AdapterMemory) Contains(ctx context.Context, key interface{}) (bool, er
// It returns -1 if the `key` does not exist in the cache.
func (c *AdapterMemory) GetExpire(ctx context.Context, key interface{}) (time.Duration, error) {
if item, ok := c.data.Get(key); ok {
c.handleLruKey(ctx, key)
return time.Duration(item.e-gtime.TimestampMilli()) * time.Millisecond, nil
}
return -1, nil
Expand All @@ -282,6 +287,15 @@ func (c *AdapterMemory) GetExpire(ctx context.Context, key interface{}) (time.Du
// Remove deletes one or more keys from cache, and returns its value.
// If multiple keys are given, it returns the value of the last deleted item.
func (c *AdapterMemory) Remove(ctx context.Context, keys ...interface{}) (*gvar.Var, error) {
defer c.lru.Remove(keys...)
value, err := c.doRemove(ctx, keys...)
if err != nil {
return nil, err
}
return gvar.New(value), nil
}

func (c *AdapterMemory) doRemove(_ context.Context, keys ...interface{}) (*gvar.Var, error) {
var removedKeys []interface{}
removedKeys, value, err := c.data.Remove(keys...)
if err != nil {
Expand All @@ -303,6 +317,9 @@ func (c *AdapterMemory) Remove(ctx context.Context, keys ...interface{}) (*gvar.
// It does nothing if `key` does not exist in the cache.
func (c *AdapterMemory) Update(ctx context.Context, key interface{}, value interface{}) (oldValue *gvar.Var, exist bool, err error) {
v, exist, err := c.data.Update(key, value)
if exist {
c.handleLruKey(ctx, key)
}
return gvar.New(v), exist, err
}

Expand All @@ -321,6 +338,7 @@ func (c *AdapterMemory) UpdateExpire(ctx context.Context, key interface{}, durat
k: key,
e: newExpireTime,
})
c.handleLruKey(ctx, key)
}
return
}
Expand Down Expand Up @@ -348,14 +366,13 @@ func (c *AdapterMemory) Values(ctx context.Context) ([]interface{}, error) {
// Clear clears all data of the cache.
// Note that this function is sensitive and should be carefully used.
func (c *AdapterMemory) Clear(ctx context.Context) error {
return c.data.Clear()
c.data.Clear()
c.lru.Clear()
return nil
}

// Close closes the cache.
func (c *AdapterMemory) Close(ctx context.Context) error {
if c.cap > 0 {
c.lru.Close()
}
c.closed.Set(true)
return nil
}
Expand Down Expand Up @@ -390,9 +407,9 @@ func (c *AdapterMemory) makeExpireKey(expire int64) int64 {
}

// syncEventAndClearExpired does the asynchronous task loop:
// 1. Asynchronously process the data in the event list,
// and synchronize the results to the `expireTimes` and `expireSets` properties.
// 2. Clean up the expired key-value pair data.
// 1. Asynchronously process the data in the event list,
// and synchronize the results to the `expireTimes` and `expireSets` properties.
// 2. Clean up the expired key-value pair data.
func (c *AdapterMemory) syncEventAndClearExpired(ctx context.Context) {
if c.closed.Val() {
gtimer.Exit()
Expand All @@ -403,9 +420,9 @@ func (c *AdapterMemory) syncEventAndClearExpired(ctx context.Context) {
oldExpireTime int64
newExpireTime int64
)
// ========================
// Data Synchronization.
// ========================
// ================================
// Data expiration synchronization.
// ================================
for {
v := c.eventList.PopFront()
if v == nil {
Expand All @@ -425,37 +442,24 @@ func (c *AdapterMemory) syncEventAndClearExpired(ctx context.Context) {
// Updating the expired time for `event.k`.
c.expireTimes.Set(event.k, newExpireTime)
}
// Adding the key the LRU history by writing operations.
if c.cap > 0 {
c.lru.Push(event.k)
}
}
// Processing expired keys from LRU.
if c.cap > 0 {
if c.lruGetList.Len() > 0 {
for {
if v := c.lruGetList.PopFront(); v != nil {
c.lru.Push(v)
} else {
break
}
}
}
c.lru.SyncAndClear(ctx)
}
// ========================
// Data Cleaning up.
// ========================
// =================================
// Data expiration auto cleaning up.
// =================================
var (
expireSet *gset.Set
ek = c.makeExpireKey(gtime.TimestampMilli())
eks = []int64{ek - 1000, ek - 2000, ek - 3000, ek - 4000, ek - 5000}
expireSet *gset.Set
expireTime int64
currentEk = c.makeExpireKey(gtime.TimestampMilli())
)
for _, expireTime := range eks {
// auto removing expiring key set for latest seconds.
for i := int64(1); i <= 5; i++ {
expireTime = currentEk - i*1000
if expireSet = c.expireSets.Get(expireTime); expireSet != nil {
// Iterating the set to delete all keys in it.
expireSet.Iterator(func(key interface{}) bool {
c.clearByKey(key)
c.deleteExpiredKey(key)
// remove auto expired key for lru.
c.lru.Remove(key)
return true
})
// Deleting the set after all of its keys are deleted.
Expand All @@ -464,17 +468,22 @@ func (c *AdapterMemory) syncEventAndClearExpired(ctx context.Context) {
}
}

func (c *AdapterMemory) handleLruKey(ctx context.Context, keys ...interface{}) {
if c.lru == nil {
return
}
if evictedKeys := c.lru.SaveAndEvict(keys...); len(evictedKeys) > 0 {
_, _ = c.doRemove(ctx, evictedKeys...)
return
}
return
}

// clearByKey deletes the key-value pair with given `key`.
// The parameter `force` specifies whether doing this deleting forcibly.
func (c *AdapterMemory) clearByKey(key interface{}, force ...bool) {
func (c *AdapterMemory) deleteExpiredKey(key interface{}) {
// Doubly check before really deleting it from cache.
c.data.DeleteWithDoubleCheck(key, force...)

c.data.Delete(key)
// Deleting its expiration time from `expireTimes`.
c.expireTimes.Delete(key)

// Deleting it from LRU.
if c.cap > 0 {
c.lru.Remove(key)
}
}
Loading
Loading