Skip to content

Commit

Permalink
Merge branch 'master' into haifengh/v1.21.2-verify-test-pr
Browse files Browse the repository at this point in the history
  • Loading branch information
hehaifengcn authored Jul 18, 2023
2 parents d0c4e47 + 8a75aed commit c0fd549
Show file tree
Hide file tree
Showing 46 changed files with 1,110 additions and 397 deletions.
3 changes: 2 additions & 1 deletion common/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type Cache interface {
// Iterator returns the iterator of the cache
Iterator() Iterator

// Size returns the number of entries currently stored in the Cache
// Size returns current size of the Cache, the size definition is implementation of SizeGetter interface
// for the entry size, if the entry does not implement SizeGetter interface, the size is 1
Size() int
}

Expand Down
47 changes: 34 additions & 13 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (

var (
// ErrCacheFull is returned if Put fails due to cache being filled with pinned elements
ErrCacheFull = errors.New("Cache capacity is fully occupied with pinned elements")
ErrCacheFull = errors.New("cache capacity is fully occupied with pinned elements")
// ErrCacheItemTooLarge is returned if Put fails due to item size being larger than max cache capacity
ErrCacheItemTooLarge = errors.New("cache item size is larger than max cache capacity")
)

// lru is a concurrent fixed size cache that evicts elements in lru order
Expand All @@ -45,6 +47,7 @@ type (
byAccess *list.List
byKey map[interface{}]*list.Element
maxSize int
currSize int
ttl time.Duration
pin bool
clock clockwork.Clock
Expand All @@ -61,6 +64,7 @@ type (
createTime time.Time
value interface{}
refCount int
size int
}
)

Expand All @@ -86,6 +90,7 @@ func (it *iteratorImpl) Next() Entry {
entry = &entryImpl{
key: entry.key,
value: entry.value,
size: entry.size,
createTime: entry.createTime,
}
it.prepareNext()
Expand Down Expand Up @@ -127,6 +132,10 @@ func (entry *entryImpl) Value() interface{} {
return entry.value
}

func (entry *entryImpl) Size() int {
return entry.size
}

func (entry *entryImpl) CreateTime() time.Time {
return entry.createTime
}
Expand All @@ -146,6 +155,7 @@ func New(maxSize int, opts *Options) Cache {
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
maxSize: maxSize,
currSize: 0,
pin: opts.Pin,
clock: clock,
}
Expand Down Expand Up @@ -247,12 +257,15 @@ func (c *lru) Release(key interface{}) {
entry.refCount--
}

// Size returns the number of entries currently in the lru, useful if cache is not full
// Size returns the current size of the lru, useful if cache is not full. This size is calculated by summing
// the size of all entries in the cache. And the entry size is calculated by the size of the value.
// The size of the value is calculated implementing the Sizeable interface. If the value does not implement
// the Sizeable interface, the size is 1.
func (c *lru) Size() int {
c.mut.Lock()
defer c.mut.Unlock()

return len(c.byKey)
return c.currSize
}

// Put puts a new value associated with a given key, returning the existing value (if present)
Expand All @@ -261,9 +274,22 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
if c.maxSize == 0 {
return nil, nil
}
entrySize := getSize(value)
if entrySize > c.maxSize {
return nil, ErrCacheItemTooLarge
}

c.mut.Lock()
defer c.mut.Unlock()

c.currSize += entrySize
c.tryEvictUntilEnoughSpace()
// If there is still not enough space, remove the new entry size from the current size and return an error
if c.currSize > c.maxSize {
c.currSize -= entrySize
return nil, ErrCacheFull
}

elt := c.byKey[key]
if elt != nil {
entry := elt.Value.(*entryImpl)
Expand All @@ -290,6 +316,7 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
entry := &entryImpl{
key: key,
value: value,
size: entrySize,
}

if c.pin {
Expand All @@ -300,30 +327,24 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
entry.createTime = c.clock.Now().UTC()
}

if len(c.byKey) >= c.maxSize {
c.evictOnceInternal()
}
if len(c.byKey) >= c.maxSize {
return nil, ErrCacheFull
}

element := c.byAccess.PushFront(entry)
c.byKey[key] = element
return nil, nil
}

func (c *lru) deleteInternal(element *list.Element) {
entry := c.byAccess.Remove(element).(*entryImpl)
c.currSize -= entry.Size()
delete(c.byKey, entry.key)
}

func (c *lru) evictOnceInternal() {
// tryEvictUntilEnoughSpace try to evict entries until there is enough space for the new entry
func (c *lru) tryEvictUntilEnoughSpace() {
element := c.byAccess.Back()
for element != nil {
for c.currSize > c.maxSize && element != nil {
entry := element.Value.(*entryImpl)
if entry.refCount == 0 {
c.deleteInternal(element)
return
}

// entry.refCount > 0
Expand Down
65 changes: 62 additions & 3 deletions common/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,29 @@
package cache

import (
"math/rand"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
)

type keyType struct {
dummyString string
dummyInt int
type (
keyType struct {
dummyString string
dummyInt int
}

testEntryWithCacheSize struct {
cacheSize int
}
)

func (c *testEntryWithCacheSize) CacheSize() int {
return c.cacheSize
}

func TestLRU(t *testing.T) {
Expand Down Expand Up @@ -333,3 +345,50 @@ func TestZeroSizeCache(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 0, cache.Size())
}

func TestCache_ItemSizeTooLarge(t *testing.T) {
t.Parallel()

maxTotalBytes := 10
cache := NewLRU(maxTotalBytes)

res := cache.Put(uuid.New(), &testEntryWithCacheSize{maxTotalBytes})
assert.Equal(t, res, nil)

res, err := cache.PutIfNotExist(uuid.New(), &testEntryWithCacheSize{maxTotalBytes + 1})
assert.Equal(t, err, ErrCacheItemTooLarge)
assert.Equal(t, res, nil)

}

func TestCache_ItemHasCacheSizeDefined(t *testing.T) {
t.Parallel()

maxTotalBytes := 10
cache := NewLRU(maxTotalBytes)

numPuts := rand.Intn(1024)

startWG := sync.WaitGroup{}
endWG := sync.WaitGroup{}

startWG.Add(numPuts)
endWG.Add(numPuts)

go func() {
startWG.Wait()
assert.True(t, cache.Size() < maxTotalBytes)
}()
for i := 0; i < numPuts; i++ {
go func() {
defer endWG.Done()

startWG.Wait()
key := uuid.New()
cache.Put(key, &testEntryWithCacheSize{rand.Int()})
}()
startWG.Done()
}

endWG.Wait()
}
42 changes: 42 additions & 0 deletions common/cache/size_getter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// The MIT License
//
// Copyright (c) 2023 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination size_getter_mock.go

package cache

// SizeGetter is an interface that can be implemented by cache entries to provide their size
type (
SizeGetter interface {
CacheSize() int
}
)

func getSize(value interface{}) int {
if v, ok := value.(SizeGetter); ok {
return v.CacheSize()
}
// if the object does not have a CacheSize() method, assume is count limit cache, which size should be 1
return 1
}
72 changes: 72 additions & 0 deletions common/cache/size_getter_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,8 @@ const (
// DefaultQueueReaderID is the default readerID when loading history tasks
DefaultQueueReaderID int64 = 0
)

const (
// DefaultOperatorRPSRatio is the default percentage of rate limit that should be used for operator priority requests
DefaultOperatorRPSRatio float64 = 0.2
)
6 changes: 6 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ const (
// ShardPerNsRPSWarnPercent is the per-shard per-namespace RPS limit for warning as a percentage of ShardRPSWarnLimit
// these warning are not emitted if the value is set to 0 or less
ShardPerNsRPSWarnPercent = "system.shardPerNsRPSWarnPercent"
// OperatorRPSRatio is the percentage of the rate limit provided to priority rate limiters that should be used for
// operator API calls (highest priority). Should be >0.0 and <= 1.0 (defaults to 20% if not specified)
OperatorRPSRatio = "system.operatorRPSRatio"

// Whether the deadlock detector should dump goroutines
DeadlockDumpGoroutines = "system.deadlock.DumpGoroutines"
Expand Down Expand Up @@ -480,6 +483,9 @@ const (
HistoryCacheMaxSize = "history.cacheMaxSize"
// HistoryCacheTTL is TTL of history cache
HistoryCacheTTL = "history.cacheTTL"
// HistoryCacheNonUserContextLockTimeout controls how long non-user call (callerType != API or Operator)
// will wait on workflow lock acquisition. Requires service restart to take effect.
HistoryCacheNonUserContextLockTimeout = "history.cacheNonUserContextLockTimeout"
// HistoryStartupMembershipJoinDelay is the duration a history instance waits
// before joining membership after starting.
HistoryStartupMembershipJoinDelay = "history.startupMembershipJoinDelay"
Expand Down
Loading

0 comments on commit c0fd549

Please sign in to comment.