Skip to content

Commit

Permalink
[extension/filestorage] Online file storage compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikołaj Świątek authored and pmm-sumo committed Apr 22, 2022
1 parent 19ba19f commit 5fad271
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 65 deletions.
40 changes: 37 additions & 3 deletions extension/storage/filestorage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,48 @@ The default directory is `%ProgramData%\Otelcol\FileStorage` on Windows and `/va
`timeout` is the maximum time to wait for a file lock. This value does not need to be modified in most circumstances.
The default timeout is `1s`.

`compaction` defines how and when files should be compacted.
For now only compaction on start of the collector is supported, and can be enabled by `compaction.on_start` option.
## Compaction
`compaction` defines how and when files should be compacted. There are two modes of compaction available (both of which can be set concurrently):
- `compaction.on_start`, which happens when collector starts
- `compaction.on_rebound`, which happens online when certain criteria are met; it's discussed in more detail below

`compaction.directory` is the directory used for compaction (as midstep).
`compaction.directory` specifies the directory used for compaction (as a midstep).

`compaction.max_transaction_size` defines maximum size of the compaction transaction.
A value of zero will ignore transaction sizes.

### Rebound (online) compaction

For rebound compaction, there are two additional parameters available:
- `compaction.rebound_size_below_mib` - specifies the maximum size of actually allocated data for compaction to happen
- `compaction.rebound_total_size_above_mib` - specifies the minimum overall size of the allocated space (both actually used and free pages)

The idea behind rebound compaction is that in certain workloads (e.g. [persistent queue](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/exporterhelper#persistent-queue)) the storage might grow significantly (e.g. when the exporter is unable to send the data due to network problem) after which it is being emptied as the underlying issue is gone (e.g. network connectivity is back). This leaves a significant space that needs to be reclaimed (also, this space is reported in memory usage as mmap() is used underneath). The optimal conditions for this to happen online is after the storage is largely drained, which is being controlled by `rebound_size_below_mib`. To make sure this is not too sensitive, there's also `rebound_total_size_above_mib` which specifie the total claimed space size that must be met for online compaction to even be considered. Consider following diagram for an example of meeting the rebound (online) compaction conditions.

```
│ XX.............
m │ XXXX............
e ├───────────XXXXXXX..........──────────── rebound_total_size_above_mib
m │ XXXXXXXXX..........
o │ XXXXXXXXXXX.........
r │ XXXXXXXXXXXXXXXXX....
y ├─────XXXXXXXXXXXXXXXXXXXXX..──────────── rebound_size_below_mib
│ XXXXXXXXXXXXXXXXXXXXXXXXXX.........
│ XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
└──────────────── time ─────────────────►
│ | |
issue draining compaction happens
starts begins and reclaims space
X - actually used space
. - claimed but no longer used space
```


## Example

```
extensions:
file_storage:
Expand Down
112 changes: 92 additions & 20 deletions extension/storage/filestorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,28 @@ import (
"errors"
"io/ioutil"
"os"
"sync"
"time"

"go.etcd.io/bbolt"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.uber.org/zap"
)

var defaultBucket = []byte(`default`)

const (
elapsedKey = "elapsed"
directoryKey = "directory"
tempDirectoryKey = "tempDirectory"
)

type fileStorageClient struct {
db *bbolt.DB
logger *zap.Logger
compactionMutex sync.RWMutex
db *bbolt.DB
compactionCfg *CompactionConfig
openTimeout time.Duration
}

func bboltOptions(timeout time.Duration) *bbolt.Options {
Expand All @@ -40,7 +52,7 @@ func bboltOptions(timeout time.Duration) *bbolt.Options {
}
}

