diff --git a/compaction.go b/compaction.go index 8af9643277..5ac37226c5 100644 --- a/compaction.go +++ b/compaction.go @@ -53,6 +53,9 @@ func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64 // compactions to half of available disk space. Note that this will not // prevent compaction picking from pursuing compactions that are larger // than this threshold before expansion. + // + // NB: this heuristic is an approximation since we may run more compactions + // than MaxConcurrentCompactions. diskMax := (availBytes / 2) / uint64(opts.MaxConcurrentCompactions()) if v > diskMax { v = diskMax @@ -267,7 +270,7 @@ type compaction struct { pickerMetrics compactionPickerMetrics - slot base.CompactionSlot + grantHandle CompactionGrantHandle } // inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any @@ -336,7 +339,7 @@ func newCompaction( opts *Options, beganAt time.Time, provider objstorage.Provider, - slot base.CompactionSlot, + grantHandle CompactionGrantHandle, ) *compaction { c := &compaction{ kind: compactionKindDefault, @@ -353,17 +356,13 @@ func newCompaction( maxOutputFileSize: pc.maxOutputFileSize, maxOverlapBytes: pc.maxOverlapBytes, pickerMetrics: pc.pickerMetrics, - slot: slot, + grantHandle: grantHandle, } c.startLevel = &c.inputs[0] if pc.startLevel.l0SublevelInfo != nil { c.startLevel.l0SublevelInfo = pc.startLevel.l0SublevelInfo } c.outputLevel = &c.inputs[1] - if c.slot == nil { - c.slot = opts.Experimental.CompactionLimiter.TookWithoutPermission(context.TODO()) - c.slot.CompactionSelected(c.startLevel.level, c.outputLevel.level, c.startLevel.files.SizeSum()) - } if len(pc.extraLevels) > 0 { c.extraLevels = pc.extraLevels @@ -432,6 +431,7 @@ func newDeleteOnlyCompaction( inputs: inputs, deletionHints: hints, exciseEnabled: exciseEnabled, + grantHandle: singletonNoopGrantHandle, } // Set c.smallest, c.largest. @@ -526,22 +526,11 @@ func newFlush( maxOutputFileSize: math.MaxUint64, maxOverlapBytes: math.MaxUint64, flushing: flushing, + grantHandle: singletonNoopGrantHandle, } c.startLevel = &c.inputs[0] c.outputLevel = &c.inputs[1] - // Flush slots are always taken without permission. - // - // NB: CompactionLimiter defaults to a no-op limiter unless one is implemented - // and passed-in as an option during Open. - slot := opts.Experimental.CompactionLimiter.TookWithoutPermission(context.TODO()) - var flushingSize uint64 - for i := range flushing { - flushingSize += flushing[i].totalBytes() - } - slot.CompactionSelected(-1, 0, flushingSize) - c.slot = slot - if len(flushing) > 0 { if _, ok := flushing[0].flushable.(*ingestedFlushable); ok { if len(flushing) != 1 { @@ -613,15 +602,11 @@ func newFlush( updatePointBounds(f.newIter(nil)) if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil { if err := updateRangeBounds(rangeDelIter); err != nil { - c.slot.Release(0) - c.slot = nil return nil, err } } if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil { if err := updateRangeBounds(rangeKeyIter); err != nil { - c.slot.Release(0) - c.slot = nil return nil, err } } @@ -986,8 +971,9 @@ func (c *compaction) String() string { } type manualCompaction struct { - // Count of the retries either due to too many concurrent compactions, or a - // concurrent compaction to overlapping levels. + // id is for internal bookkeeping. + id uint64 + // Count of the retries due to concurrent compaction to overlapping levels. retries int level int outputLevel int @@ -1050,9 +1036,6 @@ func (d *DB) addInProgressCompaction(c *compaction) { // have completed by this point. func (d *DB) clearCompactingState(c *compaction, rollback bool) { c.versionEditApplied = true - if c.slot != nil { - panic("pebble: compaction slot should have been released before clearing compacting state") - } for _, cl := range c.inputs { iter := cl.files.Iter() for f := iter.First(); f != nil; f = iter.Next() { @@ -1088,7 +1071,14 @@ func (d *DB) clearCompactingState(c *compaction, rollback bool) { // all the indices in TableMetadata to be inaccurate. To ensure this, // grab the manifest lock. d.mu.versions.logLock() - defer d.mu.versions.logUnlock() + // It is a bit peculiar that we are fiddling with th current version state + // in a separate critical section from when this version was installed. + // But this fiddling is necessary if the compaction failed. When the + // compaction succeeded, we've already done this in logAndApply, so this + // seems redundant. Anyway, we clear the pickedCompactionCache since we + // may be able to pick a better compaction (though when this compaction + // succeeded we've also cleared the cache in logAndApply). + defer d.mu.versions.logUnlockAndInvalidatePickedCompactionCache() d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress) }() } @@ -1243,8 +1233,8 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) { panic("pebble: ingestedFlushable must be flushed one at a time.") } defer func() { - c.slot.Release(0 /* totalBytesWritten */) - c.slot = nil + c.grantHandle.Done() + c.grantHandle = nil }() // Construct the VersionEdit, levelMetrics etc. @@ -1574,8 +1564,9 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { info.Err = err } } else { - // We won't be performing the logAndApply step because of the error, - // so logUnlock. + // We won't be performing the logAndApply step because of the error, so + // logUnlock. We don't need to invalidate the pickedCompactionCache since + // the flush failed and so the latest version has not changed. d.mu.versions.logUnlock() } @@ -1702,236 +1693,281 @@ func (d *DB) maybeScheduleCompactionAsync() { // maybeScheduleCompaction schedules a compaction if necessary. // -// d.mu must be held when calling this. +// Requires d.mu to be held. func (d *DB) maybeScheduleCompaction() { - d.maybeScheduleCompactionPicker(pickAuto) + d.mu.versions.logLock() + defer d.mu.versions.logUnlock() + env := d.makeCompactionEnvLocked() + if env == nil { + return + } + // env.inProgressCompactions will become stale once we pick a compaction, so + // it needs to be kept fresh. Also, the pickedCompaction in the + // pickedCompactionCache is not valid if we pick a compaction before using + // it, since those earlier compactions can mark the same file as compacting. + + // Delete-only compactions are expected to be cheap and reduce future + // compaction work, so schedule them directly instead of using the + // CompactionScheduler. + if d.tryScheduleDeleteOnlyCompaction() { + env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil) + d.mu.versions.pickedCompactionCache.invalidate() + } + // Download compactions have their own concurrency and do not currently + // interact with CompactionScheduler. + // + // TODO(sumeer): integrate with CompactionScheduler, since these consume + // disk write bandwidth. + if d.tryScheduleDownloadCompactions(*env, d.opts.MaxConcurrentDownloads()) { + env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil) + d.mu.versions.pickedCompactionCache.invalidate() + } + // The remaining compactions are scheduled by the CompactionScheduler. + if d.mu.versions.pickedCompactionCache.isWaiting() { + // CompactionScheduler already knows that the DB is waiting to run a + // compaction. + return + } + // INVARIANT: !pickedCompactionCache.isWaiting. The following loop will + // either exit after successfully starting all the compactions it can pick, + // or will exit with one pickedCompaction in the cache, and isWaiting=true. + for { + // Do not have a pickedCompaction in the cache. + pc := d.pickAnyCompaction(*env) + if pc == nil { + return + } + success, grantHandle := d.opts.Experimental.CompactionScheduler.TrySchedule() + if !success { + // Can't run now, but remember this pickedCompaction in the cache. + d.mu.versions.pickedCompactionCache.add(pc) + return + } + d.runPickedCompaction(pc, grantHandle) + env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil) + } } -func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction { - return picker.pickAuto(env) +// makeCompactionEnv attempts to create a compactionEnv necessary during +// compaction picking. If the DB is closed or marked as read-only, +// makeCompactionEnv returns nil to indicate that compactions may not be +// performed. Else, a new compactionEnv is constructed using the current DB +// state. +// +// Compaction picking needs a coherent view of a Version. For example, we need +// to exclude concurrent ingestions from making a decision on which level to +// ingest into that conflicts with our compaction decision. +// +// A pickedCompaction constructed using a compactionEnv must only be used if +// the latest Version has not changed. +// +// REQUIRES: d.mu and d.mu.versions.logLock are held. +func (d *DB) makeCompactionEnvLocked() *compactionEnv { + if d.closed.Load() != nil || d.opts.ReadOnly { + return nil + } + return &compactionEnv{ + diskAvailBytes: d.diskAvailBytes.Load(), + earliestSnapshotSeqNum: d.mu.snapshots.earliest(), + earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(), + inProgressCompactions: d.getInProgressCompactionInfoLocked(nil), + readCompactionEnv: readCompactionEnv{ + readCompactions: &d.mu.compact.readCompactions, + flushing: d.mu.compact.flushing || d.passedFlushThreshold(), + rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction, + }, + } } -func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction { - return picker.pickElisionOnlyCompaction(env) +// pickAnyCompaction tries to pick a manual or automatic compaction. +func (d *DB) pickAnyCompaction(env compactionEnv) (pc *pickedCompaction) { + pc = d.pickManualCompaction(env) + if pc == nil && !d.opts.DisableAutomaticCompactions { + pc = d.mu.versions.picker.pickAuto(env) + } + return pc } -// tryScheduleDownloadCompaction tries to start a download compaction. -// -// Returns true if we started a download compaction (or completed it -// immediately because it is a no-op or we hit an error). +// runPickedCompaction kicks off the provided pickedCompaction. In case the +// pickedCompaction is a manual compaction, the corresponding manualCompaction +// is removed from d.mu.compact.manual. // -// Requires d.mu to be held. Updates d.mu.compact.downloads. -func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownloads int) bool { - vers := d.mu.versions.currentVersion() - for i := 0; i < len(d.mu.compact.downloads); { - download := d.mu.compact.downloads[i] - switch d.tryLaunchDownloadCompaction(download, vers, env, maxConcurrentDownloads) { - case launchedCompaction: - return true - case didNotLaunchCompaction: - // See if we can launch a compaction for another download task. - i++ - case downloadTaskCompleted: - // Task is completed and must be removed. - d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1) +// REQUIRES: d.mu and d.mu.versions.logLock is held. +func (d *DB) runPickedCompaction(pc *pickedCompaction, grantHandle CompactionGrantHandle) { + var doneChannel chan error + if pc.manualID > 0 { + for i := range d.mu.compact.manual { + if d.mu.compact.manual[i].id == pc.manualID { + doneChannel = d.mu.compact.manual[i].done + d.mu.compact.manual = slices.Delete(d.mu.compact.manual, i, i+1) + d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual))) + break + } + } + if doneChannel == nil { + panic(errors.AssertionFailedf("did not find manual compaction with id %d", pc.manualID)) } } - return false + + d.mu.compact.compactingCount++ + compaction := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider(), grantHandle) + d.addInProgressCompaction(compaction) + go func() { + d.compact(compaction, doneChannel) + }() } -// maybeScheduleCompactionPicker schedules a compaction if necessary, -// calling `pickFunc` to pick automatic compactions. -// -// Requires d.mu to be held. -func (d *DB) maybeScheduleCompactionPicker( - pickFunc func(compactionPicker, compactionEnv) *pickedCompaction, -) { - if d.closed.Load() != nil || d.opts.ReadOnly { - return +// Schedule implements DBForCompaction (it is called by the +// CompactionScheduler). +func (d *DB) Schedule(grantHandle CompactionGrantHandle) bool { + d.mu.Lock() + defer d.mu.Unlock() + d.mu.versions.logLock() + defer d.mu.versions.logUnlock() + isWaiting := d.mu.versions.pickedCompactionCache.isWaiting() + if !isWaiting { + return false } - maxCompactions := d.opts.MaxConcurrentCompactions() - maxDownloads := d.opts.MaxConcurrentDownloads() - - if d.mu.compact.compactingCount >= maxCompactions && - (len(d.mu.compact.downloads) == 0 || d.mu.compact.downloadingCount >= maxDownloads) { - if len(d.mu.compact.manual) > 0 { - // Inability to run head blocks later manual compactions. - d.mu.compact.manual[0].retries++ + pc := d.mu.versions.pickedCompactionCache.getForRunning() + if pc == nil { + env := d.makeCompactionEnvLocked() + if env != nil { + pc = d.pickAnyCompaction(*env) + } + if pc == nil { + d.mu.versions.pickedCompactionCache.setNotWaiting() + return false } - return } + // INVARIANT: pc != nil and is not in the cache. isWaiting is true, since + // there may be more compactions to run. + d.runPickedCompaction(pc, grantHandle) + return true +} - // Compaction picking needs a coherent view of a Version. In particular, we - // need to exclude concurrent ingestions from making a decision on which level - // to ingest into that conflicts with our compaction - // decision. versionSet.logLock provides the necessary mutual exclusion. +// GetWaitingCompaction implements DBForCompaction (it is called by the +// CompactionScheduler). +func (d *DB) GetWaitingCompaction() (bool, WaitingCompaction) { + d.mu.Lock() + defer d.mu.Unlock() d.mu.versions.logLock() defer d.mu.versions.logUnlock() - - // Check for the closed flag again, in case the DB was closed while we were - // waiting for logLock(). - if d.closed.Load() != nil { - return + isWaiting := d.mu.versions.pickedCompactionCache.isWaiting() + if !isWaiting { + return false, WaitingCompaction{} } - - env := compactionEnv{ - diskAvailBytes: d.diskAvailBytes.Load(), - earliestSnapshotSeqNum: d.mu.snapshots.earliest(), - earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(), + pc := d.mu.versions.pickedCompactionCache.peek() + if pc == nil { + // Need to pick a compaction. + env := d.makeCompactionEnvLocked() + if env != nil { + pc = d.pickAnyCompaction(*env) + } + if pc == nil { + // Call setNotWaiting so that next call to GetWaitingCompaction can + // return early. + d.mu.versions.pickedCompactionCache.setNotWaiting() + return false, WaitingCompaction{} + } else { + d.mu.versions.pickedCompactionCache.add(pc) + } } + // INVARIANT: pc != nil and is in the cache. + return true, makeWaitingCompaction(pc.manualID > 0, pc.kind, pc.score) +} - if d.mu.compact.compactingCount < maxCompactions { - // Check for delete-only compactions first, because they're expected to be - // cheap and reduce future compaction work. - if !d.opts.private.disableDeleteOnlyCompactions && - !d.opts.DisableAutomaticCompactions && - len(d.mu.compact.deletionHints) > 0 { - d.tryScheduleDeleteOnlyCompaction() - } +// GetAllowedWithoutPermission implements DBForCompaction (it is called by the +// CompactionScheduler). +func (d *DB) GetAllowedWithoutPermission() int { + allowedBasedOnBacklog := int(d.mu.versions.curCompactionConcurrency.Load()) + allowedBasedOnManual := 0 + manualBacklog := int(d.mu.compact.manualLen.Load()) + if manualBacklog > 0 { + maxAllowed := d.opts.MaxConcurrentCompactions() + allowedBasedOnManual = min(maxAllowed, manualBacklog+allowedBasedOnBacklog) + } + return max(allowedBasedOnBacklog, allowedBasedOnManual) +} - for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < maxCompactions { - if manual := d.mu.compact.manual[0]; !d.tryScheduleManualCompaction(env, manual) { - // Inability to run head blocks later manual compactions. - manual.retries++ - break - } - d.mu.compact.manual = d.mu.compact.manual[1:] +// tryScheduleDownloadCompactions tries to start download compactions. +// +// Requires d.mu to be held. Updates d.mu.compact.downloads. +// +// Returns true iff at least one compaction was started. +func (d *DB) tryScheduleDownloadCompactions(env compactionEnv, maxConcurrentDownloads int) bool { + started := false + vers := d.mu.versions.currentVersion() + for i := 0; i < len(d.mu.compact.downloads); { + if d.mu.compact.downloadingCount >= maxConcurrentDownloads { + break } - - for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxCompactions && - d.tryScheduleAutoCompaction(env, pickFunc) { + download := d.mu.compact.downloads[i] + switch d.tryLaunchDownloadCompaction(download, vers, env, maxConcurrentDownloads) { + case launchedCompaction: + started = true + continue + case didNotLaunchCompaction: + // See if we can launch a compaction for another download task. + i++ + case downloadTaskCompleted: + // Task is completed and must be removed. + d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1) } } + return started +} - for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads && - d.tryScheduleDownloadCompaction(env, maxDownloads) { +func (d *DB) pickManualCompaction(env compactionEnv) (pc *pickedCompaction) { + v := d.mu.versions.currentVersion() + for len(d.mu.compact.manual) > 0 { + manual := d.mu.compact.manual[0] + pc, retryLater := newPickedManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual) + if pc != nil { + return pc + } + if retryLater { + // We are not able to run this manual compaction at this time. + // Inability to run the head blocks later manual compactions. + manual.retries++ + return nil + } + // Manual compaction is a no-op. Signal that it's complete. + manual.done <- nil + d.mu.compact.manual = d.mu.compact.manual[1:] + d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual))) } + return nil } // tryScheduleDeleteOnlyCompaction tries to kick off a delete-only compaction // for all files that can be deleted as suggested by deletionHints. // // Requires d.mu to be held. Updates d.mu.compact.deletionHints. -func (d *DB) tryScheduleDeleteOnlyCompaction() { +// +// Returns true iff a compaction was started. +func (d *DB) tryScheduleDeleteOnlyCompaction() bool { + if d.opts.private.disableDeleteOnlyCompactions || d.opts.DisableAutomaticCompactions || + d.mu.compact.compactingCount >= d.opts.MaxConcurrentCompactions() || + len(d.mu.compact.deletionHints) == 0 { + return false + } v := d.mu.versions.currentVersion() snapshots := d.mu.snapshots.toSlice() // We need to save the value of exciseEnabled in the compaction itself, as // it can change dynamically between now and when the compaction runs. exciseEnabled := d.FormatMajorVersion() >= FormatVirtualSSTables && d.opts.Experimental.EnableDeleteOnlyCompactionExcises != nil && d.opts.Experimental.EnableDeleteOnlyCompactionExcises() - // NB: CompactionLimiter defaults to a no-op limiter unless one is implemented - // and passed-in as an option during Open. - limiter := d.opts.Experimental.CompactionLimiter - var slot base.CompactionSlot - // TODO(bilal): Should we always take a slot without permission? - if n := len(d.getInProgressCompactionInfoLocked(nil)); n == 0 { - // We are not running a compaction at the moment. We should take a compaction slot - // without permission. - slot = limiter.TookWithoutPermission(context.TODO()) - } else { - var err error - slot, err = limiter.RequestSlot(context.TODO()) - if err != nil { - d.opts.EventListener.BackgroundError(err) - return - } - if slot == nil { - // The limiter is denying us a compaction slot. Yield to other work. - return - } - } inputs, resolvedHints, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots, exciseEnabled) d.mu.compact.deletionHints = unresolvedHints if len(inputs) > 0 { c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow(), resolvedHints, exciseEnabled) - c.slot = slot d.mu.compact.compactingCount++ d.addInProgressCompaction(c) go d.compact(c, nil) - } else { - slot.Release(0 /* totalBytesWritten */) + return true } -} - -// tryScheduleManualCompaction tries to kick off the given manual compaction. -// -// Returns false if we are not able to run this compaction at this time. -// -// Requires d.mu to be held. -func (d *DB) tryScheduleManualCompaction(env compactionEnv, manual *manualCompaction) bool { - v := d.mu.versions.currentVersion() - env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil) - pc, retryLater := pickManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual) - if pc == nil { - if !retryLater { - // Manual compaction is a no-op. Signal completion and exit. - manual.done <- nil - return true - } - // We are not able to run this manual compaction at this time. - return false - } - - c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider(), nil /* compactionSlot */) - d.mu.compact.compactingCount++ - d.addInProgressCompaction(c) - go d.compact(c, manual.done) - return true -} - -// tryScheduleAutoCompaction tries to kick off an automatic compaction. -// -// Returns false if no automatic compactions are necessary or able to run at -// this time. -// -// Requires d.mu to be held. -func (d *DB) tryScheduleAutoCompaction( - env compactionEnv, pickFunc func(compactionPicker, compactionEnv) *pickedCompaction, -) bool { - env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil) - env.readCompactionEnv = readCompactionEnv{ - readCompactions: &d.mu.compact.readCompactions, - flushing: d.mu.compact.flushing || d.passedFlushThreshold(), - rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction, - } - // NB: CompactionLimiter defaults to a no-op limiter unless one is implemented - // and passed-in as an option during Open. - limiter := d.opts.Experimental.CompactionLimiter - var slot base.CompactionSlot - if n := len(env.inProgressCompactions); n == 0 { - // We are not running a compaction at the moment. We should take a compaction slot - // without permission. - slot = limiter.TookWithoutPermission(context.TODO()) - } else { - var err error - slot, err = limiter.RequestSlot(context.TODO()) - if err != nil { - d.opts.EventListener.BackgroundError(err) - return false - } - if slot == nil { - // The limiter is denying us a compaction slot. Yield to other work. - return false - } - } - pc := pickFunc(d.mu.versions.picker, env) - if pc == nil { - slot.Release(0 /* bytesWritten */) - return false - } - var inputSize uint64 - for i := range pc.inputs { - inputSize += pc.inputs[i].files.SizeSum() - } - slot.CompactionSelected(pc.startLevel.level, pc.outputLevel.level, inputSize) - - // Responsibility for releasing slot passes over to the compaction. - c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider(), slot) - d.mu.compact.compactingCount++ - d.addInProgressCompaction(c) - go d.compact(c, nil) - return true + return false } // deleteCompactionHintType indicates whether the deleteCompactionHint was @@ -2267,7 +2303,7 @@ func (d *DB) compactionPprofLabels(c *compaction) pprof.LabelSet { func (d *DB) compact(c *compaction, errChannel chan error) { pprof.Do(context.Background(), d.compactionPprofLabels(c), func(context.Context) { d.mu.Lock() - defer d.mu.Unlock() + c.grantHandle.Started() if err := d.compact1(c, errChannel); err != nil { // TODO(peter): count consecutive compaction errors and backoff. d.opts.EventListener.BackgroundError(err) @@ -2283,11 +2319,28 @@ func (d *DB) compact(c *compaction, errChannel chan error) { // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not // miss or double count a completing compaction's duration. d.mu.compact.duration += d.timeNow().Sub(c.beganAt) - - // The previous compaction may have produced too many files in a - // level, so reschedule another compaction if needed. + d.mu.Unlock() + // Done must not be called while holding any lock that needs to be + // acquired by Schedule. Also, it must be called after new Version has + // been installed, and metadata related to compactingCount and inProgress + // compactions has been updated. This is because when we are running at + // the limit of permitted compactions, Done can cause the + // CompactionScheduler to schedule another compaction. Note that the only + // compactions that may be scheduled by Done are those integrated with the + // CompactionScheduler. + c.grantHandle.Done() + c.grantHandle = nil + // The previous compaction may have produced too many files in a level, so + // reschedule another compaction if needed. + // + // The preceding Done call will not necessarily cause a compaction to be + // scheduled, so we also need to call maybeScheduleCompaction. And + // maybeScheduleCompaction encompasses all compactions, and not only those + // scheduled via the CompactionScheduler. + d.mu.Lock() d.maybeScheduleCompaction() d.mu.compact.cond.Broadcast() + d.mu.Unlock() }) } @@ -2379,8 +2432,10 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { // created, as d.runCompaction did not do that. d.cleanupVersionEdit(ve) // logAndApply calls logUnlock. If we didn't call it, we need to call - // logUnlock ourselves. - d.mu.versions.logUnlock() + // logUnlock ourselves. We also invalidate the pickedCompactionCache + // since this failed compaction may be the highest priority to run + // next. + d.mu.versions.logUnlockAndInvalidatePickedCompactionCache() return err } return d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo { @@ -2867,10 +2922,6 @@ func (d *DB) runMoveCompaction( func (d *DB) runCompaction( jobID JobID, c *compaction, ) (ve *versionEdit, stats compact.Stats, retErr error) { - defer func() { - c.slot.Release(stats.CumulativeWrittenSize) - c.slot = nil - }() if c.cancel.Load() { return ve, stats, ErrCancelledCompaction } @@ -3039,8 +3090,7 @@ func (d *DB) compactAndWrite( Grandparents: c.grandparents, MaxGrandparentOverlapBytes: c.maxOverlapBytes, TargetOutputFileSize: c.maxOutputFileSize, - Slot: c.slot, - IteratorStats: &c.stats, + GrantHandle: c.grantHandle, } runner := compact.NewRunner(runnerCfg, iter) for runner.MoreDataToWrite() { diff --git a/compaction_picker.go b/compaction_picker.go index 2464b2a3ce..0b03105fbf 100644 --- a/compaction_picker.go +++ b/compaction_picker.go @@ -192,6 +192,9 @@ type pickedCompaction struct { score float64 // kind indicates the kind of compaction. kind compactionKind + // manualID > 0 iff this is a manual compaction. It exists solely for + // internal bookkeeping. + manualID uint64 // startLevel is the level that is being compacted. Inputs from startLevel // and outputLevel will be merged to produce a set of outputLevel files. startLevel *compactionLevel @@ -1153,16 +1156,8 @@ func responsibleForGarbageBytes( return uint64(totalGarbage) / uint64(useCount) } -// pickAuto picks the best compaction, if any. -// -// On each call, pickAuto computes per-level size adjustments based on -// in-progress compactions, and computes a per-level score. The levels are -// iterated over in decreasing score order trying to find a valid compaction -// anchored at that level. -// -// If a score-based compaction cannot be found, pickAuto falls back to looking -// for an elision-only compaction to remove obsolete keys. -func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) { +func (p *compactionPickerByScore) getCompactionConcurrency() int { + maxConcurrentCompactions := p.opts.MaxConcurrentCompactions() // Compaction concurrency is controlled by L0 read-amp. We allow one // additional compaction per L0CompactionConcurrency sublevels, as well as // one additional compaction per CompactionDebtConcurrency bytes of @@ -1171,16 +1166,44 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact // debt as a second signal to prevent compaction concurrency from dropping // significantly right after a base compaction finishes, and before those // bytes have been compacted further down the LSM. - if n := len(env.inProgressCompactions); n > 0 { + // + // Let n be the number of in-progress compactions. + // + // l0ReadAmp >= ccSignal1 then can run another compaction, where + // ccSignal1 = n * p.opts.Experimental.L0CompactionConcurrency + // Rearranging, + // n <= l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency. + // So we can run up to + // l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency + 1 compactions + l0ReadAmpCompactions := 1 + if p.opts.Experimental.L0CompactionConcurrency > 0 { l0ReadAmp := p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions() + l0ReadAmpCompactions = (l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency) + 1 + } + // compactionDebt >= ccSignal2 then can run another compaction, where + // ccSignal2 = uint64(n) * p.opts.Experimental.CompactionDebtConcurrency + // Rearranging, + // n <= compactionDebt / p.opts.Experimental.CompactionDebtConcurrency + // So we can run up to + // compactionDebt / p.opts.Experimental.CompactionDebtConcurrency + 1 compactions. + compactionDebtCompactions := 1 + if p.opts.Experimental.CompactionDebtConcurrency > 0 { compactionDebt := p.estimatedCompactionDebt(0) - ccSignal1 := n * p.opts.Experimental.L0CompactionConcurrency - ccSignal2 := uint64(n) * p.opts.Experimental.CompactionDebtConcurrency - if l0ReadAmp < ccSignal1 && compactionDebt < ccSignal2 { - return nil - } + compactionDebtCompactions = int(compactionDebt/p.opts.Experimental.CompactionDebtConcurrency) + 1 } + return max(min(maxConcurrentCompactions, max(l0ReadAmpCompactions, compactionDebtCompactions)), 1) +} +// pickAuto picks the best compaction, if any. +// +// On each call, pickAuto computes per-level size adjustments based on +// in-progress compactions, and computes a per-level score. The levels are +// iterated over in decreasing score order trying to find a valid compaction +// anchored at that level. +// +// If a score-based compaction cannot be found, pickAuto falls back to looking +// for an elision-only compaction to remove obsolete keys. +func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) { scores := p.calculateLevelScores(env.inProgressCompactions) // TODO(bananabrick): Either remove, or change this into an event sent to the @@ -1489,6 +1512,9 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction( // necessary. A rewrite compaction outputs files to the same level as // the input level. func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) { + if p.vers.Stats.MarkedForCompaction == 0 { + return nil + } for l := numLevels - 1; l >= 0; l-- { candidate := markedForCompactionAnnotator.LevelAnnotation(p.vers.Levels[l]) if candidate == nil { @@ -1766,7 +1792,7 @@ func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc return pc } -func pickManualCompaction( +func newPickedManualCompaction( vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction, ) (pc *pickedCompaction, retryLater bool) { outputLevel := manual.level + 1 @@ -1790,6 +1816,7 @@ func pickManualCompaction( return nil, true } pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel) + pc.manualID = manual.id manual.outputLevel = pc.outputLevel.level pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end)) if pc.startLevel.files.Empty() { diff --git a/compaction_picker_test.go b/compaction_picker_test.go index 9667aa4ba9..6a008b94c3 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -188,6 +188,15 @@ func TestCompactionPickerTargetLevel(t *testing.T) { } } + pickAuto := func(env compactionEnv, pickerByScore *compactionPickerByScore) *pickedCompaction { + inProgressCompactions := len(env.inProgressCompactions) + allowedCompactions := pickerByScore.getCompactionConcurrency() + if inProgressCompactions >= allowedCompactions { + return nil + } + return pickerByScore.pickAuto(env) + } + datadriven.RunTest(t, "testdata/compaction_picker_target_level", func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { @@ -203,6 +212,14 @@ func TestCompactionPickerTargetLevel(t *testing.T) { // : [compensation] var errMsg string vers, opts, errMsg = loadVersion(t, d) + opts.MaxConcurrentCompactions = func() int { + // This test only limits the count based on the L0 read amp and + // compaction debt, so we would like to return math.MaxInt. But we + // don't since it is also used in expandedCompactionByteSizeLimit, + // and causes the expanded bytes to reduce. The test cases never + // pick more than 4 compactions, so we use 4. + return 4 + } if errMsg != "" { return errMsg } @@ -231,7 +248,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) { earliestUnflushedSeqNum: base.SeqNumMax, inProgressCompactions: inProgress, } - pc := pickerByScore.pickAuto(env) + pc := pickAuto(env, pickerByScore) if pc == nil { break } @@ -299,10 +316,10 @@ func TestCompactionPickerTargetLevel(t *testing.T) { var b strings.Builder fmt.Fprintf(&b, "Initial state before pick:\n%s", runVersionFileSizes(vers)) - pc := pickerByScore.pickAuto(compactionEnv{ + pc := pickAuto(compactionEnv{ earliestUnflushedSeqNum: base.SeqNumMax, inProgressCompactions: inProgress, - }) + }, pickerByScore) if pc != nil { fmt.Fprintf(&b, "Picked: L%d->L%d: %0.1f\n", pc.startLevel.level, pc.outputLevel.level, pc.score) } @@ -326,7 +343,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) { end: iEnd.UserKey, } - pc, retryLater := pickManualCompaction( + pc, retryLater := newPickedManualCompaction( pickerByScore.vers, opts, compactionEnv{ @@ -540,7 +557,7 @@ func TestCompactionPickerL0(t *testing.T) { var result strings.Builder if pc != nil { checkClone(t, pc) - c := newCompaction(pc, opts, time.Now(), nil /* provider */, nil /* slot */) + c := newCompaction(pc, opts, time.Now(), nil /* provider */, singletonNoopGrantHandle) fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level) fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files)) if !pc.outputLevel.files.Empty() { @@ -587,6 +604,7 @@ func TestCompactionPickerL0(t *testing.T) { func TestCompactionPickerConcurrency(t *testing.T) { opts := DefaultOptions() opts.Experimental.L0CompactionConcurrency = 1 + opts.MaxConcurrentCompactions = func() int { return 4 } parseMeta := func(s string) (*tableMetadata, error) { parts := strings.Split(s, ":") @@ -755,13 +773,20 @@ func TestCompactionPickerConcurrency(t *testing.T) { td.MaybeScanArgs(t, "l0_compaction_concurrency", &opts.Experimental.L0CompactionConcurrency) td.MaybeScanArgs(t, "compaction_debt_concurrency", &opts.Experimental.CompactionDebtConcurrency) - pc := picker.pickAuto(compactionEnv{ + env := compactionEnv{ earliestUnflushedSeqNum: math.MaxUint64, inProgressCompactions: inProgressCompactions, - }) + } + inProgressCount := len(env.inProgressCompactions) + allowedCompactions := picker.getCompactionConcurrency() + var pc *pickedCompaction + if inProgressCount < allowedCompactions { + pc = picker.pickAuto(env) + } var result strings.Builder + fmt.Fprintf(&result, "picker.getCompactionConcurrency: %d\n", allowedCompactions) if pc != nil { - c := newCompaction(pc, opts, time.Now(), nil /* provider */, nil /* slot */) + c := newCompaction(pc, opts, time.Now(), nil /* provider */, singletonNoopGrantHandle) fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level) fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files)) if !pc.outputLevel.files.Empty() { @@ -771,7 +796,7 @@ func TestCompactionPickerConcurrency(t *testing.T) { fmt.Fprintf(&result, "grandparents: %s\n", fileNums(c.grandparents)) } } else { - return "nil" + fmt.Fprintf(&result, "nil") } return result.String() } @@ -1353,6 +1378,7 @@ func TestCompactionPickerPickFile(t *testing.T) { FS: fs, } opts.Experimental.EnableColumnarBlocks = func() bool { return true } + opts.Experimental.CompactionScheduler = NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() d, err := Open("", opts) require.NoError(t, err) @@ -1415,11 +1441,16 @@ func TestCompactionPickerPickFile(t *testing.T) { // level. var lf manifest.LevelFile var ok bool - d.maybeScheduleCompactionPicker(func(untypedPicker compactionPicker, env compactionEnv) *pickedCompaction { - p := untypedPicker.(*compactionPickerByScore) + func() { + d.mu.versions.logLock() + defer d.mu.versions.logUnlock() + env := d.makeCompactionEnvLocked() + if env == nil { + return + } + p := d.mu.versions.picker.(*compactionPickerByScore) lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum) - return nil - }) + }() if !ok { return "(none)" } @@ -1472,6 +1503,7 @@ func TestCompactionPickerScores(t *testing.T) { FS: fs, } opts.Experimental.EnableColumnarBlocks = func() bool { return true } + opts.Experimental.CompactionScheduler = NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() d, err := Open("", opts) require.NoError(t, err) @@ -1493,7 +1525,7 @@ func TestCompactionPickerScores(t *testing.T) { if td.HasArg("pause-cleaning") { cleaner.pause() } - + opts.Experimental.CompactionScheduler = NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() d, err = runDBDefineCmd(td, opts) if err != nil { return err.Error() diff --git a/compaction_scheduler.go b/compaction_scheduler.go new file mode 100644 index 0000000000..a9e6dcc7e8 --- /dev/null +++ b/compaction_scheduler.go @@ -0,0 +1,398 @@ +// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package pebble + +import ( + "sync" + "time" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/base" +) + +type CompactionGrantHandle = base.CompactionGrantHandle +type CompactionGrantHandleStats = base.CompactionGrantHandleStats + +// NB: This interface is experimental and subject to change. +// +// For instance, we may incorporate more information in TrySchedule and in the +// return value of Schedule to tell CompactionScheduler of the sub-category of +// compaction so that the scheduler can have more granular estimates. For +// example, the input or output level could affect the write bandwidth if the +// inputs are better cached (say at higher levels). + +// CompactionScheduler is responsible for scheduling both automatic and manual +// compactions. In the case of multiple DB instances on a node (i.e. a +// multi-store configuration), implementations of CompactionScheduler may +// enforce a global maximum compaction concurrency. Additionally, +// implementations of CompactionScheduler may be resource aware and permit +// more than the compactions that are "allowed without permission" if +// resources are available. +// +// Locking: CompactionScheduler's mutexes are ordered after DBForCompaction +// mutexes. We need to specify some lock ordering since CompactionScheduler +// and DBForCompaction call into each other. This ordering choice is made to +// simplify the implementation of DBForCompaction. There are three exceptions +// to this DBForCompaction.GetAllowedWithoutPermission, +// CompactionScheduler.Unregister, CompactionGrantHandle.Done -- see those +// declarations for details. +type CompactionScheduler interface { + // Register is called to register this DB and to specify the number of + // goroutines that consume CPU in each compaction (see the CPU reporting + // interface, CompactionGrantHandle.MeasureCPU). Must be called exactly once + // by this DB if it successfully opens. + Register(numGoroutinesPerCompaction int, db DBForCompaction) + // Unregister is used to unregister the DB. Must be called once when the DB + // is being closed. Unregister waits until all ongoing calls to + // DBForCompaction are finished, so Unregister must not be called while + // holding locks that DBForCompaction acquires in those calls. + Unregister() + // TrySchedule is called by DB when it wants to run a compaction. The bool + // is true iff permission is granted, and in that case the + // CompactionGrantHandle needs to be exercised by the DB. + TrySchedule() (bool, CompactionGrantHandle) +} + +// DBForCompaction is the interface implemented by the DB to interact with the +// CompactionScheduler. +type DBForCompaction interface { + // GetAllowedWithoutPermission returns what is permitted at the DB-level + // (there may be further restrictions at the node level, when there are + // multiple DBs at a node, which is not captured by this number). This can + // vary based on compaction backlog or other factors. This method must not + // acquire any mutex in DBForCompaction that is covered by the general mutex + // ordering rule stated earlier. + GetAllowedWithoutPermission() int + // GetWaitingCompaction returns true iff the DB can run a compaction. The + // true return is accompanied by a populated WaitingForCompaction, that the + // scheduler can use to pick across DBs or other work in the system. This + // method should typically be efficient, in that the DB should try to cache + // some state if its previous call to TrySchedule resulted in a failure to + // get permission. It is ok if it is sometimes slow since all work scheduled + // by CompactionScheduler is long-lived (often executing for multiple + // seconds). + GetWaitingCompaction() (bool, WaitingCompaction) + // Schedule grants the DB permission to run a compaction. The DB returns + // true iff it accepts the grant, in which case it must exercise the + // CompactionGrantHandle. + Schedule(CompactionGrantHandle) bool +} + +// WaitingCompaction captures state for a compaction that can be used to +// prioritize wrt compactions in other DBs or other long-lived work in the +// system. +type WaitingCompaction struct { + // Optional is true for a compaction that isn't necessary for maintaining an + // overall healthy LSM. This value can be compared across compactions and + // other long-lived work. + Optional bool + // Priority is the priority of a compaction. It is only compared across + // compactions, and when the Optional value is the same. + Priority int + // Score is only compared across compactions. It is only compared across + // compactions, and when the Optional and Priority are the same. + Score float64 +} + +// Ordering is by priority and if the optional value is different, false is +// more important than true. +// +// The ordering here must be consistent with the order in which compactions +// are picked in compactionPickerByScore.pickAuto. +type compactionOptionalAndPriority struct { + optional bool + priority int +} + +var scheduledCompactionMap map[compactionKind]compactionOptionalAndPriority +var manualCompactionPriority int + +func init() { + // Manual compactions have the highest priority since DB.pickAnyCompaction + // first picks a manual compaction, before calling + // compactionPickerByScore.pickAuto. + manualCompactionPriority = 100 + scheduledCompactionMap = map[compactionKind]compactionOptionalAndPriority{} + // We don't actually know if it is a compactionKindMove or + // compactionKindCopy until a compactionKindDefault is turned from a + // pickedCompaction into a compaction struct. So we will never see those + // values here, but for completeness we include them. + scheduledCompactionMap[compactionKindMove] = compactionOptionalAndPriority{priority: 90} + scheduledCompactionMap[compactionKindCopy] = compactionOptionalAndPriority{priority: 80} + scheduledCompactionMap[compactionKindDefault] = compactionOptionalAndPriority{priority: 70} + scheduledCompactionMap[compactionKindTombstoneDensity] = + compactionOptionalAndPriority{optional: true, priority: 60} + scheduledCompactionMap[compactionKindElisionOnly] = + compactionOptionalAndPriority{optional: true, priority: 50} + scheduledCompactionMap[compactionKindRead] = + compactionOptionalAndPriority{optional: true, priority: 40} + scheduledCompactionMap[compactionKindRewrite] = + compactionOptionalAndPriority{optional: true, priority: 30} +} + +func makeWaitingCompaction(manual bool, kind compactionKind, score float64) WaitingCompaction { + if manual { + return WaitingCompaction{Priority: manualCompactionPriority, Score: score} + } + entry, ok := scheduledCompactionMap[kind] + if !ok { + panic(errors.AssertionFailedf("unexpected compactionKind %s", kind)) + } + return WaitingCompaction{Optional: entry.optional, Priority: entry.priority, Score: score} +} + +// noopGrantHandle is used in cases that don't interact with a CompactionScheduler. +type noopGrantHandle struct{} + +var singletonNoopGrantHandle = &noopGrantHandle{} +var _ CompactionGrantHandle = singletonNoopGrantHandle + +func (h *noopGrantHandle) Started() {} +func (h *noopGrantHandle) MeasureCPU(g int) {} +func (h *noopGrantHandle) CumulativeStats(stats base.CompactionGrantHandleStats) {} +func (h *noopGrantHandle) Done() {} + +// pickedCompactionCache is used to avoid the work of repeatedly picking a +// compaction that then fails to run immediately because TrySchedule returns +// false. +// +// The high-level approach is to construct a pickedCompaction in +// DB.maybeScheduleCompaction if there isn't one in the cache, and if +// TrySchedule returns false, to remember it. Ignoring flushes, the worst-case +// behavior is 1 of 2 pickedCompactions gets to run (so half the picking work +// is wasted). This worst-case happens when the system is running at the limit +// of the long-lived work (including compactions) it can support. In this +// setting, each started compaction invalidates the pickedCompaction in the +// cache when it completes, and the reason the cache has a pickedCompaction +// (that got invalidated) is that the CompactionScheduler called +// GetWaitingCompaction and decided not to run the pickedCompaction (some +// other work won). We consider the CPU overhead of this waste acceptable. +// +// For the default case of a ConcurrencyLimitScheduler, which only considers a +// single DB, the aforementioned worst-case is avoided by not constructing a +// new pickedCompaction in DB.maybeScheduleCompaction when +// pickedCompactionCache.isWaiting is already true (which became true once, +// when a backlog developed). Whenever a compaction completes and a new +// compaction can be started, the call to DBForCompaction.GetWaitingCompaction +// constructs a new pickedCompaction and caches it, and then this immediately +// gets to run when DBForCompaction.Schedule is called. +type pickedCompactionCache struct { + // pc != nil => waiting + waiting bool + pc *pickedCompaction +} + +// invalidate the cache because a new Version is installed. The value of +// waiting is not changed. +func (c *pickedCompactionCache) invalidate() { + c.pc = nil +} + +// isWaiting returns the value of waiting. +func (c *pickedCompactionCache) isWaiting() bool { + return c.waiting +} + +// getForRunning returns a pickedCompaction if in the cache. The cache is +// cleared. It may return nil. +func (c *pickedCompactionCache) getForRunning() *pickedCompaction { + // NB: This does not set c.waiting = false, since there may be more + // compactions to run. + pc := c.pc + c.pc = nil + return pc +} + +// setNotWaiting sets waiting to false. +func (c *pickedCompactionCache) setNotWaiting() { + c.waiting = false + c.pc = nil +} + +// peek return the pickedCompaction, if any, in the cache. +func (c *pickedCompactionCache) peek() *pickedCompaction { + return c.pc +} + +// add adds a pickedCompaction to the cache and sets waiting to true. +func (c *pickedCompactionCache) add(pc *pickedCompaction) { + c.waiting = true + c.pc = pc +} + +// ConcurrencyLimitScheduler is the default scheduler used by Pebble. It +// simply uses the concurrency limit retrieved from +// DBForCompaction.GetAllowedWithoutPermission to decide the number of +// compactions to schedule. +// +// Since the GetAllowedWithoutPermission value changes over time, the +// scheduler needs to be quite current in its sampling, especially if the +// value is increasing, to prevent lag in scheduling compactions. Calls to +// ConcurrencyLimitScheduler.Done and ConcurrencyLimitScheduler.TrySchedule +// are obvious places this value is sampled. However, since +// ConcurrencyLimitScheduler does not observe flushes (which can increase the +// value), and there can be situations where compactions last 10+ seconds, +// this sampling is not considered sufficient. Note that calls to +// ConcurrencyLimitScheduler.TrySchedule are dampened in +// DB.maybeScheduleCompaction when there is a waiting compaction (to prevent +// wasted computation of pickedCompaction). If DB.maybeScheduleCompaction +// always called ConcurrencyLimitScheduler.TrySchedule we would have no lag as +// DB.maybeScheduleCompaction is called on flush completion. Hence, we resort +// to having a background thread in ConcurrencyLimitScheduler sample the value +// every 250ms. +type ConcurrencyLimitScheduler struct { + // db is set in Register, but not protected by mu since it is strictly + // before any calls to the other methods. + db DBForCompaction + mu struct { + sync.Mutex + runningCompactions int + // unregistered transitions once from false => true. + unregistered bool + // isGranting is used to (a) serialize granting from Done and + // periodicGranter, (b) ensure that granting is stopped before returning + // from Unregister. + isGranting bool + isGrantingCond *sync.Cond + } + stopPeriodicGranterCh chan struct{} +} + +var _ CompactionScheduler = &ConcurrencyLimitScheduler{} + +func newConcurrencyLimitScheduler() *ConcurrencyLimitScheduler { + s := &ConcurrencyLimitScheduler{ + stopPeriodicGranterCh: make(chan struct{}), + } + s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex) + return s +} + +func NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() *ConcurrencyLimitScheduler { + s := &ConcurrencyLimitScheduler{} + s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex) + return s +} + +func (s *ConcurrencyLimitScheduler) Register(numGoroutinesPerCompaction int, db DBForCompaction) { + s.db = db + if s.stopPeriodicGranterCh != nil { + go s.periodicGranter() + } +} + +func (s *ConcurrencyLimitScheduler) Unregister() { + if s.stopPeriodicGranterCh != nil { + s.stopPeriodicGranterCh <- struct{}{} + } + s.mu.Lock() + defer s.mu.Unlock() + s.mu.unregistered = true + // Wait until isGranting becomes false. Since unregistered has been set to + // true, once isGranting becomes false, no more granting will happen. + for s.mu.isGranting { + s.mu.isGrantingCond.Wait() + } +} + +func (s *ConcurrencyLimitScheduler) TrySchedule() (bool, CompactionGrantHandle) { + s.mu.Lock() + defer s.mu.Unlock() + if s.mu.unregistered { + return false, nil + } + allowed := s.db.GetAllowedWithoutPermission() + if allowed > s.mu.runningCompactions { + s.mu.runningCompactions++ + return true, s + } + return false, nil +} + +func (s *ConcurrencyLimitScheduler) Started() {} + +func (s *ConcurrencyLimitScheduler) MeasureCPU(g int) {} + +func (s *ConcurrencyLimitScheduler) CumulativeStats(stats base.CompactionGrantHandleStats) {} + +func (s *ConcurrencyLimitScheduler) Done() { + s.mu.Lock() + s.mu.runningCompactions-- + s.tryGrantLockedAndUnlock() +} + +func (s *ConcurrencyLimitScheduler) tryGrantLockedAndUnlock() { + defer s.mu.Unlock() + if s.mu.unregistered { + return + } + // Wait for turn to grant. + for s.mu.isGranting { + s.mu.isGrantingCond.Wait() + } + // INVARIANT: !isGranting. + if s.mu.unregistered { + return + } + toGrant := s.db.GetAllowedWithoutPermission() - s.mu.runningCompactions + if toGrant > 0 { + s.mu.isGranting = true + } else { + return + } + s.mu.Unlock() + // We call GetWaitingCompaction iff we can successfully grant, so that there + // is no wasted pickedCompaction. + // + // INVARIANT: loop exits with s.mu unlocked. + for toGrant > 0 { + waiting, _ := s.db.GetWaitingCompaction() + if !waiting { + break + } + accepted := s.db.Schedule(s) + if !accepted { + break + } + s.mu.Lock() + s.mu.runningCompactions++ + toGrant-- + s.mu.Unlock() + } + // Will be unlocked by the defer statement. + s.mu.Lock() + s.mu.isGranting = false + s.mu.isGrantingCond.Broadcast() +} + +func (s *ConcurrencyLimitScheduler) periodicGranter() { + ticker := time.NewTicker(250 * time.Millisecond) + for { + select { + case <-ticker.C: + s.mu.Lock() + s.tryGrantLockedAndUnlock() + case <-s.stopPeriodicGranterCh: + ticker.Stop() + return + } + } +} + +func (s *ConcurrencyLimitScheduler) adjustRunningCompactionsForTesting(delta int) { + s.mu.Lock() + s.mu.runningCompactions += delta + if delta < 0 { + s.tryGrantLockedAndUnlock() + } else { + s.mu.Unlock() + } +} + +func (s *ConcurrencyLimitScheduler) isUnregisteredForTesting() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.unregistered +} diff --git a/compaction_test.go b/compaction_test.go index b490cbf4e6..1651f5bb36 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -6,6 +6,7 @@ package pebble import ( "bytes" + "cmp" "context" crand "crypto/rand" "fmt" @@ -13,6 +14,7 @@ import ( "math/rand/v2" "path/filepath" "regexp" + "slices" "sort" "strconv" "strings" @@ -517,7 +519,7 @@ func TestPickCompaction(t *testing.T) { vs.picker = &tc.picker pc, got := vs.picker.pickAuto(compactionEnv{diskAvailBytes: math.MaxUint64}), "" if pc != nil { - c := newCompaction(pc, opts, time.Now(), nil /* provider */, nil /* slot */) + c := newCompaction(pc, opts, time.Now(), nil /* provider */, singletonNoopGrantHandle) gotStart := fileNums(c.startLevel.files) gotML := "" @@ -955,6 +957,7 @@ func TestManualCompaction(t *testing.T) { } opts.WithFSDefaults() opts.Experimental.EnableColumnarBlocks = func() bool { return true } + opts.Experimental.CompactionScheduler = NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() var err error d, err = Open("", opts) @@ -1060,6 +1063,7 @@ func TestManualCompaction(t *testing.T) { } opts.WithFSDefaults() opts.Experimental.EnableColumnarBlocks = func() bool { return true } + opts.Experimental.CompactionScheduler = NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() var err error if d, err = runDBDefineCmd(td, opts); err != nil { @@ -1158,10 +1162,6 @@ func TestManualCompaction(t *testing.T) { if len(d.mu.compact.manual) == 0 { return errors.New("no manual compaction queued") } - manual := d.mu.compact.manual[0] - if manual.retries == 0 { - return errors.New("manual compaction has not been retried") - } return nil }) if err != nil { @@ -1175,6 +1175,13 @@ func TestManualCompaction(t *testing.T) { d.mu.Lock() deleteOngoingCompaction(ongoingCompaction) ongoingCompaction = nil + d.mu.Unlock() + d.opts.Experimental.CompactionScheduler.(*ConcurrencyLimitScheduler). + adjustRunningCompactionsForTesting(-1) + // If the ongoing compaction conflicted with the manual compaction, + // the CompactionScheduler may believe there is no waiting compaction. + // So explicitly call maybeScheduleCompaction. + d.mu.Lock() d.maybeScheduleCompaction() d.mu.Unlock() if err := <-ch; err != nil { @@ -1194,6 +1201,8 @@ func TestManualCompaction(t *testing.T) { d.mu.Lock() ongoingCompaction = createOngoingCompaction([]byte(start), []byte(end), startLevel, outputLevel) d.mu.Unlock() + d.opts.Experimental.CompactionScheduler.(*ConcurrencyLimitScheduler). + adjustRunningCompactionsForTesting(+1) return "" case "remove-ongoing-compaction": @@ -1201,6 +1210,9 @@ func TestManualCompaction(t *testing.T) { deleteOngoingCompaction(ongoingCompaction) ongoingCompaction = nil d.mu.Unlock() + d.opts.Experimental.CompactionScheduler.(*ConcurrencyLimitScheduler). + adjustRunningCompactionsForTesting(-1) + return "" case "set-concurrent-compactions": @@ -1311,7 +1323,7 @@ func TestCompactionOutputLevel(t *testing.T) { d.ScanArgs(t, "start", &start) d.ScanArgs(t, "base", &base) pc := newPickedCompaction(opts, version, start, defaultOutputLevel(start, base), base) - c := newCompaction(pc, opts, time.Now(), nil /* provider */, nil /* slot */) + c := newCompaction(pc, opts, time.Now(), nil /* provider */, singletonNoopGrantHandle) return fmt.Sprintf("output=%d\nmax-output-file-size=%d\n", c.outputLevel.level, c.maxOutputFileSize) @@ -1584,23 +1596,45 @@ func TestCompactionTombstones(t *testing.T) { } }() - var compactInfo *CompactionInfo // protected by d.mu + var compactInfo []*CompactionInfo // protected by d.mu compactionString := func() string { for d.mu.compact.compactingCount > 0 { d.mu.compact.cond.Wait() } - s := "(none)" - if compactInfo != nil { + if len(compactInfo) == 0 { + return "(none)" + } + for _, c := range compactInfo { // Fix the job ID and durations for determinism. - compactInfo.JobID = 100 - compactInfo.Duration = time.Second - compactInfo.TotalDuration = 2 * time.Second - s = compactInfo.String() - compactInfo = nil + c.JobID = 0 + c.Duration = time.Second + c.TotalDuration = 2 * time.Second + if len(compactInfo) > 1 { + // The output table numbering is non-deterministic when there are + // multiple concurrent compactions. + for i := range c.Output.Tables { + c.Output.Tables[i].FileNum = 0 + } + } } - return s + // Sort for determinism. We use the negative value of cmp.Compare to sort + // delete-only before default since it is more natural in the output, as + // delete-only is selected first. + slices.SortFunc(compactInfo, func(a, b *CompactionInfo) int { + return -cmp.Compare(a.String(), b.String()) + }) + var b strings.Builder + jobID := 100 + for _, c := range compactInfo { + // Fix the job ID. + c.JobID = jobID + jobID++ + fmt.Fprintf(&b, "%s\n", c.String()) + } + compactInfo = nil + return b.String() } datadriven.RunTest(t, "testdata/compaction_tombstones", @@ -1608,18 +1642,18 @@ func TestCompactionTombstones(t *testing.T) { switch td.Cmd { case "define": if d != nil { - compactInfo = nil require.NoError(t, closeAllSnapshots(d)) if err := d.Close(); err != nil { return err.Error() } + compactInfo = nil } opts := &Options{ FS: vfs.NewMem(), DebugCheck: DebugCheckLevels, EventListener: &EventListener{ CompactionEnd: func(info CompactionInfo) { - compactInfo = &info + compactInfo = append(compactInfo, &info) }, }, FormatMajorVersion: internalFormatNewest, @@ -1627,6 +1661,7 @@ func TestCompactionTombstones(t *testing.T) { opts.WithFSDefaults() opts.Experimental.EnableDeleteOnlyCompactionExcises = func() bool { return true } opts.Experimental.EnableColumnarBlocks = func() bool { return true } + opts.Experimental.CompactionScheduler = NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() var err error d, err = runDBDefineCmd(td, opts) if err != nil { diff --git a/data_test.go b/data_test.go index 87b6ec70db..f5886f540a 100644 --- a/data_test.go +++ b/data_test.go @@ -795,6 +795,15 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { // runDBDefineCmdReuseFS is like runDBDefineCmd, but does not set opts.FS, expecting // the caller to have set an appropriate FS already. func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error) { + // Some tests expect that opts is an in-out parameter in that the changes to + // opts made here are used later by the caller. But the + // ConcurrencyLimitScheduler cannot be reused after the DB is closed. So we + // replace it here. + scheduler, ok := opts.Experimental.CompactionScheduler.(*ConcurrencyLimitScheduler) + if ok && scheduler.isUnregisteredForTesting() { + opts.Experimental.CompactionScheduler = + NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() + } opts.EnsureDefaults() if err := parseDBOptionsArgs(opts, td.CmdArgs); err != nil { return nil, err diff --git a/db.go b/db.go index 6f260ae78d..d56020a201 100644 --- a/db.go +++ b/db.go @@ -453,7 +453,10 @@ type DB struct { deletionHints []deleteCompactionHint // The list of manual compactions. The next manual compaction to perform // is at the start of the list. New entries are added to the end. - manual []*manualCompaction + manual []*manualCompaction + manualLen atomic.Int32 + // manualID is used to identify manualCompactions in the manual slice. + manualID uint64 // downloads is the list of pending download tasks. The next download to // perform is at the start of the list. New entries are added to the end. downloads []*downloadSpanTask @@ -1656,6 +1659,14 @@ func (d *DB) NewEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFile // or to call Close concurrently with any other DB method. It is not valid // to call any of a DB's methods after the DB has been closed. func (d *DB) Close() error { + if err := d.closed.Load(); err != nil { + panic(err) + } + // Compactions can be asynchronously started by the CompactionScheduler + // calling d.Schedule. When this Unregister returns, we know that the + // CompactionScheduler will never again call a method on the DB. Note that + // this must be called without holding d.mu. + d.opts.Experimental.CompactionScheduler.Unregister() // Lock the commit pipeline for the duration of Close. This prevents a race // with makeRoomForWrite. Rotating the WAL in makeRoomForWrite requires // dropping d.mu several times for I/O. If Close only holds d.mu, an @@ -1670,9 +1681,6 @@ func (d *DB) Close() error { defer d.commit.mu.Unlock() d.mu.Lock() defer d.mu.Unlock() - if err := d.closed.Load(); err != nil { - panic(err) - } // Clear the finalizer that is used to check that an unreferenced DB has been // closed. We're closing the DB here, so the check performed by that @@ -1906,7 +1914,12 @@ func (d *DB) manualCompact(start, end []byte, level int, parallelize bool) error end: end, }) } + for i := range compactions { + d.mu.compact.manualID++ + compactions[i].id = d.mu.compact.manualID + } d.mu.compact.manual = append(d.mu.compact.manual, compactions...) + d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual))) d.maybeScheduleCompaction() d.mu.Unlock() diff --git a/download.go b/download.go index 22e1d284ba..43cd992261 100644 --- a/download.go +++ b/download.go @@ -433,7 +433,7 @@ func (d *DB) tryLaunchDownloadForFile( download.numLaunchedDownloads++ doneCh = make(chan error, 1) - c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider, nil /* slot */) + c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider, singletonNoopGrantHandle) c.isDownload = true d.mu.compact.downloadingCount++ d.addInProgressCompaction(c) diff --git a/format_major_version.go b/format_major_version.go index dd8ff5cec3..64788ab462 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -444,19 +444,23 @@ func (d *DB) writeFormatVersionMarker(formatVers FormatMajorVersion) error { // waiting for compactions to complete (or for slots to free up). func (d *DB) compactMarkedFilesLocked() error { curr := d.mu.versions.currentVersion() + if curr.Stats.MarkedForCompaction == 0 { + return nil + } + // Attempt to schedule a compaction to rewrite a file marked for compaction. + // We simply call maybeScheduleCompaction since it also picks rewrite + // compactions. Note that we don't need to call this repeatedly in the for + // loop below since the completion of a compaction either starts a new one + // or ensures a compaction is queued for scheduling. By calling + // maybeScheduleCompaction here we are simply kicking off this behavior. + d.maybeScheduleCompaction() + + // The above attempt might succeed and schedule a rewrite compaction. Or + // there might not be available compaction concurrency to schedule the + // compaction. Or compaction of the file might have already been in + // progress. In any scenario, wait until there's some change in the + // state of active compactions. for curr.Stats.MarkedForCompaction > 0 { - // Attempt to schedule a compaction to rewrite a file marked for - // compaction. - d.maybeScheduleCompactionPicker(func(picker compactionPicker, env compactionEnv) *pickedCompaction { - return picker.pickRewriteCompaction(env) - }) - - // The above attempt might succeed and schedule a rewrite compaction. Or - // there might not be available compaction concurrency to schedule the - // compaction. Or compaction of the file might have already been in - // progress. In any scenario, wait until there's some change in the - // state of active compactions. - // Before waiting, check that the database hasn't been closed. Trying to // schedule the compaction may have dropped d.mu while waiting for a // manifest write to complete. In that dropped interim, the database may @@ -473,9 +477,10 @@ func (d *DB) compactMarkedFilesLocked() error { // Only wait on compactions if there are files still marked for compaction. // NB: Waiting on this condition variable drops d.mu while blocked. if curr.Stats.MarkedForCompaction > 0 { - if d.mu.compact.compactingCount == 0 { - panic("expected a compaction of marked files in progress") - } + // NB: we cannot assert that d.mu.compact.compactingCount > 0, since + // with a CompactionScheduler a DB may not have even one ongoing + // compaction (if other competing activities are being preferred by the + // scheduler). d.mu.compact.cond.Wait() // Refresh the current version again. curr = d.mu.versions.currentVersion() diff --git a/ingest.go b/ingest.go index ccdab8c5bb..d2eab86a0b 100644 --- a/ingest.go +++ b/ingest.go @@ -2341,6 +2341,8 @@ func (d *DB) ingestApply( } } if err != nil { + // No need to invalidate the pickedCompactionCache since the latest + // version has not changed. d.mu.versions.logUnlock() return nil, err } diff --git a/internal/base/compaction.go b/internal/base/compaction.go deleted file mode 100644 index 417ee218b8..0000000000 --- a/internal/base/compaction.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use -// of this source code is governed by a BSD-style license that can be found in -// the LICENSE file. - -package base - -import "context" - -type CompactionSlot interface { - // CompactionSelected is called when a compaction is selected for execution in - // this compaction slot. The firstInputLevel is the highest input level participating - // in this compaction, and the outputLevel is the level that outputs are being - // written to from this compaction. inputSize is the cumulative size of inputs participating - // in this compaction, including the size of the memtable for flushes. - CompactionSelected(firstInputLevel, outputLevel int, inputSize uint64) - // UpdateMetrics is called periodically to update the number of disk bytes read and written - // for this compaction slot. The metrics passed in are cumulative, not incremental - // since the last UpdateMetrics call. The implementation of this method could - // calculate deltas as necessary. For flushes, bytesRead will be bytes read from - // the memtable. - UpdateMetrics(bytesRead, bytesWritten uint64) - // Release returns a compaction slot. Must be called once a compaction is - // complete. After this method, no more calls must be made to other interface - // methods on CompactionSlot. - Release(totalBytesWritten uint64) -} - -type CompactionLimiter interface { - // TookWithoutPermission is called when a compaction is performed without - // asking for permission. Pebble calls into this method for flushes as well - // as for the first compaction in an instance, as those slots will always be - // granted even in the case of slot exhaustion or overload. - TookWithoutPermission(ctx context.Context) CompactionSlot - // RequestSlot is called when a compaction is about to be scheduled. If the - // compaction is allowed, the method returns a non-nil slot that must be released - // after the compaction is complete. If a nil value is returned, the compaction is - // disallowed due to possible overload. - RequestSlot(ctx context.Context) (CompactionSlot, error) -} - -type DefaultCompactionSlot struct{} - -func (d *DefaultCompactionSlot) CompactionSelected( - firstInputLevel, outputLevel int, inputSize uint64, -) { -} - -func (d *DefaultCompactionSlot) UpdateMetrics(bytesRead, bytesWritten uint64) { -} - -func (d *DefaultCompactionSlot) Release(totalBytesWritten uint64) { -} - -type DefaultCompactionLimiter struct{} - -func (d *DefaultCompactionLimiter) TookWithoutPermission(ctx context.Context) CompactionSlot { - return &DefaultCompactionSlot{} -} - -func (d *DefaultCompactionLimiter) RequestSlot(ctx context.Context) (CompactionSlot, error) { - return &DefaultCompactionSlot{}, nil -} diff --git a/internal/base/compaction_grant_handle.go b/internal/base/compaction_grant_handle.go new file mode 100644 index 0000000000..3c8b40ac6f --- /dev/null +++ b/internal/base/compaction_grant_handle.go @@ -0,0 +1,41 @@ +// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package base + +// CompactionGrantHandleStats contains stats provided to a CompactionGrantHandle. +type CompactionGrantHandleStats struct { + // CumWriteBytes is the cumulative bytes written to disk. + CumWriteBytes uint64 + // TODO(sumeer): add more stats like: + // cumReadBytes uint64 + // cumReadBytesInCache uint64 +} + +// CompactionGrantHandle is used to frequently update the CompactionScheduler +// about resource consumption. The MeasureCPU and CumulativeStats methods must +// be called frequently. +type CompactionGrantHandle interface { + // Started is called once and must precede calls to MeasureCPU and + // CumulativeStats. + Started() + // MeasureCPU is used to measure the CPU consumption of a goroutine involved + // in a compaction. It must be called from each of the two goroutines that + // consume CPU during a compaction, and the first call must be before any + // significant work is done, since the first call is used to initialize the + // measurer for the goroutine. The parameter g must be 0 or 1, to + // differentiate between the goroutines. If a compaction is only using one + // goroutine, then it can skip calling MeasureCPU(1). + MeasureCPU(g int) + // CumulativeStats reports the current cumulative stats. This method may + // block if the scheduler wants to pace the compaction (say to moderate its + // consumption of disk write bandwidth). + CumulativeStats(stats CompactionGrantHandleStats) + // Done must be called when the compaction completes (whether success or + // failure). It may synchronously result in a call to + // DBForCompaction.Schedule so this must be called without holding any + // locks, *and* after the new version (if the compaction was successful) has + // been installed. + Done() +} diff --git a/internal/compact/run.go b/internal/compact/run.go index 15a02e69a3..d685949b3e 100644 --- a/internal/compact/run.go +++ b/internal/compact/run.go @@ -80,12 +80,9 @@ type RunnerConfig struct { // value. TargetOutputFileSize uint64 - // Slot is the compaction slot taken up by this compaction. Used to perform - // pacing or account for concurrency limits. - Slot base.CompactionSlot - - // IteratorStats is the stats collected by the compaction iterator. - IteratorStats *base.InternalIteratorStats + // GrantHandle is used to perform accounting of resource consumption by the + // CompactionScheduler. + GrantHandle base.CompactionGrantHandle } // Runner is a helper for running the "data" part of a compaction (where we use @@ -168,7 +165,7 @@ func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw sstable.RawWri } func (r *Runner) writeKeysToTable(tw sstable.RawWriter) (splitKey []byte, _ error) { - const updateSlotEveryNKeys = 1024 + const updateGrantHandleEveryNKeys = 128 firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan)) if r.kv != nil && firstKey == nil { firstKey = r.kv.K.UserKey @@ -188,8 +185,13 @@ func (r *Runner) writeKeysToTable(tw sstable.RawWriter) (splitKey []byte, _ erro kv := r.kv for ; kv != nil; kv = r.iter.Next() { iteratedKeys++ - if iteratedKeys%updateSlotEveryNKeys == 0 { - r.cfg.Slot.UpdateMetrics(r.cfg.IteratorStats.BlockBytes, r.stats.CumulativeWrittenSize+tw.EstimatedSize()) + if iteratedKeys%updateGrantHandleEveryNKeys == 0 { + r.cfg.GrantHandle.CumulativeStats(base.CompactionGrantHandleStats{ + CumWriteBytes: r.stats.CumulativeWrittenSize + tw.EstimatedSize(), + }) + // TODO(sumeer): give the GrantHandle to the writer so it can account on + // all its goroutines. + r.cfg.GrantHandle.MeasureCPU(0) } if splitter.ShouldSplitBefore(kv.K.UserKey, tw.EstimatedSize(), equalPrev) { break @@ -242,7 +244,10 @@ func (r *Runner) writeKeysToTable(tw sstable.RawWriter) (splitKey []byte, _ erro tw.SetSnapshotPinnedProperties(pinnedCount, pinnedKeySize, pinnedValueSize) r.stats.CumulativePinnedKeys += pinnedCount r.stats.CumulativePinnedSize += pinnedKeySize + pinnedValueSize - r.cfg.Slot.UpdateMetrics(r.cfg.IteratorStats.BlockBytes, r.stats.CumulativeWrittenSize+tw.EstimatedSize()) + r.cfg.GrantHandle.CumulativeStats(base.CompactionGrantHandleStats{ + CumWriteBytes: r.stats.CumulativeWrittenSize + tw.EstimatedSize(), + }) + r.cfg.GrantHandle.MeasureCPU(0) return splitKey, nil } diff --git a/metamorphic/options_test.go b/metamorphic/options_test.go index 9b016cd4c2..f9114b50d4 100644 --- a/metamorphic/options_test.go +++ b/metamorphic/options_test.go @@ -67,6 +67,7 @@ func TestOptionsRoundtrip(t *testing.T) { "FS:", "KeySchemas[", "FileCache:", + "Experimental.CompactionScheduler", // Function pointers "BlockPropertyCollectors:", "EventListener:", diff --git a/metamorphic/test.go b/metamorphic/test.go index 8091b8170a..2bd6ac011b 100644 --- a/metamorphic/test.go +++ b/metamorphic/test.go @@ -192,7 +192,11 @@ func (t *Test) init( dir = path.Join(t.dir, fmt.Sprintf("db%d", i+1)) } err = t.withRetries(func() error { - db, err = pebble.Open(dir, t.opts) + // Give each DB its own CompactionScheduler. + o := *t.opts + o.Experimental.CompactionScheduler = + pebble.NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() + db, err = pebble.Open(dir, &o) return err }) if err != nil { @@ -322,7 +326,11 @@ func (t *Test) restartDB(dbID objID) error { if len(t.dbs) > 1 { dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot())) } - t.dbs[dbID.slot()-1], err = pebble.Open(dir, t.opts) + // Give each DB its own CompactionScheduler. + o := *t.opts + o.Experimental.CompactionScheduler = + pebble.NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() + t.dbs[dbID.slot()-1], err = pebble.Open(dir, &o) if err != nil { return err } diff --git a/open.go b/open.go index b4cb1e6b83..4385063902 100644 --- a/open.go +++ b/open.go @@ -521,6 +521,10 @@ func Open(dirname string, opts *Options) (db *DB, err error) { } d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load()) + // Register with the CompactionScheduler before calling + // d.maybeScheduleFlush, since completion of the flush can trigger + // compactions. + d.opts.Experimental.CompactionScheduler.Register(2, d) if !d.opts.ReadOnly { d.maybeScheduleFlush() for d.mu.compact.flushing { diff --git a/open_test.go b/open_test.go index 59acc66941..79e643223c 100644 --- a/open_test.go +++ b/open_test.go @@ -133,6 +133,8 @@ func TestErrorIfNotPristine(t *testing.T) { ErrorIfNotPristine: true, }) defer ensureFilesClosed(t, opts)() + // Ensures each DB will create its own CompactionScheduler in Open. + opts.Experimental.CompactionScheduler = nil d0, err := Open("", opts) require.NoError(t, err) diff --git a/options.go b/options.go index 959f1099ea..afbd646e38 100644 --- a/options.go +++ b/options.go @@ -78,12 +78,6 @@ type ShortAttributeExtractor = base.ShortAttributeExtractor // UserKeyPrefixBound exports the sstable.UserKeyPrefixBound type. type UserKeyPrefixBound = sstable.UserKeyPrefixBound -// CompactionLimiter exports the base.CompactionLimiter type. -type CompactionLimiter = base.CompactionLimiter - -// CompactionSlot exports the base.CompactionSlot type. -type CompactionSlot = base.CompactionSlot - // IterKeyType configures which types of keys an iterator should surface. type IterKeyType int8 @@ -757,11 +751,10 @@ type Options struct { // the excise phase of IngestAndExcise. EnableDeleteOnlyCompactionExcises func() bool - // CompactionLimiter, if set, is used to limit concurrent compactions as well - // as to pace compactions and flushing compactions already chosen. If nil, - // no limiting or pacing happens other than that controlled by other options - // like L0CompactionConcurrency and CompactionDebtConcurrency. - CompactionLimiter CompactionLimiter + // CompactionScheduler, if set, is used to limit concurrent compactions as + // well as to pace compactions already chosen. If nil, a default scheduler + // is created and used. + CompactionScheduler CompactionScheduler UserKeyCategories UserKeyCategories } @@ -916,13 +909,36 @@ type Options struct { // The default merger concatenates values. Merger *Merger - // MaxConcurrentCompactions specifies the maximum number of concurrent - // compactions (not including download compactions). + // MaxConcurrentCompactions roughly specifies the maximum number of + // concurrent compactions, not including download compactions (which have a + // separate limit specified by MaxConcurrentDownloads). + // + // This is a rough bound since delete-only compactions (a) do not use the + // CompactionScheduler, and (b) the CompactionScheduler may use other + // criteria to decide on how many compactions to permit. + // + // Elaborating on (b), MaxConcurrentCompactions is one of the inputs used to + // compute the value returned from DB.GetAllowedWithoutPermission to the + // CompactionScheduler. When the ConcurrencyLimitScheduler is being used the + // value returned by DB.GetAllowedWithoutPermission fully controls how many + // compactions get to run. Other CompactionSchedulers may use additional + // criteria, like resource availability. // - // Concurrent compactions are performed: + // Elaborating on (a), we don't use the CompactionScheduler to schedule + // delete-only compactions since they are expected to be almost free from a + // CPU and disk usage perspective. Since the CompactionScheduler does not + // know about their existence, the total running count can exceed this + // value. For example, consider MaxConcurrentCompactions returns 3, and the + // current value returned from DB.GetAllowedWithoutPermission is also 3. Say + // 3 delete-only compactions are also running. Then the + // ConcurrencyLimitScheduler can also start 3 other compactions, for a total + // of 6. + // + // DB.GetAllowedWithoutPermission will return a value in the interval [1, + // MaxConcurrentCompactions]. A value > 1 is returned: // - when L0 read-amplification passes the L0CompactionConcurrency threshold; - // - for automatic background compactions; - // - when a manual compaction for a level is split and parallelized. + // - when compaction debt passes the CompactionDebtConcurrency threshold; + // - when there are multiple manual compactions waiting to run. // // MaxConcurrentCompactions() must be greater than 0. // @@ -1146,9 +1162,6 @@ func (o *Options) EnsureDefaults() { if o.Experimental.CompactionDebtConcurrency <= 0 { o.Experimental.CompactionDebtConcurrency = 1 << 30 // 1 GB } - if o.Experimental.CompactionLimiter == nil { - o.Experimental.CompactionLimiter = &base.DefaultCompactionLimiter{} - } if o.KeySchema == "" && len(o.KeySchemas) == 0 { ks := colblk.DefaultKeySchema(o.Comparer, 16 /* bundleSize */) o.KeySchema = ks.Name @@ -1280,6 +1293,9 @@ func (o *Options) EnsureDefaults() { if o.Experimental.MultiLevelCompactionHeuristic == nil { o.Experimental.MultiLevelCompactionHeuristic = WriteAmpHeuristic{} } + if o.Experimental.CompactionScheduler == nil { + o.Experimental.CompactionScheduler = newConcurrencyLimitScheduler() + } o.initMaps() } diff --git a/snapshot.go b/snapshot.go index 88bf76a640..d608abb943 100644 --- a/snapshot.go +++ b/snapshot.go @@ -117,7 +117,8 @@ func (s *Snapshot) closeLocked() error { // If s was the previous earliest snapshot, we might be able to reclaim // disk space by dropping obsolete records that were pinned by s. if e := s.db.mu.snapshots.earliest(); e > s.seqNum { - s.db.maybeScheduleCompactionPicker(pickElisionOnly) + // NB: maybeScheduleCompaction also picks elision-only compactions. + s.db.maybeScheduleCompaction() } s.db = nil return nil diff --git a/testdata/compaction_picker_concurrency b/testdata/compaction_picker_concurrency index 6e6707895f..d99961980c 100644 --- a/testdata/compaction_picker_concurrency +++ b/testdata/compaction_picker_concurrency @@ -48,6 +48,7 @@ compactions pick-auto l0_compaction_threshold=10 ---- +picker.getCompactionConcurrency: 4 nil # Test that lowering L0CompactionConcurrency opens up more compaction slots. @@ -93,10 +94,12 @@ compactions pick-auto l0_compaction_concurrency=10 ---- +picker.getCompactionConcurrency: 1 nil pick-auto l0_compaction_concurrency=5 ---- +picker.getCompactionConcurrency: 2 L0 -> L1 L0: 000301,000302,000303,000304,000305 L1: 000201 @@ -104,6 +107,7 @@ grandparents: 000101 pick-auto l0_compaction_concurrency=1 ---- +picker.getCompactionConcurrency: 4 L0 -> L1 L0: 000301,000302,000303,000304,000305 L1: 000201 @@ -155,10 +159,12 @@ compactions pick-auto l0_compaction_concurrency=10 compaction_debt_concurrency=5120000 ---- +picker.getCompactionConcurrency: 1 nil pick-auto l0_compaction_concurrency=10 compaction_debt_concurrency=512000 ---- +picker.getCompactionConcurrency: 4 L0 -> L1 L0: 000301,000302,000303,000304,000305 L1: 000201 @@ -166,6 +172,7 @@ grandparents: 000101 pick-auto l0_compaction_concurrency=5 compaction_debt_concurrency=5120000 ---- +picker.getCompactionConcurrency: 2 L0 -> L1 L0: 000301,000302,000303,000304,000305 L1: 000201 diff --git a/testdata/compaction_tombstones b/testdata/compaction_tombstones index c5257259e0..352d66334d 100644 --- a/testdata/compaction_tombstones +++ b/testdata/compaction_tombstones @@ -371,9 +371,12 @@ num-range-key-sets: 0 point-deletions-bytes-estimate: 0 range-deletions-bytes-estimate: 8380 +# With multiple compactions, there is non-determinism in the output table +# numbers, so the test overwrites them to 0. maybe-compact ---- -[JOB 100] compacted(delete-only) L6 [000007] (13KB) Score=0.00 -> L6 [000008] (8.2KB), in 1.0s (2.0s total), output rate 8.2KB/s +[JOB 100] compacted(delete-only) L6 [000007] (13KB) Score=0.00 -> L6 [000000] (8.2KB), in 1.0s (2.0s total), output rate 8.2KB/s +[JOB 101] compacted(default) L5 [000004] (770B) Score=24.33 + L6 [000006] (13KB) Score=0.52 -> L6 [000000] (4.7KB), in 1.0s (2.0s total), output rate 4.7KB/s # The same LSM as above. However, this time, with point tombstone weighting at # 2x, the table with the point tombstone (000004) will be selected as the @@ -414,6 +417,9 @@ num-range-key-sets: 0 point-deletions-bytes-estimate: 0 range-deletions-bytes-estimate: 8380 +# With multiple compactions, there is non-determinism in the output table +# numbers, so the test overwrites them to 0. maybe-compact ---- -[JOB 100] compacted(delete-only) L6 [000007] (13KB) Score=0.00 -> L6 [000008] (8.2KB), in 1.0s (2.0s total), output rate 8.2KB/s +[JOB 100] compacted(delete-only) L6 [000007] (13KB) Score=0.00 -> L6 [000000] (8.2KB), in 1.0s (2.0s total), output rate 8.2KB/s +[JOB 101] compacted(default) L5 [000004] (770B) Score=24.33 + L6 [000006] (13KB) Score=0.52 -> L6 [000000] (4.7KB), in 1.0s (2.0s total), output rate 4.7KB/s diff --git a/version_set.go b/version_set.go index cfcd6d8fb0..65e47e13cc 100644 --- a/version_set.go +++ b/version_set.go @@ -71,6 +71,9 @@ type versionSet struct { // Mutable fields. versions versionList picker compactionPicker + // curCompactionConcurrency is updated whenever picker is updated. + // INVARIANT: >= 1. + curCompactionConcurrency atomic.Int32 // Not all metrics are kept here. See DB.Metrics(). metrics Metrics @@ -128,6 +131,8 @@ type versionSet struct { writerCond sync.Cond // State for deciding when to write a snapshot. Protected by mu. rotationHelper record.RotationHelper + + pickedCompactionCache pickedCompactionCache } // objectInfo describes an object in object storage (either a sstable or a blob @@ -180,7 +185,8 @@ func (vs *versionSet) create( } vs.append(newVersion) - vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil) + vs.setCompactionPicker( + newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil)) // Note that a "snapshot" version edit is written to the manifest when it is // created. vs.manifestFileNum = vs.getNextDiskFileNum() @@ -364,7 +370,8 @@ func (vs *versionSet) load( vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize) }) - vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil) + vs.setCompactionPicker( + newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil)) return nil } @@ -408,6 +415,12 @@ func (vs *versionSet) logUnlock() { vs.writerCond.Signal() } +func (vs *versionSet) logUnlockAndInvalidatePickedCompactionCache() { + // vs.opts.Logger.Infof("logUnlockAnd") + vs.pickedCompactionCache.invalidate() + vs.logUnlock() +} + // logAndApply logs the version edit to the manifest, applies the version edit // to the current version, and installs the new version. // @@ -440,7 +453,7 @@ func (vs *versionSet) logAndApply( if !vs.writing { vs.opts.Logger.Fatalf("MANIFEST not locked for writing") } - defer vs.logUnlock() + defer vs.logUnlockAndInvalidatePickedCompactionCache() if ve.MinUnflushedLogNum != 0 { if ve.MinUnflushedLogNum < vs.minUnflushedLogNum || @@ -711,13 +724,19 @@ func (vs *versionSet) logAndApply( vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles)) vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localLiveSizeDelta) - vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, inProgress) + vs.setCompactionPicker( + newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, inProgress)) if !vs.dynamicBaseLevel { vs.picker.forceBaseLevel1() } return nil } +func (vs *versionSet) setCompactionPicker(picker *compactionPickerByScore) { + vs.picker = picker + vs.curCompactionConcurrency.Store(int32(picker.getCompactionConcurrency())) +} + type fileBackingInfo struct { backing *fileBacking isLocal bool