Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: introduce CompactionScheduler and integrate with DB #4297

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
504 changes: 277 additions & 227 deletions compaction.go

Large diffs are not rendered by default.

61 changes: 44 additions & 17 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ type pickedCompaction struct {
score float64
// kind indicates the kind of compaction.
kind compactionKind
// manualID > 0 iff this is a manual compaction. It exists solely for
// internal bookkeeping.
manualID uint64
// startLevel is the level that is being compacted. Inputs from startLevel
// and outputLevel will be merged to produce a set of outputLevel files.
startLevel *compactionLevel
Expand Down Expand Up @@ -1153,16 +1156,8 @@ func responsibleForGarbageBytes(
return uint64(totalGarbage) / uint64(useCount)
}

// pickAuto picks the best compaction, if any.
//
// On each call, pickAuto computes per-level size adjustments based on
// in-progress compactions, and computes a per-level score. The levels are
// iterated over in decreasing score order trying to find a valid compaction
// anchored at that level.
//
// If a score-based compaction cannot be found, pickAuto falls back to looking
// for an elision-only compaction to remove obsolete keys.
func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) {
func (p *compactionPickerByScore) getCompactionConcurrency() int {
maxConcurrentCompactions := p.opts.MaxConcurrentCompactions()
// Compaction concurrency is controlled by L0 read-amp. We allow one
// additional compaction per L0CompactionConcurrency sublevels, as well as
// one additional compaction per CompactionDebtConcurrency bytes of
Expand All @@ -1171,16 +1166,44 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
// debt as a second signal to prevent compaction concurrency from dropping
// significantly right after a base compaction finishes, and before those
// bytes have been compacted further down the LSM.
if n := len(env.inProgressCompactions); n > 0 {
//
// Let n be the number of in-progress compactions.
//
// l0ReadAmp >= ccSignal1 then can run another compaction, where
// ccSignal1 = n * p.opts.Experimental.L0CompactionConcurrency
// Rearranging,
// n <= l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency.
// So we can run up to
// l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency + 1 compactions
l0ReadAmpCompactions := 1
if p.opts.Experimental.L0CompactionConcurrency > 0 {
l0ReadAmp := p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions()
l0ReadAmpCompactions = (l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency) + 1
}
// compactionDebt >= ccSignal2 then can run another compaction, where
// ccSignal2 = uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
// Rearranging,
// n <= compactionDebt / p.opts.Experimental.CompactionDebtConcurrency
// So we can run up to
// compactionDebt / p.opts.Experimental.CompactionDebtConcurrency + 1 compactions.
compactionDebtCompactions := 1
if p.opts.Experimental.CompactionDebtConcurrency > 0 {
compactionDebt := p.estimatedCompactionDebt(0)
ccSignal1 := n * p.opts.Experimental.L0CompactionConcurrency
ccSignal2 := uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
if l0ReadAmp < ccSignal1 && compactionDebt < ccSignal2 {
return nil
}
compactionDebtCompactions = int(compactionDebt/p.opts.Experimental.CompactionDebtConcurrency) + 1
}
return max(min(maxConcurrentCompactions, max(l0ReadAmpCompactions, compactionDebtCompactions)), 1)
}

// pickAuto picks the best compaction, if any.
//
// On each call, pickAuto computes per-level size adjustments based on
// in-progress compactions, and computes a per-level score. The levels are
// iterated over in decreasing score order trying to find a valid compaction
// anchored at that level.
//
// If a score-based compaction cannot be found, pickAuto falls back to looking
// for an elision-only compaction to remove obsolete keys.
func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) {
scores := p.calculateLevelScores(env.inProgressCompactions)

// TODO(bananabrick): Either remove, or change this into an event sent to the
Expand Down Expand Up @@ -1489,6 +1512,9 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(
// necessary. A rewrite compaction outputs files to the same level as
// the input level.
func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) {
if p.vers.Stats.MarkedForCompaction == 0 {
return nil
}
for l := numLevels - 1; l >= 0; l-- {
candidate := markedForCompactionAnnotator.LevelAnnotation(p.vers.Levels[l])
if candidate == nil {
Expand Down Expand Up @@ -1766,7 +1792,7 @@ func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc
return pc
}

func pickManualCompaction(
func newPickedManualCompaction(
vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction,
) (pc *pickedCompaction, retryLater bool) {
outputLevel := manual.level + 1
Expand All @@ -1790,6 +1816,7 @@ func pickManualCompaction(
return nil, true
}
pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
pc.manualID = manual.id
manual.outputLevel = pc.outputLevel.level
pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end))
if pc.startLevel.files.Empty() {
Expand Down
60 changes: 46 additions & 14 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
}
}

pickAuto := func(env compactionEnv, pickerByScore *compactionPickerByScore) *pickedCompaction {
inProgressCompactions := len(env.inProgressCompactions)
allowedCompactions := pickerByScore.getCompactionConcurrency()
if inProgressCompactions >= allowedCompactions {
return nil
}
return pickerByScore.pickAuto(env)
}

datadriven.RunTest(t, "testdata/compaction_picker_target_level",
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
Expand All @@ -203,6 +212,14 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
// <level>: <size> [compensation]
var errMsg string
vers, opts, errMsg = loadVersion(t, d)
opts.MaxConcurrentCompactions = func() int {
// This test only limits the count based on the L0 read amp and
// compaction debt, so we would like to return math.MaxInt. But we
// don't since it is also used in expandedCompactionByteSizeLimit,
// and causes the expanded bytes to reduce. The test cases never
// pick more than 4 compactions, so we use 4.
return 4
}
if errMsg != "" {
return errMsg
}
Expand Down Expand Up @@ -231,7 +248,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
earliestUnflushedSeqNum: base.SeqNumMax,
inProgressCompactions: inProgress,
}
pc := pickerByScore.pickAuto(env)
pc := pickAuto(env, pickerByScore)
if pc == nil {
break
}
Expand Down Expand Up @@ -299,10 +316,10 @@ func TestCompactionPickerTargetLevel(t *testing.T) {

var b strings.Builder
fmt.Fprintf(&b, "Initial state before pick:\n%s", runVersionFileSizes(vers))
pc := pickerByScore.pickAuto(compactionEnv{
pc := pickAuto(compactionEnv{
earliestUnflushedSeqNum: base.SeqNumMax,
inProgressCompactions: inProgress,
})
}, pickerByScore)
if pc != nil {
fmt.Fprintf(&b, "Picked: L%d->L%d: %0.1f\n", pc.startLevel.level, pc.outputLevel.level, pc.score)
}
Expand All @@ -326,7 +343,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
end: iEnd.UserKey,
}

pc, retryLater := pickManualCompaction(
pc, retryLater := newPickedManualCompaction(
pickerByScore.vers,
opts,
compactionEnv{
Expand Down Expand Up @@ -540,7 +557,7 @@ func TestCompactionPickerL0(t *testing.T) {
var result strings.Builder
if pc != nil {
checkClone(t, pc)
c := newCompaction(pc, opts, time.Now(), nil /* provider */, nil /* slot */)
c := newCompaction(pc, opts, time.Now(), nil /* provider */, singletonNoopGrantHandle)
fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level)
fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files))
if !pc.outputLevel.files.Empty() {
Expand Down Expand Up @@ -587,6 +604,7 @@ func TestCompactionPickerL0(t *testing.T) {
func TestCompactionPickerConcurrency(t *testing.T) {
opts := DefaultOptions()
opts.Experimental.L0CompactionConcurrency = 1
opts.MaxConcurrentCompactions = func() int { return 4 }

parseMeta := func(s string) (*tableMetadata, error) {
parts := strings.Split(s, ":")
Expand Down Expand Up @@ -755,13 +773,20 @@ func TestCompactionPickerConcurrency(t *testing.T) {
td.MaybeScanArgs(t, "l0_compaction_concurrency", &opts.Experimental.L0CompactionConcurrency)
td.MaybeScanArgs(t, "compaction_debt_concurrency", &opts.Experimental.CompactionDebtConcurrency)

pc := picker.pickAuto(compactionEnv{
env := compactionEnv{
earliestUnflushedSeqNum: math.MaxUint64,
inProgressCompactions: inProgressCompactions,
})
}
inProgressCount := len(env.inProgressCompactions)
allowedCompactions := picker.getCompactionConcurrency()
var pc *pickedCompaction
if inProgressCount < allowedCompactions {
pc = picker.pickAuto(env)
}
var result strings.Builder
fmt.Fprintf(&result, "picker.getCompactionConcurrency: %d\n", allowedCompactions)
if pc != nil {
c := newCompaction(pc, opts, time.Now(), nil /* provider */, nil /* slot */)
c := newCompaction(pc, opts, time.Now(), nil /* provider */, singletonNoopGrantHandle)
fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level)
fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files))
if !pc.outputLevel.files.Empty() {
Expand All @@ -771,7 +796,7 @@ func TestCompactionPickerConcurrency(t *testing.T) {
fmt.Fprintf(&result, "grandparents: %s\n", fileNums(c.grandparents))
}
} else {
return "nil"
fmt.Fprintf(&result, "nil")
}
return result.String()
}
Expand Down Expand Up @@ -1353,6 +1378,7 @@ func TestCompactionPickerPickFile(t *testing.T) {
FS: fs,
}
opts.Experimental.EnableColumnarBlocks = func() bool { return true }
opts.Experimental.CompactionScheduler = NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest()

d, err := Open("", opts)
require.NoError(t, err)
Expand Down Expand Up @@ -1415,11 +1441,16 @@ func TestCompactionPickerPickFile(t *testing.T) {
// level.
var lf manifest.LevelFile
var ok bool
d.maybeScheduleCompactionPicker(func(untypedPicker compactionPicker, env compactionEnv) *pickedCompaction {
p := untypedPicker.(*compactionPickerByScore)
func() {
d.mu.versions.logLock()
defer d.mu.versions.logUnlock()
env := d.makeCompactionEnvLocked()
if env == nil {
return
}
p := d.mu.versions.picker.(*compactionPickerByScore)
lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum)
return nil
})
}()
if !ok {
return "(none)"
}
Expand Down Expand Up @@ -1472,6 +1503,7 @@ func TestCompactionPickerScores(t *testing.T) {
FS: fs,
}
opts.Experimental.EnableColumnarBlocks = func() bool { return true }
opts.Experimental.CompactionScheduler = NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest()

d, err := Open("", opts)
require.NoError(t, err)
Expand All @@ -1493,7 +1525,7 @@ func TestCompactionPickerScores(t *testing.T) {
if td.HasArg("pause-cleaning") {
cleaner.pause()
}

opts.Experimental.CompactionScheduler = NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest()
d, err = runDBDefineCmd(td, opts)
if err != nil {
return err.Error()
Expand Down
Loading