Skip to content

Commit

Permalink
db: introduce CompactionScheduler and integrate with DB
Browse files Browse the repository at this point in the history
CompactionScheduler is an interface that encompasses (a) our current
compaction scheduling behavior, (b) compaction scheduling in a multi-store
setting that adds a per-node limit in addition to the per-store limit, and
prioritizes across stores, (c) compaction scheduling that includes (b) plus
is aware of resource usage and can prioritize across stores and across
other long-lived work in addition to compactions (e.g. range snapshot
reception).

CompactionScheduler calls into DB and the DB calls into the
CompactionScheduler. This requires some care in specification of the
synchronization expectations, to avoid deadlock. For the most part, the
complexity is borne by the CompactionScheduler -- see the code comments
for details.

ConcurrencyLimitScheduler is an implementation for (a), and is paired with
a single DB. It has no knowledge of delete-only compactions, so we have
redefined the meaning of Options.MaxConcurrentCompactions, as discussed
in the code comment.

CompactionScheduler has some related interfaces/structs:
- CompactionGrantHandle is used to report the start and end of the
  compaction, and frequently report the written bytes, and CPU consumption.
  In the implementation of CompactionGrantHandle provided by CockroachDB's
  AC component, the CPU consumption will use the grunning package.
- WaitingCompaction is a struct used to prioritize the DB's compaction
  relative to other long-lived work (including compactions by other DBs).
  makeWaitingCompaction is a helper that constructs this struct.

For integrating the CompactionScheduler with DB, there are a number of
changes:
- The entry paths to ask to schedule a compaction are reduced to 1, by
  removing DB.maybeScheduleCompactionPicker. The only path is
  DB.maybeScheduleCompaction.
- versionSet.{curCompactionConcurrency,pickedCompactionCache} are added
  to satisfy the interface expected by CompactionScheduler. Specifically,
  pickedCompactionCache allows us to safely cache a pickedCompaction
  that cannot be run. There is some commentary on the worst-case waste
  in compaction picking -- with the default ConcurrencyLimitScheduler
  on average there should be no wastage.
- versionSet.curCompactionConcurrency and DB.mu.compact.manualLen are two
  atomic variables introduced to implement DB.GetAllowedWithoutPermission,
  which allows the DB to adjust what minimum compaction concurrency it
  desires based on the backlog of automatic and manual compactions. The
  encoded logic is meant to be equivalent to our current logic.

The CompactionSlot and CompactionLimiter introduced in a recent PR are
deleted. CompactionGrantHandle is analogous to CompactionSlot, and allows for
measuring of CPU usage since the implementation of CompactionScheduler in AC
will explicitly monitor usage and capacity. CompactionScheduler is analogous to
CompactionLimiter. CompactionLimiter had a non-queueing interface in
that it never called into the DB. This worked since the only events that
allowed another compaction to run were also ones that caused another
call to maybeScheduleCompaction. This is not true when a
CompactionScheduler is scheduling across multiple DBs, or managing a
compaction and other long-lived work (snapshot reception), since something
unrelated to the DB can cause resources to become available to run a
compaction.

There is a partial implementation of a resource aware scheduler in
https://github.com/sumeerbhola/cockroach/tree/long_lived_granter/pkg/util/admission/admit_long.

Informs #3813, cockroachdb/cockroach#74697, #1329
  • Loading branch information
sumeerbhola committed Feb 5, 2025
1 parent 35df196 commit a8b60cf
Show file tree
Hide file tree
Showing 22 changed files with 1,015 additions and 393 deletions.
504 changes: 277 additions & 227 deletions compaction.go

Large diffs are not rendered by default.

