-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathwork_queue.go
1815 lines (1678 loc) · 64 KB
/
work_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package admission
import (
"container/heap"
"context"
"fmt"
"math"
"sort"
"sync"
"time"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/redact"
)
// Use of the admission control package spans the SQL and KV layers. When
// running in a multi-tenant setting, we have per-tenant SQL-only servers and
// multi-tenant storage servers. These multi-tenant storage servers contain
// the multi-tenant KV layer, and the SQL layer for the system tenant. Most of
// the following settings are relevant to both kinds of servers (except for
// KVAdmissionControlEnabled). Only the system tenant can modify these
// settings in the storage servers, while a regular tenant can modify these
// settings for their SQL-only servers. Which is why these are typically
// TenantWritable.
// KVAdmissionControlEnabled controls whether KV server-side admission control
// is enabled.
var KVAdmissionControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"admission.kv.enabled",
"when true, work performed by the KV layer is subject to admission control",
true).WithPublic()
// KVBulkOnlyAdmissionControlEnabled controls whether user (normal and above
// priority) work is subject to admission control. If it is set to true, then
// user work will not be throttled by admission control but bulk work still will
// be. This setting is a preferable alternative to completely disabling
// admission control. It can be used reactively in cases where index backfill,
// schema modifications or other bulk operations are causing high latency due to
// io_overload on nodes.
// TODO(baptist): Find a better solution to this in v23.1.
var KVBulkOnlyAdmissionControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"admission.kv.bulk_only.enabled",
"when both admission.kv.enabled and this is true, only throttle bulk work",
false)
// SQLKVResponseAdmissionControlEnabled controls whether response processing
// in SQL, for KV requests, is enabled.
var SQLKVResponseAdmissionControlEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"admission.sql_kv_response.enabled",
"when true, work performed by the SQL layer when receiving a KV response is subject to "+
"admission control",
true).WithPublic()
// SQLSQLResponseAdmissionControlEnabled controls whether response processing
// in SQL, for DistSQL requests, is enabled.
var SQLSQLResponseAdmissionControlEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"admission.sql_sql_response.enabled",
"when true, work performed by the SQL layer when receiving a DistSQL response is subject "+
"to admission control",
true).WithPublic()
var admissionControlEnabledSettings = [numWorkKinds]*settings.BoolSetting{
KVWork: KVAdmissionControlEnabled,
SQLKVResponseWork: SQLKVResponseAdmissionControlEnabled,
SQLSQLResponseWork: SQLSQLResponseAdmissionControlEnabled,
}
// KVTenantWeightsEnabled controls whether tenant weights are enabled for KV
// admission control. This setting has no effect if admission.kv.enabled is
// false.
var KVTenantWeightsEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"admission.kv.tenant_weights.enabled",
"when true, tenant weights are enabled for KV admission control",
false).WithPublic()
// KVStoresTenantWeightsEnabled controls whether tenant weights are enabled
// for KV-stores admission control. This setting has no effect if
// admission.kv.enabled is false.
var KVStoresTenantWeightsEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"admission.kv.stores.tenant_weights.enabled",
"when true, tenant weights are enabled for KV-stores admission control",
false).WithPublic()
// EpochLIFOEnabled controls whether the adaptive epoch-LIFO scheme is enabled
// for admission control. Is only relevant when the above admission control
// settings are also set to true. Unlike those settings, which are granular
// for each kind of admission queue, this setting applies to all the queues.
// This is because we recommend that all those settings be enabled or none be
// enabled, and we don't want to carry forward unnecessarily granular
// settings.
var EpochLIFOEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"admission.epoch_lifo.enabled",
"when true, epoch-LIFO behavior is enabled when there is significant delay in admission",
false).WithPublic()
var epochLIFOEpochDuration = settings.RegisterDurationSetting(
settings.TenantWritable,
"admission.epoch_lifo.epoch_duration",
"the duration of an epoch, for epoch-LIFO admission control ordering",
epochLength,
func(v time.Duration) error {
if v < time.Millisecond {
return errors.Errorf("epoch-LIFO: epoch duration is too small")
}
return nil
}).WithPublic()
var epochLIFOEpochClosingDeltaDuration = settings.RegisterDurationSetting(
settings.TenantWritable,
"admission.epoch_lifo.epoch_closing_delta_duration",
"the delta duration before closing an epoch, for epoch-LIFO admission control ordering",
epochClosingDelta,
func(v time.Duration) error {
if v < time.Millisecond {
return errors.Errorf("epoch-LIFO: epoch closing delta is too small")
}
return nil
}).WithPublic()
var epochLIFOQueueDelayThresholdToSwitchToLIFO = settings.RegisterDurationSetting(
settings.TenantWritable,
"admission.epoch_lifo.queue_delay_threshold_to_switch_to_lifo",
"the queue delay encountered by a (tenant,priority) for switching to epoch-LIFO ordering",
maxQueueDelayToSwitchToLifo,
func(v time.Duration) error {
if v < time.Millisecond {
return errors.Errorf("epoch-LIFO: queue delay threshold is too small")
}
return nil
}).WithPublic()
// WorkInfo provides information that is used to order work within an
// WorkQueue. The WorkKind is not included as a field since an WorkQueue deals
// with a single WorkKind.
type WorkInfo struct {
// TenantID is the id of the tenant. For single-tenant clusters, this will
// always be the SystemTenantID.
TenantID roachpb.TenantID
// Priority is utilized within a tenant.
Priority admissionpb.WorkPriority
// CreateTime is equivalent to Time.UnixNano() at the creation time of this
// work or a parent work (e.g. could be the start time of the transaction,
// if this work was created as part of a transaction). It is used to order
// work within a (TenantID, Priority) pair -- earlier CreateTime is given
// preference.
CreateTime int64
// BypassAdmission allows the work to bypass admission control, but allows
// for it to be accounted for. Ignored unless TenantID is the
// SystemTenantID. It should be used for high-priority intra-KV work, and
// when KV work generates other KV work (to avoid deadlock). Ignored
// otherwise.
BypassAdmission bool
// Optional information specified only for WorkQueues where the work is tied
// to a range. This allows queued work to return early as soon as the range
// is no longer in a relevant state at this node. Currently only KVWork is
// tied to a range.
// TODO(sumeer): use these in the WorkQueue implementation.
// RangeID is the range at which this work must be performed. Optional (see
// comment above).
RangeID roachpb.RangeID
// RequiresLeaseholder is true iff the work requires the leaseholder.
// Optional (see comment above).
RequiresLeaseholder bool
// For internal use by wrapper classes. The requested tokens or slots.
requestedCount int64
}
// WorkQueue maintains a queue of work waiting to be admitted. Ordering of
// work is achieved via 2 heaps: a tenant heap orders the tenants with waiting
// work in increasing order of used slots or tokens, optionally adjusted by
// tenant weights. Within each tenant, the waiting work is ordered based on
// priority and create time. Tenants with non-zero values of used slots or
// tokens are tracked even if they have no more waiting work. Token usage is
// reset to zero every second. The choice of 1 second of memory for token
// distribution fairness is somewhat arbitrary. The same 1 second interval is
// also used to garbage collect tenants who have no waiting requests and no
// used slots or tokens.
//
// Usage example:
//
// var grantCoord *GrantCoordinator
// <initialize grantCoord>
// kvQueue := grantCoord.GetWorkQueue(KVWork)
// <hand kvQueue to the code that does kv server work>
//
// // Before starting some kv server work
// if enabled, err := kvQueue.Admit(ctx, WorkInfo{TenantID: tid, ...}); err != nil {
// return err
// }
// <do the work>
// if enabled {
// kvQueue.AdmittedWorkDone(tid)
// }
type WorkQueue struct {
ambientCtx context.Context
workKind WorkKind
granter granter
usesTokens bool
tiedToRange bool
settings *cluster.Settings
// Prevents more than one caller to be in Admit and calling tryGet or adding
// to the queue. It allows WorkQueue to release mu before calling tryGet and
// be assured that it is not competing with another Admit.
// Lock ordering is admitMu < mu.
admitMu syncutil.Mutex
mu struct {
syncutil.Mutex
// Tenants with waiting work.
tenantHeap tenantHeap
// All tenants, including those without waiting work. Periodically cleaned.
tenants map[uint64]*tenantInfo
tenantWeights struct {
mu syncutil.Mutex
// active refers to the currently active weights. mu is held for updates
// to the inactive weights, to prevent concurrent updates. After
// updating the inactive weights, it is made active by swapping with
// active, while also holding WorkQueue.mu. Therefore, reading
// tenantWeights.active does not require tenantWeights.mu. For lock
// ordering, tenantWeights.mu precedes WorkQueue.mu.
//
// The maps are lazily allocated.
active, inactive map[uint64]uint32
}
// The highest epoch that is closed.
closedEpochThreshold int64
// Following values are copied from the cluster settings.
epochLengthNanos int64
epochClosingDeltaNanos int64
maxQueueDelayToSwitchToLifo time.Duration
}
logThreshold log.EveryN
metrics *WorkQueueMetrics
stopCh chan struct{}
timeSource timeutil.TimeSource
}
var _ requester = &WorkQueue{}
type workQueueOptions struct {
usesTokens bool
tiedToRange bool
// timeSource can be set to non-nil for tests. If nil,
// the timeutil.DefaultTimeSource will be used.
timeSource timeutil.TimeSource
// The epoch closing goroutine can be disabled for tests.
disableEpochClosingGoroutine bool
}
func makeWorkQueueOptions(workKind WorkKind) workQueueOptions {
switch workKind {
case KVWork:
// CPU bound KV work uses tokens. We also use KVWork for the per-store
// queues, which use tokens -- the caller overrides the usesTokens value
// in that case.
return workQueueOptions{usesTokens: false, tiedToRange: true}
case SQLKVResponseWork, SQLSQLResponseWork:
return workQueueOptions{usesTokens: true, tiedToRange: false}
case SQLStatementLeafStartWork, SQLStatementRootStartWork:
return workQueueOptions{usesTokens: false, tiedToRange: false}
default:
panic(errors.AssertionFailedf("unexpected workKind %d", workKind))
}
}
func makeWorkQueue(
ambientCtx log.AmbientContext,
workKind WorkKind,
granter granter,
settings *cluster.Settings,
metrics *WorkQueueMetrics,
opts workQueueOptions,
) requester {
q := &WorkQueue{}
initWorkQueue(q, ambientCtx, workKind, granter, settings, metrics, opts)
return q
}
func initWorkQueue(
q *WorkQueue,
ambientCtx log.AmbientContext,
workKind WorkKind,
granter granter,
settings *cluster.Settings,
metrics *WorkQueueMetrics,
opts workQueueOptions,
) {
stopCh := make(chan struct{})
timeSource := opts.timeSource
if timeSource == nil {
timeSource = timeutil.DefaultTimeSource{}
}
q.ambientCtx = ambientCtx.AnnotateCtx(context.Background())
q.workKind = workKind
q.granter = granter
q.usesTokens = opts.usesTokens
q.tiedToRange = opts.tiedToRange
q.settings = settings
q.logThreshold = log.Every(5 * time.Minute)
q.metrics = metrics
q.stopCh = stopCh
q.timeSource = timeSource
func() {
q.mu.Lock()
defer q.mu.Unlock()
q.mu.tenants = make(map[uint64]*tenantInfo)
q.sampleEpochLIFOSettingsLocked()
}()
go func() {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
q.gcTenantsAndResetTokens()
case <-stopCh:
// Channel closed.
return
}
}
}()
q.tryCloseEpoch(q.timeNow())
if !opts.disableEpochClosingGoroutine {
q.startClosingEpochs()
}
}
func isInTenantHeap(tenant *tenantInfo) bool {
// If there is some waiting work, this tenant is in tenantHeap.
return len(tenant.waitingWorkHeap) > 0 || len(tenant.openEpochsHeap) > 0
}
func (q *WorkQueue) timeNow() time.Time {
return q.timeSource.Now()
}
func (q *WorkQueue) epochLIFOEnabled() bool {
return EpochLIFOEnabled.Get(&q.settings.SV)
}
// Samples the latest cluster settings for epoch-LIFO.
func (q *WorkQueue) sampleEpochLIFOSettingsLocked() {
epochLengthNanos := int64(epochLIFOEpochDuration.Get(&q.settings.SV))
if epochLengthNanos != q.mu.epochLengthNanos {
// Reset what is closed. A proper closed value will be calculated when the
// next epoch closes. This ensures that if we are increasing the epoch
// length, we will regress what epoch number is closed. Meanwhile, all
// work subject to LIFO queueing will get queued in the openEpochsHeap,
// which is fine (we admit from there too).
q.mu.closedEpochThreshold = 0
}
q.mu.epochLengthNanos = epochLengthNanos
q.mu.epochClosingDeltaNanos = int64(epochLIFOEpochClosingDeltaDuration.Get(&q.settings.SV))
q.mu.maxQueueDelayToSwitchToLifo = epochLIFOQueueDelayThresholdToSwitchToLIFO.Get(&q.settings.SV)
}
func (q *WorkQueue) startClosingEpochs() {
go func() {
// If someone sets the epoch length to a huge value by mistake, we will
// still sample every second, so that we can adjust when they fix their
// mistake.
const maxTimerDur = time.Second
// This is the min duration we set the timer for, to avoid setting smaller
// and smaller timers, in case the timer fires slightly early.
const minTimerDur = time.Millisecond
var timer *time.Timer
for {
q.mu.Lock()
q.sampleEpochLIFOSettingsLocked()
nextCloseTime := q.nextEpochCloseTimeLocked()
q.mu.Unlock()
timeNow := q.timeNow()
timerDur := nextCloseTime.Sub(timeNow)
if timerDur > 0 {
if timerDur > maxTimerDur {
timerDur = maxTimerDur
} else if timerDur < minTimerDur {
timerDur = minTimerDur
}
if timer == nil {
timer = time.NewTimer(timerDur)
} else {
timer.Reset(timerDur)
}
select {
case <-timer.C:
case <-q.stopCh:
// Channel closed.
return
}
} else {
q.tryCloseEpoch(timeNow)
}
}
}()
}
func (q *WorkQueue) nextEpochCloseTimeLocked() time.Time {
// +2 since we need to advance the threshold by 1, and another 1 since the
// epoch closes at its end time.
timeUnixNanos :=
(q.mu.closedEpochThreshold+2)*q.mu.epochLengthNanos + q.mu.epochClosingDeltaNanos
return timeutil.Unix(0, timeUnixNanos)
}
func (q *WorkQueue) tryCloseEpoch(timeNow time.Time) {
epochLIFOEnabled := q.epochLIFOEnabled()
q.mu.Lock()
defer q.mu.Unlock()
epochClosingTimeNanos := timeNow.UnixNano() - q.mu.epochLengthNanos - q.mu.epochClosingDeltaNanos
epoch := epochForTimeNanos(epochClosingTimeNanos, q.mu.epochLengthNanos)
if epoch <= q.mu.closedEpochThreshold {
return
}
q.mu.closedEpochThreshold = epoch
doLog := q.logThreshold.ShouldLog()
for _, tenant := range q.mu.tenants {
prevThreshold := tenant.fifoPriorityThreshold
tenant.fifoPriorityThreshold =
tenant.priorityStates.getFIFOPriorityThresholdAndReset(
tenant.fifoPriorityThreshold, q.mu.epochLengthNanos, q.mu.maxQueueDelayToSwitchToLifo)
if !epochLIFOEnabled {
tenant.fifoPriorityThreshold = int(admissionpb.LowPri)
}
if tenant.fifoPriorityThreshold != prevThreshold || doLog {
logVerb := "is"
if tenant.fifoPriorityThreshold != prevThreshold {
logVerb = "changed to"
}
// TODO(sumeer): export this as a per-tenant metric somehow. We could
// start with this being a per-WorkQueue metric for only the system
// tenant. However, currently we share metrics across WorkQueues --
// specifically all the store WorkQueues share the same metric. We
// should eliminate that sharing and make those per store metrics.
log.Infof(q.ambientCtx, "%s: FIFO threshold for tenant %d %s %d",
workKindString(q.workKind), tenant.id, logVerb, tenant.fifoPriorityThreshold)
}
// Note that we are ignoring the new priority threshold and only
// dequeueing the ones that are in the closed epoch. It is possible to
// have work items that are not in the closed epoch and whose priority
// makes them no longer subject to LIFO, but they will need to wait here
// until their epochs close. This is considered acceptable since the
// priority threshold should not fluctuate rapidly.
for len(tenant.openEpochsHeap) > 0 {
work := tenant.openEpochsHeap[0]
if work.epoch > epoch {
break
}
heap.Pop(&tenant.openEpochsHeap)
heap.Push(&tenant.waitingWorkHeap, work)
}
}
}
// Admit is called when requesting admission for some work. If err!=nil, the
// request was not admitted, potentially due to the deadline being exceeded.
// The enabled return value is relevant when err=nil, and represents whether
// admission control is enabled. AdmittedWorkDone must be called iff
// enabled=true && err!=nil, and the WorkKind for this queue uses slots.
func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err error) {
enabledSetting := admissionControlEnabledSettings[q.workKind]
if enabledSetting != nil && !enabledSetting.Get(&q.settings.SV) {
return false, nil
}
if info.requestedCount == 0 {
// Callers from outside the admission package don't set requestedCount --
// these are implicitly requesting a count of 1.
info.requestedCount = 1
}
if !q.usesTokens && info.requestedCount != 1 {
panic(errors.AssertionFailedf("unexpected requestedCount: %d", info.requestedCount))
}
q.metrics.incRequested(info.Priority)
tenantID := info.TenantID.ToUint64()
// The code in this method does not use defer to unlock the mutexes because
// it needs the flexibility of selectively unlocking one of these on a
// certain code path. When changing the code, be careful in making sure the
// mutexes are properly unlocked on all code paths.
q.admitMu.Lock()
q.mu.Lock()
tenant, ok := q.mu.tenants[tenantID]
if !ok {
tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID))
q.mu.tenants[tenantID] = tenant
}
if info.BypassAdmission && roachpb.IsSystemTenantID(tenantID) && q.workKind == KVWork {
tenant.used += uint64(info.requestedCount)
if isInTenantHeap(tenant) {
q.mu.tenantHeap.fix(tenant)
}
q.mu.Unlock()
q.admitMu.Unlock()
q.granter.tookWithoutPermission(info.requestedCount)
q.metrics.incAdmitted(info.Priority)
return true, nil
}
// Work is subject to admission control.
// Tell priorityStates about this received work. We don't tell it about work
// that has bypassed admission control, since priorityStates is deciding the
// threshold for LIFO queueing based on observed admission latency.
tenant.priorityStates.requestAtPriority(info.Priority)
if len(q.mu.tenantHeap) == 0 {
// Fast-path. Try to grab token/slot.
// Optimistically update used to avoid locking again.
tenant.used += uint64(info.requestedCount)
q.mu.Unlock()
if q.granter.tryGet(info.requestedCount) {
q.admitMu.Unlock()
q.metrics.incAdmitted(info.Priority)
return true, nil
}
// Did not get token/slot.
//
// There is a race here: before q.mu is acquired, the granter could
// experience a reduction in load and call
// WorkQueue.hasWaitingRequests to see if it should grant, but since
// there is nothing in the queue that method will return false. Then the
// work here queues up even though granter has spare capacity. We could
// add additional synchronization (and complexity to the granter
// interface) to deal with this, by keeping the granter's lock
// (GrantCoordinator.mu) locked when returning from tryGrant and call
// granter again to release that lock after this work has been queued. But
// it has the downside of extending the scope of GrantCoordinator.mu.
// Instead we tolerate this race in the knowledge that GrantCoordinator
// will periodically, at a high frequency, look at the state of the
// requesters to see if there is any queued work that can be granted
// admission.
q.mu.Lock()
prevTenant := tenant
// The tenant could have been removed when using tokens. See the comment
// where the tenantInfo struct is declared.
tenant, ok = q.mu.tenants[tenantID]
if !q.usesTokens {
if !ok || prevTenant != tenant {
panic("prev tenantInfo no longer in map")
}
if tenant.used < uint64(info.requestedCount) {
panic(errors.AssertionFailedf("tenant.used %d < info.requestedCount %d",
tenant.used, info.requestedCount))
}
tenant.used -= uint64(info.requestedCount)
} else {
if !ok {
tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID))
q.mu.tenants[tenantID] = tenant
}
// Don't want to overflow tenant.used if it is already 0 because of
// being reset to 0 by the GC goroutine.
if tenant.used >= uint64(info.requestedCount) {
tenant.used -= uint64(info.requestedCount)
}
}
}
// Check for cancellation.
startTime := q.timeNow()
if ctx.Err() != nil {
// Already canceled. More likely to happen if cpu starvation is
// causing entering into the work queue to be delayed.
q.mu.Unlock()
q.admitMu.Unlock()
q.metrics.incErrored(info.Priority)
deadline, _ := ctx.Deadline()
return true,
errors.Newf("work %s deadline already expired: deadline: %v, now: %v",
workKindString(q.workKind), deadline, startTime)
}
// Push onto heap(s).
ordering := fifoWorkOrdering
if int(info.Priority) < tenant.fifoPriorityThreshold {
ordering = lifoWorkOrdering
}
work := newWaitingWork(info.Priority, ordering, info.CreateTime, info.requestedCount, startTime, q.mu.epochLengthNanos)
inTenantHeap := isInTenantHeap(tenant)
if work.epoch <= q.mu.closedEpochThreshold || ordering == fifoWorkOrdering {
heap.Push(&tenant.waitingWorkHeap, work)
} else {
heap.Push(&tenant.openEpochsHeap, work)
}
if !inTenantHeap {
heap.Push(&q.mu.tenantHeap, tenant)
}
// Else already in tenantHeap.
// Release all locks and start waiting.
q.mu.Unlock()
q.admitMu.Unlock()
q.metrics.recordStartWait(info.Priority)
defer releaseWaitingWork(work)
select {
case <-ctx.Done():
waitDur := q.timeNow().Sub(startTime)
q.mu.Lock()
// The work was cancelled, so waitDur is less than the wait time this work
// would have encountered if it actually waited until admission. However,
// this lower bound is still useful for calculating the FIFO=>LIFO switch
// since it is possible that all work at this priority is exceeding the
// deadline and being cancelled. The risk here is that if the deadlines
// are too short, we could underestimate the actual wait time.
tenant.priorityStates.updateDelayLocked(work.priority, waitDur, true /* canceled */)
if work.heapIndex == -1 {
// No longer in heap. Raced with token/slot grant.
if !q.usesTokens {
if tenant.used < uint64(info.requestedCount) {
panic(errors.AssertionFailedf("tenant.used %d < info.requestedCount %d",
tenant.used, info.requestedCount))
}
tenant.used -= uint64(info.requestedCount)
}
// Else, we don't decrement tenant.used since we don't want to race with
// the gc goroutine that will set used=0.
q.mu.Unlock()
q.granter.returnGrant(info.requestedCount)
// The channel is sent to after releasing mu, so we don't need to hold
// mu when receiving from it. Additionally, we've already called
// returnGrant so we're not holding back future grant chains if this one
// chain gets terminated.
chainID := <-work.ch
q.granter.continueGrantChain(chainID)
} else {
if work.inWaitingWorkHeap {
tenant.waitingWorkHeap.remove(work)
} else {
tenant.openEpochsHeap.remove(work)
}
if !isInTenantHeap(tenant) {
q.mu.tenantHeap.remove(tenant)
}
q.mu.Unlock()
}
q.metrics.incErrored(info.Priority)
q.metrics.recordFinishWait(info.Priority, waitDur)
deadline, _ := ctx.Deadline()
log.Eventf(ctx, "deadline expired, waited in %s queue for %v",
workKindString(q.workKind), waitDur)
return true,
errors.Newf("work %s deadline expired while waiting: deadline: %v, start: %v, dur: %v",
workKindString(q.workKind), deadline, startTime, waitDur)
case chainID, ok := <-work.ch:
if !ok {
panic(errors.AssertionFailedf("channel should not be closed"))
}
q.metrics.incAdmitted(info.Priority)
waitDur := q.timeNow().Sub(startTime)
q.metrics.recordFinishWait(info.Priority, waitDur)
if work.heapIndex != -1 {
panic(errors.AssertionFailedf("grantee should be removed from heap"))
}
log.Eventf(ctx, "admitted, waited in %s queue for %v", workKindString(q.workKind), waitDur)
q.granter.continueGrantChain(chainID)
return true, nil
}
}
// AdmittedWorkDone is used to inform the WorkQueue that some admitted work is
// finished. It must be called iff the WorkKind of this WorkQueue uses slots
// (not tokens), i.e., KVWork, SQLStatementLeafStartWork,
// SQLStatementRootStartWork.
func (q *WorkQueue) AdmittedWorkDone(tenantID roachpb.TenantID) {
if q.usesTokens {
panic(errors.AssertionFailedf("tokens should not be returned"))
}
// Single slot is allocated for the work.
q.mu.Lock()
tenant, ok := q.mu.tenants[tenantID.ToUint64()]
if !ok {
panic(errors.AssertionFailedf("tenant not found"))
}
tenant.used--
if isInTenantHeap(tenant) {
q.mu.tenantHeap.fix(tenant)
}
q.mu.Unlock()
q.granter.returnGrant(1)
}
func (q *WorkQueue) hasWaitingRequests() bool {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.mu.tenantHeap) > 0
}
func (q *WorkQueue) granted(grantChainID grantChainID) int64 {
// Reduce critical section by getting time before mutex acquisition.
now := q.timeNow()
q.mu.Lock()
if len(q.mu.tenantHeap) == 0 {
q.mu.Unlock()
return 0
}
tenant := q.mu.tenantHeap[0]
var item *waitingWork
if len(tenant.waitingWorkHeap) > 0 {
item = heap.Pop(&tenant.waitingWorkHeap).(*waitingWork)
} else {
item = heap.Pop(&tenant.openEpochsHeap).(*waitingWork)
}
waitDur := now.Sub(item.enqueueingTime)
tenant.priorityStates.updateDelayLocked(item.priority, waitDur, false /* canceled */)
tenant.used += uint64(item.requestedCount)
if isInTenantHeap(tenant) {
q.mu.tenantHeap.fix(tenant)
} else {
q.mu.tenantHeap.remove(tenant)
}
// Get the value of requestedCount before releasing the mutex, since after
// releasing Admit can notice that item is no longer in the heap and call
// releaseWaitingWork to return item to the waitingWorkPool.
requestedCount := item.requestedCount
q.mu.Unlock()
// Reduce critical section by sending on channel after releasing mutex.
item.ch <- grantChainID
return requestedCount
}
func (q *WorkQueue) gcTenantsAndResetTokens() {
q.mu.Lock()
defer q.mu.Unlock()
// With large numbers of active tenants, this iteration could hold the lock
// longer than desired. We could break this iteration into smaller parts if
// needed.
for id, info := range q.mu.tenants {
if info.used == 0 && !isInTenantHeap(info) {
delete(q.mu.tenants, id)
releaseTenantInfo(info)
} else if q.usesTokens {
info.used = 0
// All the heap members will reset used=0, so no need to change heap
// ordering.
}
}
}
// adjustTenantTokens is used internally by StoreWorkQueue. The
// additionalTokens count can be negative, in which case it is returning
// tokens. This is only for WorkQueue's own accounting -- it should not call
// into granter.
func (q *WorkQueue) adjustTenantTokens(tenantID roachpb.TenantID, additionalTokens int64) {
tid := tenantID.ToUint64()
q.mu.Lock()
defer q.mu.Unlock()
tenant, ok := q.mu.tenants[tid]
if ok {
if additionalTokens < 0 {
toReturn := uint64(-additionalTokens)
if tenant.used < toReturn {
tenant.used = 0
} else {
tenant.used -= toReturn
}
} else {
tenant.used += uint64(additionalTokens)
}
}
}
func (q *WorkQueue) String() string {
return redact.StringWithoutMarkers(q)
}
// SafeFormat implements the redact.SafeFormatter interface.
func (q *WorkQueue) SafeFormat(s redact.SafePrinter, _ rune) {
q.mu.Lock()
defer q.mu.Unlock()
s.Printf("closed epoch: %d ", q.mu.closedEpochThreshold)
s.Printf("tenantHeap len: %d", len(q.mu.tenantHeap))
if len(q.mu.tenantHeap) > 0 {
s.Printf(" top tenant: %d", q.mu.tenantHeap[0].id)
}
var ids []uint64
for id := range q.mu.tenants {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
for _, id := range ids {
tenant := q.mu.tenants[id]
s.Printf("\n tenant-id: %d used: %d, w: %d, fifo: %d", tenant.id, tenant.used,
tenant.weight, tenant.fifoPriorityThreshold)
if len(tenant.waitingWorkHeap) > 0 {
s.Printf(" waiting work heap:")
for i := range tenant.waitingWorkHeap {
var workOrdering string
if tenant.waitingWorkHeap[i].arrivalTimeWorkOrdering == lifoWorkOrdering {
workOrdering = ", lifo-ordering"
}
s.Printf(" [%d: pri: %d, ct: %d, epoch: %d, qt: %d%s]", i,
tenant.waitingWorkHeap[i].priority,
tenant.waitingWorkHeap[i].createTime/int64(time.Millisecond),
tenant.waitingWorkHeap[i].epoch,
tenant.waitingWorkHeap[i].enqueueingTime.UnixNano()/int64(time.Millisecond), workOrdering)
}
}
if len(tenant.openEpochsHeap) > 0 {
s.Printf(" open epochs heap:")
for i := range tenant.openEpochsHeap {
s.Printf(" [%d: pri: %d, ct: %d, epoch: %d, qt: %d]", i,
tenant.openEpochsHeap[i].priority,
tenant.openEpochsHeap[i].createTime/int64(time.Millisecond),
tenant.openEpochsHeap[i].epoch,
tenant.openEpochsHeap[i].enqueueingTime.UnixNano()/int64(time.Millisecond))
}
}
}
}
// Weight for tenants that are not assigned a weight. This typically applies
// to tenants which weren't on this node in the prior call to
// SetTenantWeights. Additionally, it is also the minimum tenant weight.
const defaultTenantWeight = 1
// The current cap on the weight of a tenant. We don't allow a single tenant
// to use more than cap times the number of resources of the smallest tenant.
// For KV slots, we have seen a range of slot counts from 50-200 for 16 cpu
// nodes, for a KV50 workload, depending on how we set
// admission.kv_slot_adjuster.overload_threshold. We don't want to starve
// small tenants, so the cap is currently set to 20. A more sophisticated fair
// sharing scheme would not need such a cap.
const tenantWeightCap = 20
func (q *WorkQueue) getTenantWeightLocked(tenantID uint64) uint32 {
weight, ok := q.mu.tenantWeights.active[tenantID]
if !ok {
weight = defaultTenantWeight
}
return weight
}
// SetTenantWeights sets the weight of tenants, using the provided tenant ID
// => weight map. A nil map will result in all tenants having the same weight.
func (q *WorkQueue) SetTenantWeights(tenantWeights map[uint64]uint32) {
q.mu.tenantWeights.mu.Lock()
defer q.mu.tenantWeights.mu.Unlock()
if q.mu.tenantWeights.inactive == nil {
q.mu.tenantWeights.inactive = make(map[uint64]uint32)
}
// Remove all elements from the inactive map.
for k := range q.mu.tenantWeights.inactive {
delete(q.mu.tenantWeights.inactive, k)
}
// Compute the max weight in the new map, for enforcing the tenantWeightCap.
maxWeight := uint32(1)
for _, v := range tenantWeights {
if v > maxWeight {
maxWeight = v
}
}
scaling := float64(1)
if maxWeight > tenantWeightCap {
scaling = tenantWeightCap / float64(maxWeight)
}
// Populate the weights in the inactive map.
for k, v := range tenantWeights {
w := uint32(math.Ceil(float64(v) * scaling))
if w < defaultTenantWeight {
w = defaultTenantWeight
}
q.mu.tenantWeights.inactive[k] = w
}
q.mu.Lock()
// Establish the new active map.
q.mu.tenantWeights.active, q.mu.tenantWeights.inactive =
q.mu.tenantWeights.inactive, q.mu.tenantWeights.active
// Create a slice for storing all the tenantIDs. We use this to split the
// update to the data-structures that require holding q.mu, in case there
// are 1000s of tenants (we don't want to hold q.mu for long durations).
tenantIDs := make([]uint64, len(q.mu.tenants))
i := 0
for k := range q.mu.tenants {
tenantIDs[i] = k
i++
}
q.mu.Unlock()
// Any tenants not in tenantIDs will see the latest weight when their
// tenantInfo is created. The existing ones need their weights to be
// updated.
// tenantIDs[index] represents the next tenantID that needs to be updated.
var index int
n := len(tenantIDs)
// updateNextBatch acquires q.mu and updates a batch of tenants.
updateNextBatch := func() (repeat bool) {
q.mu.Lock()
defer q.mu.Unlock()
// Arbitrary batch size of 5.
const batchSize = 5
for i := 0; i < batchSize; i++ {
if index >= n {
return false
}
tenantID := tenantIDs[index]
tenantInfo := q.mu.tenants[tenantID]
weight := q.getTenantWeightLocked(tenantID)
if tenantInfo != nil && tenantInfo.weight != weight {
tenantInfo.weight = weight
if isInTenantHeap(tenantInfo) {
q.mu.tenantHeap.fix(tenantInfo)
}
}
index++
}
return true
}
for updateNextBatch() {
}
}
// close tells the gc goroutine to stop.
func (q *WorkQueue) close() {
close(q.stopCh)
}
type workOrderingKind int8
const (
fifoWorkOrdering workOrderingKind = iota
lifoWorkOrdering
)
type priorityState struct {
priority admissionpb.WorkPriority
// maxQueueDelay includes the delay of both successfully admitted and
// canceled requests.
//
// NB: The maxQueueDelay value is an incomplete picture of delay since it
// does not have visibility into work that is still waiting in the queue.
// However, since we use the maxQueueDelay across a collection of priorities
// to set a priority threshold, we expect that usually there will be some
// work just below the priority threshold that does dequeue (with high
// latency) -- if not, it is likely that the next high priority is actually
// the one experiencing some instances of high latency. That is, it is very
// unlikely to be the case that a certain priority sees admission with no
// high latency while the next lower priority never gets work dequeued
// because of resource saturation.
maxQueueDelay time.Duration
// Count of requests that were successfully admitted (not canceled). This is
// used in concert with lowestPriorityWithRequests to detect priorities
// where work was queued but nothing was successfully admitted.
admittedCount int
}
// priorityStates tracks information about admission requests and admission
// grants at various priorities. It is used to set a priority threshold for
// LIFO queuing. There is one priorityStates per tenant, since it is embedded
// in a tenantInfo.
type priorityStates struct {
// In increasing order of priority. Expected to not have more than 10
// elements, so a linear search is fast. The slice is emptied after each
// epoch is closed.
ps []priorityState
lowestPriorityWithRequests int
}
// makePriorityStates returns an empty priorityStates, that reuses the
// ps slice.
func makePriorityStates(ps []priorityState) priorityStates {
return priorityStates{ps: ps[:0], lowestPriorityWithRequests: admissionpb.OneAboveHighPri}
}
// requestAtPriority is called when a request is received at the given
// priority.
func (ps *priorityStates) requestAtPriority(priority admissionpb.WorkPriority) {
if int(priority) < ps.lowestPriorityWithRequests {
ps.lowestPriorityWithRequests = int(priority)