Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace, mcs, tso: support reference counter in LockGroup #6629

Merged
merged 3 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,13 @@ func NewKeyspaceManager(
kgm *GroupManager,
) *Manager {
return &Manager{
ctx: ctx,
metaLock: syncutil.NewLockGroup(syncutil.WithHash(MaskKeyspaceID)),
ctx: ctx,
// Remove the lock of the given key from the lock group when unlock to
// keep minimal working set, which is suited for low qps, non-time-critical
// and non-consecutive large key space scenarios. One of scenarios for
// last use case is keyspace group split loads non-consecutive keyspace meta
// in batches and lock all loaded keyspace meta within a batch at the same time.
metaLock: syncutil.NewLockGroup(syncutil.WithRemoveEntryOnUnlock(true)),
idAllocator: idAllocator,
store: store,
cluster: cluster,
Expand Down
38 changes: 32 additions & 6 deletions pkg/utils/syncutil/lock_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ package syncutil

import "fmt"

type lockEntry struct {
mu *Mutex
refCount int
}

// LockGroup is a map of mutex that locks entries with different id separately.
// It's used levitate lock contentions of using a global lock.
type LockGroup struct {
groupLock Mutex // protects group.
entries map[uint32]*Mutex // map of locks with id as key.
groupLock Mutex // protects group.
removeEntryOnUnlock bool // if remove entry from entries on Unlock().
entries map[uint32]*lockEntry // map of locks with id as key.
// hashFn hashes id to map key, it's main purpose is to limit the total
// number of mutexes in the group, as using a mutex for every id is too memory heavy.
hashFn func(id uint32) uint32
Expand All @@ -36,10 +42,17 @@ func WithHash(hashFn func(id uint32) uint32) LockGroupOption {
}
}

// WithRemoveEntryOnUnlock sets the lockGroup's removeEntryOnUnlock to provided value.
func WithRemoveEntryOnUnlock(removeEntryOnUnlock bool) LockGroupOption {
return func(lg *LockGroup) {
lg.removeEntryOnUnlock = removeEntryOnUnlock
}
}

// NewLockGroup create and return an empty lockGroup.
func NewLockGroup(options ...LockGroupOption) *LockGroup {
lockGroup := &LockGroup{
entries: make(map[uint32]*Mutex),
entries: make(map[uint32]*lockEntry),
// If no custom hash function provided, use identity hash.
hashFn: func(id uint32) uint32 { return id },
}
Expand All @@ -56,11 +69,15 @@ func (g *LockGroup) Lock(id uint32) {
e, ok := g.entries[hashedID]
// If target id's lock has not been initialized, create a new lock.
if !ok {
e = &Mutex{}
e = &lockEntry{
mu: &Mutex{},
refCount: 0,
}
g.entries[hashedID] = e
}
e.refCount++
g.groupLock.Unlock()
e.Lock()
e.mu.Lock()
}

// Unlock unlocks the target mutex based on the hash of the id.
Expand All @@ -73,6 +90,15 @@ func (g *LockGroup) Unlock(id uint32) {
g.groupLock.Unlock()
panic(fmt.Errorf("unlock requested for key %v, but no entry found", id))
}
e.refCount--
if e.refCount == -1 {
// Ref count should never be negative, otherwise there should be a run-time error and panic.
g.groupLock.Unlock()
panic(fmt.Errorf("unlock requested for key %v, but ref count is negative", id))
}
if g.removeEntryOnUnlock && e.refCount == 0 {
delete(g.entries, hashedID)
}
g.groupLock.Unlock()
e.Unlock()
e.mu.Unlock()
}
36 changes: 33 additions & 3 deletions pkg/utils/syncutil/lock_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,47 @@ func TestLockGroup(t *testing.T) {
for i := 0; i < concurrency; i++ {
go func(spaceID uint32) {
defer wg.Done()
mustSequentialUpdateSingle(re, spaceID, group)
mustSequentialUpdateSingle(re, spaceID, group, concurrency)
}(rand.Uint32())
}
wg.Wait()
// Check that size of the lock group is limited.
re.LessOrEqual(len(group.entries), 16)
}

func TestLockGroupWithRemoveEntryOnUnlock(t *testing.T) {
re := require.New(t)
group := NewLockGroup(WithRemoveEntryOnUnlock(true))
maxID := 1024

// Test Concurrent lock/unlock.
var wg sync.WaitGroup
wg.Add(maxID)
for i := 0; i < maxID; i++ {
go func(spaceID uint32) {
defer wg.Done()
mustSequentialUpdateSingle(re, spaceID, group, 10)
}(uint32(i))
}

// Test range lock in a scenario with non-consecutive large key space. One of example is
// keyspace group split loads non-consecutive keyspace meta in batches and lock all loaded
// keyspace meta within a batch at the same time.
for i := 0; i < maxID; i++ {
group.Lock(uint32(i))
}
re.Equal(len(group.entries), maxID)
for i := 0; i < maxID; i++ {
group.Unlock(uint32(i))
}

wg.Wait()
// Check that size of the lock group is limited.
re.Equal(len(group.entries), 0)
}

// mustSequentialUpdateSingle checks that for any given update, update is sequential.
func mustSequentialUpdateSingle(re *require.Assertions, spaceID uint32, group *LockGroup) {
concurrency := 50
func mustSequentialUpdateSingle(re *require.Assertions, spaceID uint32, group *LockGroup, concurrency int) {
total := 0
var wg sync.WaitGroup
wg.Add(concurrency)
Expand Down