Skip to content

Commit

Permalink
admission: add l0 control settings
Browse files Browse the repository at this point in the history
Part of cockroachdb#82743. We add cluster settings to control:
- smoothing alpha for byte token computations;
- reduction factor for L0 compaction tokens, based on observed
  compactions;

We've found these to be useful in internal experiments, and also when
looking to paper over L0 compaction variability effects up in AC.

Release note: None
  • Loading branch information
irfansharif committed Sep 5, 2023
1 parent 05f0064 commit ed03b0c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
5 changes: 3 additions & 2 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,9 @@ func (cg *kvStoreTokenChildGranter) storeWriteDone(
// it. The one difference is that post token adjustments, if we observe the
// granter was previously exhausted but is no longer so, we're allowed to
// admit other waiting requests.
return cg.parent.storeReplicatedWorkAdmittedLocked(
additionalTokensTaken := cg.parent.storeReplicatedWorkAdmittedLocked(
cg.workClass, originalTokens, storeReplicatedWorkAdmittedInfo(doneInfo), true /* canGrantAnother */)
return additionalTokensTaken
}

// storeReplicatedWorkAdmitted implements granterWithStoreReplicatedWorkAdmitted.
Expand Down Expand Up @@ -466,7 +467,7 @@ func (sg *kvStoreTokenGranter) subtractTokensLocked(
if count > 0 {
sg.tokensTakenMetric.Inc(count)
} else {
sg.tokensReturnedMetric.Inc(-count)
sg.tokensReturnedMetric.Inc(count)
}
}
if count > 0 && avail > 0 && sg.coordMu.availableIOTokens <= 0 {
Expand Down
31 changes: 24 additions & 7 deletions pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,23 @@ var L0MinimumSizePerSubLevel = settings.RegisterIntSetting(
"when non-zero, this indicates the minimum size that is needed to count towards one sub-level",
5<<20, settings.NonNegativeInt)

// L0CompactionAlpha is the exponential smoothing term used when measuring L0
// compactions, which in turn is used to generate IO tokens.
var L0CompactionAlpha = settings.RegisterFloatSetting(
settings.TenantWritable,
"admission.l0_compacted_alpha",
"exponential smoothing term used when measuring L0 compactions to generate IO tokens",
0.5, settings.PositiveFloat)

// L0ReductionFactor is the exponential smoothing term used when measuring L0
// compactions, which in turn is used to generate IO tokens.
var L0ReductionFactor = settings.RegisterFloatSetting(
settings.TenantWritable,
"admission.l0_reduction_factor",
"once overloaded, factor by which we reduce L0 compaction tokens based on observed compactions",
2.0,
settings.FloatWithMinimum(1.0))

// Experimental observations:
// - Sub-level count of ~40 caused a node heartbeat latency p90, p99 of 2.5s,
// 4s. With a setting that limits sub-level count to 10, before the system
Expand Down Expand Up @@ -683,10 +700,9 @@ func (io *ioLoadListener) adjustTokensInner(
}
io.l0CompactedBytes.Inc(intL0CompactedBytes)

const alpha = 0.5

// Compaction scheduling can be uneven in prioritizing L0 for compactions,
// so smooth out what is being removed by compactions.
alpha := L0CompactionAlpha.Get(&io.settings.SV)
smoothedIntL0CompactedBytes := int64(alpha*float64(intL0CompactedBytes) + (1-alpha)*float64(prev.smoothedIntL0CompactedBytes))

// Flush tokens:
Expand Down Expand Up @@ -875,6 +891,7 @@ func (io *ioLoadListener) adjustTokensInner(
// threshold.
var totalNumByteTokens int64
var smoothedCompactionByteTokens float64
l0ReductionFactor := L0ReductionFactor.Get(&io.settings.SV)

score, _ := ioThreshold.Score()
// Multiplying score by 2 for ease of calculation.
Expand Down Expand Up @@ -916,7 +933,7 @@ func (io *ioLoadListener) adjustTokensInner(
// Don't admit more byte work than we can remove via compactions.
// totalNumByteTokens tracks our goal for admission. Scale down
// since we want to get under the thresholds over time.
fTotalNumByteTokens = float64(smoothedIntL0CompactedBytes / 2.0)
fTotalNumByteTokens = float64(smoothedIntL0CompactedBytes) / l0ReductionFactor
} else if score >= 0.5 && score < 1 {
// Low load. Score in [0.5, 1). Tokens should be
// smoothedIntL0CompactedBytes at 1, and 2 * smoothedIntL0CompactedBytes
Expand All @@ -926,8 +943,8 @@ func (io *ioLoadListener) adjustTokensInner(
// Medium load. Score in [1, 2). We use linear interpolation from
// medium load to overload, to slowly give out fewer tokens as we
// move towards overload.
halfSmoothedBytes := float64(smoothedIntL0CompactedBytes / 2.0)
fTotalNumByteTokens = -score*halfSmoothedBytes + 3*halfSmoothedBytes
reducedSmoothedBytes := float64(smoothedIntL0CompactedBytes) / l0ReductionFactor
fTotalNumByteTokens = -score*reducedSmoothedBytes + 3*reducedSmoothedBytes
}
smoothedCompactionByteTokens = alpha*fTotalNumByteTokens + (1-alpha)*prev.smoothedCompactionByteTokens
if float64(math.MaxInt64) < smoothedCompactionByteTokens {
Expand Down Expand Up @@ -1057,8 +1074,8 @@ func (res adjustTokensResult) SafeFormat(p redact.SafePrinter, _ rune) {
ib(m/adjustmentInterval))
switch res.aux.tokenKind {
case compactionTokenKind:
// NB: res.smoothedCompactionByteTokens is the same as
// res.ioLoadListenerState.totalNumByteTokens (printed above) when
// NB: res.smoothedCompactionByteTokens (printed above) is the same
// as res.ioLoadListenerState.totalNumByteTokens when
// res.aux.tokenKind == compactionTokenKind.
p.Printf(" due to L0 growth")
case flushTokenKind:
Expand Down

0 comments on commit ed03b0c

Please sign in to comment.