Skip to content

Commit

Permalink
db: remove compaction.atomicBytesIterated
Browse files Browse the repository at this point in the history
The atomic bytes-iterated value was used for compaction and flush pacing.
Remove it for now to resolve #1808.

Fix #1808.
  • Loading branch information
jbowens committed Jul 14, 2022
1 parent d3484a6 commit 37c3f24
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 95 deletions.
96 changes: 35 additions & 61 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,12 +419,6 @@ type compaction struct {
bytesIterated uint64
// bytesWritten contains the number of bytes that have been written to outputs.
bytesWritten int64
// atomicBytesIterated points to the variable to increment during iteration.
// atomicBytesIterated must be read/written atomically. Flushing will increment
// the shared variable which compaction will read. This allows for the
// compaction routine to know how many bytes have been flushed before the flush
// is applied.
atomicBytesIterated *uint64

// The boundaries of the input data.
smallest InternalKey
Expand Down Expand Up @@ -512,22 +506,21 @@ func (c *compaction) makeInfo(jobID int) CompactionInfo {
return info
}

func newCompaction(pc *pickedCompaction, opts *Options, bytesCompacted *uint64) *compaction {
func newCompaction(pc *pickedCompaction, opts *Options) *compaction {
c := &compaction{
kind: compactionKindDefault,
cmp: pc.cmp,
equal: opts.equal(),
formatKey: opts.Comparer.FormatKey,
score: pc.score,
inputs: pc.inputs,
smallest: pc.smallest,
largest: pc.largest,
logger: opts.Logger,
version: pc.version,
maxOutputFileSize: pc.maxOutputFileSize,
maxOverlapBytes: pc.maxOverlapBytes,
atomicBytesIterated: bytesCompacted,
l0SublevelInfo: pc.l0SublevelInfo,
kind: compactionKindDefault,
cmp: pc.cmp,
equal: opts.equal(),
formatKey: opts.Comparer.FormatKey,
score: pc.score,
inputs: pc.inputs,
smallest: pc.smallest,
largest: pc.largest,
logger: opts.Logger,
version: pc.version,
maxOutputFileSize: pc.maxOutputFileSize,
maxOverlapBytes: pc.maxOverlapBytes,
l0SublevelInfo: pc.l0SublevelInfo,
}
c.startLevel = &c.inputs[0]
c.outputLevel = &c.inputs[1]
Expand Down Expand Up @@ -643,21 +636,18 @@ func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64)
}
}

func newFlush(
opts *Options, cur *version, baseLevel int, flushing flushableList, bytesFlushed *uint64,
) *compaction {
func newFlush(opts *Options, cur *version, baseLevel int, flushing flushableList) *compaction {
c := &compaction{
kind: compactionKindFlush,
cmp: opts.Comparer.Compare,
equal: opts.equal(),
formatKey: opts.Comparer.FormatKey,
logger: opts.Logger,
version: cur,
inputs: []compactionLevel{{level: -1}, {level: 0}},
maxOutputFileSize: math.MaxUint64,
maxOverlapBytes: math.MaxUint64,
flushing: flushing,
atomicBytesIterated: bytesFlushed,
kind: compactionKindFlush,
cmp: opts.Comparer.Compare,
equal: opts.equal(),
formatKey: opts.Comparer.FormatKey,
logger: opts.Logger,
version: cur,
inputs: []compactionLevel{{level: -1}, {level: 0}},
maxOutputFileSize: math.MaxUint64,
maxOverlapBytes: math.MaxUint64,
flushing: flushing,
}
c.startLevel = &c.inputs[0]
c.outputLevel = &c.inputs[1]
Expand Down Expand Up @@ -1389,13 +1379,15 @@ func (d *DB) removeInProgressCompaction(c *compaction) {
}

func (d *DB) getCompactionPacerInfo() compactionPacerInfo {
bytesFlushed := atomic.LoadUint64(&d.atomic.bytesFlushed)

d.mu.Lock()

estimatedMaxWAmp := d.mu.versions.picker.getEstimatedMaxWAmp()
pacerInfo := compactionPacerInfo{
slowdownThreshold: uint64(estimatedMaxWAmp * float64(d.opts.MemTableSize)),
totalCompactionDebt: d.mu.versions.picker.estimatedCompactionDebt(bytesFlushed),
slowdownThreshold: uint64(estimatedMaxWAmp * float64(d.opts.MemTableSize)),
// TODO(jackson): bytesFlushed is no longer maintained. To re-enable
// pacing, we'll need to restructure the code to produce a current
// `bytesFlushed` total.
//totalCompactionDebt: d.mu.versions.picker.estimatedCompactionDebt(bytesFlushed),
}
for _, m := range d.mu.mem.queue {
pacerInfo.totalDirtyBytes += m.inuseBytes()
Expand Down Expand Up @@ -1595,7 +1587,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
}

c := newFlush(d.opts, d.mu.versions.currentVersion(),
d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], &d.atomic.bytesFlushed)
d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n])
d.addInProgressCompaction(c)