61 changes: 44 additions & 17 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ type pickedCompaction struct {
score float64
// kind indicates the kind of compaction.
kind compactionKind
// manualID > 0 iff this is a manual compaction. It exists solely for
// internal bookkeeping.
manualID uint64
// startLevel is the level that is being compacted. Inputs from startLevel
// and outputLevel will be merged to produce a set of outputLevel files.
startLevel *compactionLevel
Expand Down Expand Up @@ -1151,16 +1154,8 @@ func responsibleForGarbageBytes(virtualBackings *manifest.VirtualBackings, m *fi
return uint64(totalGarbage) / uint64(useCount)
}

// pickAuto picks the best compaction, if any.
//
// On each call, pickAuto computes per-level size adjustments based on
// in-progress compactions, and computes a per-level score. The levels are
// iterated over in decreasing score order trying to find a valid compaction
// anchored at that level.
//
// If a score-based compaction cannot be found, pickAuto falls back to looking
// for an elision-only compaction to remove obsolete keys.
func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) {
func (p *compactionPickerByScore) getCompactionConcurrency() int {
maxConcurrentCompactions := p.opts.MaxConcurrentCompactions()
// Compaction concurrency is controlled by L0 read-amp. We allow one
// additional compaction per L0CompactionConcurrency sublevels, as well as
// one additional compaction per CompactionDebtConcurrency bytes of
Expand All @@ -1169,16 +1164,44 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
// debt as a second signal to prevent compaction concurrency from dropping
// significantly right after a base compaction finishes, and before those
// bytes have been compacted further down the LSM.
if n := len(env.inProgressCompactions); n > 0 {
//
// Let n be the number of in-progress compactions.
//
// l0ReadAmp >= ccSignal1 then can run another compaction, where
// ccSignal1 = n * p.opts.Experimental.L0CompactionConcurrency
// Rearranging,
// n <= l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency.
// So we can run up to
// l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency + 1 compactions
l0ReadAmpCompactions := 1
if p.opts.Experimental.L0CompactionConcurrency > 0 {
l0ReadAmp := p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions()
l0ReadAmpCompactions = (l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency) + 1
}
// compactionDebt >= ccSignal2 then can run another compaction, where
// ccSignal2 = uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
// Rearranging,
// n <= compactionDebt / p.opts.Experimental.CompactionDebtConcurrency
// So we can run up to
// compactionDebt / p.opts.Experimental.CompactionDebtConcurrency + 1 compactions.
compactionDebtCompactions := 1
if p.opts.Experimental.CompactionDebtConcurrency > 0 {
compactionDebt := p.estimatedCompactionDebt(0)
ccSignal1 := n * p.opts.Experimental.L0CompactionConcurrency
ccSignal2 := uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
if l0ReadAmp < ccSignal1 && compactionDebt < ccSignal2 {
return nil
}
compactionDebtCompactions = int(compactionDebt/p.opts.Experimental.CompactionDebtConcurrency) + 1
}
return max(min(maxConcurrentCompactions, max(l0ReadAmpCompactions, compactionDebtCompactions)), 1)
}

// pickAuto picks the best compaction, if any.
//
// On each call, pickAuto computes per-level size adjustments based on
// in-progress compactions, and computes a per-level score. The levels are
// iterated over in decreasing score order trying to find a valid compaction
// anchored at that level.
//
// If a score-based compaction cannot be found, pickAuto falls back to looking
// for an elision-only compaction to remove obsolete keys.
func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) {
scores := p.calculateLevelScores(env.inProgressCompactions)

// TODO(bananabrick): Either remove, or change this into an event sent to the
Expand Down Expand Up @@ -1487,6 +1510,9 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(
// necessary. A rewrite compaction outputs files to the same level as
// the input level.
func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) {
if p.vers.Stats.MarkedForCompaction == 0 {
return nil
}
for l := numLevels - 1; l >= 0; l-- {
candidate := markedForCompactionAnnotator.LevelAnnotation(p.vers.Levels[l])
if candidate == nil {
Expand Down Expand Up @@ -1764,7 +1790,7 @@ func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc
return pc
}

func pickManualCompaction(
func newPickedManualCompaction(
vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction,
) (pc *pickedCompaction, retryLater bool) {
outputLevel := manual.level + 1
Expand All @@ -1788,6 +1814,7 @@ func pickManualCompaction(
return nil, true
}
pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
pc.manualID = manual.id
manual.outputLevel = pc.outputLevel.level
pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end))
if pc.startLevel.files.Empty() {
Expand Down
60 changes: 46 additions & 14 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
}
}

pickAuto := func(env compactionEnv, pickerByScore *compactionPickerByScore) *pickedCompaction {
inProgressCompactions := len(env.inProgressCompactions)
allowedCompactions := pickerByScore.getCompactionConcurrency()
if inProgressCompactions >= allowedCompactions {
return nil
}
return pickerByScore.pickAuto(env)
}

datadriven.RunTest(t, "testdata/compaction_picker_target_level",
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
Expand All @@ -203,6 +212,14 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
// <level>: <size> [compensation]
var errMsg string
vers, opts, errMsg = loadVersion(t, d)
opts.MaxConcurrentCompactions = func() int {
// This test only limits the count based on the L0 read amp and
// compaction debt, so we would like to return math.MaxInt. But we
// don't since it is also used in expandedCompactionByteSizeLimit,
// and causes the expanded bytes to reduce. The test cases never
// pick more than 4 compactions, so we use 4.
return 4
}
if errMsg != "" {
return errMsg
}
Expand Down Expand Up @@ -231,7 +248,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
earliestUnflushedSeqNum: base.SeqNumMax,
inProgressCompactions: inProgress,
}
pc := pickerByScore.pickAuto(env)
pc := pickAuto(env, pickerByScore)
if pc == nil {
break
}
Expand Down Expand Up @@ -299,10 +316,10 @@ func TestCompactionPickerTargetLevel(t *testing.T) {

var b strings.Builder
fmt.Fprintf(&b, "Initial state before pick:\n%s", runVersionFileSizes(vers))
pc := pickerByScore.pickAuto(compactionEnv{
pc := pickAuto(compactionEnv{
earliestUnflushedSeqNum: base.SeqNumMax,
inProgressCompactions: inProgress,
})
}, pickerByScore)
if pc != nil {
fmt.Fprintf(&b, "Picked: L%d->L%d: %0.1f\n", pc.startLevel.level, pc.outputLevel.level, pc.score)
}
Expand All @@ -326,7 +343,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
end: iEnd.UserKey,
}

pc, retryLater := pickManualCompaction(
pc, retryLater := newPickedManualCompaction(
pickerByScore.vers,
opts,
compactionEnv{
Expand Down Expand Up @@ -540,7 +557,7 @@ func TestCompactionPickerL0(t *testing.T) {
var result strings.Builder
if pc != nil {
checkClone(t, pc)
c := newCompaction(pc, opts, time.Now(), nil /* provider */, nil /* slot */)
c := newCompaction(pc, opts, time.Now(), nil /* provider */, singletonNoopGrantHandle)
fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level)
fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files))
if !pc.outputLevel.files.Empty() {
Expand Down Expand Up @@ -587,6 +604,7 @@ func TestCompactionPickerL0(t *testing.T) {
func TestCompactionPickerConcurrency(t *testing.T) {
opts := (*Options)(nil).EnsureDefaults()
opts.Experimental.L0CompactionConcurrency = 1
opts.MaxConcurrentCompactions = func() int { return 4 }

parseMeta := func(s string) (*fileMetadata, error) {
parts := strings.Split(s, ":")
Expand Down Expand Up @@ -755,13 +773,20 @@ func TestCompactionPickerConcurrency(t *testing.T) {
td.MaybeScanArgs(t, "l0_compaction_concurrency", &opts.Experimental.L0CompactionConcurrency)
td.MaybeScanArgs(t, "compaction_debt_concurrency", &opts.Experimental.CompactionDebtConcurrency)

pc := picker.pickAuto(compactionEnv{
env := compactionEnv{
earliestUnflushedSeqNum: math.MaxUint64,
inProgressCompactions: inProgressCompactions,
})
}
inProgressCount := len(env.inProgressCompactions)
allowedCompactions := picker.getCompactionConcurrency()
var pc *pickedCompaction
if inProgressCount < allowedCompactions {
pc = picker.pickAuto(env)
}
var result strings.Builder
fmt.Fprintf(&result, "picker.getCompactionConcurrency: %d\n", allowedCompactions)
if pc != nil {
c := newCompaction(pc, opts, time.Now(), nil /* provider */, nil /* slot */)
c := newCompaction(pc, opts, time.Now(), nil /* provider */, singletonNoopGrantHandle)
fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level)
fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files))
if !pc.outputLevel.files.Empty() {
Expand All @@ -771,7 +796,7 @@ func TestCompactionPickerConcurrency(t *testing.T) {
fmt.Fprintf(&result, "grandparents: %s\n", fileNums(c.grandparents))
}
} else {
return "nil"
fmt.Fprintf(&result, "nil")
}
return result.String()
}
Expand Down Expand Up @@ -1355,6 +1380,7 @@ func TestCompactionPickerPickFile(t *testing.T) {
FS: fs,
}
opts.Experimental.EnableColumnarBlocks = func() bool { return true }
opts.Experimental.CompactionScheduler = NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest()

d, err := Open("", opts)
require.NoError(t, err)
Expand Down Expand Up @@ -1417,11 +1443,16 @@ func TestCompactionPickerPickFile(t *testing.T) {
// level.
var lf manifest.LevelFile
var ok bool
d.maybeScheduleCompactionPicker(func(untypedPicker compactionPicker, env compactionEnv) *pickedCompaction {
p := untypedPicker.(*compactionPickerByScore)
func() {
d.mu.versions.logLock()
defer d.mu.versions.logUnlock()
env := d.makeCompactionEnvLocked()
if env == nil {
return
}
p := d.mu.versions.picker.(*compactionPickerByScore)
lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum)
return nil
})
}()
if !ok {
return "(none)"
}
Expand Down Expand Up @@ -1474,6 +1505,7 @@ func TestCompactionPickerScores(t *testing.T) {
FS: fs,
}
opts.Experimental.EnableColumnarBlocks = func() bool { return true }
opts.Experimental.CompactionScheduler = NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest()

d, err := Open("", opts)
require.NoError(t, err)
Expand All @@ -1495,7 +1527,7 @@ func TestCompactionPickerScores(t *testing.T) {
if td.HasArg("pause-cleaning") {
cleaner.pause()
}

opts.Experimental.CompactionScheduler = NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest()
d, err = runDBDefineCmd(td, opts)
if err != nil {
return err.Error()
Expand Down
Loading

0 comments on commit a8b60cf

Please sign in to comment.