Skip to content

Commit

Permalink
admission: optimizations to admission.WorkQueue
Browse files Browse the repository at this point in the history
These small optimizations reduce memory allocations and
the length of the critical sections.

A 10s mutex profile of kv50 with cpu overload had
~1.2s (of a total of 2s) in the admission package.
This was correlated with what showed up in the
corresponding cpu profile, which directed these
changes.

Release note: None
  • Loading branch information
sumeerbhola committed Jun 28, 2021
1 parent 1c13bba commit 2834239
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 15 deletions.
93 changes: 78 additions & 15 deletions pkg/util/admission/work_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"math"
"sort"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -188,9 +189,6 @@ func makeWorkQueue(
return q
}

// TODO(sumeer): reduce allocations by using a pool for tenantInfo, waitingWork,
// waitingWork.ch.

// Admit is called when requesting admission for some work. If err!=nil, the
// request was not admitted, potentially due to the deadline being exceeded.
// The enabled return value is relevant when err=nil, and represents whether
Expand All @@ -212,7 +210,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
q.mu.Lock()
tenant, ok := q.mu.tenants[tenantID]
if !ok {
tenant = makeTenantInfo(tenantID)
tenant = newTenantInfo(tenantID)
q.mu.tenants[tenantID] = tenant
}
if info.BypassAdmission && roachpb.IsSystemTenantID(tenantID) && q.workKind == KVWork {
Expand Down Expand Up @@ -279,26 +277,32 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
}
}
// Push onto heap(s).
work := makeWaitingWork(info.Priority, info.CreateTime)
work := newWaitingWork(info.Priority, info.CreateTime)
heap.Push(&tenant.waitingWorkHeap, work)
if len(tenant.waitingWorkHeap) == 1 {
heap.Push(&q.mu.tenantHeap, tenant)
}
// Else already in tenantHeap.
q.metrics.WaitQueueLength.Inc(1)

// Release all locks and start waiting.
q.mu.Unlock()
q.admitMu.Unlock()

q.metrics.WaitQueueLength.Inc(1)
defer releaseWaitingWork(work)
select {
case <-doneCh:
q.mu.Lock()
if work.heapIndex == -1 {
// No longer in heap. Raced with token/slot grant.
chainID := <-work.ch
tenant.used--
q.mu.Unlock()
q.granter.returnGrant()
// The channel is sent to after releasing mu, so we don't need to hold
// mu when receiving from it. Additionally, we've already called
// returnGrant so we're not holding back future grant chains if this one
// chain gets terminated.
chainID := <-work.ch
q.granter.continueGrantChain(chainID)
} else {
tenant.waitingWorkHeap.remove(work)
Expand Down Expand Up @@ -366,21 +370,25 @@ func (q *WorkQueue) hasWaitingRequests() bool {
}

func (q *WorkQueue) granted(grantChainID grantChainID) bool {
// Reduce critical section by getting time before mutex acquisition.
now := timeutil.Now()
q.mu.Lock()
defer q.mu.Unlock()
if len(q.mu.tenantHeap) == 0 {
q.mu.Unlock()
return false
}
tenant := q.mu.tenantHeap[0]
item := heap.Pop(&tenant.waitingWorkHeap).(*waitingWork)
item.grantTime = timeutil.Now()
item.ch <- grantChainID
item.grantTime = now
tenant.used++
if len(tenant.waitingWorkHeap) > 0 {
q.mu.tenantHeap.fix(tenant)
} else {
q.mu.tenantHeap.remove(tenant)
}
q.mu.Unlock()
// Reduce critical section by sending on channel after releasing mutex.
item.ch <- grantChainID
return true
}

Expand All @@ -393,6 +401,7 @@ func (q *WorkQueue) gcTenantsAndResetTokens() {
for id, info := range q.mu.tenants {
if info.used == 0 && len(info.waitingWorkHeap) == 0 {
delete(q.mu.tenants, id)
releaseTenantInfo(info)
}
if q.usesTokens {
info.used = 0
Expand Down Expand Up @@ -456,8 +465,36 @@ type tenantHeap []*tenantInfo

var _ heap.Interface = (*tenantHeap)(nil)

func makeTenantInfo(id uint64) *tenantInfo {
return &tenantInfo{id: id, heapIndex: -1}
var tenantInfoPool = sync.Pool{
New: func() interface{} {
return &tenantInfo{}
},
}

func newTenantInfo(id uint64) *tenantInfo {
ti := tenantInfoPool.Get().(*tenantInfo)
*ti = tenantInfo{
id: id,
waitingWorkHeap: ti.waitingWorkHeap,
heapIndex: -1,
}
return ti
}

func releaseTenantInfo(ti *tenantInfo) {
if len(ti.waitingWorkHeap) != 0 {
panic("tenantInfo has non-empty heap")
}
// NB: waitingWorkHeap.Pop nils the slice elements when removing, so we are
// not inadvertently holding any references.
if cap(ti.waitingWorkHeap) > 100 {
ti.waitingWorkHeap = nil
}
wwh := ti.waitingWorkHeap
*ti = tenantInfo{
waitingWorkHeap: wwh,
}
tenantInfoPool.Put(ti)
}

func (th *tenantHeap) fix(item *tenantInfo) {
Expand Down Expand Up @@ -519,13 +556,39 @@ type waitingWorkHeap []*waitingWork

var _ heap.Interface = (*waitingWorkHeap)(nil)

func makeWaitingWork(priority WorkPriority, createTime int64) *waitingWork {
return &waitingWork{
var waitingWorkPool = sync.Pool{
New: func() interface{} {
return &waitingWork{}
},
}

func newWaitingWork(priority WorkPriority, createTime int64) *waitingWork {
ww := waitingWorkPool.Get().(*waitingWork)
ch := ww.ch
if ch == nil {
ch = make(chan grantChainID, 1)
}
*ww = waitingWork{
priority: priority,
createTime: createTime,
ch: make(chan grantChainID, 1),
ch: ch,
heapIndex: -1,
}
return ww
}

// releaseWaitingWork must be called with an empty waitingWork.ch.
func releaseWaitingWork(ww *waitingWork) {
ch := ww.ch
select {
case <-ch:
panic("channel must be empty and not closed")
default:
}
*ww = waitingWork{
ch: ch,
}
waitingWorkPool.Put(ww)
}

func (wwh *waitingWorkHeap) remove(item *waitingWork) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/admission/work_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,5 @@ func scanTenantID(t *testing.T, d *datadriven.TestData) roachpb.TenantID {
// - Test metrics
// - Test race between grant and cancellation
// - Test WorkQueue for tokens
// - Add microbenchmark with high concurrency and procs for full admission
// system

0 comments on commit 2834239

Please sign in to comment.