diff --git a/compaction.go b/compaction.go index 7947748abd..7aed3be284 100644 --- a/compaction.go +++ b/compaction.go @@ -11,6 +11,7 @@ import ( "math" "runtime/pprof" "slices" + "sync" "sync/atomic" "time" @@ -121,21 +122,28 @@ func (c *compactionWritable) Write(p []byte) error { type compactionKind int +// The ordering of these compaction kinds is important - it is used to +// determine the priority of a pickedCompaction when multiple DBs are waiting +// to schedule a compaction. A compactionKind is considered as being strictly +// higher priority than the one below it. +// +// The ordering of these compactionKinds should mirror the order in which +// compaction types are picked in compactionPicker. const ( - compactionKindDefault compactionKind = iota - compactionKindFlush + compactionKindFlush compactionKind = iota + // compactionKindDeleteOnly denotes a compaction that only deletes input + // files. It can occur when wide range tombstones completely contain sstables. + compactionKindDeleteOnly // compactionKindMove denotes a move compaction where the input file is // retained and linked in a new level without being obsoleted. compactionKindMove + compactionKindDefault // compactionKindCopy denotes a copy compaction where the input file is // copied byte-by-byte into a new file with a new FileNum in the output level. compactionKindCopy - // compactionKindDeleteOnly denotes a compaction that only deletes input - // files. It can occur when wide range tombstones completely contain sstables. - compactionKindDeleteOnly + compactionKindTombstoneDensity compactionKindElisionOnly compactionKindRead - compactionKindTombstoneDensity compactionKindRewrite compactionKindIngestedFlushable ) @@ -929,10 +937,9 @@ func (c *compaction) String() string { } var buf bytes.Buffer - for level := c.startLevel.level; level <= c.outputLevel.level; level++ { - i := level - c.startLevel.level - fmt.Fprintf(&buf, "%d:", level) - iter := c.inputs[i].files.Iter() + for _, input := range c.inputs { + fmt.Fprintf(&buf, "%d:", input.level) + iter := input.files.Iter() for f := iter.First(); f != nil; f = iter.Next() { fmt.Fprintf(&buf, " %s:%s-%s", f.FileNum, f.Smallest, f.Largest) } @@ -1644,34 +1651,19 @@ func (d *DB) maybeScheduleCompactionAsync() { d.mu.Unlock() } -// maybeScheduleCompaction schedules a compaction if necessary. -// -// d.mu must be held when calling this. -func (d *DB) maybeScheduleCompaction() { - d.maybeScheduleCompactionPicker(pickAuto) -} - -func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction { - return picker.pickAuto(env) -} - -func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction { - return picker.pickElisionOnlyCompaction(env) -} - -// tryScheduleDownloadCompaction tries to start a download compaction. -// -// Returns true if we started a download compaction (or completed it -// immediately because it is a no-op or we hit an error). +// tryScheduleDownloadCompactions tries to start download compactions. // // Requires d.mu to be held. Updates d.mu.compact.downloads. -func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownloads int) bool { +func (d *DB) tryScheduleDownloadCompactions(env compactionEnv, maxConcurrentDownloads int) { vers := d.mu.versions.currentVersion() for i := 0; i < len(d.mu.compact.downloads); { + if d.mu.compact.downloadingCount >= maxConcurrentDownloads { + break + } download := d.mu.compact.downloads[i] switch d.tryLaunchDownloadCompaction(download, vers, env, maxConcurrentDownloads) { case launchedCompaction: - return true + continue case didNotLaunchCompaction: // See if we can launch a compaction for another download task. i++ @@ -1680,83 +1672,326 @@ func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownl d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1) } } - return false } -// maybeScheduleCompactionPicker schedules a compaction if necessary, -// calling `pickFunc` to pick automatic compactions. +// makeCompactionEnv attempts to acquire d.mu.versions.logLock in order +// to provide the proper mutual exclusion necessary during compaction picking. +// If the DB is closed or marked as read-only, makeCompactionEnv returns nil to +// indicate that compactions may not be performed. Otherwise, a new +// compactionEnv is constructed using the current DB state. +// +// Compaction picking needs a coherent view of a Version. In particular, we +// need to exclude concurrent ingestions from making a decision on which level +// to ingest into that conflicts with our compaction +// decision. versionSet.logLock provides the necessary mutual exclusion. +// +// NOTE: makeCompactionEnv does not call d.mu.versions.logUnlock; it is the +// caller's responsibility to ensure that the manifest is unlocked. // // Requires d.mu to be held. -func (d *DB) maybeScheduleCompactionPicker( - pickFunc func(compactionPicker, compactionEnv) *pickedCompaction, -) { +func (d *DB) makeCompactionEnv() *compactionEnv { if d.closed.Load() != nil || d.opts.ReadOnly { - return - } - maxCompactions := d.opts.MaxConcurrentCompactions() - maxDownloads := d.opts.MaxConcurrentDownloads() - - if d.mu.compact.compactingCount >= maxCompactions && - (len(d.mu.compact.downloads) == 0 || d.mu.compact.downloadingCount >= maxDownloads) { - if len(d.mu.compact.manual) > 0 { - // Inability to run head blocks later manual compactions. - d.mu.compact.manual[0].retries++ - } - return + return nil } - // Compaction picking needs a coherent view of a Version. In particular, we - // need to exclude concurrent ingestions from making a decision on which level - // to ingest into that conflicts with our compaction - // decision. versionSet.logLock provides the necessary mutual exclusion. d.mu.versions.logLock() - defer d.mu.versions.logUnlock() // Check for the closed flag again, in case the DB was closed while we were // waiting for logLock(). if d.closed.Load() != nil { - return + d.mu.versions.logUnlock() + return nil } - env := compactionEnv{ + return &compactionEnv{ diskAvailBytes: d.diskAvailBytes.Load(), earliestSnapshotSeqNum: d.mu.snapshots.earliest(), earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(), + inProgressCompactions: d.getInProgressCompactionInfoLocked(nil), + readCompactionEnv: readCompactionEnv{ + readCompactions: &d.mu.compact.readCompactions, + flushing: d.mu.compact.flushing || d.passedFlushThreshold(), + rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction, + }, + } +} + +// pickAnyCompaction tries to pick a manual or automatic compaction. +func (d *DB) pickAnyCompaction(env compactionEnv) (pc *pickedCompaction) { + pc = d.pickManualCompaction(env) + if pc == nil && !d.opts.DisableAutomaticCompactions { + pc = d.mu.versions.picker.pickAuto(env) + } + return pc +} + +// runPickedCompaction kicks off the provided pickedCompaction. In case the +// pickedCompaction is a manual compaction, the corresponding manualCompaction +// is removed from d.mu.compact.manual. +func (d *DB) runPickedCompaction(pc *pickedCompaction) { + var doneChannel chan error + if pc.isManual { + doneChannel = d.mu.compact.manual[0].done + d.mu.compact.manual = d.mu.compact.manual[1:] + } + + d.mu.compact.compactingCount++ + compaction := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider()) + d.addInProgressCompaction(compaction) + go func() { + d.compact(compaction, doneChannel) + d.compactionPool.CompactionFinished(pc) + }() +} + +// CompactionPool is responsible for scheduling both automatic and manual +// compactions. In the case of multiple DB instances (i.e. a multi-store +// configuration), implementations of CompactionPool may or may not enforce +// a global maximum compaction concurrency. +type CompactionPool interface { + // AddWaitingDB signals to the CompactionPool that the provided DB might + // have compaction(s) that need to be scheduled. Implementations of + // CompactionPool may decide when a compaction will actually be picked + // and run from this DB. + // + // DB.mu should NOT be held for any DB (including d) when AddWaitingDB is + // called. + AddWaitingDB(d *DB) + // CompactionFinished signals to the CompactionPool that the provided + // pickedCompaction has now finished running. + CompactionFinished(pc *pickedCompaction) +} + +var defaultCompactionPool = &UnlimitedCompactionPool{} + +// UnlimitedCompactionPool implements CompactionPool. It does not enforce any +// global maximum compaction concurrency when multiple DBs attempt to schedule +// compactions. +type UnlimitedCompactionPool struct{} + +func (ucp *UnlimitedCompactionPool) AddWaitingDB(d *DB) { + d.mu.Lock() + env := d.makeCompactionEnv() + if env == nil { + d.mu.Unlock() + return + } + + pc := d.pickAnyCompaction(*env) + if pc != nil { + d.runPickedCompaction(pc) + // We might be able to schedule more compactions. + defer ucp.AddWaitingDB(d) + } + + d.mu.versions.logUnlock() + d.mu.Unlock() +} + +func (ucp *UnlimitedCompactionPool) CompactionFinished(pc *pickedCompaction) { + // No-op for an UnlimitedCompactionPool. +} + +// PrioritizingCompactionPool enforces a global max compaction concurrency +// in a multi-store configuration. If multiple DBs are waiting to perform a +// compaction, it prioritizes the DB whose pickedCompaction has the highest +// priority. +type PrioritizingCompactionPool struct { + mu sync.Mutex + // cond is used during testing to signal that a compaction has finished. + cond sync.Cond + // compactingCount is the current number of running compactions across + // all DBs. + compactingCount int + // waiting contains all DBs which might have compactions that need to be + // scheduled. The value stored for each DB may be nil to indicate that + // a compaction needs to be picked from the DB. + waiting map[*DB]*pickedCompaction + // maxCompactionConcurrency defines the global max compaction concurrency + // across all DBs. + maxCompactionConcurrency int +} + +// NewPrioritizingCompactionPool creates a new PrioritizingCompactionPool +// with the specified maxCompactionConcurrency. +func NewPrioritizingCompactionPool(maxCompactionConcurrency int) *PrioritizingCompactionPool { + if maxCompactionConcurrency <= 0 { + panic("pebble: maxCompactionConcurrency for a CompactionPool must be greater than 0") + } + pcp := &PrioritizingCompactionPool{ + maxCompactionConcurrency: maxCompactionConcurrency, + waiting: make(map[*DB]*pickedCompaction), + } + pcp.cond.L = &pcp.mu + return pcp +} + +// shouldLimitConcurrency returns true if the provided pickedCompaction should +// be counted towards this PrioritizingCompactionPool's compactingCount. +func (pcp *PrioritizingCompactionPool) shouldLimitConcurrency(pc *pickedCompaction) bool { + return pc.kind != compactionKindDeleteOnly && pc.kind != compactionKindMove +} + +// comparePickedCompactions returns true if pc1 is higher priority than pc2. +func comparePickedCompactions(pc1 *pickedCompaction, pc2 *pickedCompaction) bool { + if pc1 == nil { + return false + } else if pc2 == nil { + return true + } + + // If pc1 and pc2 are of different compactionKinds, pc1 is higher priority + // only if its compactionKind is higher priority. The relative priorities + // of compactionKinds are determined by the order they are defined in. See + // the comment below compactionKind. + if pc1.kind < pc2.kind { + return true + } + // Otherwise, use the pickedCompaction's score to break ties. This is the + // score of the level in the case of score-based compactions; other + // compaction types assign the score differently. + return pc1.score > pc2.score +} + +// maybeScheduleWaitingCompactionLocked attempts to schedule a waiting +// compaction from the list of waiting DBs. It prioritizes the DB with the +// highest priority pickedCompaction as defined by comparePickedCompactions. +// +// c.mu must be held. DB.mu must not be held for any DB. +func (pcp *PrioritizingCompactionPool) maybeScheduleWaitingCompactionLocked() { + if pcp.compactingCount >= pcp.maxCompactionConcurrency { + return } - if d.mu.compact.compactingCount < maxCompactions { - // Check for delete-only compactions first, because they're expected to be - // cheap and reduce future compaction work. - if !d.opts.private.disableDeleteOnlyCompactions && - !d.opts.DisableAutomaticCompactions && - len(d.mu.compact.deletionHints) > 0 { - d.tryScheduleDeleteOnlyCompaction() + unlockDB := func(d *DB) { + d.mu.versions.logUnlock() + d.mu.Unlock() + } + + var selectedDB *DB + // We need to find the highest-priority compaction across all waiting DBs. + // This is tricky because concurrent flushes, ingestions, etc. from each DB + // may invalidate a compaction after it is picked. For this reason, the + // below loop maintains the invariant that both selectedDB.mu and + // selectedDB.mu.versions.logLock are continuously held until we either find + // a higher priority compaction from one of the other DBs, or we finish + // iteration and start the compaction. + // + // Although the ordering of pcp.waiting is arbitrary and thus the order in + // locks are acquired is also arbitrary, this is fine only because we hold + // c.mu throughout this method. + for d := range pcp.waiting { + d.mu.Lock() + + env := d.makeCompactionEnv() + if env == nil { + // This DB is read-only or closed; skip it. + delete(pcp.waiting, d) + d.mu.Unlock() + continue } - for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < maxCompactions { - if manual := d.mu.compact.manual[0]; !d.tryScheduleManualCompaction(env, manual) { - // Inability to run head blocks later manual compactions. - manual.retries++ - break + if pcp.waiting[d] == nil { + if pcp.waiting[d] = d.pickAnyCompaction(*env); pcp.waiting[d] == nil { + // There are no compactions that can be scheduled from this DB. + // Mark it as no longer waiting. + delete(pcp.waiting, d) + unlockDB(d) + continue } - d.mu.compact.manual = d.mu.compact.manual[1:] } - - for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxCompactions && - d.tryScheduleAutoCompaction(env, pickFunc) { + if selectedDB == nil { + selectedDB = d + } else if comparePickedCompactions(pcp.waiting[d], pcp.waiting[selectedDB]) { + // We've found a higher priority pickedCompaction - first unlock the old + // selectedDB, and then swap it out for the current DB. + // + // NB: the pickedCompaction for the previous selectedDB can be cached + // because it will still be valid until d.maybeScheduleCompaction is + // called in the future, at which point the cached pickedCompaction + // will be invalidated. + // + // TODO: the above is currently not true because of a data race; we need + // to atomically invalidate the pickedCompaction before release d.mu + // inside maybeScheduleCompaction. + unlockDB(selectedDB) + selectedDB = d + } else { + // This DB's pickedCompaction is lower priority than that of selectedDB. + // Release locks and continue. + unlockDB(d) } } - for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads && - d.tryScheduleDownloadCompaction(env, maxDownloads) { + if selectedDB == nil { + return + } + + // At this point, locks are held only for selectedDB.mu. + pc := pcp.waiting[selectedDB] + if pcp.shouldLimitConcurrency(pc) { + pcp.compactingCount++ + } + selectedDB.runPickedCompaction(pc) + pcp.waiting[selectedDB] = nil + + unlockDB(selectedDB) + pcp.maybeScheduleWaitingCompactionLocked() +} + +func (pcp *PrioritizingCompactionPool) CompactionFinished(pc *pickedCompaction) { + if pcp.shouldLimitConcurrency(pc) { + pcp.mu.Lock() + defer pcp.mu.Unlock() + + pcp.compactingCount-- + pcp.maybeScheduleWaitingCompactionLocked() + pcp.cond.Broadcast() } } +func (pcp *PrioritizingCompactionPool) AddWaitingDB(d *DB) { + pcp.mu.Lock() + defer pcp.mu.Unlock() + // Mark this DB as waiting, but also invalidate any existing + // pickedCompaction from this DB since a higher priority compaction + // could be picked. + pcp.waiting[d] = nil + pcp.maybeScheduleWaitingCompactionLocked() +} + +// maybeScheduleCompaction schedules a compaction if necessary. +// +// Requires d.mu to be held. +func (d *DB) maybeScheduleCompaction() { + env := d.makeCompactionEnv() + if env == nil { + return + } + // Delete-only compactions are expected to be cheap and reduce future + // compaction work, so schedule them directly instead of using + // d.compactionPool. + d.tryScheduleDeleteOnlyCompaction() + // Download compactions have their own concurrency. + d.tryScheduleDownloadCompactions(*env, d.opts.MaxConcurrentDownloads()) + d.mu.versions.logUnlock() + + // NB: we must release d.mu to avoid deadlock when calling + // addWaitingDB below. + d.mu.Unlock() + d.compactionPool.AddWaitingDB(d) + d.mu.Lock() +} + // tryScheduleDeleteOnlyCompaction tries to kick off a delete-only compaction // for all files that can be deleted as suggested by deletionHints. // // Requires d.mu to be held. Updates d.mu.compact.deletionHints. func (d *DB) tryScheduleDeleteOnlyCompaction() { + if d.opts.private.disableDeleteOnlyCompactions || d.opts.DisableAutomaticCompactions || d.mu.compact.compactingCount >= d.opts.MaxConcurrentCompactions() || len(d.mu.compact.deletionHints) == 0 { + return + } + v := d.mu.versions.currentVersion() snapshots := d.mu.snapshots.toSlice() inputs, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots) @@ -1770,56 +2005,29 @@ func (d *DB) tryScheduleDeleteOnlyCompaction() { } } -// tryScheduleManualCompaction tries to kick off the given manual compaction. -// -// Returns false if we are not able to run this compaction at this time. -// -// Requires d.mu to be held. -func (d *DB) tryScheduleManualCompaction(env compactionEnv, manual *manualCompaction) bool { +func (d *DB) pickManualCompaction(env compactionEnv) (pc *pickedCompaction) { v := d.mu.versions.currentVersion() - env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil) - pc, retryLater := pickManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual) - if pc == nil { - if !retryLater { - // Manual compaction is a no-op. Signal completion and exit. - manual.done <- nil - return true - } - // We are not able to run this manual compaction at this time. - return false - } - - c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider()) - d.mu.compact.compactingCount++ - d.addInProgressCompaction(c) - go d.compact(c, manual.done) - return true -} + for len(d.mu.compact.manual) > 0 { + manual := d.mu.compact.manual[0] + if d.mu.compact.compactingCount >= d.opts.MaxConcurrentCompactions() { + manual.retries++ + return nil + } -// tryScheduleAutoCompaction tries to kick off an automatic compaction. -// -// Returns false if no automatic compactions are necessary or able to run at -// this time. -// -// Requires d.mu to be held. -func (d *DB) tryScheduleAutoCompaction( - env compactionEnv, pickFunc func(compactionPicker, compactionEnv) *pickedCompaction, -) bool { - env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil) - env.readCompactionEnv = readCompactionEnv{ - readCompactions: &d.mu.compact.readCompactions, - flushing: d.mu.compact.flushing || d.passedFlushThreshold(), - rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction, - } - pc := pickFunc(d.mu.versions.picker, env) - if pc == nil { - return false + pc, retryLater := newPickedManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual) + if pc != nil { + return pc + } + if retryLater { + // We are not able to run this manual compaction at this time. + manual.retries++ + return nil + } + // Manual compaction is a no-op. Signal that it's complete. + manual.done <- nil + d.mu.compact.manual = d.mu.compact.manual[1:] } - c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider()) - d.mu.compact.compactingCount++ - d.addInProgressCompaction(c) - go d.compact(c, nil) - return true + return nil } // deleteCompactionHintType indicates whether the deleteCompactionHint was diff --git a/compaction_picker.go b/compaction_picker.go index 79578d0749..baa239a79d 100644 --- a/compaction_picker.go +++ b/compaction_picker.go @@ -13,7 +13,6 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" - "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/manifest" ) @@ -192,6 +191,8 @@ type pickedCompaction struct { score float64 // kind indicates the kind of compaction. kind compactionKind + // isManual indicates whether this compaction was manually triggered. + isManual bool // startLevel is the level that is being compacted. Inputs from startLevel // and outputLevel will be merged to produce a set of outputLevel files. startLevel *compactionLevel @@ -1156,16 +1157,13 @@ func responsibleForGarbageBytes(virtualBackings *manifest.VirtualBackings, m *fi return uint64(totalGarbage) / uint64(useCount) } -// pickAuto picks the best compaction, if any. -// -// On each call, pickAuto computes per-level size adjustments based on -// in-progress compactions, and computes a per-level score. The levels are -// iterated over in decreasing score order trying to find a valid compaction -// anchored at that level. -// -// If a score-based compaction cannot be found, pickAuto falls back to looking -// for an elision-only compaction to remove obsolete keys. +// pickAuto picks the best compaction, if any. It first tries to find a +// score-based compaction; if one cannot be found, pickAuto falls back to +// various other compaction picking methods. func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) { + // We first check if this DB has reached its current max compaction + // concurrency. + // // Compaction concurrency is controlled by L0 read-amp. We allow one // additional compaction per L0CompactionConcurrency sublevels, as well as // one additional compaction per CompactionDebtConcurrency bytes of @@ -1184,107 +1182,9 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact } } - scores := p.calculateLevelScores(env.inProgressCompactions) - - // TODO(bananabrick): Either remove, or change this into an event sent to the - // EventListener. - logCompaction := func(pc *pickedCompaction) { - var buf bytes.Buffer - for i := 0; i < numLevels; i++ { - if i != 0 && i < p.baseLevel { - continue - } - - var info *candidateLevelInfo - for j := range scores { - if scores[j].level == i { - info = &scores[j] - break - } - } - - marker := " " - if pc.startLevel.level == info.level { - marker = "*" - } - fmt.Fprintf(&buf, " %sL%d: %5.1f %5.1f %5.1f %5.1f %8s %8s", - marker, info.level, info.compensatedScoreRatio, info.compensatedScore, - info.uncompensatedScoreRatio, info.uncompensatedScore, - humanize.Bytes.Int64(int64(totalCompensatedSize( - p.vers.Levels[info.level].Iter(), - ))), - humanize.Bytes.Int64(p.levelMaxBytes[info.level]), - ) - - count := 0 - for i := range env.inProgressCompactions { - c := &env.inProgressCompactions[i] - if c.inputs[0].level != info.level { - continue - } - count++ - if count == 1 { - fmt.Fprintf(&buf, " [") - } else { - fmt.Fprintf(&buf, " ") - } - fmt.Fprintf(&buf, "L%d->L%d", c.inputs[0].level, c.outputLevel) - } - if count > 0 { - fmt.Fprintf(&buf, "]") - } - fmt.Fprintf(&buf, "\n") - } - p.opts.Logger.Infof("pickAuto: L%d->L%d\n%s", - pc.startLevel.level, pc.outputLevel.level, buf.String()) - } - - // Check for a score-based compaction. candidateLevelInfos are first sorted - // by whether they should be compacted, so if we find a level which shouldn't - // be compacted, we can break early. - for i := range scores { - info := &scores[i] - if !info.shouldCompact() { - break - } - if info.level == numLevels-1 { - continue - } - - if info.level == 0 { - pc = pickL0(env, p.opts, p.vers, p.baseLevel) - // Fail-safe to protect against compacting the same sstable - // concurrently. - if pc != nil && !inputRangeAlreadyCompacting(env, pc) { - p.addScoresToPickedCompactionMetrics(pc, scores) - pc.score = info.compensatedScoreRatio - // TODO(bananabrick): Create an EventListener for logCompaction. - if false { - logCompaction(pc) - } - return pc - } - continue - } - - // info.level > 0 - var ok bool - info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum) - if !ok { - continue - } - - pc := pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel) - // Fail-safe to protect against compacting the same sstable concurrently. - if pc != nil && !inputRangeAlreadyCompacting(env, pc) { - p.addScoresToPickedCompactionMetrics(pc, scores) - pc.score = info.compensatedScoreRatio - // TODO(bananabrick): Create an EventListener for logCompaction. - if false { - logCompaction(pc) - } - return pc - } + // Score-based compactions have the highest priority. + if pc := p.pickScoreBasedCompaction(env); pc != nil { + return pc } // Check for files which contain excessive point tombstones that could slow @@ -1337,8 +1237,61 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact // MarkedForCompaction field is persisted in the manifest. That's okay. We // previously would've ignored the designation, whereas now we'll re-compact // the file in place. - if p.vers.Stats.MarkedForCompaction > 0 { - if pc := p.pickRewriteCompaction(env); pc != nil { + if pc := p.pickRewriteCompaction(env); pc != nil { + return pc + } + + return nil +} + +// pickScoreBasedCompaction computes per-level size adjustments based on +// in-progress compactions, and computes a per-level score. The levels are +// iterated over in decreasing score order trying to find a valid compaction +// anchored at that level. +func (p *compactionPickerByScore) pickScoreBasedCompaction( + env compactionEnv, +) (pc *pickedCompaction) { + scores := p.calculateLevelScores(env.inProgressCompactions) + + // Check for a score-based compaction. candidateLevelInfos are first sorted + // by whether they should be compacted, so if we find a level which shouldn't + // be compacted, we can break early. + for i := range scores { + info := &scores[i] + if !info.shouldCompact() { + break + } + if info.level == numLevels-1 { + continue + } + + withScore := func(pc *pickedCompaction) *pickedCompaction { + // Fail-safe to protect against compacting the same sstable + // concurrently. + if pc == nil || inputRangeAlreadyCompacting(env, pc) { + return nil + } + p.addScoresToPickedCompactionMetrics(pc, scores) + pc.score = info.compensatedScoreRatio + pc.kind = compactionKindDefault + return pc + } + + if info.level == 0 { + if pc = withScore(pickL0(env, p.opts, p.vers, p.baseLevel)); pc != nil { + return pc + } + continue + } + + // info.level > 0 + var ok bool + info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum) + if !ok { + continue + } + + if pc = withScore(pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel)); pc != nil { return pc } } @@ -1426,7 +1379,12 @@ var markedForCompactionAnnotator = &manifest.Annotator[fileMetadata]{ // with various checks to ensure that the file still exists in the expected level // and isn't already being compacted. func (p *compactionPickerByScore) pickedCompactionFromCandidateFile( - candidate *fileMetadata, env compactionEnv, startLevel int, outputLevel int, kind compactionKind, + candidate *fileMetadata, + env compactionEnv, + startLevel int, + outputLevel int, + kind compactionKind, + score float64, ) *pickedCompaction { if candidate == nil || candidate.IsCompacting() { return nil @@ -1453,6 +1411,7 @@ func (p *compactionPickerByScore) pickedCompactionFromCandidateFile( pc := newPickedCompaction(p.opts, p.vers, startLevel, outputLevel, p.baseLevel) pc.kind = kind + pc.score = score pc.startLevel.files = inputs pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter()) @@ -1483,7 +1442,8 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction( if candidate.LargestSeqNum >= env.earliestSnapshotSeqNum { return nil } - return p.pickedCompactionFromCandidateFile(candidate, env, numLevels-1, numLevels-1, compactionKindElisionOnly) + score := float64(max(candidate.Stats.RangeDeletionsBytesEstimate/max(candidate.Size, 1), candidate.Stats.NumDeletions/max(candidate.Stats.NumEntries, 1))) + return p.pickedCompactionFromCandidateFile(candidate, env, numLevels-1, numLevels-1, compactionKindElisionOnly, score) } // pickRewriteCompaction attempts to construct a compaction that @@ -1492,13 +1452,17 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction( // necessary. A rewrite compaction outputs files to the same level as // the input level. func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) { + if p.vers.Stats.MarkedForCompaction == 0 { + return nil + } + for l := numLevels - 1; l >= 0; l-- { candidate := markedForCompactionAnnotator.LevelAnnotation(p.vers.Levels[l]) if candidate == nil { // Try the next level. continue } - pc := p.pickedCompactionFromCandidateFile(candidate, env, l, l, compactionKindRewrite) + pc := p.pickedCompactionFromCandidateFile(candidate, env, l, l, compactionKindRewrite, 0) if pc != nil { return pc } @@ -1551,7 +1515,11 @@ func (p *compactionPickerByScore) pickTombstoneDensityCompaction( } } - return p.pickedCompactionFromCandidateFile(candidate, env, level, defaultOutputLevel(level, p.baseLevel), compactionKindTombstoneDensity) + if candidate == nil { + return nil + } + + return p.pickedCompactionFromCandidateFile(candidate, env, level, defaultOutputLevel(level, p.baseLevel), compactionKindTombstoneDensity, candidate.Stats.TombstoneDenseBlocksRatio) } // pickAutoLPositive picks an automatic compaction for the candidate @@ -1755,7 +1723,7 @@ func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc return pc } -func pickManualCompaction( +func newPickedManualCompaction( vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction, ) (pc *pickedCompaction, retryLater bool) { outputLevel := manual.level + 1 @@ -1779,6 +1747,11 @@ func pickManualCompaction( return nil, true } pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel) + pc.kind = compactionKindDefault + pc.isManual = true + // NB: we set the score to math.MaxFloat64 so that manual compactions are + // always prioritized above automatic compactions. + pc.score = math.MaxFloat64 manual.outputLevel = pc.outputLevel.level pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end)) if pc.startLevel.files.Empty() { @@ -1909,6 +1882,10 @@ func pickReadTriggeredCompactionHelper( return nil } + // Prioritize read compactions with a smaller initial file size, since + // they will be cheaper to perform. + pc.score = -float64(outputOverlaps.SizeSum() + overlapSlice.SizeSum()) + return pc } diff --git a/compaction_picker_test.go b/compaction_picker_test.go index 481f4a6cd1..baa7e3d3b5 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -325,7 +325,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) { end: iEnd.UserKey, } - pc, retryLater := pickManualCompaction( + pc, retryLater := newPickedManualCompaction( pickerByScore.vers, opts, compactionEnv{ @@ -1413,17 +1413,15 @@ func TestCompactionPickerPickFile(t *testing.T) { d.mu.Lock() defer d.mu.Unlock() - // Use maybeScheduleCompactionPicker to take care of all of the - // initialization of the compaction-picking environment, but never - // pick a compaction; just call pickFile using the user-provided - // level. var lf manifest.LevelFile var ok bool - d.maybeScheduleCompactionPicker(func(untypedPicker compactionPicker, env compactionEnv) *pickedCompaction { - p := untypedPicker.(*compactionPickerByScore) - lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum) - return nil - }) + env := d.makeCompactionEnv() + if env == nil { + return "unable to lock the DB for compaction picking" + } + p := d.mu.versions.picker.(*compactionPickerByScore) + lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum) + d.mu.versions.logUnlock() if !ok { return "(none)" } diff --git a/compaction_test.go b/compaction_test.go index edd6283c21..8b0aef2c0a 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -2836,3 +2836,99 @@ func TestCompactionErrorStats(t *testing.T) { d.mu.Unlock() require.NoError(t, d.Close()) } + +// TestPrioritizingCompactionPool tests the behavior of a compaction pool +// with a maximum global concurrency of 1, ensuring that compactions are +// scheduled in a correctly prioritized order. +func TestPrioritizingCompactionPool(t *testing.T) { + var dbs []*DB + var buf bytes.Buffer + var compactInfo *CompactionInfo + var lastDB int + + compactionPool := NewPrioritizingCompactionPool(1) + datadriven.RunTest(t, "testdata/prioritizing_compaction_pool", + func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "define": + dbIndex := len(dbs) + opts := (&Options{ + FS: vfs.NewMem(), + DebugCheck: DebugCheckLevels, + FormatMajorVersion: internalFormatNewest, + DisableAutomaticCompactions: true, + CompactionPool: compactionPool, + EventListener: &EventListener{ + CompactionEnd: func(info CompactionInfo) { + // Fix the duration and output for determinism. + info.TotalDuration = time.Millisecond + info.Output.Tables = nil + compactInfo = &info + lastDB = dbIndex + }, + }, + }).WithFSDefaults() + d, err := runDBDefineCmd(td, opts) + if err != nil { + return err.Error() + } + d.mu.Lock() + s := d.mu.versions.currentVersion().String() + d.mu.Unlock() + dbs = append(dbs, d) + return s + + case "allow-compactions": + for _, d := range dbs { + d.opts.DisableAutomaticCompactions = false + d.mu.Lock() + d.maybeScheduleCompaction() + d.mu.Unlock() + } + return "" + + case "compact": + parts := strings.Split(td.CmdArgs[0].Key, "-") + go dbs[0].Compact([]byte(parts[0]), []byte(parts[1]), false) + return "" + + case "ingest": + numTables := 12 + for i := range numTables { + key := i % 4 + path := fmt.Sprintf("ext%d", key) + f, err := dbs[1].opts.FS.Create(path, vfs.WriteCategoryUnspecified) + require.NoError(t, err) + w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{ + TableFormat: dbs[1].FormatMajorVersion().MaxTableFormat(), + }) + require.NoError(t, w.Set([]byte(fmt.Sprint(key)), nil)) + require.NoError(t, w.Close()) + require.NoError(t, dbs[1].Ingest(context.Background(), []string{path})) + } + return "" + + case "wait-for-compactions": + buf.Reset() + compactionPool.mu.Lock() + defer compactionPool.mu.Unlock() + + for compactionPool.compactingCount > 0 { + compactionPool.cond.Wait() + fmt.Fprintf(&buf, "dbs[%d] finished a compaction: %v\n", lastDB, compactInfo) + fmt.Fprint(&buf, "in progress: ") + var numInProgress []int + for _, d := range dbs { + d.mu.Lock() + numInProgress = append(numInProgress, d.mu.compact.compactingCount) + d.mu.Unlock() + } + fmt.Fprintf(&buf, "%v\n", numInProgress) + } + return buf.String() + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} diff --git a/db.go b/db.go index 8c7b6f16e5..fb26f3a7f3 100644 --- a/db.go +++ b/db.go @@ -323,6 +323,20 @@ type DB struct { // compactionShedulers.Wait() should not be called while the DB.mu is held. compactionSchedulers sync.WaitGroup + // compactionPool is responsible for scheduling both automatic and manual + // compactions. + // + // The compactionPool may enforce a global max compaction concurrency in a + // multi-store configuration. Each Pebble store (i.e. an instance of *DB) + // has its own per-store compaction concurrency which is controlled by + // opts.MaxConcurrentCompactions. However, in a multi-store configuration, + // disk I/O is a per-store resource while CPU is shared across stores. + // A significant portion of compaction is CPU-intensive, and so + // CompactionPool may be used to ensure that excessive compactions don't + // interrupt foreground CPU tasks even if the disks are capable of handling + // the additional throughput from those compactions. + compactionPool CompactionPool + // The main mutex protecting internal DB state. This mutex encompasses many // fields because those fields need to be accessed and updated atomically. In // particular, the current version, log.*, mem.*, and snapshot list need to diff --git a/format_major_version.go b/format_major_version.go index 90ef9f0b6a..252d18c4c9 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -399,13 +399,13 @@ func (d *DB) writeFormatVersionMarker(formatVers FormatMajorVersion) error { // waiting for compactions to complete (or for slots to free up). func (d *DB) compactMarkedFilesLocked() error { curr := d.mu.versions.currentVersion() + if curr.Stats.MarkedForCompaction == 0 { + return nil + } + // Attempt to schedule a compaction to rewrite a file marked for + // compaction. + d.maybeScheduleCompaction() for curr.Stats.MarkedForCompaction > 0 { - // Attempt to schedule a compaction to rewrite a file marked for - // compaction. - d.maybeScheduleCompactionPicker(func(picker compactionPicker, env compactionEnv) *pickedCompaction { - return picker.pickRewriteCompaction(env) - }) - // The above attempt might succeed and schedule a rewrite compaction. Or // there might not be available compaction concurrency to schedule the // compaction. Or compaction of the file might have already been in diff --git a/open.go b/open.go index 46ecaa49d4..987656a11b 100644 --- a/open.go +++ b/open.go @@ -530,6 +530,8 @@ func Open(dirname string, opts *Options) (db *DB, err error) { } d.updateReadStateLocked(d.opts.DebugCheck) + d.compactionPool = opts.CompactionPool + if !d.opts.ReadOnly { // If the Options specify a format major version higher than the // loaded database's, upgrade it. If this is a new database, this diff --git a/options.go b/options.go index 21a7b92563..2c5493a7d2 100644 --- a/options.go +++ b/options.go @@ -975,6 +975,12 @@ type Options struct { // The default value is 1. MaxConcurrentCompactions func() int + // CompactionPool is responsible for scheduling both automatic and manual + // compactions. In the case of multiple DB instances (i.e. a multi-store + // configuration), a CompactionPool may be used to enforce a global maximum + // compaction concurrency. + CompactionPool CompactionPool + // MaxConcurrentDownloads specifies the maximum number of download // compactions. These are compactions that copy an external file to the local // store. @@ -1268,6 +1274,9 @@ func (o *Options) EnsureDefaults() *Options { if o.MaxConcurrentCompactions == nil { o.MaxConcurrentCompactions = func() int { return 1 } } + if o.CompactionPool == nil { + o.CompactionPool = defaultCompactionPool + } if o.MaxConcurrentDownloads == nil { o.MaxConcurrentDownloads = func() int { return 1 } } diff --git a/snapshot.go b/snapshot.go index 1be35931c4..6ebf88c090 100644 --- a/snapshot.go +++ b/snapshot.go @@ -117,7 +117,7 @@ func (s *Snapshot) closeLocked() error { // If s was the previous earliest snapshot, we might be able to reclaim // disk space by dropping obsolete records that were pinned by s. if e := s.db.mu.snapshots.earliest(); e > s.seqNum { - s.db.maybeScheduleCompactionPicker(pickElisionOnly) + s.db.maybeScheduleCompaction() } s.db = nil return nil diff --git a/testdata/prioritizing_compaction_pool b/testdata/prioritizing_compaction_pool new file mode 100644 index 0000000000..0a37d32bde --- /dev/null +++ b/testdata/prioritizing_compaction_pool @@ -0,0 +1,116 @@ +# Define 3 DBs and ensure compactions occur in the correctly prioritized order. +# - dbs[0] has overlapping sstables in L5 and L6 and will be manually compacted. +# - dbs[1] is empty initially but will have overlapping files ingested into it, +# triggering automatic compactions. +# - dbs[2] has multiple sstables in L6 which only contain DELs, which triggers +# an elision-only compaction. + +# dbs[0] +define +L5 + a.SET.2: + b.SET.2: +L5 + c.SET.2: + d.SET.2: +L5 + e.SET.2: + f.SET.2: +L6 + a.SET.1: + b.SET.1: +L6 + c.SET.1: + d.SET.1: +L6 + e.SET.1: + f.SET.1: +---- +L5: + 000004:[a#2,SET-b#2,SET] + 000005:[c#2,SET-d#2,SET] + 000006:[e#2,SET-f#2,SET] +L6: + 000007:[a#1,SET-b#1,SET] + 000008:[c#1,SET-d#1,SET] + 000009:[e#1,SET-f#1,SET] + +# dbs[1] +define +---- + +ingest +---- + +# dbs[2] +define +L6 + a.DEL.1: +L6 + b.DEL.2: +L6 + c.DEL.3: +L6 + d.DEL.4: +L6 + e.DEL.5: +L6 + f.DEL.6: +---- +L6: + 000004:[a#1,DEL-a#1,DEL] + 000005:[b#2,DEL-b#2,DEL] + 000006:[c#3,DEL-c#3,DEL] + 000007:[d#4,DEL-d#4,DEL] + 000008:[e#5,DEL-e#5,DEL] + 000009:[f#6,DEL-f#6,DEL] + +# Enable automatic compactions on all DBs. +allow-compactions +---- + +# Asynchronously start manual compactions for dbs[0]. +compact a-b +---- + +compact c-d +---- + +compact e-f +---- + +# The compaction pool has a maximum concurrency of 1. +# +# Since automatic compactions were enabled first, a default compaction from +# dbs[1] should be first to complete. Second, all of the manual compactions from +# dbs[0] should be prioritized. Third, the remaining default compactions +# resulting from files ingested into dbs[1]. Finally, the elision-only compactions +# from dbs[2] should +wait-for-compactions +---- +dbs[1] finished a compaction: [JOB 15] compacted(default) L0 [000008 000012] (1.2KB) Score=100.00 + L6 [000004] (590B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [1 0 0] +dbs[0] finished a compaction: [JOB 4] compacted(default) L5 [000004] (594B) Score=0.00 + L6 [000007] (594B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [1 0 0] +dbs[0] finished a compaction: [JOB 5] compacted(default) L5 [000005] (594B) Score=0.00 + L6 [000008] (594B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [1 0 0] +dbs[0] finished a compaction: [JOB 6] compacted(default) L5 [000006] (594B) Score=0.00 + L6 [000009] (594B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 1 0] +dbs[1] finished a compaction: [JOB 16] compacted(default) L0 [000009 000013] (1.2KB) Score=100.00 + L6 [000005] (590B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 1 0] +dbs[1] finished a compaction: [JOB 17] compacted(default) L0 [000010 000014] (1.2KB) Score=100.00 + L6 [000006] (590B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 1 0] +dbs[1] finished a compaction: [JOB 18] compacted(default) L0 [000011 000015] (1.2KB) Score=100.00 + L6 [000007] (590B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 1] +dbs[2] finished a compaction: [JOB 5] compacted(elision-only) L6 [000004] (628B) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 1] +dbs[2] finished a compaction: [JOB 6] compacted(elision-only) L6 [000005] (628B) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 1] +dbs[2] finished a compaction: [JOB 7] compacted(elision-only) L6 [000006] (628B) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 1] +dbs[2] finished a compaction: [JOB 8] compacted(elision-only) L6 [000007] (628B) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 1] +dbs[2] finished a compaction: [JOB 9] compacted(elision-only) L6 [000008] (628B) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 1] +dbs[2] finished a compaction: [JOB 10] compacted(elision-only) L6 [000009] (628B) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 0]