func newClient(filePath string, timeout time.Duration) (*fileStorageClient, error) {
func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig) (*fileStorageClient, error) {
options := bboltOptions(timeout)
db, err := bbolt.Open(filePath, 0600, options)
if err != nil {
Expand All @@ -55,7 +67,7 @@ func newClient(filePath string, timeout time.Duration) (*fileStorageClient, erro
return nil, err
}

return &fileStorageClient{db}, nil
return &fileStorageClient{logger: logger, db: db, compactionCfg: compactionCfg, openTimeout: timeout}, nil
}

// Get will retrieve data from storage that corresponds to the specified key
Expand All @@ -80,7 +92,7 @@ func (c *fileStorageClient) Delete(ctx context.Context, key string) error {
}

// Batch executes the specified operations in order. Get operation results are updated in place
func (c *fileStorageClient) Batch(_ context.Context, ops ...storage.Operation) error {
func (c *fileStorageClient) Batch(ctx context.Context, ops ...storage.Operation) error {
batch := func(tx *bbolt.Tx) error {
bucket := tx.Bucket(defaultBucket)
if bucket == nil {
Expand Down Expand Up @@ -108,7 +120,20 @@ func (c *fileStorageClient) Batch(_ context.Context, ops ...storage.Operation) e
return nil
}

return c.db.Update(batch)
c.compactionMutex.RLock()
defer c.compactionMutex.RUnlock()
err := c.db.Update(batch)
if c.shouldCompactOnWrite() {
go func() {
onWriteErr := c.Compact(ctx, c.compactionCfg.Directory, c.openTimeout, c.compactionCfg.MaxTransactionSize)
if onWriteErr != nil {
c.logger.Error("compaction failure",
zap.String(directoryKey, c.compactionCfg.Directory),
zap.Error(onWriteErr))
}
}()
}
return err
}

// Close will close the database
Expand All @@ -117,40 +142,87 @@ func (c *fileStorageClient) Close(_ context.Context) error {
}

// Compact database. Use temporary file as helper as we cannot replace database in-place
func (c *fileStorageClient) Compact(ctx context.Context, compactionDirectory string, timeout time.Duration, maxTransactionSize int64) (*fileStorageClient, error) {
func (c *fileStorageClient) Compact(ctx context.Context, compactionDirectory string, timeout time.Duration, maxTransactionSize int64) error {
var err error
var file *os.File
var compactedDb *bbolt.DB

// create temporary file in compactionDirectory
file, err := ioutil.TempFile(compactionDirectory, "tempdb")
file, err = ioutil.TempFile(compactionDirectory, "tempdb")
if err != nil {
return nil, err
return err
}

defer func() {
_, statErr := file.Stat()
if statErr == nil {
// File still exists and needs to be removed
if removeErr := os.Remove(file.Name()); removeErr != nil {
c.logger.Error("removing temporary compaction file failed", zap.Error(removeErr))
}
}
}()

// use temporary file as compaction target
options := bboltOptions(timeout)

c.logger.Debug("starting compaction",
zap.String(directoryKey, c.db.Path()),
zap.String(tempDirectoryKey, file.Name()))

// cannot reuse newClient as db shouldn't contain any bucket
db, err := bbolt.Open(file.Name(), 0600, options)
compactedDb, err = bbolt.Open(file.Name(), 0600, options)
if err != nil {
return nil, err
return err
}

if err := bbolt.Compact(db, c.db, maxTransactionSize); err != nil {
return nil, err
c.compactionMutex.Lock()
defer c.compactionMutex.Unlock()
compactionStart := time.Now()

if err = bbolt.Compact(compactedDb, c.db, maxTransactionSize); err != nil {
return err
}

dbPath := c.db.Path()
tempDBPath := db.Path()
compactedDbPath := compactedDb.Path()

db.Close()
c.Close(ctx)
c.db.Close()
compactedDb.Close()

// replace current db file with compacted db file
if err := os.Remove(dbPath); err != nil {
return nil, err
if err = os.Remove(dbPath); err != nil {
return err
}

if err := os.Rename(tempDBPath, dbPath); err != nil {
return nil, err
if err = os.Rename(compactedDbPath, dbPath); err != nil {
return err
}

c.db, err = bbolt.Open(dbPath, 0600, options)

c.logger.Info("finished compaction",
zap.String(directoryKey, dbPath),
zap.Duration(elapsedKey, time.Since(compactionStart)))

return err
}

// shouldCompactOnWrite checks whether the conditions for online compaction are met
func (c *fileStorageClient) shouldCompactOnWrite() bool {
if !c.compactionCfg.OnRebound {
return false
}

return newClient(dbPath, timeout)
dbStats := c.db.Stats()
if dbStats.FreelistInuse > int(c.compactionCfg.ReboundSizeBelowMiB) ||
dbStats.FreeAlloc+dbStats.FreelistInuse < int(c.compactionCfg.ReboundTotalSizeAboveMiB) {
return false
}

c.logger.Debug("shouldCompactOnWrite returns true",
zap.Int("FreelistInuse", dbStats.FreelistInuse),
zap.Int("FreeAlloc", dbStats.FreeAlloc))

return true
}
Loading

0 comments on commit 5fad271

Please sign in to comment.