Skip to content

Commit

Permalink
*: Don't block flushes on cleaning turns
Browse files Browse the repository at this point in the history
A flush isn't marked as complete (and flushing set to
false) until deleteObsoleteFiles returns. Currently,
deleteObsoleteFiles waits for other cleaning turns to complete
before doing its own cleaning. This could make flushes wait
for cleanup after large compactions, causing write stalls.

This change makes flushes move along and mark themselves as
completed if a cleaner job is already running, instead of waiting
for them. This allows for a lower impact on user-observed
write latency.
  • Loading branch information
itsbilal committed May 3, 2021
1 parent 0f79851 commit 23421fe
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 10 deletions.
14 changes: 8 additions & 6 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,7 @@ func (d *DB) flush1() error {
d.updateReadStateLocked(d.opts.DebugCheck)
d.updateTableStatsLocked(ve.NewFiles)
}
d.deleteObsoleteFiles(jobID)
d.deleteObsoleteFiles(jobID, false /* waitForOngoing */)

// Mark all the memtables we flushed as flushed. Note that we do this last so
// that a synchronous call to DB.Flush() will not return until the deletion
Expand Down Expand Up @@ -1888,7 +1888,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
d.updateReadStateLocked(d.opts.DebugCheck)
d.updateTableStatsLocked(ve.NewFiles)
}
d.deleteObsoleteFiles(jobID)
d.deleteObsoleteFiles(jobID, true /* waitForOngoing */)

return err
}
Expand Down Expand Up @@ -2559,7 +2559,7 @@ func (d *DB) enableFileDeletions() {
}
jobID := d.mu.nextJobID
d.mu.nextJobID++
d.deleteObsoleteFiles(jobID)
d.deleteObsoleteFiles(jobID, true /* waitForOngoing */)
}

// d.mu must be held when calling this.
Expand All @@ -2586,12 +2586,14 @@ func (d *DB) releaseCleaningTurn() {
d.mu.cleaner.cond.Broadcast()
}

// deleteObsoleteFiles deletes those files that are no longer needed.
// deleteObsoleteFiles deletes those files that are no longer needed. If
// waitForOngoing is true, it waits for any ongoing cleaning turns to complete,
// and if false, it returns rightaway if a cleaning turn is ongoing.
//
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) deleteObsoleteFiles(jobID int) {
if !d.acquireCleaningTurn(true) {
func (d *DB) deleteObsoleteFiles(jobID int, waitForOngoing bool) {
if !d.acquireCleaningTurn(waitForOngoing) {
return
}
d.doDeleteObsoleteFiles(jobID)
Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ func (d *DB) Close() error {
// prevented a new cleaning job when a readState was unrefed. If needed,
// synchronously delete obsolete files.
if len(d.mu.versions.obsoleteTables) > 0 {
d.deleteObsoleteFiles(d.mu.nextJobID)
d.deleteObsoleteFiles(d.mu.nextJobID, true /* waitForOngoing */)
}
// Wait for all the deletion goroutines spawned by cleaning jobs to finish.
d.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion flush_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func flushExternalTable(untypedDB interface{}, path string, originalMeta *fileMe
}
d.updateReadStateLocked(d.opts.DebugCheck)
d.updateTableStatsLocked(ve.NewFiles)
d.deleteObsoleteFiles(jobID)
d.deleteObsoleteFiles(jobID, true /* waitForOngoing */)
d.maybeScheduleCompaction()
d.mu.Unlock()
return nil
Expand Down
12 changes: 12 additions & 0 deletions flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ func TestManualFlush(t *testing.T) {
d.mu.Unlock()
return s

case "acquire-cleaning-turn":
d.mu.Lock()
d.acquireCleaningTurn(false)
d.mu.Unlock()
return ""

case "release-cleaning-turn":
d.mu.Lock()
d.releaseCleaningTurn()
d.mu.Unlock()
return ""

case "reset":
if err := d.Close(); err != nil {
return err.Error()
Expand Down
2 changes: 1 addition & 1 deletion ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error)
}
d.updateReadStateLocked(d.opts.DebugCheck)
d.updateTableStatsLocked(ve.NewFiles)
d.deleteObsoleteFiles(jobID)
d.deleteObsoleteFiles(jobID, false /* waitForOngoing */)
// The ingestion may have pushed a level over the threshold for compaction,
// so check to see if one is necessary and schedule it.
d.maybeScheduleCompaction()
Expand Down
2 changes: 1 addition & 1 deletion open.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {

if !d.opts.ReadOnly {
d.scanObsoleteFiles(ls)
d.deleteObsoleteFiles(jobID)
d.deleteObsoleteFiles(jobID, true /* waitForOngoing */)
} else {
// All the log files are obsolete.
d.mu.versions.metrics.WAL.Files = int64(len(logFiles))
Expand Down
20 changes: 20 additions & 0 deletions testdata/manual_flush
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,23 @@ async-flush
----
0.0:
000005:[a#1,SET-b#2,SET]

# Test that synchronous flushes can happen even when a cleaning turn is held.
reset
----

acquire-cleaning-turn
----

batch
set a 1
set b 2
----

flush
----
0.0:
000005:[a#1,SET-b#2,SET]

release-cleaning-turn
----

0 comments on commit 23421fe

Please sign in to comment.