From 37c3f2470449778c1d9e0d51a39d1b17c79cdc33 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Thu, 14 Jul 2022 17:56:31 -0400 Subject: [PATCH] db: remove compaction.atomicBytesIterated The atomic bytes-iterated value was used for compaction and flush pacing. Remove it for now to resolve #1808. Fix #1808. --- compaction.go | 96 ++++++++++++++------------------------- compaction_picker.go | 1 - compaction_picker_test.go | 8 +--- compaction_test.go | 18 ++------ data_test.go | 2 +- db.go | 10 ---- open.go | 2 +- 7 files changed, 42 insertions(+), 95 deletions(-) diff --git a/compaction.go b/compaction.go index d1cf40d652..0add9d0127 100644 --- a/compaction.go +++ b/compaction.go @@ -419,12 +419,6 @@ type compaction struct { bytesIterated uint64 // bytesWritten contains the number of bytes that have been written to outputs. bytesWritten int64 - // atomicBytesIterated points to the variable to increment during iteration. - // atomicBytesIterated must be read/written atomically. Flushing will increment - // the shared variable which compaction will read. This allows for the - // compaction routine to know how many bytes have been flushed before the flush - // is applied. - atomicBytesIterated *uint64 // The boundaries of the input data. smallest InternalKey @@ -512,22 +506,21 @@ func (c *compaction) makeInfo(jobID int) CompactionInfo { return info } -func newCompaction(pc *pickedCompaction, opts *Options, bytesCompacted *uint64) *compaction { +func newCompaction(pc *pickedCompaction, opts *Options) *compaction { c := &compaction{ - kind: compactionKindDefault, - cmp: pc.cmp, - equal: opts.equal(), - formatKey: opts.Comparer.FormatKey, - score: pc.score, - inputs: pc.inputs, - smallest: pc.smallest, - largest: pc.largest, - logger: opts.Logger, - version: pc.version, - maxOutputFileSize: pc.maxOutputFileSize, - maxOverlapBytes: pc.maxOverlapBytes, - atomicBytesIterated: bytesCompacted, - l0SublevelInfo: pc.l0SublevelInfo, + kind: compactionKindDefault, + cmp: pc.cmp, + equal: opts.equal(), + formatKey: opts.Comparer.FormatKey, + score: pc.score, + inputs: pc.inputs, + smallest: pc.smallest, + largest: pc.largest, + logger: opts.Logger, + version: pc.version, + maxOutputFileSize: pc.maxOutputFileSize, + maxOverlapBytes: pc.maxOverlapBytes, + l0SublevelInfo: pc.l0SublevelInfo, } c.startLevel = &c.inputs[0] c.outputLevel = &c.inputs[1] @@ -643,21 +636,18 @@ func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64) } } -func newFlush( - opts *Options, cur *version, baseLevel int, flushing flushableList, bytesFlushed *uint64, -) *compaction { +func newFlush(opts *Options, cur *version, baseLevel int, flushing flushableList) *compaction { c := &compaction{ - kind: compactionKindFlush, - cmp: opts.Comparer.Compare, - equal: opts.equal(), - formatKey: opts.Comparer.FormatKey, - logger: opts.Logger, - version: cur, - inputs: []compactionLevel{{level: -1}, {level: 0}}, - maxOutputFileSize: math.MaxUint64, - maxOverlapBytes: math.MaxUint64, - flushing: flushing, - atomicBytesIterated: bytesFlushed, + kind: compactionKindFlush, + cmp: opts.Comparer.Compare, + equal: opts.equal(), + formatKey: opts.Comparer.FormatKey, + logger: opts.Logger, + version: cur, + inputs: []compactionLevel{{level: -1}, {level: 0}}, + maxOutputFileSize: math.MaxUint64, + maxOverlapBytes: math.MaxUint64, + flushing: flushing, } c.startLevel = &c.inputs[0] c.outputLevel = &c.inputs[1] @@ -1389,13 +1379,15 @@ func (d *DB) removeInProgressCompaction(c *compaction) { } func (d *DB) getCompactionPacerInfo() compactionPacerInfo { - bytesFlushed := atomic.LoadUint64(&d.atomic.bytesFlushed) - d.mu.Lock() + estimatedMaxWAmp := d.mu.versions.picker.getEstimatedMaxWAmp() pacerInfo := compactionPacerInfo{ - slowdownThreshold: uint64(estimatedMaxWAmp * float64(d.opts.MemTableSize)), - totalCompactionDebt: d.mu.versions.picker.estimatedCompactionDebt(bytesFlushed), + slowdownThreshold: uint64(estimatedMaxWAmp * float64(d.opts.MemTableSize)), + // TODO(jackson): bytesFlushed is no longer maintained. To re-enable + // pacing, we'll need to restructure the code to produce a current + // `bytesFlushed` total. + //totalCompactionDebt: d.mu.versions.picker.estimatedCompactionDebt(bytesFlushed), } for _, m := range d.mu.mem.queue { pacerInfo.totalDirtyBytes += m.inuseBytes() @@ -1595,7 +1587,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { } c := newFlush(d.opts, d.mu.versions.currentVersion(), - d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], &d.atomic.bytesFlushed) + d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n]) d.addInProgressCompaction(c) jobID := d.mu.nextJobID @@ -1654,15 +1646,12 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { } } + bytesFlushed = c.bytesIterated d.maybeUpdateDeleteCompactionHints(c) d.removeInProgressCompaction(c) d.mu.versions.incrementCompactions(c.kind, c.extraLevels) d.mu.versions.incrementCompactionBytes(-c.bytesWritten) - // Refresh bytes flushed count. - bytesFlushed = atomic.LoadUint64(&d.atomic.bytesFlushed) - atomic.StoreUint64(&d.atomic.bytesFlushed, 0) - var flushed flushableList if err == nil { flushed = d.mu.mem.queue[:n] @@ -1755,7 +1744,6 @@ func (d *DB) maybeScheduleCompactionPicker( } env := compactionEnv{ - bytesCompacted: &d.atomic.bytesCompacted, earliestSnapshotSeqNum: d.mu.snapshots.earliest(), earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(), } @@ -1783,7 +1771,7 @@ func (d *DB) maybeScheduleCompactionPicker( env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil) pc, retryLater := d.mu.versions.picker.pickManual(env, manual) if pc != nil { - c := newCompaction(pc, d.opts, env.bytesCompacted) + c := newCompaction(pc, d.opts) d.mu.compact.manual = d.mu.compact.manual[1:] d.mu.compact.compactingCount++ d.addInProgressCompaction(c) @@ -1810,7 +1798,7 @@ func (d *DB) maybeScheduleCompactionPicker( if pc == nil { break } - c := newCompaction(pc, d.opts, env.bytesCompacted) + c := newCompaction(pc, d.opts) d.mu.compact.compactingCount++ d.addInProgressCompaction(c) go d.compact(c, nil) @@ -2632,14 +2620,6 @@ func (d *DB) runCompaction( } splitter := &splitterGroup{cmp: c.cmp, splitters: outputSplitters} - // NB: we avoid calling maybeThrottle on a nilPacer because the cost of - // dynamic dispatch in the hot loop below is pronounced in CPU profiles (see - // #1030). Additionally, even the cost of this interface comparison is - // pronounced in CPU profiles, so we hoist the entire thing out of the hot - // loop. This allows the branch predictor to do its job and make the pacer - // interactions ~free when a nilPacer is used. - isNilPacer := pacer == nilPacer - // Each outer loop iteration produces one output file. An iteration that // produces a file containing point keys (and optionally range tombstones) // guarantees that the input iterator advanced. An iteration that produces @@ -2657,12 +2637,6 @@ func (d *DB) runCompaction( break } - atomic.StoreUint64(c.atomicBytesIterated, c.bytesIterated) - if !isNilPacer { - if err := pacer.maybeThrottle(c.bytesIterated); err != nil { - return nil, pendingOutputs, err - } - } switch key.Kind() { case InternalKeyKindRangeDelete: // Range tombstones are handled specially. They are fragmented, diff --git a/compaction_picker.go b/compaction_picker.go index f9b54dc1aa..b38e1a8e22 100644 --- a/compaction_picker.go +++ b/compaction_picker.go @@ -22,7 +22,6 @@ const minIntraL0Count = 4 const levelMultiplier = 10 type compactionEnv struct { - bytesCompacted *uint64 earliestUnflushedSeqNum uint64 earliestSnapshotSeqNum uint64 inProgressCompactions []compactionInfo diff --git a/compaction_picker_test.go b/compaction_picker_test.go index 4703e6380d..d815db8503 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -522,13 +522,12 @@ func TestCompactionPickerL0(t *testing.T) { } pc = picker.pickAuto(compactionEnv{ - bytesCompacted: new(uint64), earliestUnflushedSeqNum: math.MaxUint64, inProgressCompactions: inProgressCompactions, }) var result strings.Builder if pc != nil { - c := newCompaction(pc, opts, new(uint64)) + c := newCompaction(pc, opts) fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level) fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files)) if !pc.outputLevel.files.Empty() { @@ -764,13 +763,12 @@ func TestCompactionPickerConcurrency(t *testing.T) { } pc := picker.pickAuto(compactionEnv{ - bytesCompacted: new(uint64), earliestUnflushedSeqNum: math.MaxUint64, inProgressCompactions: inProgressCompactions, }) var result strings.Builder if pc != nil { - c := newCompaction(pc, opts, new(uint64)) + c := newCompaction(pc, opts) fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level) fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files)) if !pc.outputLevel.files.Empty() { @@ -915,7 +913,6 @@ func TestCompactionPickerPickReadTriggered(t *testing.T) { case "pick-auto": pc := picker.pickAuto(compactionEnv{ - bytesCompacted: new(uint64), earliestUnflushedSeqNum: math.MaxUint64, readCompactionEnv: readCompactionEnv{ readCompactions: &rcList, @@ -1233,7 +1230,6 @@ func TestCompactionOutputFileSize(t *testing.T) { case "pick-auto": pc := picker.pickAuto(compactionEnv{ - bytesCompacted: new(uint64), earliestUnflushedSeqNum: math.MaxUint64, earliestSnapshotSeqNum: math.MaxUint64, }) diff --git a/compaction_test.go b/compaction_test.go index 6ae851c7f6..83b8d8b437 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -518,11 +518,9 @@ func TestPickCompaction(t *testing.T) { tc.picker.opts = opts tc.picker.vers = tc.version vs.picker = &tc.picker - env := compactionEnv{bytesCompacted: new(uint64)} - - pc, got := vs.picker.pickAuto(env), "" + pc, got := vs.picker.pickAuto(compactionEnv{}), "" if pc != nil { - c := newCompaction(pc, opts, env.bytesCompacted) + c := newCompaction(pc, opts) got0 := fileNums(c.startLevel.files) got1 := fileNums(c.outputLevel.files) got2 := fileNums(c.grandparents) @@ -944,8 +942,6 @@ func TestCompaction(t *testing.T) { if err != nil { t.Fatalf("Open: %v", err) } - mockLimiter := mockCountLimiter{burst: int(math.MaxInt32)} - d.compactionLimiter = &mockLimiter get1 := func(iter internalIterator) (ret string) { b := &bytes.Buffer{} @@ -1047,17 +1043,9 @@ func TestCompaction(t *testing.T) { t.Errorf("%q: %v", tc.key, err) } } - if err := d.Close(); err != nil { t.Fatalf("db Close: %v", err) } - - if mockLimiter.allowCount != 0 { - t.Errorf("limiter allow: got %d, want %d", mockLimiter.allowCount, 0) - } - if mockLimiter.waitCount == 0 { - t.Errorf("limiter wait: got %d, want >%d", mockLimiter.waitCount, 0) - } } func TestValidateVersionEdit(t *testing.T) { @@ -1760,7 +1748,7 @@ func TestCompactionOutputLevel(t *testing.T) { d.ScanArgs(t, "start", &start) d.ScanArgs(t, "base", &base) pc := newPickedCompaction(opts, version, start, defaultOutputLevel(start, base), base) - c := newCompaction(pc, opts, new(uint64)) + c := newCompaction(pc, opts) return fmt.Sprintf("output=%d\nmax-output-file-size=%d\n", c.outputLevel.level, c.maxOutputFileSize) diff --git a/data_test.go b/data_test.go index 53049384fb..72bd48a030 100644 --- a/data_test.go +++ b/data_test.go @@ -717,7 +717,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { flushed: make(chan struct{}), }} c := newFlush(d.opts, d.mu.versions.currentVersion(), - d.mu.versions.picker.getBaseLevel(), toFlush, &d.atomic.bytesFlushed) + d.mu.versions.picker.getBaseLevel(), toFlush) c.disableSpanElision = true // NB: define allows the test to exactly specify which keys go // into which sstables. If the test has a small target file diff --git a/db.go b/db.go index 1ce35c8402..5721d7c9dc 100644 --- a/db.go +++ b/db.go @@ -220,16 +220,6 @@ type DB struct { memTableCount int64 memTableReserved int64 // number of bytes reserved in the cache for memtables - // bytesFlushed is the number of bytes flushed in the current flush. This - // must be read/written atomically since it is accessed by both the flush - // and compaction routines. - bytesFlushed uint64 - - // bytesCompacted is the number of bytes compacted in the current compaction. - // This is used as a dummy variable to increment during compaction, and the - // value is not used anywhere. - bytesCompacted uint64 - // The size of the current log file (i.e. db.mu.log.queue[len(queue)-1]. logSize uint64 diff --git a/open.go b/open.go index 4010a55f9a..add8598e0a 100644 --- a/open.go +++ b/open.go @@ -706,7 +706,7 @@ func (d *DB) replayWAL( // mem is nil here. if !d.opts.ReadOnly { c := newFlush(d.opts, d.mu.versions.currentVersion(), - 1 /* base level */, toFlush, &d.atomic.bytesFlushed) + 1 /* base level */, toFlush) newVE, _, err := d.runCompaction(jobID, c, nilPacer) if err != nil { return 0, err