diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index 0046a858f26a..c16bfca0ac74 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -307,7 +307,8 @@ type raftAdmissionMetaKey struct{} // ContextWithMeta returns a Context wrapping the supplied raft admission meta, // if any. // -// TODO(irfansharif): This causes a heap allocation. Revisit as part of #95563. +// TODO(irfansharif,aaditya): This causes a heap allocation. Revisit as part of +// #104154. func ContextWithMeta(ctx context.Context, meta *kvflowcontrolpb.RaftAdmissionMeta) context.Context { if meta != nil { ctx = context.WithValue(ctx, raftAdmissionMetaKey{}, meta) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index b6ac9307674a..a4980e4aff88 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "sort" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" @@ -68,7 +69,8 @@ type Controller struct { // streams get closed permanently (tenants get deleted, nodes removed) // or when completely inactive (no tokens deducted/returned over 30+ // minutes), clear these out. - buckets map[kvflowcontrol.Stream]*bucket + buckets sync.Map // kvflowcontrol.Stream => *bucket + bucketCount int } metrics *metrics clock *hlc.Clock @@ -86,38 +88,40 @@ func New(registry *metric.Registry, settings *cluster.Settings, clock *hlc.Clock regularTokens := kvflowcontrol.Tokens(regularTokensPerStream.Get(&settings.SV)) elasticTokens := kvflowcontrol.Tokens(elasticTokensPerStream.Get(&settings.SV)) - c.mu.limit = map[admissionpb.WorkClass]kvflowcontrol.Tokens{ + c.mu.limit = tokensPerWorkClass{ regular: regularTokens, elastic: elasticTokens, } - c.mu.buckets = make(map[kvflowcontrol.Stream]*bucket) + c.mu.buckets = sync.Map{} regularTokensPerStream.SetOnChange(&settings.SV, func(ctx context.Context) { c.mu.Lock() defer c.mu.Unlock() - before := tokensPerWorkClass{ - regular: c.mu.limit[regular], - elastic: c.mu.limit[elastic], - } + before := c.mu.limit now := tokensPerWorkClass{ regular: kvflowcontrol.Tokens(regularTokensPerStream.Get(&settings.SV)), elastic: kvflowcontrol.Tokens(elasticTokensPerStream.Get(&settings.SV)), } adjustment := tokensPerWorkClass{ - regular: now[regular] - before[regular], - elastic: now[elastic] - before[elastic], + regular: now.regular - before.regular, + elastic: now.elastic - before.elastic, } c.mu.limit = now - for _, b := range c.mu.buckets { + c.mu.buckets.Range(func(_, value any) bool { + // NB: We're holding the controller mutex here, which guards against + // new buckets being added, synchronization we don't get out of + // sync.Map.Range() directly. + b := value.(*bucket) b.mu.Lock() - b.mu.tokens[regular] += adjustment[regular] - b.mu.tokens[elastic] += adjustment[elastic] + b.mu.tokensPerWorkClass.regular += adjustment.regular + b.mu.tokensPerWorkClass.elastic += adjustment.elastic b.mu.Unlock() c.metrics.onTokenAdjustment(adjustment) - if adjustment[regular] > 0 || adjustment[elastic] > 0 { + if adjustment.regular > 0 || adjustment.elastic > 0 { b.signal() // signal a waiter, if any } - } + return true + }) }) c.metrics = newMetrics(c) registry.AddMetricStruct(c.metrics) @@ -143,9 +147,7 @@ func (c *Controller) Admit( logged := false tstart := c.clock.PhysicalTime() for { - c.mu.Lock() - b := c.getBucketLocked(connection.Stream()) - c.mu.Unlock() + b := c.getBucket(connection.Stream()) tokens := b.tokens(class) // In addition to letting requests through when there are tokens @@ -153,7 +155,7 @@ func (c *Controller) Admit( // applying flow control to their specific work class. bypass := c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass if tokens > 0 || bypass { - if log.ExpensiveLogEnabled(ctx, 2) { + if log.V(2) { log.Infof(ctx, "admitted request (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)", pri, connection.Stream(), tokens, c.clock.PhysicalTime().Sub(tstart), c.mode()) } @@ -184,7 +186,7 @@ func (c *Controller) Admit( return true, nil } - if !logged && log.ExpensiveLogEnabled(ctx, 2) { + if !logged && log.V(2) { log.Infof(ctx, "waiting for flow tokens (pri=%s stream=%s tokens=%s)", pri, connection.Stream(), tokens) logged = true @@ -245,16 +247,20 @@ func (c *Controller) Inspect(ctx context.Context) []kvflowinspectpb.Stream { defer c.mu.Unlock() var streams []kvflowinspectpb.Stream - for stream, b := range c.mu.buckets { - b.mu.Lock() + c.mu.buckets.Range(func(key, value any) bool { + stream := key.(kvflowcontrol.Stream) + b := value.(*bucket) + + b.mu.RLock() streams = append(streams, kvflowinspectpb.Stream{ TenantID: stream.TenantID, StoreID: stream.StoreID, AvailableRegularTokens: int64(b.tokensLocked(regular)), AvailableElasticTokens: int64(b.tokensLocked(elastic)), }) - b.mu.Unlock() - } + b.mu.RUnlock() + return true + }) sort.Slice(streams, func(i, j int) bool { // for determinism if streams[i].TenantID != streams[j].TenantID { return streams[i].TenantID.ToUint64() < streams[j].TenantID.ToUint64() @@ -272,8 +278,8 @@ func (c *Controller) InspectStream( return kvflowinspectpb.Stream{ TenantID: stream.TenantID, StoreID: stream.StoreID, - AvailableRegularTokens: int64(tokens[regular]), - AvailableElasticTokens: int64(tokens[elastic]), + AvailableRegularTokens: int64(tokens.regular), + AvailableElasticTokens: int64(tokens.elastic), } } @@ -285,31 +291,46 @@ func (c *Controller) adjustTokens( ) { class := admissionpb.WorkClassFromPri(pri) - c.mu.Lock() - b := c.getBucketLocked(stream) - c.mu.Unlock() + // TODO(irfansharif,aaditya): Double check that there are no more + // alloc_objects (for the tokensPerWorkClass instances being bussed around + // below) when running kv0/enc=false/nodes=3/cpu=96. Do this as part of + // #104154. + b := c.getBucket(stream) adjustment, unaccounted := b.adjust(ctx, class, delta, c.mu.limit) c.metrics.onTokenAdjustment(adjustment) - c.metrics.onUnaccounted(unaccounted) - if adjustment[regular] > 0 || adjustment[elastic] > 0 { + if unaccounted.regular > 0 || unaccounted.elastic > 0 { + c.metrics.onUnaccounted(unaccounted) + } + if adjustment.regular > 0 || adjustment.elastic > 0 { b.signal() // signal a waiter, if any } - if log.ExpensiveLogEnabled(ctx, 2) { - b.mu.Lock() + if log.V(2) { + b.mu.RLock() log.Infof(ctx, "adjusted flow tokens (pri=%s stream=%s delta=%s): regular=%s elastic=%s", pri, stream, delta, b.tokensLocked(regular), b.tokensLocked(elastic)) - b.mu.Unlock() + b.mu.RUnlock() } } -func (c *Controller) getBucketLocked(stream kvflowcontrol.Stream) *bucket { - b, ok := c.mu.buckets[stream] +func (c *Controller) getBucket(stream kvflowcontrol.Stream) *bucket { + // NB: sync.map is more expensive CPU wise as per BenchmarkController + // for reads, ~250ns vs. ~350ns, though better for mutex contention when + // looking at kv0/enc=false/nodes=3/cpu=9. The sync.Map does show up in CPU + // profiles more prominently though. If we want to go back to it being a + // mutex-backed map, we could use a read-Lock when trying to read the bucket + // and then swapping for a write-lock when optionally creating the bucket. + b, ok := c.mu.buckets.Load(stream) if !ok { - b = newBucket(c.mu.limit) - c.mu.buckets[stream] = b + c.mu.Lock() + var loaded bool + b, loaded = c.mu.buckets.LoadOrStore(stream, newBucket(c.mu.limit)) + if !loaded { + c.mu.bucketCount += 1 + } + c.mu.Unlock() } - return b + return b.(*bucket) } // bucket holds flow tokens for {regular,elastic} traffic over a @@ -317,8 +338,8 @@ func (c *Controller) getBucketLocked(stream kvflowcontrol.Stream) *bucket { // returning and waiting for flow tokens. type bucket struct { mu struct { - syncutil.Mutex - tokens tokensPerWorkClass + syncutil.RWMutex + tokensPerWorkClass tokensPerWorkClass } // Waiting requests do so by waiting on signalCh without holding mutexes. @@ -336,25 +357,25 @@ type bucket struct { signalCh chan struct{} } -func newBucket(t tokensPerWorkClass) *bucket { +func newBucket(tokensPerWorkClass tokensPerWorkClass) *bucket { b := bucket{ signalCh: make(chan struct{}, 1), } - b.mu.tokens = map[admissionpb.WorkClass]kvflowcontrol.Tokens{ - regular: t[regular], - elastic: t[elastic], - } + b.mu.tokensPerWorkClass = tokensPerWorkClass return &b } func (b *bucket) tokens(wc admissionpb.WorkClass) kvflowcontrol.Tokens { - b.mu.Lock() - defer b.mu.Unlock() + b.mu.RLock() + defer b.mu.RUnlock() return b.tokensLocked(wc) } func (b *bucket) tokensLocked(wc admissionpb.WorkClass) kvflowcontrol.Tokens { - return b.mu.tokens[wc] + if wc == regular { + return b.mu.tokensPerWorkClass.regular + } + return b.mu.tokensPerWorkClass.elastic } func (b *bucket) signal() { @@ -376,52 +397,76 @@ func (b *bucket) adjust( ) (adjustment, unaccounted tokensPerWorkClass) { b.mu.Lock() defer b.mu.Unlock() - - unaccounted = tokensPerWorkClass{ - regular: 0, - elastic: 0, - } - - before := tokensPerWorkClass{ - regular: b.mu.tokens[regular], - elastic: b.mu.tokens[elastic], - } + // TODO(irfansharif,aaditya): On kv0/enc=false/nodes=3/cpu=96 this mutex is + // responsible for ~1.8% of the mutex contention. Maybe address it as part + // of #104154. We want to effectively increment two values but cap each at + // some limit, and when incrementing, figure out what the adjustment was. + // What if reads always capped it at the limit? And when incrementing + // atomically by +delta, if we're over the limit, since we tried to increase + // the value by +delta, at most we need to adjust back down by -delta. + // Something like it. + // + // var c int64 = 0 + // var limit int64 = rand.Int63n(10000000000) + // for i := 0; i < 50; i++ { + // go func() { + // for j := 0; j < 2000; j++ { + // delta := rand.Int63() + // v := atomic.AddInt64(&c, delta) + // if v > limit { + // overlimit := v - limit + // var adjustment int64 = overlimit + // if delta < overlimit { + // adjustment = delta + // } + // n := atomic.AddInt64(&c, -adjustment) + // fmt.Printf("%d > %d by %d, adjusted by %d to %d)\n", + // v, limit, v-limit, -adjustment, n) + // } + // } + // }() + // } + + unaccounted = tokensPerWorkClass{} + before := b.mu.tokensPerWorkClass switch class { case elastic: // Elastic {deductions,returns} only affect elastic flow tokens. - b.mu.tokens[class] += delta - if delta > 0 && b.mu.tokens[class] > limit[class] { - unaccounted[class] = b.mu.tokens[class] - limit[class] - b.mu.tokens[class] = limit[class] // enforce ceiling + b.mu.tokensPerWorkClass.elastic += delta + if delta > 0 && b.mu.tokensPerWorkClass.elastic > limit.elastic { + unaccounted.elastic = b.mu.tokensPerWorkClass.elastic - limit.elastic + b.mu.tokensPerWorkClass.elastic = limit.elastic // enforce ceiling } case regular: - b.mu.tokens[class] += delta - if delta > 0 && b.mu.tokens[class] > limit[class] { - unaccounted[class] = b.mu.tokens[class] - limit[class] - b.mu.tokens[class] = limit[class] // enforce ceiling + b.mu.tokensPerWorkClass.regular += delta + if delta > 0 && b.mu.tokensPerWorkClass.regular > limit.regular { + unaccounted.regular = b.mu.tokensPerWorkClass.regular - limit.regular + b.mu.tokensPerWorkClass.regular = limit.regular // enforce ceiling } - b.mu.tokens[elastic] += delta - if delta > 0 && b.mu.tokens[elastic] > limit[elastic] { - unaccounted[elastic] = b.mu.tokens[elastic] - limit[elastic] - b.mu.tokens[elastic] = limit[elastic] // enforce ceiling + b.mu.tokensPerWorkClass.elastic += delta + if delta > 0 && b.mu.tokensPerWorkClass.elastic > limit.elastic { + unaccounted.elastic = b.mu.tokensPerWorkClass.elastic - limit.elastic + b.mu.tokensPerWorkClass.elastic = limit.elastic // enforce ceiling } } - if buildutil.CrdbTestBuild && (unaccounted[regular] != 0 || unaccounted[elastic] != 0) { + if buildutil.CrdbTestBuild && (unaccounted.regular != 0 || unaccounted.elastic != 0) { log.Fatalf(ctx, "unaccounted[regular]=%s unaccounted[elastic]=%s for class=%s delta=%s limit[regular]=%s limit[elastic]=%s", - unaccounted[regular], unaccounted[elastic], class, delta, limit[regular], limit[elastic]) + unaccounted.regular, unaccounted.elastic, class, delta, limit.regular, limit.elastic) } adjustment = tokensPerWorkClass{ - regular: b.mu.tokens[regular] - before[regular], - elastic: b.mu.tokens[elastic] - before[elastic], + regular: b.mu.tokensPerWorkClass.regular - before.regular, + elastic: b.mu.tokensPerWorkClass.elastic - before.elastic, } return adjustment, unaccounted } -type tokensPerWorkClass map[admissionpb.WorkClass]kvflowcontrol.Tokens +type tokensPerWorkClass struct { + regular, elastic kvflowcontrol.Tokens +} const ( minTokensPerStream kvflowcontrol.Tokens = 1 << 20 // 1 MiB @@ -440,16 +485,13 @@ var validateTokenRange = settings.WithValidateInt(func(b int64) error { }) func (c *Controller) getTokensForStream(stream kvflowcontrol.Stream) tokensPerWorkClass { - ret := make(map[admissionpb.WorkClass]kvflowcontrol.Tokens) - c.mu.Lock() - b := c.getBucketLocked(stream) - c.mu.Unlock() + ret := tokensPerWorkClass{} + b := c.getBucket(stream) - b.mu.Lock() - for _, wc := range []admissionpb.WorkClass{regular, elastic} { - ret[wc] = b.tokensLocked(wc) - } - b.mu.Unlock() + b.mu.RLock() + ret.regular = b.tokensLocked(regular) + ret.elastic = b.tokensLocked(elastic) + b.mu.RUnlock() return ret } @@ -488,14 +530,9 @@ func (c *Controller) TestingNonBlockingAdmit( default: } - c.mu.Lock() - b := c.getBucketLocked(connection.Stream()) - c.mu.Unlock() - - b.mu.Lock() - tokens := b.mu.tokens[class] - b.mu.Unlock() + b := c.getBucket(connection.Stream()) + tokens := b.tokens(class) if tokens <= 0 { return false } @@ -529,9 +566,7 @@ func (c *Controller) TestingMetrics() interface{} { } func (c *Controller) testingGetBucket(stream kvflowcontrol.Stream) *bucket { - c.mu.Lock() - defer c.mu.Unlock() - return c.getBucketLocked(stream) + return c.getBucket(stream) } func (b *bucket) testingSignaled(connection kvflowcontrol.ConnectedStream) func() bool { diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go index 5325e8a83f35..8097abce8d16 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -129,17 +130,20 @@ func annotateMetricTemplateWithWorkClass( } type metrics struct { - FlowTokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge - FlowTokensDeducted [admissionpb.NumWorkClasses]*metric.Counter - FlowTokensReturned [admissionpb.NumWorkClasses]*metric.Counter - FlowTokensUnaccounted [admissionpb.NumWorkClasses]*metric.Counter - RequestsWaiting [admissionpb.NumWorkClasses]*metric.Gauge - RequestsAdmitted [admissionpb.NumWorkClasses]*metric.Counter - RequestsErrored [admissionpb.NumWorkClasses]*metric.Counter - RequestsBypassed [admissionpb.NumWorkClasses]*metric.Counter - WaitDuration [admissionpb.NumWorkClasses]metric.IHistogram - TotalStreamCount [admissionpb.NumWorkClasses]*metric.Gauge - BlockedStreamCount [admissionpb.NumWorkClasses]*metric.Gauge + ElasticFlowTokensDeducted *metric.Counter + ElasticFlowTokensReturned *metric.Counter + ElasticFlowTokensUnaccounted *metric.Counter + RegularFlowTokensDeducted *metric.Counter + RegularFlowTokensReturned *metric.Counter + RegularFlowTokensUnaccounted *metric.Counter + FlowTokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge + RequestsWaiting [admissionpb.NumWorkClasses]*metric.Gauge + RequestsAdmitted [admissionpb.NumWorkClasses]*metric.Counter + RequestsErrored [admissionpb.NumWorkClasses]*metric.Counter + RequestsBypassed [admissionpb.NumWorkClasses]*metric.Counter + WaitDuration [admissionpb.NumWorkClasses]metric.IHistogram + TotalStreamCount [admissionpb.NumWorkClasses]*metric.Gauge + BlockedStreamCount [admissionpb.NumWorkClasses]*metric.Gauge } var _ metric.Struct = &metrics{} @@ -157,21 +161,35 @@ func newMetrics(c *Controller) *metrics { sum := int64(0) c.mu.Lock() defer c.mu.Unlock() - for _, b := range c.mu.buckets { + c.mu.buckets.Range(func(key, value any) bool { + b := value.(*bucket) sum += int64(b.tokens(wc)) - } + return true + }) return sum }, ) - m.FlowTokensDeducted[wc] = metric.NewCounter( - annotateMetricTemplateWithWorkClass(wc, flowTokensDeducted), - ) - m.FlowTokensReturned[wc] = metric.NewCounter( - annotateMetricTemplateWithWorkClass(wc, flowTokensReturned), - ) - m.FlowTokensUnaccounted[wc] = metric.NewCounter( - annotateMetricTemplateWithWorkClass(wc, flowTokensUnaccounted), - ) + if wc == regular { + m.RegularFlowTokensDeducted = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensDeducted), + ) + m.RegularFlowTokensReturned = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensReturned), + ) + m.RegularFlowTokensUnaccounted = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensUnaccounted), + ) + } else { + m.ElasticFlowTokensDeducted = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensDeducted), + ) + m.ElasticFlowTokensReturned = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensReturned), + ) + m.ElasticFlowTokensUnaccounted = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensUnaccounted), + ) + } m.RequestsWaiting[wc] = metric.NewGauge( annotateMetricTemplateWithWorkClass(wc, requestsWaiting), ) @@ -197,7 +215,7 @@ func newMetrics(c *Controller) *metrics { func() int64 { c.mu.Lock() defer c.mu.Unlock() - return int64(len(c.mu.buckets)) + return int64(c.mu.bucketCount) }, ) @@ -212,13 +230,16 @@ func newMetrics(c *Controller) *metrics { c.mu.Lock() defer c.mu.Unlock() - for s, wbc := range c.mu.buckets { - if wbc.tokens(wc) <= 0 { + c.mu.buckets.Range(func(key, value any) bool { + stream := key.(kvflowcontrol.Stream) + b := value.(*bucket) + + if b.tokens(wc) <= 0 { count += 1 if shouldLog { if count > 10 { - continue // cap output to 10 blocked streams + return false // cap output to 10 blocked streams } if count == 1 { buf.Reset() @@ -226,10 +247,11 @@ func newMetrics(c *Controller) *metrics { if count > 1 { buf.WriteString(", ") } - buf.WriteString(s.String()) + buf.WriteString(stream.String()) } } - } + return true + }) if shouldLog && count > 0 { log.Warningf(context.Background(), "%d blocked %s replication stream(s): %s", count, wc, buf.String()) } @@ -263,19 +285,21 @@ func (m *metrics) onErrored(class admissionpb.WorkClass, dur time.Duration) { } func (m *metrics) onTokenAdjustment(adjustment tokensPerWorkClass) { - for class, delta := range adjustment { - if delta < 0 { - m.FlowTokensDeducted[class].Inc(-int64(delta)) - } else if delta > 0 { - m.FlowTokensReturned[class].Inc(int64(delta)) - } + if adjustment.regular < 0 { + m.RegularFlowTokensDeducted.Inc(-int64(adjustment.regular)) + } else { + m.RegularFlowTokensReturned.Inc(int64(adjustment.regular)) + } + if adjustment.elastic < 0 { + m.ElasticFlowTokensDeducted.Inc(-int64(adjustment.elastic)) + } else { + m.ElasticFlowTokensReturned.Inc(int64(adjustment.elastic)) } } func (m *metrics) onUnaccounted(unaccounted tokensPerWorkClass) { - for class, delta := range unaccounted { - m.FlowTokensUnaccounted[class].Inc(int64(delta)) - } + m.RegularFlowTokensUnaccounted.Inc(int64(unaccounted.regular)) + m.ElasticFlowTokensUnaccounted.Inc(int64(unaccounted.elastic)) } // MetricStruct implements the metric.Struct interface. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go index 56f05ce9fe80..22ce7f0ab271 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go @@ -108,8 +108,8 @@ func TestFlowTokenAdjustment(t *testing.T) { var buf strings.Builder buf.WriteString(" regular | elastic\n") buf.WriteString(fmt.Sprintf(" %8s | %8s\n", - printTrimmedTokens(limit[admissionpb.RegularWorkClass]), - printTrimmedTokens(limit[admissionpb.ElasticWorkClass]), + printTrimmedTokens(limit.regular), + printTrimmedTokens(limit.elastic), )) buf.WriteString("======================================\n") for _, h := range adjustments { @@ -137,10 +137,10 @@ func (h adjustment) String() string { class := admissionpb.WorkClassFromPri(h.pri) comment := "" - if h.post[admissionpb.RegularWorkClass] <= 0 { + if h.post.regular <= 0 { comment = "regular" } - if h.post[admissionpb.ElasticWorkClass] <= 0 { + if h.post.elastic <= 0 { if len(comment) == 0 { comment = "elastic" } else { @@ -153,8 +153,8 @@ func (h adjustment) String() string { return fmt.Sprintf("%8s %7s %8s | %8s%s", printTrimmedTokens(h.delta), class, - printTrimmedTokens(h.post[admissionpb.RegularWorkClass]), - printTrimmedTokens(h.post[admissionpb.ElasticWorkClass]), + printTrimmedTokens(h.post.regular), + printTrimmedTokens(h.post.elastic), comment, ) } @@ -234,3 +234,34 @@ func (m *mockConnectedStream) Stream() kvflowcontrol.Stream { func (m *mockConnectedStream) Disconnected() <-chan struct{} { return nil } + +func BenchmarkController(b *testing.B) { + ctx := context.Background() + makeStream := func(id uint64) kvflowcontrol.Stream { + return kvflowcontrol.Stream{ + TenantID: roachpb.MustMakeTenantID(id), + StoreID: roachpb.StoreID(id), + } + } + makeConnectedStream := func(id uint64) kvflowcontrol.ConnectedStream { + return &mockConnectedStream{ + stream: makeStream(id), + } + } + + st := cluster.MakeTestingClusterSettings() + elasticTokensPerStream.Override(ctx, &st.SV, 8<<20 /* 8 MiB */) + regularTokensPerStream.Override(ctx, &st.SV, 16<<20 /* 16 MiB */) + controller := New(metric.NewRegistry(), st, hlc.NewClockForTesting(nil)) + + // Deduct some {regular,elastic} tokens from s1/t1 and verify that Inspect() + // renders the state correctly. + t1s1 := makeStream(1) + ct1s1 := makeConnectedStream(1) + + for i := 0; i < b.N; i++ { + _, _ = controller.Admit(ctx, admissionpb.NormalPri, time.Time{}, ct1s1) + controller.DeductTokens(ctx, admissionpb.NormalPri, kvflowcontrol.Tokens(1 /* 1b */), t1s1) + controller.ReturnTokens(ctx, admissionpb.NormalPri, kvflowcontrol.Tokens(1 /* 1b */), t1s1) + } +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go index 0a3576ceb285..1e2aa816567e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go @@ -28,6 +28,10 @@ import ( // entries to specific nodes, and (ii) to read pending dispatches. type Dispatch struct { mu struct { + // TODO(irfansharif,aaditya): On kv0/enc=false/nodes=3/cpu=96 this mutex + // is responsible for ~3.7% of the mutex contention. Look to address it + // as part of #104154. Perhaps shard this mutex by node ID? Or use a + // sync.Map instead? syncutil.Mutex // outbox maintains pending dispatches on a per-node basis. outbox map[roachpb.NodeID]dispatches @@ -153,6 +157,10 @@ func (d *Dispatch) PendingDispatchFor( var entries []kvflowcontrolpb.AdmittedRaftLogEntries for key, dispatch := range d.mu.outbox[nodeID] { + // TODO(irfansharif,aaditya): This contributes to 0.5% of alloc_objects + // under kv0/enc=false/nodes=3/cpu=96. Maybe address it as part of + // #104154; we're simply copying things over. Maybe use a sync.Pool here + // and around the outbox map? entries = append(entries, kvflowcontrolpb.AdmittedRaftLogEntries{ RangeID: key.RangeID, StoreID: key.StoreID, diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index 40efbcbf0d6a..75f1c3999a17 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -170,6 +170,11 @@ func (h *Handle) deductTokensForInner( // Only deduct tokens if we're able to track them for subsequent // returns. We risk leaking flow tokens otherwise. h.controller.DeductTokens(ctx, pri, tokens, c.Stream()) + + // TODO(irfansharif,aaditya): This accounts for 0.4% of + // alloc_objects when running kv0/enc=false/nodes=3/cpu=9. Except + // this return type is not used in production code. Clean it up as + // part of #104154. streams = append(streams, c.Stream()) } } @@ -211,14 +216,19 @@ func (h *Handle) ReturnTokensUpto( // instantiated. stream.TenantID = h.tenantID } + + // TODO(irfansharif,aaditya): This mutex still shows up in profiles, ~0.3% + // for kv0/enc=false/nodes=3/cpu=9. Maybe clean it up as part of #104154. h.mu.Lock() - defer h.mu.Unlock() if h.mu.closed { + h.mu.Unlock() log.Errorf(ctx, "operating on a closed handle") return } tokens := h.mu.perStreamTokenTracker[stream].Untrack(ctx, pri, upto) + h.mu.Unlock() + h.controller.ReturnTokens(ctx, pri, tokens, stream) } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go index b507ff26eee8..1cfc39afc235 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go @@ -29,6 +29,10 @@ import ( // admissionpb.WorkPriority, for replication along an individual // kvflowcontrol.Stream. type Tracker struct { + // TODO(irfansharif,aaditya): Everytime we track something, we incur a map + // assignment (shows up in CPU profiles). We could introduce a struct that + // internally embeds this list of tracked deductions, and append there + // instead. Do this as part of #104154. trackedM map[admissionpb.WorkPriority][]tracked // lowerBound tracks on a per-stream basis the log position below which @@ -110,11 +114,15 @@ func (dt *Tracker) Track( return false } } + + // TODO(irfansharif,aaditya): The tracked instances here make up about ~0.4% + // of allocations under kv0/enc=false/nodes=3/cpu=9. Maybe clean it up as + // part of #104154, by using a sync.Pool perhaps. dt.trackedM[pri] = append(dt.trackedM[pri], tracked{ tokens: tokens, position: pos, }) - if log.ExpensiveLogEnabled(ctx, 1) { + if log.V(1) { log.Infof(ctx, "tracking %s flow control tokens for pri=%s stream=%s pos=%s", tokens, pri, dt.stream, pos) } @@ -155,7 +163,7 @@ func (dt *Tracker) Untrack( trackedBefore := len(dt.trackedM[pri]) dt.trackedM[pri] = dt.trackedM[pri][untracked:] - if log.ExpensiveLogEnabled(ctx, 1) { + if log.V(1) { remaining := "" if len(dt.trackedM[pri]) > 0 { remaining = fmt.Sprintf(" (%s, ...)", dt.trackedM[pri][0].tokens) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 0db457c010d8..600fc9839963 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -816,10 +816,10 @@ func (t *RaftTransport) processQueue( continue // nothing to do } - // TODO(irfansharif): There's no limit on how many pending + // TODO(irfansharif,aaditya): There's no limit on how many pending // dispatches are going to be attached to the outgoing raft // messages, both here and above. It can be excessive -- limit this - // by some count/byte policy as part of #95563. + // by some count/byte policy as part of #104154. req := newRaftMessageRequest() maybeAnnotateWithAdmittedRaftLogEntries(req, pendingDispatches) batch.Requests = append(batch.Requests, *req) diff --git a/pkg/server/admission.go b/pkg/server/admission.go index e738c6ad1eb4..9b957f81fe9c 100644 --- a/pkg/server/admission.go +++ b/pkg/server/admission.go @@ -43,6 +43,8 @@ func (a *admittedLogEntryAdaptor) AdmittedLogEntry( rangeID roachpb.RangeID, pos admission.LogPosition, ) { + // TODO(irfansharif,aaditya): This contributes to a high count of + // inuse_objects. Look to address it as part of #104154. a.dispatchWriter.Dispatch(ctx, origin, kvflowcontrolpb.AdmittedRaftLogEntries{ RangeID: rangeID, AdmissionPriority: int32(pri),