Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/filestorage] Online file storage compaction #9327

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- `elasticsearchreceiver`: Add integration test for elasticsearch receiver (#10165)
- `datadogexporter`: Some config validation and unmarshaling steps are now done on `Validate` and `Unmarshal` instead of `Sanitize` (#8829)
- `examples`: Add an example for scraping Couchbase metrics (#10894)
- `filestorageextension`: Add background compaction capability (#9327)
- `googlecloudpubsubreceiver`: Added new `Endpoint` and `Insecure` connection configuration options. (#10845)

### 🧰 Bug fixes 🧰
Expand Down
43 changes: 39 additions & 4 deletions extension/storage/filestorage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,49 @@ The default directory is `%ProgramData%\Otelcol\FileStorage` on Windows and `/va
`timeout` is the maximum time to wait for a file lock. This value does not need to be modified in most circumstances.
The default timeout is `1s`.

`compaction` defines how and when files should be compacted.
For now only compaction on start of the collector is supported, and can be enabled by `compaction.on_start` option.
## Compaction
`compaction` defines how and when files should be compacted. There are two modes of compaction available (both of which can be set concurrently):
- `compaction.on_start` (default: false), which happens when collector starts
- `compaction.on_rebound` (default: false), which happens online when certain criteria are met; it's discussed in more detail below

`compaction.directory` is the directory used for compaction (as midstep).
`compaction.directory` specifies the directory used for compaction (as a midstep).

`compaction.max_transaction_size` defines maximum size of the compaction transaction.
`compaction.max_transaction_size` (default: 65536): defines maximum size of the compaction transaction.
A value of zero will ignore transaction sizes.

### Rebound (online) compaction

For rebound compaction, there are two additional parameters available:
- `compaction.rebound_needed_threshold_mib` (default: 100) - when allocated data exceeds this amount, the "compaction needed" flag will be enabled
- `compaction.rebound_trigger_threshold_mib` (default: 10) - if the "compaction needed" flag is set and allocated data drops below this amount, compaction will begin and the "compaction needed" flag will be cleared
- `compaction.check_interval` (default: 5s) - specifies how frequently the conditions for compaction are being checked

The idea behind rebound compaction is that in certain workloads (e.g. [persistent queue](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/exporterhelper#persistent-queue)) the storage might grow significantly (e.g. when the exporter is unable to send the data due to network problem) after which it is being emptied as the underlying issue is gone (e.g. network connectivity is back). This leaves a significant space that needs to be reclaimed (also, this space is reported in memory usage as mmap() is used underneath). The optimal conditions for this to happen online is after the storage is largely drained, which is being controlled by `rebound_trigger_threshold_mib`. To make sure this is not too sensitive, there's also `rebound_needed_threshold_mib` which specifies the total claimed space size that must be met for online compaction to even be considered. Consider following diagram for an example of meeting the rebound (online) compaction conditions.

```
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
│ XX.............
m │ XXXX............
e ├───────────XXXXXXX..........──────────── rebound_needed_threshold_mib
m │ XXXXXXXXX..........
o │ XXXXXXXXXXX.........
r │ XXXXXXXXXXXXXXXXX....
y ├─────XXXXXXXXXXXXXXXXXXXXX..──────────── rebound_trigger_threshold_mib
│ XXXXXXXXXXXXXXXXXXXXXXXXXX.........
│ XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
└──────────────── time ─────────────────►
│ | |
issue draining compaction happens
starts begins and reclaims space

X - actually used space
. - claimed but no longer used space
```


## Example

```
extensions:
file_storage:
Expand Down
177 changes: 157 additions & 20 deletions extension/storage/filestorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,32 @@ import (
"errors"
"io/ioutil"
"os"
"sync"
"time"

"go.etcd.io/bbolt"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.uber.org/zap"
)

var defaultBucket = []byte(`default`)

const (
elapsedKey = "elapsed"
directoryKey = "directory"
tempDirectoryKey = "tempDirectory"

oneMiB = 1048576
)

type fileStorageClient struct {
db *bbolt.DB
logger *zap.Logger
compactionMutex sync.RWMutex
db *bbolt.DB
compactionCfg *CompactionConfig
openTimeout time.Duration
cancel context.CancelFunc
closed bool
}

func bboltOptions(timeout time.Duration) *bbolt.Options {
Expand All @@ -40,7 +56,7 @@ func bboltOptions(timeout time.Duration) *bbolt.Options {
}
}

func newClient(filePath string, timeout time.Duration) (*fileStorageClient, error) {
func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig) (*fileStorageClient, error) {
options := bboltOptions(timeout)
db, err := bbolt.Open(filePath, 0600, options)
if err != nil {
Expand All @@ -56,7 +72,12 @@ func newClient(filePath string, timeout time.Duration) (*fileStorageClient, erro
return nil, err
}

return &fileStorageClient{db}, nil
client := &fileStorageClient{logger: logger, db: db, compactionCfg: compactionCfg, openTimeout: timeout}
if compactionCfg.OnRebound {
client.startCompactionLoop(context.Background())
}

return client, nil
}

// Get will retrieve data from storage that corresponds to the specified key
Expand All @@ -81,7 +102,7 @@ func (c *fileStorageClient) Delete(ctx context.Context, key string) error {
}

// Batch executes the specified operations in order. Get operation results are updated in place
func (c *fileStorageClient) Batch(_ context.Context, ops ...storage.Operation) error {
func (c *fileStorageClient) Batch(ctx context.Context, ops ...storage.Operation) error {
batch := func(tx *bbolt.Tx) error {
bucket := tx.Bucket(defaultBucket)
if bucket == nil {
Expand Down Expand Up @@ -109,53 +130,169 @@ func (c *fileStorageClient) Batch(_ context.Context, ops ...storage.Operation) e
return nil
}

c.compactionMutex.RLock()
defer c.compactionMutex.RUnlock()
return c.db.Update(batch)
}

// Close will close the database
func (c *fileStorageClient) Close(_ context.Context) error {
c.compactionMutex.Lock()
defer c.compactionMutex.Unlock()

if c.cancel != nil {
c.cancel()
}
c.closed = true
return c.db.Close()
}

// Compact database. Use temporary file as helper as we cannot replace database in-place
func (c *fileStorageClient) Compact(ctx context.Context, compactionDirectory string, timeout time.Duration, maxTransactionSize int64) (*fileStorageClient, error) {
func (c *fileStorageClient) Compact(compactionDirectory string, timeout time.Duration, maxTransactionSize int64) error {
var err error
var file *os.File
var compactedDb *bbolt.DB

// create temporary file in compactionDirectory
file, err := ioutil.TempFile(compactionDirectory, "tempdb")
file, err = ioutil.TempFile(compactionDirectory, "tempdb")
if err != nil {
return nil, err
return err
}
err = file.Close()
if err != nil {
return nil, err
return err
}

defer func() {
_, statErr := os.Stat(file.Name())
if statErr == nil {
// File still exists and needs to be removed
if removeErr := os.Remove(file.Name()); removeErr != nil {
c.logger.Error("removing temporary compaction file failed", zap.Error(removeErr))
}
}
}()

// use temporary file as compaction target
options := bboltOptions(timeout)

c.compactionMutex.Lock()
defer c.compactionMutex.Unlock()
if c.closed {
c.logger.Debug("skipping compaction since database is already closed")
return nil
}

c.logger.Debug("starting compaction",
zap.String(directoryKey, c.db.Path()),
zap.String(tempDirectoryKey, file.Name()))

// cannot reuse newClient as db shouldn't contain any bucket
db, err := bbolt.Open(file.Name(), 0600, options)
compactedDb, err = bbolt.Open(file.Name(), 0600, options)
if err != nil {
return nil, err
return err
}

if err := bbolt.Compact(db, c.db, maxTransactionSize); err != nil {
return nil, err
compactionStart := time.Now()

if err = bbolt.Compact(compactedDb, c.db, maxTransactionSize); err != nil {
return err
}

dbPath := c.db.Path()
tempDBPath := db.Path()
compactedDbPath := compactedDb.Path()

db.Close()
c.Close(ctx)
c.db.Close()
compactedDb.Close()

// replace current db file with compacted db file
if err := os.Remove(dbPath); err != nil {
return nil, err
if err = os.Remove(dbPath); err != nil {
return err
}

if err := os.Rename(tempDBPath, dbPath); err != nil {
return nil, err
if err = os.Rename(compactedDbPath, dbPath); err != nil {
return err
}

c.db, err = bbolt.Open(dbPath, 0600, options)

c.logger.Info("finished compaction",
zap.String(directoryKey, dbPath),
zap.Duration(elapsedKey, time.Since(compactionStart)))

return err
}

// startCompactionLoop provides asynchronous compaction function
func (c *fileStorageClient) startCompactionLoop(ctx context.Context) {
ctx, c.cancel = context.WithCancel(ctx)

go func() {
c.logger.Debug("starting compaction loop",
zap.Duration("compaction_check_interval", c.compactionCfg.CheckInterval))

compactionTicker := time.NewTicker(c.compactionCfg.CheckInterval)
defer compactionTicker.Stop()

for {
select {
case <-compactionTicker.C:
if c.shouldCompact() {
err := c.Compact(c.compactionCfg.Directory, c.openTimeout, c.compactionCfg.MaxTransactionSize)
if err != nil {
c.logger.Error("compaction failure",
zap.String(directoryKey, c.compactionCfg.Directory),
zap.Error(err))
}
}
case <-ctx.Done():
c.logger.Debug("shutting down compaction loop")
return
}
}
}()
}

// shouldCompact checks whether the conditions for online compaction are met
func (c *fileStorageClient) shouldCompact() bool {
if !c.compactionCfg.OnRebound {
return false
}

totalSizeBytes, dataSizeBytes, err := c.getDbSize()
if err != nil {
c.logger.Error("failed to get db size", zap.Error(err))
return false
}

c.logger.Debug("shouldCompact check",
zap.Int64("totalSizeBytes", totalSizeBytes),
zap.Int64("dataSizeBytes", dataSizeBytes))

if dataSizeBytes > c.compactionCfg.ReboundNeededThresholdMiB*oneMiB ||
totalSizeBytes < c.compactionCfg.ReboundTriggerThresholdMiB*oneMiB {
return false
}

c.logger.Debug("shouldCompact returns true",
zap.Int64("totalSizeBytes", totalSizeBytes),
zap.Int64("dataSizeBytes", dataSizeBytes))

return true
}

func (c *fileStorageClient) getDbSize() (totalSizeResult int64, dataSizeResult int64, errResult error) {
var totalSize int64

err := c.db.View(func(tx *bbolt.Tx) error {
totalSize = tx.Size()
return nil
})
if err != nil {
return 0, 0, err
}

return newClient(dbPath, timeout)
dbStats := c.db.Stats()
dataSize := totalSize - int64(dbStats.FreeAlloc)
return totalSize, dataSize, nil
}
Loading