Skip to content

Commit

Permalink
fix(tsm1): "snapshot in progress" error during backup
Browse files Browse the repository at this point in the history
When an InfluxDB database is very busy writing new points the backup
the process can fail because it can not write a new snapshot.
The error is: operation timed out with error: create snapshot: snapshot in progress.
This happens because InfluxDB takes almost "continuously" a snapshot
from the cache caused by the high number of points ingested.
The fix for this was #16627
but it was for OSS only, and was not in the code path for backups
in clusters.
This fix adds a skipCacheOk flag to tsdb.Engine.CreateSnapshot().
A value of true allows the backup to proceed even if a cache snapshot
cannot be taken.
This flag is set to true in tsm1.Engine.Backup(), the OSS backup code path
and in tsdb.Shard.CreateSnapshot(), the cluster backup code path.
This flag is set to false in tsm1.Engine.Export()

influxdata/plutonium#3227
(cherry picked from commit 23be20b)
(cherry picked from commit 0b1ee04)
  • Loading branch information
davidby-influx committed Nov 14, 2020
1 parent 6a1299e commit fc0bbac
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 22 deletions.
2 changes: 1 addition & 1 deletion tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Engine interface {

LoadMetadataIndex(shardID uint64, index Index) error

CreateSnapshot() (string, error)
CreateSnapshot(skipCacheOk bool) (string, error)
Backup(w io.Writer, basePath string, since time.Time) error
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
Restore(r io.Reader, basePath string) error
Expand Down
34 changes: 14 additions & 20 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,26 +909,16 @@ func (e *Engine) Free() error {
// of the files in the archive. It will force a snapshot of the WAL first
// then perform the backup with a read lock against the file store. This means
// that new TSM files will not be able to be created in this shard while the
// backup is running. For shards that are still acively getting writes, this
// could cause the WAL to backup, increasing memory usage and evenutally rejecting writes.
// backup is running. For shards that are still actively getting writes, this
// could cause the WAL to backup, increasing memory usage and eventually rejecting writes.
func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
var err error
var path string
for i := 0; i < 3; i++ {
path, err = e.CreateSnapshot()
if err != nil {
switch err {
case ErrSnapshotInProgress:
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
time.Sleep(backoff)
default:
return err
}
}
}
if err == ErrSnapshotInProgress {
e.logger.Warn("Snapshotter busy: Backup proceeding without snapshot contents.")
path, err = e.CreateSnapshot(true)
if err != nil {
return err
}

// Remove the temporary snapshot dir
defer func() {
if err := os.RemoveAll(path); err != nil {
Expand Down Expand Up @@ -995,7 +985,7 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
}

func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.Time) error {
path, err := e.CreateSnapshot()
path, err := e.CreateSnapshot(false)
if err != nil {
return err
}
Expand Down Expand Up @@ -1918,9 +1908,13 @@ func (e *Engine) WriteSnapshot() (err error) {
}

// CreateSnapshot will create a temp directory that holds
// temporary hardlinks to the underylyng shard files.
func (e *Engine) CreateSnapshot() (string, error) {
if err := e.WriteSnapshot(); err != nil {
// temporary hardlinks to the underlying shard files.
// skipCacheOk controls whether it is permissible to fail writing out
// in-memory cache data when a previous snapshot is in progress
func (e *Engine) CreateSnapshot(skipCacheOk bool) (string, error) {
if err := e.WriteSnapshot(); (err == ErrSnapshotInProgress) && skipCacheOk {
e.logger.Warn("Snapshotter busy: Backup or export proceeding without cache snapshot contents.")
} else if err != nil {
return "", err
}

Expand Down
2 changes: 1 addition & 1 deletion tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ func (s *Shard) CreateSnapshot() (string, error) {
if err != nil {
return "", err
}
return engine.CreateSnapshot()
return engine.CreateSnapshot(true)
}

// ForEachMeasurementName iterates over each measurement in the shard.
Expand Down

0 comments on commit fc0bbac

Please sign in to comment.