From e571ffe475da32386ca1c76622688d2843ace001 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 7 Jul 2023 23:39:29 -0400 Subject: [PATCH 1/3] kvflowcontrol: annotate and fix perf regressions - Replace the flow controller level mutex-backed kvflowcontrol.Stream => token bucket map with sync.Map. On kv0/enc=false/nodes=3/cpu=96 accessing this map contributed to a high amount of mutex contention. We observe that this bucket is effectively read-only - entries for keys are written once (on creation) and read frequently after. We don't currently GC these buckets, but even if we did, the same access pattern would hold. We'll note that using a sync.Map is slightly more expensive CPU-wise. - Replace various map accesses with individual variables. We were needly using maps to access one of two variables, keyed by work class, for example when maintaining metrics per work class, or tracking token adjustments. The map accesses appeared prominently in CPU profiles and was unnecessary overhead. - Avoid using log.ExpensiveLogEnabled in hot code paths; it shows up in CPU profiles. - Slightly reduce the surface area of kvflowhandle.Handle.mu when returning flow tokens. - We also annotate various other points in the code where peep-hole optimizations exist, as surfaced by kv0/enc=false/nodes=3/cpu=96. Part of #104154. Release note: None --- .../kvserver/kvflowcontrol/kvflowcontrol.go | 3 +- .../kvflowcontroller/kvflowcontroller.go | 223 ++++++++++-------- .../kvflowcontroller_metrics.go | 98 +++++--- .../kvflowcontroller/kvflowcontroller_test.go | 43 +++- .../kvflowdispatch/kvflowdispatch.go | 8 + .../kvflowhandle/kvflowhandle.go | 12 +- .../kvflowtokentracker/tracker.go | 12 +- pkg/kv/kvserver/raft_transport.go | 4 +- pkg/server/admission.go | 2 + 9 files changed, 262 insertions(+), 143 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index 0046a858f26a..c16bfca0ac74 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -307,7 +307,8 @@ type raftAdmissionMetaKey struct{} // ContextWithMeta returns a Context wrapping the supplied raft admission meta, // if any. // -// TODO(irfansharif): This causes a heap allocation. Revisit as part of #95563. +// TODO(irfansharif,aaditya): This causes a heap allocation. Revisit as part of +// #104154. func ContextWithMeta(ctx context.Context, meta *kvflowcontrolpb.RaftAdmissionMeta) context.Context { if meta != nil { ctx = context.WithValue(ctx, raftAdmissionMetaKey{}, meta) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index b6ac9307674a..a4980e4aff88 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "sort" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" @@ -68,7 +69,8 @@ type Controller struct { // streams get closed permanently (tenants get deleted, nodes removed) // or when completely inactive (no tokens deducted/returned over 30+ // minutes), clear these out. - buckets map[kvflowcontrol.Stream]*bucket + buckets sync.Map // kvflowcontrol.Stream => *bucket + bucketCount int } metrics *metrics clock *hlc.Clock @@ -86,38 +88,40 @@ func New(registry *metric.Registry, settings *cluster.Settings, clock *hlc.Clock regularTokens := kvflowcontrol.Tokens(regularTokensPerStream.Get(&settings.SV)) elasticTokens := kvflowcontrol.Tokens(elasticTokensPerStream.Get(&settings.SV)) - c.mu.limit = map[admissionpb.WorkClass]kvflowcontrol.Tokens{ + c.mu.limit = tokensPerWorkClass{ regular: regularTokens, elastic: elasticTokens, } - c.mu.buckets = make(map[kvflowcontrol.Stream]*bucket) + c.mu.buckets = sync.Map{} regularTokensPerStream.SetOnChange(&settings.SV, func(ctx context.Context) { c.mu.Lock() defer c.mu.Unlock() - before := tokensPerWorkClass{ - regular: c.mu.limit[regular], - elastic: c.mu.limit[elastic], - } + before := c.mu.limit now := tokensPerWorkClass{ regular: kvflowcontrol.Tokens(regularTokensPerStream.Get(&settings.SV)), elastic: kvflowcontrol.Tokens(elasticTokensPerStream.Get(&settings.SV)), } adjustment := tokensPerWorkClass{ - regular: now[regular] - before[regular], - elastic: now[elastic] - before[elastic], + regular: now.regular - before.regular, + elastic: now.elastic - before.elastic, } c.mu.limit = now - for _, b := range c.mu.buckets { + c.mu.buckets.Range(func(_, value any) bool { + // NB: We're holding the controller mutex here, which guards against + // new buckets being added, synchronization we don't get out of + // sync.Map.Range() directly. + b := value.(*bucket) b.mu.Lock() - b.mu.tokens[regular] += adjustment[regular] - b.mu.tokens[elastic] += adjustment[elastic] + b.mu.tokensPerWorkClass.regular += adjustment.regular + b.mu.tokensPerWorkClass.elastic += adjustment.elastic b.mu.Unlock() c.metrics.onTokenAdjustment(adjustment) - if adjustment[regular] > 0 || adjustment[elastic] > 0 { + if adjustment.regular > 0 || adjustment.elastic > 0 { b.signal() // signal a waiter, if any } - } + return true + }) }) c.metrics = newMetrics(c) registry.AddMetricStruct(c.metrics) @@ -143,9 +147,7 @@ func (c *Controller) Admit( logged := false tstart := c.clock.PhysicalTime() for { - c.mu.Lock() - b := c.getBucketLocked(connection.Stream()) - c.mu.Unlock() + b := c.getBucket(connection.Stream()) tokens := b.tokens(class) // In addition to letting requests through when there are tokens @@ -153,7 +155,7 @@ func (c *Controller) Admit( // applying flow control to their specific work class. bypass := c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass if tokens > 0 || bypass { - if log.ExpensiveLogEnabled(ctx, 2) { + if log.V(2) { log.Infof(ctx, "admitted request (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)", pri, connection.Stream(), tokens, c.clock.PhysicalTime().Sub(tstart), c.mode()) } @@ -184,7 +186,7 @@ func (c *Controller) Admit( return true, nil } - if !logged && log.ExpensiveLogEnabled(ctx, 2) { + if !logged && log.V(2) { log.Infof(ctx, "waiting for flow tokens (pri=%s stream=%s tokens=%s)", pri, connection.Stream(), tokens) logged = true @@ -245,16 +247,20 @@ func (c *Controller) Inspect(ctx context.Context) []kvflowinspectpb.Stream { defer c.mu.Unlock() var streams []kvflowinspectpb.Stream - for stream, b := range c.mu.buckets { - b.mu.Lock() + c.mu.buckets.Range(func(key, value any) bool { + stream := key.(kvflowcontrol.Stream) + b := value.(*bucket) + + b.mu.RLock() streams = append(streams, kvflowinspectpb.Stream{ TenantID: stream.TenantID, StoreID: stream.StoreID, AvailableRegularTokens: int64(b.tokensLocked(regular)), AvailableElasticTokens: int64(b.tokensLocked(elastic)), }) - b.mu.Unlock() - } + b.mu.RUnlock() + return true + }) sort.Slice(streams, func(i, j int) bool { // for determinism if streams[i].TenantID != streams[j].TenantID { return streams[i].TenantID.ToUint64() < streams[j].TenantID.ToUint64() @@ -272,8 +278,8 @@ func (c *Controller) InspectStream( return kvflowinspectpb.Stream{ TenantID: stream.TenantID, StoreID: stream.StoreID, - AvailableRegularTokens: int64(tokens[regular]), - AvailableElasticTokens: int64(tokens[elastic]), + AvailableRegularTokens: int64(tokens.regular), + AvailableElasticTokens: int64(tokens.elastic), } } @@ -285,31 +291,46 @@ func (c *Controller) adjustTokens( ) { class := admissionpb.WorkClassFromPri(pri) - c.mu.Lock() - b := c.getBucketLocked(stream) - c.mu.Unlock() + // TODO(irfansharif,aaditya): Double check that there are no more + // alloc_objects (for the tokensPerWorkClass instances being bussed around + // below) when running kv0/enc=false/nodes=3/cpu=96. Do this as part of + // #104154. + b := c.getBucket(stream) adjustment, unaccounted := b.adjust(ctx, class, delta, c.mu.limit) c.metrics.onTokenAdjustment(adjustment) - c.metrics.onUnaccounted(unaccounted) - if adjustment[regular] > 0 || adjustment[elastic] > 0 { + if unaccounted.regular > 0 || unaccounted.elastic > 0 { + c.metrics.onUnaccounted(unaccounted) + } + if adjustment.regular > 0 || adjustment.elastic > 0 { b.signal() // signal a waiter, if any } - if log.ExpensiveLogEnabled(ctx, 2) { - b.mu.Lock() + if log.V(2) { + b.mu.RLock() log.Infof(ctx, "adjusted flow tokens (pri=%s stream=%s delta=%s): regular=%s elastic=%s", pri, stream, delta, b.tokensLocked(regular), b.tokensLocked(elastic)) - b.mu.Unlock() + b.mu.RUnlock() } } -func (c *Controller) getBucketLocked(stream kvflowcontrol.Stream) *bucket { - b, ok := c.mu.buckets[stream] +func (c *Controller) getBucket(stream kvflowcontrol.Stream) *bucket { + // NB: sync.map is more expensive CPU wise as per BenchmarkController + // for reads, ~250ns vs. ~350ns, though better for mutex contention when + // looking at kv0/enc=false/nodes=3/cpu=9. The sync.Map does show up in CPU + // profiles more prominently though. If we want to go back to it being a + // mutex-backed map, we could use a read-Lock when trying to read the bucket + // and then swapping for a write-lock when optionally creating the bucket. + b, ok := c.mu.buckets.Load(stream) if !ok { - b = newBucket(c.mu.limit) - c.mu.buckets[stream] = b + c.mu.Lock() + var loaded bool + b, loaded = c.mu.buckets.LoadOrStore(stream, newBucket(c.mu.limit)) + if !loaded { + c.mu.bucketCount += 1 + } + c.mu.Unlock() } - return b + return b.(*bucket) } // bucket holds flow tokens for {regular,elastic} traffic over a @@ -317,8 +338,8 @@ func (c *Controller) getBucketLocked(stream kvflowcontrol.Stream) *bucket { // returning and waiting for flow tokens. type bucket struct { mu struct { - syncutil.Mutex - tokens tokensPerWorkClass + syncutil.RWMutex + tokensPerWorkClass tokensPerWorkClass } // Waiting requests do so by waiting on signalCh without holding mutexes. @@ -336,25 +357,25 @@ type bucket struct { signalCh chan struct{} } -func newBucket(t tokensPerWorkClass) *bucket { +func newBucket(tokensPerWorkClass tokensPerWorkClass) *bucket { b := bucket{ signalCh: make(chan struct{}, 1), } - b.mu.tokens = map[admissionpb.WorkClass]kvflowcontrol.Tokens{ - regular: t[regular], - elastic: t[elastic], - } + b.mu.tokensPerWorkClass = tokensPerWorkClass return &b } func (b *bucket) tokens(wc admissionpb.WorkClass) kvflowcontrol.Tokens { - b.mu.Lock() - defer b.mu.Unlock() + b.mu.RLock() + defer b.mu.RUnlock() return b.tokensLocked(wc) } func (b *bucket) tokensLocked(wc admissionpb.WorkClass) kvflowcontrol.Tokens { - return b.mu.tokens[wc] + if wc == regular { + return b.mu.tokensPerWorkClass.regular + } + return b.mu.tokensPerWorkClass.elastic } func (b *bucket) signal() { @@ -376,52 +397,76 @@ func (b *bucket) adjust( ) (adjustment, unaccounted tokensPerWorkClass) { b.mu.Lock() defer b.mu.Unlock() - - unaccounted = tokensPerWorkClass{ - regular: 0, - elastic: 0, - } - - before := tokensPerWorkClass{ - regular: b.mu.tokens[regular], - elastic: b.mu.tokens[elastic], - } + // TODO(irfansharif,aaditya): On kv0/enc=false/nodes=3/cpu=96 this mutex is + // responsible for ~1.8% of the mutex contention. Maybe address it as part + // of #104154. We want to effectively increment two values but cap each at + // some limit, and when incrementing, figure out what the adjustment was. + // What if reads always capped it at the limit? And when incrementing + // atomically by +delta, if we're over the limit, since we tried to increase + // the value by +delta, at most we need to adjust back down by -delta. + // Something like it. + // + // var c int64 = 0 + // var limit int64 = rand.Int63n(10000000000) + // for i := 0; i < 50; i++ { + // go func() { + // for j := 0; j < 2000; j++ { + // delta := rand.Int63() + // v := atomic.AddInt64(&c, delta) + // if v > limit { + // overlimit := v - limit + // var adjustment int64 = overlimit + // if delta < overlimit { + // adjustment = delta + // } + // n := atomic.AddInt64(&c, -adjustment) + // fmt.Printf("%d > %d by %d, adjusted by %d to %d)\n", + // v, limit, v-limit, -adjustment, n) + // } + // } + // }() + // } + + unaccounted = tokensPerWorkClass{} + before := b.mu.tokensPerWorkClass switch class { case elastic: // Elastic {deductions,returns} only affect elastic flow tokens. - b.mu.tokens[class] += delta - if delta > 0 && b.mu.tokens[class] > limit[class] { - unaccounted[class] = b.mu.tokens[class] - limit[class] - b.mu.tokens[class] = limit[class] // enforce ceiling + b.mu.tokensPerWorkClass.elastic += delta + if delta > 0 && b.mu.tokensPerWorkClass.elastic > limit.elastic { + unaccounted.elastic = b.mu.tokensPerWorkClass.elastic - limit.elastic + b.mu.tokensPerWorkClass.elastic = limit.elastic // enforce ceiling } case regular: - b.mu.tokens[class] += delta - if delta > 0 && b.mu.tokens[class] > limit[class] { - unaccounted[class] = b.mu.tokens[class] - limit[class] - b.mu.tokens[class] = limit[class] // enforce ceiling + b.mu.tokensPerWorkClass.regular += delta + if delta > 0 && b.mu.tokensPerWorkClass.regular > limit.regular { + unaccounted.regular = b.mu.tokensPerWorkClass.regular - limit.regular + b.mu.tokensPerWorkClass.regular = limit.regular // enforce ceiling } - b.mu.tokens[elastic] += delta - if delta > 0 && b.mu.tokens[elastic] > limit[elastic] { - unaccounted[elastic] = b.mu.tokens[elastic] - limit[elastic] - b.mu.tokens[elastic] = limit[elastic] // enforce ceiling + b.mu.tokensPerWorkClass.elastic += delta + if delta > 0 && b.mu.tokensPerWorkClass.elastic > limit.elastic { + unaccounted.elastic = b.mu.tokensPerWorkClass.elastic - limit.elastic + b.mu.tokensPerWorkClass.elastic = limit.elastic // enforce ceiling } } - if buildutil.CrdbTestBuild && (unaccounted[regular] != 0 || unaccounted[elastic] != 0) { + if buildutil.CrdbTestBuild && (unaccounted.regular != 0 || unaccounted.elastic != 0) { log.Fatalf(ctx, "unaccounted[regular]=%s unaccounted[elastic]=%s for class=%s delta=%s limit[regular]=%s limit[elastic]=%s", - unaccounted[regular], unaccounted[elastic], class, delta, limit[regular], limit[elastic]) + unaccounted.regular, unaccounted.elastic, class, delta, limit.regular, limit.elastic) } adjustment = tokensPerWorkClass{ - regular: b.mu.tokens[regular] - before[regular], - elastic: b.mu.tokens[elastic] - before[elastic], + regular: b.mu.tokensPerWorkClass.regular - before.regular, + elastic: b.mu.tokensPerWorkClass.elastic - before.elastic, } return adjustment, unaccounted } -type tokensPerWorkClass map[admissionpb.WorkClass]kvflowcontrol.Tokens +type tokensPerWorkClass struct { + regular, elastic kvflowcontrol.Tokens +} const ( minTokensPerStream kvflowcontrol.Tokens = 1 << 20 // 1 MiB @@ -440,16 +485,13 @@ var validateTokenRange = settings.WithValidateInt(func(b int64) error { }) func (c *Controller) getTokensForStream(stream kvflowcontrol.Stream) tokensPerWorkClass { - ret := make(map[admissionpb.WorkClass]kvflowcontrol.Tokens) - c.mu.Lock() - b := c.getBucketLocked(stream) - c.mu.Unlock() + ret := tokensPerWorkClass{} + b := c.getBucket(stream) - b.mu.Lock() - for _, wc := range []admissionpb.WorkClass{regular, elastic} { - ret[wc] = b.tokensLocked(wc) - } - b.mu.Unlock() + b.mu.RLock() + ret.regular = b.tokensLocked(regular) + ret.elastic = b.tokensLocked(elastic) + b.mu.RUnlock() return ret } @@ -488,14 +530,9 @@ func (c *Controller) TestingNonBlockingAdmit( default: } - c.mu.Lock() - b := c.getBucketLocked(connection.Stream()) - c.mu.Unlock() - - b.mu.Lock() - tokens := b.mu.tokens[class] - b.mu.Unlock() + b := c.getBucket(connection.Stream()) + tokens := b.tokens(class) if tokens <= 0 { return false } @@ -529,9 +566,7 @@ func (c *Controller) TestingMetrics() interface{} { } func (c *Controller) testingGetBucket(stream kvflowcontrol.Stream) *bucket { - c.mu.Lock() - defer c.mu.Unlock() - return c.getBucketLocked(stream) + return c.getBucket(stream) } func (b *bucket) testingSignaled(connection kvflowcontrol.ConnectedStream) func() bool { diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go index 5325e8a83f35..8097abce8d16 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -129,17 +130,20 @@ func annotateMetricTemplateWithWorkClass( } type metrics struct { - FlowTokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge - FlowTokensDeducted [admissionpb.NumWorkClasses]*metric.Counter - FlowTokensReturned [admissionpb.NumWorkClasses]*metric.Counter - FlowTokensUnaccounted [admissionpb.NumWorkClasses]*metric.Counter - RequestsWaiting [admissionpb.NumWorkClasses]*metric.Gauge - RequestsAdmitted [admissionpb.NumWorkClasses]*metric.Counter - RequestsErrored [admissionpb.NumWorkClasses]*metric.Counter - RequestsBypassed [admissionpb.NumWorkClasses]*metric.Counter - WaitDuration [admissionpb.NumWorkClasses]metric.IHistogram - TotalStreamCount [admissionpb.NumWorkClasses]*metric.Gauge - BlockedStreamCount [admissionpb.NumWorkClasses]*metric.Gauge + ElasticFlowTokensDeducted *metric.Counter + ElasticFlowTokensReturned *metric.Counter + ElasticFlowTokensUnaccounted *metric.Counter + RegularFlowTokensDeducted *metric.Counter + RegularFlowTokensReturned *metric.Counter + RegularFlowTokensUnaccounted *metric.Counter + FlowTokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge + RequestsWaiting [admissionpb.NumWorkClasses]*metric.Gauge + RequestsAdmitted [admissionpb.NumWorkClasses]*metric.Counter + RequestsErrored [admissionpb.NumWorkClasses]*metric.Counter + RequestsBypassed [admissionpb.NumWorkClasses]*metric.Counter + WaitDuration [admissionpb.NumWorkClasses]metric.IHistogram + TotalStreamCount [admissionpb.NumWorkClasses]*metric.Gauge + BlockedStreamCount [admissionpb.NumWorkClasses]*metric.Gauge } var _ metric.Struct = &metrics{} @@ -157,21 +161,35 @@ func newMetrics(c *Controller) *metrics { sum := int64(0) c.mu.Lock() defer c.mu.Unlock() - for _, b := range c.mu.buckets { + c.mu.buckets.Range(func(key, value any) bool { + b := value.(*bucket) sum += int64(b.tokens(wc)) - } + return true + }) return sum }, ) - m.FlowTokensDeducted[wc] = metric.NewCounter( - annotateMetricTemplateWithWorkClass(wc, flowTokensDeducted), - ) - m.FlowTokensReturned[wc] = metric.NewCounter( - annotateMetricTemplateWithWorkClass(wc, flowTokensReturned), - ) - m.FlowTokensUnaccounted[wc] = metric.NewCounter( - annotateMetricTemplateWithWorkClass(wc, flowTokensUnaccounted), - ) + if wc == regular { + m.RegularFlowTokensDeducted = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensDeducted), + ) + m.RegularFlowTokensReturned = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensReturned), + ) + m.RegularFlowTokensUnaccounted = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensUnaccounted), + ) + } else { + m.ElasticFlowTokensDeducted = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensDeducted), + ) + m.ElasticFlowTokensReturned = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensReturned), + ) + m.ElasticFlowTokensUnaccounted = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensUnaccounted), + ) + } m.RequestsWaiting[wc] = metric.NewGauge( annotateMetricTemplateWithWorkClass(wc, requestsWaiting), ) @@ -197,7 +215,7 @@ func newMetrics(c *Controller) *metrics { func() int64 { c.mu.Lock() defer c.mu.Unlock() - return int64(len(c.mu.buckets)) + return int64(c.mu.bucketCount) }, ) @@ -212,13 +230,16 @@ func newMetrics(c *Controller) *metrics { c.mu.Lock() defer c.mu.Unlock() - for s, wbc := range c.mu.buckets { - if wbc.tokens(wc) <= 0 { + c.mu.buckets.Range(func(key, value any) bool { + stream := key.(kvflowcontrol.Stream) + b := value.(*bucket) + + if b.tokens(wc) <= 0 { count += 1 if shouldLog { if count > 10 { - continue // cap output to 10 blocked streams + return false // cap output to 10 blocked streams } if count == 1 { buf.Reset() @@ -226,10 +247,11 @@ func newMetrics(c *Controller) *metrics { if count > 1 { buf.WriteString(", ") } - buf.WriteString(s.String()) + buf.WriteString(stream.String()) } } - } + return true + }) if shouldLog && count > 0 { log.Warningf(context.Background(), "%d blocked %s replication stream(s): %s", count, wc, buf.String()) } @@ -263,19 +285,21 @@ func (m *metrics) onErrored(class admissionpb.WorkClass, dur time.Duration) { } func (m *metrics) onTokenAdjustment(adjustment tokensPerWorkClass) { - for class, delta := range adjustment { - if delta < 0 { - m.FlowTokensDeducted[class].Inc(-int64(delta)) - } else if delta > 0 { - m.FlowTokensReturned[class].Inc(int64(delta)) - } + if adjustment.regular < 0 { + m.RegularFlowTokensDeducted.Inc(-int64(adjustment.regular)) + } else { + m.RegularFlowTokensReturned.Inc(int64(adjustment.regular)) + } + if adjustment.elastic < 0 { + m.ElasticFlowTokensDeducted.Inc(-int64(adjustment.elastic)) + } else { + m.ElasticFlowTokensReturned.Inc(int64(adjustment.elastic)) } } func (m *metrics) onUnaccounted(unaccounted tokensPerWorkClass) { - for class, delta := range unaccounted { - m.FlowTokensUnaccounted[class].Inc(int64(delta)) - } + m.RegularFlowTokensUnaccounted.Inc(int64(unaccounted.regular)) + m.ElasticFlowTokensUnaccounted.Inc(int64(unaccounted.elastic)) } // MetricStruct implements the metric.Struct interface. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go index 56f05ce9fe80..22ce7f0ab271 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go @@ -108,8 +108,8 @@ func TestFlowTokenAdjustment(t *testing.T) { var buf strings.Builder buf.WriteString(" regular | elastic\n") buf.WriteString(fmt.Sprintf(" %8s | %8s\n", - printTrimmedTokens(limit[admissionpb.RegularWorkClass]), - printTrimmedTokens(limit[admissionpb.ElasticWorkClass]), + printTrimmedTokens(limit.regular), + printTrimmedTokens(limit.elastic), )) buf.WriteString("======================================\n") for _, h := range adjustments { @@ -137,10 +137,10 @@ func (h adjustment) String() string { class := admissionpb.WorkClassFromPri(h.pri) comment := "" - if h.post[admissionpb.RegularWorkClass] <= 0 { + if h.post.regular <= 0 { comment = "regular" } - if h.post[admissionpb.ElasticWorkClass] <= 0 { + if h.post.elastic <= 0 { if len(comment) == 0 { comment = "elastic" } else { @@ -153,8 +153,8 @@ func (h adjustment) String() string { return fmt.Sprintf("%8s %7s %8s | %8s%s", printTrimmedTokens(h.delta), class, - printTrimmedTokens(h.post[admissionpb.RegularWorkClass]), - printTrimmedTokens(h.post[admissionpb.ElasticWorkClass]), + printTrimmedTokens(h.post.regular), + printTrimmedTokens(h.post.elastic), comment, ) } @@ -234,3 +234,34 @@ func (m *mockConnectedStream) Stream() kvflowcontrol.Stream { func (m *mockConnectedStream) Disconnected() <-chan struct{} { return nil } + +func BenchmarkController(b *testing.B) { + ctx := context.Background() + makeStream := func(id uint64) kvflowcontrol.Stream { + return kvflowcontrol.Stream{ + TenantID: roachpb.MustMakeTenantID(id), + StoreID: roachpb.StoreID(id), + } + } + makeConnectedStream := func(id uint64) kvflowcontrol.ConnectedStream { + return &mockConnectedStream{ + stream: makeStream(id), + } + } + + st := cluster.MakeTestingClusterSettings() + elasticTokensPerStream.Override(ctx, &st.SV, 8<<20 /* 8 MiB */) + regularTokensPerStream.Override(ctx, &st.SV, 16<<20 /* 16 MiB */) + controller := New(metric.NewRegistry(), st, hlc.NewClockForTesting(nil)) + + // Deduct some {regular,elastic} tokens from s1/t1 and verify that Inspect() + // renders the state correctly. + t1s1 := makeStream(1) + ct1s1 := makeConnectedStream(1) + + for i := 0; i < b.N; i++ { + _, _ = controller.Admit(ctx, admissionpb.NormalPri, time.Time{}, ct1s1) + controller.DeductTokens(ctx, admissionpb.NormalPri, kvflowcontrol.Tokens(1 /* 1b */), t1s1) + controller.ReturnTokens(ctx, admissionpb.NormalPri, kvflowcontrol.Tokens(1 /* 1b */), t1s1) + } +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go index 0a3576ceb285..1e2aa816567e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go @@ -28,6 +28,10 @@ import ( // entries to specific nodes, and (ii) to read pending dispatches. type Dispatch struct { mu struct { + // TODO(irfansharif,aaditya): On kv0/enc=false/nodes=3/cpu=96 this mutex + // is responsible for ~3.7% of the mutex contention. Look to address it + // as part of #104154. Perhaps shard this mutex by node ID? Or use a + // sync.Map instead? syncutil.Mutex // outbox maintains pending dispatches on a per-node basis. outbox map[roachpb.NodeID]dispatches @@ -153,6 +157,10 @@ func (d *Dispatch) PendingDispatchFor( var entries []kvflowcontrolpb.AdmittedRaftLogEntries for key, dispatch := range d.mu.outbox[nodeID] { + // TODO(irfansharif,aaditya): This contributes to 0.5% of alloc_objects + // under kv0/enc=false/nodes=3/cpu=96. Maybe address it as part of + // #104154; we're simply copying things over. Maybe use a sync.Pool here + // and around the outbox map? entries = append(entries, kvflowcontrolpb.AdmittedRaftLogEntries{ RangeID: key.RangeID, StoreID: key.StoreID, diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index 40efbcbf0d6a..75f1c3999a17 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -170,6 +170,11 @@ func (h *Handle) deductTokensForInner( // Only deduct tokens if we're able to track them for subsequent // returns. We risk leaking flow tokens otherwise. h.controller.DeductTokens(ctx, pri, tokens, c.Stream()) + + // TODO(irfansharif,aaditya): This accounts for 0.4% of + // alloc_objects when running kv0/enc=false/nodes=3/cpu=9. Except + // this return type is not used in production code. Clean it up as + // part of #104154. streams = append(streams, c.Stream()) } } @@ -211,14 +216,19 @@ func (h *Handle) ReturnTokensUpto( // instantiated. stream.TenantID = h.tenantID } + + // TODO(irfansharif,aaditya): This mutex still shows up in profiles, ~0.3% + // for kv0/enc=false/nodes=3/cpu=9. Maybe clean it up as part of #104154. h.mu.Lock() - defer h.mu.Unlock() if h.mu.closed { + h.mu.Unlock() log.Errorf(ctx, "operating on a closed handle") return } tokens := h.mu.perStreamTokenTracker[stream].Untrack(ctx, pri, upto) + h.mu.Unlock() + h.controller.ReturnTokens(ctx, pri, tokens, stream) } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go index b507ff26eee8..1cfc39afc235 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go @@ -29,6 +29,10 @@ import ( // admissionpb.WorkPriority, for replication along an individual // kvflowcontrol.Stream. type Tracker struct { + // TODO(irfansharif,aaditya): Everytime we track something, we incur a map + // assignment (shows up in CPU profiles). We could introduce a struct that + // internally embeds this list of tracked deductions, and append there + // instead. Do this as part of #104154. trackedM map[admissionpb.WorkPriority][]tracked // lowerBound tracks on a per-stream basis the log position below which @@ -110,11 +114,15 @@ func (dt *Tracker) Track( return false } } + + // TODO(irfansharif,aaditya): The tracked instances here make up about ~0.4% + // of allocations under kv0/enc=false/nodes=3/cpu=9. Maybe clean it up as + // part of #104154, by using a sync.Pool perhaps. dt.trackedM[pri] = append(dt.trackedM[pri], tracked{ tokens: tokens, position: pos, }) - if log.ExpensiveLogEnabled(ctx, 1) { + if log.V(1) { log.Infof(ctx, "tracking %s flow control tokens for pri=%s stream=%s pos=%s", tokens, pri, dt.stream, pos) } @@ -155,7 +163,7 @@ func (dt *Tracker) Untrack( trackedBefore := len(dt.trackedM[pri]) dt.trackedM[pri] = dt.trackedM[pri][untracked:] - if log.ExpensiveLogEnabled(ctx, 1) { + if log.V(1) { remaining := "" if len(dt.trackedM[pri]) > 0 { remaining = fmt.Sprintf(" (%s, ...)", dt.trackedM[pri][0].tokens) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 0db457c010d8..600fc9839963 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -816,10 +816,10 @@ func (t *RaftTransport) processQueue( continue // nothing to do } - // TODO(irfansharif): There's no limit on how many pending + // TODO(irfansharif,aaditya): There's no limit on how many pending // dispatches are going to be attached to the outgoing raft // messages, both here and above. It can be excessive -- limit this - // by some count/byte policy as part of #95563. + // by some count/byte policy as part of #104154. req := newRaftMessageRequest() maybeAnnotateWithAdmittedRaftLogEntries(req, pendingDispatches) batch.Requests = append(batch.Requests, *req) diff --git a/pkg/server/admission.go b/pkg/server/admission.go index e738c6ad1eb4..9b957f81fe9c 100644 --- a/pkg/server/admission.go +++ b/pkg/server/admission.go @@ -43,6 +43,8 @@ func (a *admittedLogEntryAdaptor) AdmittedLogEntry( rangeID roachpb.RangeID, pos admission.LogPosition, ) { + // TODO(irfansharif,aaditya): This contributes to a high count of + // inuse_objects. Look to address it as part of #104154. a.dispatchWriter.Dispatch(ctx, origin, kvflowcontrolpb.AdmittedRaftLogEntries{ RangeID: rangeID, AdmissionPriority: int32(pri), From 6dba9cc3daf1132d8224dabbd0fb005bea01811d Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Wed, 6 Sep 2023 11:09:40 -0400 Subject: [PATCH 2/3] admission: add metric for bypassed IO admission work Part of #82743. We introduce an admission.granter.io_tokens_bypassed.kv metric, that tracks the total number of tokens taken by work bypassing admission control. For example, follower writes without flow control. Aside: #109640 ripped out a tokens-taken-without-permission metric that was supposed to capture some of this, but even for standard admission work we'd routinely exercise that code path. When admitting work, we take 1 token, and later take the remaining without permission. Release note: None --- pkg/util/admission/grant_coordinator.go | 27 +++++++++++-------- pkg/util/admission/granter.go | 10 +++++-- pkg/util/admission/granter_test.go | 10 ++++--- .../replicated_write_admission_test.go | 2 +- pkg/util/admission/work_queue.go | 7 ++++- pkg/util/admission/work_queue_test.go | 2 +- 6 files changed, 38 insertions(+), 20 deletions(-) diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index ba8b25e61961..a2d5b0f05576 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -54,8 +54,9 @@ type StoreGrantCoordinators struct { kvIOTokensExhaustedDuration *metric.Counter kvIOTokensAvailable *metric.Gauge kvElasticIOTokensAvailable *metric.Gauge - kvIOTotalTokensTaken *metric.Counter - kvIOTotalTokensReturned *metric.Counter + kvIOTokensTaken *metric.Counter + kvIOTokensReturned *metric.Counter + kvIOTokensBypassed *metric.Counter l0CompactedBytes *metric.Counter l0TokensProduced *metric.Counter @@ -172,8 +173,8 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) ioTokensExhaustedDurationMetric: sgc.kvIOTokensExhaustedDuration, availableTokensMetric: sgc.kvIOTokensAvailable, availableElasticTokensMetric: sgc.kvElasticIOTokensAvailable, - tokensTakenMetric: sgc.kvIOTotalTokensTaken, - tokensReturnedMetric: sgc.kvIOTotalTokensReturned, + tokensTakenMetric: sgc.kvIOTokensTaken, + tokensReturnedMetric: sgc.kvIOTokensReturned, } kvg.coordMu.availableIOTokens = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval() kvg.coordMu.availableElasticIOTokens = kvg.coordMu.availableIOTokens @@ -203,6 +204,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) opts, sgc.knobs, sgc.onLogEntryAdmitted, + sgc.kvIOTokensBypassed, &coord.mu.Mutex, ) coord.queues[KVWork] = storeReq @@ -380,7 +382,7 @@ type makeRequesterFunc func( type makeStoreRequesterFunc func( _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, - onLogEntryAdmitted OnLogEntryAdmitted, coordMu *syncutil.Mutex, + onLogEntryAdmitted OnLogEntryAdmitted, ioTokensBypassedMetric *metric.Counter, coordMu *syncutil.Mutex, ) storeRequester // NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a @@ -470,8 +472,9 @@ func makeStoresGrantCoordinators( settings: st, makeStoreRequesterFunc: makeStoreRequester, kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, - kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken, - kvIOTotalTokensReturned: metrics.KVIOTotalTokensReturned, + kvIOTokensTaken: metrics.KVIOTokensTaken, + kvIOTokensReturned: metrics.KVIOTokensReturned, + kvIOTokensBypassed: metrics.KVIOTokensBypassed, kvIOTokensAvailable: metrics.KVIOTokensAvailable, kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable, l0CompactedBytes: metrics.L0CompactedBytes, @@ -1019,8 +1022,9 @@ type GrantCoordinatorMetrics struct { KVSlotAdjusterDecrements *metric.Counter // TODO(banabrick): Make these metrics per store. KVIOTokensExhaustedDuration *metric.Counter - KVIOTotalTokensTaken *metric.Counter - KVIOTotalTokensReturned *metric.Counter + KVIOTokensTaken *metric.Counter + KVIOTokensReturned *metric.Counter + KVIOTokensBypassed *metric.Counter KVIOTokensAvailable *metric.Gauge KVElasticIOTokensAvailable *metric.Gauge L0CompactedBytes *metric.Counter @@ -1044,8 +1048,9 @@ func makeGrantCoordinatorMetrics() GrantCoordinatorMetrics { KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration), SQLLeafStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementLeafStartWork), usedSlots)), SQLRootStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementRootStartWork), usedSlots)), - KVIOTotalTokensTaken: metric.NewCounter(kvIOTotalTokensTaken), - KVIOTotalTokensReturned: metric.NewCounter(kvIOTotalTokensReturned), + KVIOTokensTaken: metric.NewCounter(kvIOTokensTaken), + KVIOTokensReturned: metric.NewCounter(kvIOTokensReturned), + KVIOTokensBypassed: metric.NewCounter(kvIOTokensBypassed), KVIOTokensAvailable: metric.NewGauge(kvIOTokensAvailable), KVElasticIOTokensAvailable: metric.NewGauge(kvElasticIOTokensAvailable), L0CompactedBytes: metric.NewCounter(l0CompactedBytes), diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index 63bb6401d3ea..87a56671ae56 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -739,18 +739,24 @@ var ( Measurement: "Microseconds", Unit: metric.Unit_COUNT, } - kvIOTotalTokensTaken = metric.Metadata{ + kvIOTokensTaken = metric.Metadata{ Name: "admission.granter.io_tokens_taken.kv", Help: "Total number of tokens taken", Measurement: "Tokens", Unit: metric.Unit_COUNT, } - kvIOTotalTokensReturned = metric.Metadata{ + kvIOTokensReturned = metric.Metadata{ Name: "admission.granter.io_tokens_returned.kv", Help: "Total number of tokens returned", Measurement: "Tokens", Unit: metric.Unit_COUNT, } + kvIOTokensBypassed = metric.Metadata{ + Name: "admission.granter.io_tokens_bypassed.kv", + Help: "Total number of tokens taken by work bypassing admission control (for example, follower writes without flow control)", + Measurement: "Tokens", + Unit: metric.Unit_COUNT, + } kvIOTokensAvailable = metric.Metadata{ Name: "admission.granter.io_tokens_available.kv", Help: "Number of tokens available", diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index 7c0bc610d877..9ddc92b77a14 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -112,7 +112,7 @@ func TestGranterBasic(t *testing.T) { makeStoreRequesterFunc: func( ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, - _ OnLogEntryAdmitted, _ *syncutil.Mutex, + _ OnLogEntryAdmitted, _ *metric.Counter, _ *syncutil.Mutex, ) storeRequester { makeTestRequester := func(wc admissionpb.WorkClass) *testRequester { req := &testRequester{ @@ -140,8 +140,9 @@ func TestGranterBasic(t *testing.T) { kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, kvIOTokensAvailable: metrics.KVIOTokensAvailable, kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable, - kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken, - kvIOTotalTokensReturned: metrics.KVIOTotalTokensReturned, + kvIOTokensTaken: metrics.KVIOTokensTaken, + kvIOTokensReturned: metrics.KVIOTokensReturned, + kvIOTokensBypassed: metrics.KVIOTokensBypassed, l0CompactedBytes: metrics.L0CompactedBytes, l0TokensProduced: metrics.L0TokensProduced, workQueueMetrics: workQueueMetrics, @@ -324,7 +325,8 @@ func TestStoreCoordinators(t *testing.T) { makeRequesterFunc: makeRequesterFunc, makeStoreRequesterFunc: func( ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs, _ OnLogEntryAdmitted, _ *syncutil.Mutex) storeRequester { + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs, _ OnLogEntryAdmitted, + _ *metric.Counter, _ *syncutil.Mutex) storeRequester { reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts) reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts) str := &storeTestRequester{} diff --git a/pkg/util/admission/replicated_write_admission_test.go b/pkg/util/admission/replicated_write_admission_test.go index 7fd37551e8fd..a4b2884fb8ba 100644 --- a/pkg/util/admission/replicated_write_admission_test.go +++ b/pkg/util/admission/replicated_write_admission_test.go @@ -121,7 +121,7 @@ func TestReplicatedWriteAdmission(t *testing.T) { tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass], }, - st, metrics, opts, knobs, &noopOnLogEntryAdmitted{}, &mockCoordMu, + st, metrics, opts, knobs, &noopOnLogEntryAdmitted{}, metric.NewCounter(metric.Metadata{}), &mockCoordMu, ).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.ElasticWorkClass] diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index d3f71be7365c..a8cf86a0ff7a 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -1853,6 +1853,8 @@ type StoreWorkQueue struct { settings *cluster.Settings onLogEntryAdmitted OnLogEntryAdmitted + ioTokensBypassed *metric.Counter + knobs *TestingKnobs } @@ -2072,7 +2074,8 @@ func (q *StoreWorkQueue) BypassedWorkDone(workCount int64, doneInfo StoreWorkDon q.updateStoreStatsAfterWorkDone(uint64(workCount), doneInfo, true) // Since we have no control over such work, we choose to count it as // regularWorkClass. - _ = q.granters[admissionpb.RegularWorkClass].storeWriteDone(0, doneInfo) + additionalTokensTaken := q.granters[admissionpb.RegularWorkClass].storeWriteDone(0 /* originalTokens */, doneInfo) + q.ioTokensBypassed.Inc(additionalTokensTaken) } // StatsToIgnore is called for range snapshot ingestion -- see the comment in @@ -2143,6 +2146,7 @@ func makeStoreWorkQueue( opts workQueueOptions, knobs *TestingKnobs, onLogEntryAdmitted OnLogEntryAdmitted, + ioTokensBypassedMetric *metric.Counter, coordMu *syncutil.Mutex, ) storeRequester { if knobs == nil { @@ -2160,6 +2164,7 @@ func makeStoreWorkQueue( timeSource: opts.timeSource, settings: settings, onLogEntryAdmitted: onLogEntryAdmitted, + ioTokensBypassed: ioTokensBypassedMetric, } opts.usesAsyncAdmit = true diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index 1c30b9c5a3f5..968b9766bdc4 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -545,7 +545,7 @@ func TestStoreWorkQueueBasic(t *testing.T) { tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass], }, - st, metrics, opts, nil /* testing knobs */, &noopOnLogEntryAdmitted{}, &mockCoordMu).(*StoreWorkQueue) + st, metrics, opts, nil /* testing knobs */, &noopOnLogEntryAdmitted{}, metric.NewCounter(metric.Metadata{}), &mockCoordMu).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass] wrkMap.resetMap() From 99fde7d0a314179cc615df9eaaaed0ceaf28382e Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Wed, 6 Sep 2023 00:01:20 -0400 Subject: [PATCH 3/3] privilege: automate generation of ByName map This patch automates the process of generating the `ByName` map so that any newly added privileges will automatically be included. Release note: None --- pkg/sql/privilege/BUILD.bazel | 1 + pkg/sql/privilege/privilege.go | 40 +++++------------------------ pkg/sql/privilege/privilege_test.go | 12 +++++++++ 3 files changed, 19 insertions(+), 34 deletions(-) diff --git a/pkg/sql/privilege/BUILD.bazel b/pkg/sql/privilege/BUILD.bazel index e4a1adf1dbfe..41c50b216655 100644 --- a/pkg/sql/privilege/BUILD.bazel +++ b/pkg/sql/privilege/BUILD.bazel @@ -31,6 +31,7 @@ go_test( ":privilege", "//pkg/util/leaktest", "//pkg/util/log", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/privilege/privilege.go b/pkg/sql/privilege/privilege.go index 677420cf6ffa..06b7245c042e 100644 --- a/pkg/sql/privilege/privilege.go +++ b/pkg/sql/privilege/privilege.go @@ -177,40 +177,8 @@ func (k Kind) IsSetIn(bits uint64) bool { return bits&k.Mask() != 0 } -// ByName is a map of string -> kind value. -var ByName = map[string]Kind{ - "ALL": ALL, - "CHANGEFEED": CHANGEFEED, - "CONNECT": CONNECT, - "CREATE": CREATE, - "DROP": DROP, - "SELECT": SELECT, - "INSERT": INSERT, - "DELETE": DELETE, - "UPDATE": UPDATE, - "ZONECONFIG": ZONECONFIG, - "USAGE": USAGE, - "RULE": RULE, - "MODIFYCLUSTERSETTING": MODIFYCLUSTERSETTING, - "EXTERNALCONNECTION": EXTERNALCONNECTION, - "VIEWACTIVITY": VIEWACTIVITY, - "VIEWACTIVITYREDACTED": VIEWACTIVITYREDACTED, - "VIEWCLUSTERSETTING": VIEWCLUSTERSETTING, - "CANCELQUERY": CANCELQUERY, - "NOSQLLOGIN": NOSQLLOGIN, - "EXECUTE": EXECUTE, - "VIEWCLUSTERMETADATA": VIEWCLUSTERMETADATA, - "VIEWDEBUG": VIEWDEBUG, - "BACKUP": BACKUP, - "RESTORE": RESTORE, - "EXTERNALIOIMPLICITACCESS": EXTERNALIOIMPLICITACCESS, - "VIEWJOB": VIEWJOB, - "MODIFYSQLCLUSTERSETTING": MODIFYSQLCLUSTERSETTING, - "REPLICATION": REPLICATION, - "MANAGETENANT": MANAGETENANT, - "VIEWSYSTEMTABLE": VIEWSYSTEMTABLE, - "CREATEROLE": CREATEROLE, -} +// ByName is a map of string -> kind value. It is populated by init. +var ByName map[string]Kind // List is a list of privileges. type List []Kind @@ -500,10 +468,14 @@ type Object interface { } func init() { + AllPrivileges = make([]Kind, 0, largestKind) + ByName = make(map[string]Kind) + for kind := ALL; kind <= largestKind; kind++ { if isDeprecatedKind[kind] { continue } AllPrivileges = append(AllPrivileges, kind) + ByName[kind.String()] = kind } } diff --git a/pkg/sql/privilege/privilege_test.go b/pkg/sql/privilege/privilege_test.go index 0a406316918e..226ffe635ccb 100644 --- a/pkg/sql/privilege/privilege_test.go +++ b/pkg/sql/privilege/privilege_test.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" ) func TestPrivilegeDecode(t *testing.T) { @@ -59,3 +60,14 @@ func TestPrivilegeDecode(t *testing.T) { } } } + +// TestByNameHasAllPrivileges verifies that every privilege is present in ByName. +func TestByNameHasAllPrivileges(t *testing.T) { + defer leaktest.AfterTest(t)() + + for _, kind := range privilege.AllPrivileges { + resolvedKind, ok := privilege.ByName[kind.String()] + require.True(t, ok) + require.Equal(t, kind, resolvedKind) + } +}