jobID := d.mu.nextJobID
Expand Down Expand Up @@ -1654,15 +1646,12 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
}
}

bytesFlushed = c.bytesIterated
d.maybeUpdateDeleteCompactionHints(c)
d.removeInProgressCompaction(c)
d.mu.versions.incrementCompactions(c.kind, c.extraLevels)
d.mu.versions.incrementCompactionBytes(-c.bytesWritten)

// Refresh bytes flushed count.
bytesFlushed = atomic.LoadUint64(&d.atomic.bytesFlushed)
atomic.StoreUint64(&d.atomic.bytesFlushed, 0)

var flushed flushableList
if err == nil {
flushed = d.mu.mem.queue[:n]
Expand Down Expand Up @@ -1755,7 +1744,6 @@ func (d *DB) maybeScheduleCompactionPicker(
}

env := compactionEnv{
bytesCompacted: &d.atomic.bytesCompacted,
earliestSnapshotSeqNum: d.mu.snapshots.earliest(),
earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
}
Expand Down Expand Up @@ -1783,7 +1771,7 @@ func (d *DB) maybeScheduleCompactionPicker(
env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
pc, retryLater := d.mu.versions.picker.pickManual(env, manual)
if pc != nil {
c := newCompaction(pc, d.opts, env.bytesCompacted)
c := newCompaction(pc, d.opts)
d.mu.compact.manual = d.mu.compact.manual[1:]
d.mu.compact.compactingCount++
d.addInProgressCompaction(c)
Expand All @@ -1810,7 +1798,7 @@ func (d *DB) maybeScheduleCompactionPicker(
if pc == nil {
break
}
c := newCompaction(pc, d.opts, env.bytesCompacted)
c := newCompaction(pc, d.opts)
d.mu.compact.compactingCount++
d.addInProgressCompaction(c)
go d.compact(c, nil)
Expand Down Expand Up @@ -2632,14 +2620,6 @@ func (d *DB) runCompaction(
}
splitter := &splitterGroup{cmp: c.cmp, splitters: outputSplitters}

// NB: we avoid calling maybeThrottle on a nilPacer because the cost of
// dynamic dispatch in the hot loop below is pronounced in CPU profiles (see
// #1030). Additionally, even the cost of this interface comparison is
// pronounced in CPU profiles, so we hoist the entire thing out of the hot
// loop. This allows the branch predictor to do its job and make the pacer
// interactions ~free when a nilPacer is used.
isNilPacer := pacer == nilPacer

// Each outer loop iteration produces one output file. An iteration that
// produces a file containing point keys (and optionally range tombstones)
// guarantees that the input iterator advanced. An iteration that produces
Expand All @@ -2657,12 +2637,6 @@ func (d *DB) runCompaction(
break
}

atomic.StoreUint64(c.atomicBytesIterated, c.bytesIterated)
if !isNilPacer {
if err := pacer.maybeThrottle(c.bytesIterated); err != nil {
return nil, pendingOutputs, err
}
}
switch key.Kind() {
case InternalKeyKindRangeDelete:
// Range tombstones are handled specially. They are fragmented,
Expand Down
1 change: 0 additions & 1 deletion compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const minIntraL0Count = 4
const levelMultiplier = 10

type compactionEnv struct {
bytesCompacted *uint64
earliestUnflushedSeqNum uint64
earliestSnapshotSeqNum uint64
inProgressCompactions []compactionInfo
Expand Down
8 changes: 2 additions & 6 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,12 @@ func TestCompactionPickerL0(t *testing.T) {
}

pc = picker.pickAuto(compactionEnv{
bytesCompacted: new(uint64),
earliestUnflushedSeqNum: math.MaxUint64,
inProgressCompactions: inProgressCompactions,
})
var result strings.Builder
if pc != nil {
c := newCompaction(pc, opts, new(uint64))
c := newCompaction(pc, opts)
fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level)
fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files))
if !pc.outputLevel.files.Empty() {
Expand Down Expand Up @@ -764,13 +763,12 @@ func TestCompactionPickerConcurrency(t *testing.T) {
}

pc := picker.pickAuto(compactionEnv{
bytesCompacted: new(uint64),
earliestUnflushedSeqNum: math.MaxUint64,
inProgressCompactions: inProgressCompactions,
})
var result strings.Builder
if pc != nil {
c := newCompaction(pc, opts, new(uint64))
c := newCompaction(pc, opts)
fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level)
fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files))
if !pc.outputLevel.files.Empty() {
Expand Down Expand Up @@ -915,7 +913,6 @@ func TestCompactionPickerPickReadTriggered(t *testing.T) {

case "pick-auto":
pc := picker.pickAuto(compactionEnv{
bytesCompacted: new(uint64),
earliestUnflushedSeqNum: math.MaxUint64,
readCompactionEnv: readCompactionEnv{
readCompactions: &rcList,
Expand Down Expand Up @@ -1233,7 +1230,6 @@ func TestCompactionOutputFileSize(t *testing.T) {

case "pick-auto":
pc := picker.pickAuto(compactionEnv{
bytesCompacted: new(uint64),
earliestUnflushedSeqNum: math.MaxUint64,
earliestSnapshotSeqNum: math.MaxUint64,
})
Expand Down
18 changes: 3 additions & 15 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,9 @@ func TestPickCompaction(t *testing.T) {
tc.picker.opts = opts
tc.picker.vers = tc.version
vs.picker = &tc.picker
env := compactionEnv{bytesCompacted: new(uint64)}

pc, got := vs.picker.pickAuto(env), ""
pc, got := vs.picker.pickAuto(compactionEnv{}), ""
if pc != nil {
c := newCompaction(pc, opts, env.bytesCompacted)
c := newCompaction(pc, opts)
got0 := fileNums(c.startLevel.files)
got1 := fileNums(c.outputLevel.files)
got2 := fileNums(c.grandparents)
Expand Down Expand Up @@ -944,8 +942,6 @@ func TestCompaction(t *testing.T) {
if err != nil {
t.Fatalf("Open: %v", err)
}
mockLimiter := mockCountLimiter{burst: int(math.MaxInt32)}
d.compactionLimiter = &mockLimiter

get1 := func(iter internalIterator) (ret string) {
b := &bytes.Buffer{}
Expand Down Expand Up @@ -1047,17 +1043,9 @@ func TestCompaction(t *testing.T) {
t.Errorf("%q: %v", tc.key, err)
}
}

if err := d.Close(); err != nil {
t.Fatalf("db Close: %v", err)
}

if mockLimiter.allowCount != 0 {
t.Errorf("limiter allow: got %d, want %d", mockLimiter.allowCount, 0)
}
if mockLimiter.waitCount == 0 {
t.Errorf("limiter wait: got %d, want >%d", mockLimiter.waitCount, 0)
}
}

func TestValidateVersionEdit(t *testing.T) {
Expand Down Expand Up @@ -1760,7 +1748,7 @@ func TestCompactionOutputLevel(t *testing.T) {
d.ScanArgs(t, "start", &start)
d.ScanArgs(t, "base", &base)
pc := newPickedCompaction(opts, version, start, defaultOutputLevel(start, base), base)
c := newCompaction(pc, opts, new(uint64))
c := newCompaction(pc, opts)
return fmt.Sprintf("output=%d\nmax-output-file-size=%d\n",
c.outputLevel.level, c.maxOutputFileSize)

Expand Down
2 changes: 1 addition & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
flushed: make(chan struct{}),
}}
c := newFlush(d.opts, d.mu.versions.currentVersion(),
d.mu.versions.picker.getBaseLevel(), toFlush, &d.atomic.bytesFlushed)
d.mu.versions.picker.getBaseLevel(), toFlush)
c.disableSpanElision = true
// NB: define allows the test to exactly specify which keys go
// into which sstables. If the test has a small target file
Expand Down
10 changes: 0 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,6 @@ type DB struct {
memTableCount int64
memTableReserved int64 // number of bytes reserved in the cache for memtables

// bytesFlushed is the number of bytes flushed in the current flush. This
// must be read/written atomically since it is accessed by both the flush
// and compaction routines.
bytesFlushed uint64

// bytesCompacted is the number of bytes compacted in the current compaction.
// This is used as a dummy variable to increment during compaction, and the
// value is not used anywhere.
bytesCompacted uint64

// The size of the current log file (i.e. db.mu.log.queue[len(queue)-1].
logSize uint64

Expand Down
2 changes: 1 addition & 1 deletion open.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func (d *DB) replayWAL(
// mem is nil here.
if !d.opts.ReadOnly {
c := newFlush(d.opts, d.mu.versions.currentVersion(),
1 /* base level */, toFlush, &d.atomic.bytesFlushed)
1 /* base level */, toFlush)
newVE, _, err := d.runCompaction(jobID, c, nilPacer)
if err != nil {
return 0, err
Expand Down

0 comments on commit 37c3f24

Please sign in to comment.