Skip to content

Commit

Permalink
do not cache large blob history event (#4621)
Browse files Browse the repository at this point in the history
  • Loading branch information
yujieli-temporal authored and dnr committed Jul 21, 2023
1 parent 49b99fe commit f97b681
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 23 deletions.
3 changes: 2 additions & 1 deletion common/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ type Cache interface {
// Iterator returns the iterator of the cache
Iterator() Iterator

// Size returns the number of entries currently stored in the Cache
// Size returns current size of the Cache, the size definition is implementation of SizeGetter interface
// for the entry size, if the entry does not implement SizeGetter interface, the size is 1
Size() int
}

Expand Down
47 changes: 34 additions & 13 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (

var (
// ErrCacheFull is returned if Put fails due to cache being filled with pinned elements
ErrCacheFull = errors.New("Cache capacity is fully occupied with pinned elements")
ErrCacheFull = errors.New("cache capacity is fully occupied with pinned elements")
// ErrCacheItemTooLarge is returned if Put fails due to item size being larger than max cache capacity
ErrCacheItemTooLarge = errors.New("cache item size is larger than max cache capacity")
)

// lru is a concurrent fixed size cache that evicts elements in lru order
Expand All @@ -43,6 +45,7 @@ type (
byAccess *list.List
byKey map[interface{}]*list.Element
maxSize int
currSize int
ttl time.Duration
pin bool
}
Expand All @@ -58,6 +61,7 @@ type (
createTime time.Time
value interface{}
refCount int
size int
}
)

Expand All @@ -83,6 +87,7 @@ func (it *iteratorImpl) Next() Entry {
entry = &entryImpl{
key: entry.key,
value: entry.value,
size: entry.size,
createTime: entry.createTime,
}
it.prepareNext()
Expand Down Expand Up @@ -124,6 +129,10 @@ func (entry *entryImpl) Value() interface{} {
return entry.value
}

func (entry *entryImpl) Size() int {
return entry.size
}

func (entry *entryImpl) CreateTime() time.Time {
return entry.createTime
}
Expand All @@ -139,6 +148,7 @@ func New(maxSize int, opts *Options) Cache {
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
maxSize: maxSize,
currSize: 0,
pin: opts.Pin,
}
}
Expand Down Expand Up @@ -239,12 +249,15 @@ func (c *lru) Release(key interface{}) {
entry.refCount--
}

// Size returns the number of entries currently in the lru, useful if cache is not full
// Size returns the current size of the lru, useful if cache is not full. This size is calculated by summing
// the size of all entries in the cache. And the entry size is calculated by the size of the value.
// The size of the value is calculated implementing the Sizeable interface. If the value does not implement
// the Sizeable interface, the size is 1.
func (c *lru) Size() int {
c.mut.Lock()
defer c.mut.Unlock()

return len(c.byKey)
return c.currSize
}

// Put puts a new value associated with a given key, returning the existing value (if present)
Expand All @@ -253,9 +266,22 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
if c.maxSize == 0 {
return nil, nil
}
entrySize := getSize(value)
if entrySize > c.maxSize {
return nil, ErrCacheItemTooLarge
}

c.mut.Lock()
defer c.mut.Unlock()

c.currSize += entrySize
c.tryEvictUntilEnoughSpace()
// If there is still not enough space, remove the new entry size from the current size and return an error
if c.currSize > c.maxSize {
c.currSize -= entrySize
return nil, ErrCacheFull
}

elt := c.byKey[key]
if elt != nil {
entry := elt.Value.(*entryImpl)
Expand All @@ -282,6 +308,7 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
entry := &entryImpl{
key: key,
value: value,
size: entrySize,
}

if c.pin {
Expand All @@ -292,30 +319,24 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
entry.createTime = time.Now().UTC()
}

if len(c.byKey) >= c.maxSize {
c.evictOnceInternal()
}
if len(c.byKey) >= c.maxSize {
return nil, ErrCacheFull
}

element := c.byAccess.PushFront(entry)
c.byKey[key] = element
return nil, nil
}

func (c *lru) deleteInternal(element *list.Element) {
entry := c.byAccess.Remove(element).(*entryImpl)
c.currSize -= entry.Size()
delete(c.byKey, entry.key)
}

func (c *lru) evictOnceInternal() {
// tryEvictUntilEnoughSpace try to evict entries until there is enough space for the new entry
func (c *lru) tryEvictUntilEnoughSpace() {
element := c.byAccess.Back()
for element != nil {
for c.currSize > c.maxSize && element != nil {
entry := element.Value.(*entryImpl)
if entry.refCount == 0 {
c.deleteInternal(element)
return
}

// entry.refCount > 0
Expand Down
65 changes: 62 additions & 3 deletions common/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,28 @@
package cache

import (
"math/rand"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)

type keyType struct {
dummyString string
dummyInt int
type (
keyType struct {
dummyString string
dummyInt int
}

testEntryWithCacheSize struct {
cacheSize int
}
)

func (c *testEntryWithCacheSize) CacheSize() int {
return c.cacheSize
}

func TestLRU(t *testing.T) {
Expand Down Expand Up @@ -302,3 +314,50 @@ func TestZeroSizeCache(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 0, cache.Size())
}

func TestCache_ItemSizeTooLarge(t *testing.T) {
t.Parallel()

maxTotalBytes := 10
cache := NewLRU(maxTotalBytes)

res := cache.Put(uuid.New(), &testEntryWithCacheSize{maxTotalBytes})
assert.Equal(t, res, nil)

res, err := cache.PutIfNotExist(uuid.New(), &testEntryWithCacheSize{maxTotalBytes + 1})
assert.Equal(t, err, ErrCacheItemTooLarge)
assert.Equal(t, res, nil)

}

func TestCache_ItemHasCacheSizeDefined(t *testing.T) {
t.Parallel()

maxTotalBytes := 10
cache := NewLRU(maxTotalBytes)

numPuts := rand.Intn(1024)

startWG := sync.WaitGroup{}
endWG := sync.WaitGroup{}

startWG.Add(numPuts)
endWG.Add(numPuts)

go func() {
startWG.Wait()
assert.True(t, cache.Size() < maxTotalBytes)
}()
for i := 0; i < numPuts; i++ {
go func() {
defer endWG.Done()

startWG.Wait()
key := uuid.New()
cache.Put(key, &testEntryWithCacheSize{rand.Int()})
}()
startWG.Done()
}

endWG.Wait()
}
42 changes: 42 additions & 0 deletions common/cache/size_getter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// The MIT License
//
// Copyright (c) 2023 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination size_getter_mock.go

package cache

// SizeGetter is an interface that can be implemented by cache entries to provide their size
type (
SizeGetter interface {
CacheSize() int
}
)

func getSize(value interface{}) int {
if v, ok := value.(SizeGetter); ok {
return v.CacheSize()
}
// if the object does not have a CacheSize() method, assume is count limit cache, which size should be 1
return 1
}
72 changes: 72 additions & 0 deletions common/cache/size_getter_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ func NewConfig(
HistoryCacheInitialSize: dc.GetIntProperty(dynamicconfig.HistoryCacheInitialSize, 128),
HistoryCacheMaxSize: dc.GetIntProperty(dynamicconfig.HistoryCacheMaxSize, 512),
HistoryCacheTTL: dc.GetDurationProperty(dynamicconfig.HistoryCacheTTL, time.Hour),
EventsCacheInitialSize: dc.GetIntProperty(dynamicconfig.EventsCacheInitialSize, 128),
EventsCacheMaxSize: dc.GetIntProperty(dynamicconfig.EventsCacheMaxSize, 512),
EventsCacheInitialSize: dc.GetIntProperty(dynamicconfig.EventsCacheInitialSize, 128*1024), // 128KB
EventsCacheMaxSize: dc.GetIntProperty(dynamicconfig.EventsCacheMaxSize, 512*1024), // 512KB
EventsCacheTTL: dc.GetDurationProperty(dynamicconfig.EventsCacheTTL, time.Hour),
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute),
Expand Down
Loading

0 comments on commit f97b681

Please sign in to comment.