diff --git a/CHANGELOG.md b/CHANGELOG.md index 01497faeb6ba..8dac3d1cf472 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ - `elasticsearchreceiver`: Add integration test for elasticsearch receiver (#10165) - `datadogexporter`: Some config validation and unmarshaling steps are now done on `Validate` and `Unmarshal` instead of `Sanitize` (#8829) - `examples`: Add an example for scraping Couchbase metrics (#10894) +- `filestorageextension`: Add background compaction capability (#9327) - `googlecloudpubsubreceiver`: Added new `Endpoint` and `Insecure` connection configuration options. (#10845) ### 🧰 Bug fixes 🧰 diff --git a/extension/storage/filestorage/README.md b/extension/storage/filestorage/README.md index 8650f963542a..571912d1ca33 100644 --- a/extension/storage/filestorage/README.md +++ b/extension/storage/filestorage/README.md @@ -12,14 +12,49 @@ The default directory is `%ProgramData%\Otelcol\FileStorage` on Windows and `/va `timeout` is the maximum time to wait for a file lock. This value does not need to be modified in most circumstances. The default timeout is `1s`. -`compaction` defines how and when files should be compacted. -For now only compaction on start of the collector is supported, and can be enabled by `compaction.on_start` option. +## Compaction +`compaction` defines how and when files should be compacted. There are two modes of compaction available (both of which can be set concurrently): +- `compaction.on_start` (default: false), which happens when collector starts +- `compaction.on_rebound` (default: false), which happens online when certain criteria are met; it's discussed in more detail below -`compaction.directory` is the directory used for compaction (as midstep). +`compaction.directory` specifies the directory used for compaction (as a midstep). -`compaction.max_transaction_size` defines maximum size of the compaction transaction. +`compaction.max_transaction_size` (default: 65536): defines maximum size of the compaction transaction. A value of zero will ignore transaction sizes. +### Rebound (online) compaction + +For rebound compaction, there are two additional parameters available: +- `compaction.rebound_needed_threshold_mib` (default: 100) - when allocated data exceeds this amount, the "compaction needed" flag will be enabled +- `compaction.rebound_trigger_threshold_mib` (default: 10) - if the "compaction needed" flag is set and allocated data drops below this amount, compaction will begin and the "compaction needed" flag will be cleared +- `compaction.check_interval` (default: 5s) - specifies how frequently the conditions for compaction are being checked + +The idea behind rebound compaction is that in certain workloads (e.g. [persistent queue](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/exporterhelper#persistent-queue)) the storage might grow significantly (e.g. when the exporter is unable to send the data due to network problem) after which it is being emptied as the underlying issue is gone (e.g. network connectivity is back). This leaves a significant space that needs to be reclaimed (also, this space is reported in memory usage as mmap() is used underneath). The optimal conditions for this to happen online is after the storage is largely drained, which is being controlled by `rebound_trigger_threshold_mib`. To make sure this is not too sensitive, there's also `rebound_needed_threshold_mib` which specifies the total claimed space size that must be met for online compaction to even be considered. Consider following diagram for an example of meeting the rebound (online) compaction conditions. + +``` + ▲ + │ + │ XX............. +m │ XXXX............ +e ├───────────XXXXXXX..........──────────── rebound_needed_threshold_mib +m │ XXXXXXXXX.......... +o │ XXXXXXXXXXX......... +r │ XXXXXXXXXXXXXXXXX.... +y ├─────XXXXXXXXXXXXXXXXXXXXX..──────────── rebound_trigger_threshold_mib + │ XXXXXXXXXXXXXXXXXXXXXXXXXX......... + │ XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX + └──────────────── time ─────────────────► + │ | | + issue draining compaction happens + starts begins and reclaims space + + X - actually used space + . - claimed but no longer used space +``` + + +## Example + ``` extensions: file_storage: diff --git a/extension/storage/filestorage/client.go b/extension/storage/filestorage/client.go index d788a02e4400..a9e9e0b2f39f 100644 --- a/extension/storage/filestorage/client.go +++ b/extension/storage/filestorage/client.go @@ -19,16 +19,32 @@ import ( "errors" "io/ioutil" "os" + "sync" "time" "go.etcd.io/bbolt" "go.opentelemetry.io/collector/extension/experimental/storage" + "go.uber.org/zap" ) var defaultBucket = []byte(`default`) +const ( + elapsedKey = "elapsed" + directoryKey = "directory" + tempDirectoryKey = "tempDirectory" + + oneMiB = 1048576 +) + type fileStorageClient struct { - db *bbolt.DB + logger *zap.Logger + compactionMutex sync.RWMutex + db *bbolt.DB + compactionCfg *CompactionConfig + openTimeout time.Duration + cancel context.CancelFunc + closed bool } func bboltOptions(timeout time.Duration) *bbolt.Options { @@ -40,7 +56,7 @@ func bboltOptions(timeout time.Duration) *bbolt.Options { } } -func newClient(filePath string, timeout time.Duration) (*fileStorageClient, error) { +func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig) (*fileStorageClient, error) { options := bboltOptions(timeout) db, err := bbolt.Open(filePath, 0600, options) if err != nil { @@ -56,7 +72,12 @@ func newClient(filePath string, timeout time.Duration) (*fileStorageClient, erro return nil, err } - return &fileStorageClient{db}, nil + client := &fileStorageClient{logger: logger, db: db, compactionCfg: compactionCfg, openTimeout: timeout} + if compactionCfg.OnRebound { + client.startCompactionLoop(context.Background()) + } + + return client, nil } // Get will retrieve data from storage that corresponds to the specified key @@ -81,7 +102,7 @@ func (c *fileStorageClient) Delete(ctx context.Context, key string) error { } // Batch executes the specified operations in order. Get operation results are updated in place -func (c *fileStorageClient) Batch(_ context.Context, ops ...storage.Operation) error { +func (c *fileStorageClient) Batch(ctx context.Context, ops ...storage.Operation) error { batch := func(tx *bbolt.Tx) error { bucket := tx.Bucket(defaultBucket) if bucket == nil { @@ -109,53 +130,169 @@ func (c *fileStorageClient) Batch(_ context.Context, ops ...storage.Operation) e return nil } + c.compactionMutex.RLock() + defer c.compactionMutex.RUnlock() return c.db.Update(batch) } // Close will close the database func (c *fileStorageClient) Close(_ context.Context) error { + c.compactionMutex.Lock() + defer c.compactionMutex.Unlock() + + if c.cancel != nil { + c.cancel() + } + c.closed = true return c.db.Close() } // Compact database. Use temporary file as helper as we cannot replace database in-place -func (c *fileStorageClient) Compact(ctx context.Context, compactionDirectory string, timeout time.Duration, maxTransactionSize int64) (*fileStorageClient, error) { +func (c *fileStorageClient) Compact(compactionDirectory string, timeout time.Duration, maxTransactionSize int64) error { + var err error + var file *os.File + var compactedDb *bbolt.DB + // create temporary file in compactionDirectory - file, err := ioutil.TempFile(compactionDirectory, "tempdb") + file, err = ioutil.TempFile(compactionDirectory, "tempdb") if err != nil { - return nil, err + return err } err = file.Close() if err != nil { - return nil, err + return err } + defer func() { + _, statErr := os.Stat(file.Name()) + if statErr == nil { + // File still exists and needs to be removed + if removeErr := os.Remove(file.Name()); removeErr != nil { + c.logger.Error("removing temporary compaction file failed", zap.Error(removeErr)) + } + } + }() + // use temporary file as compaction target options := bboltOptions(timeout) + c.compactionMutex.Lock() + defer c.compactionMutex.Unlock() + if c.closed { + c.logger.Debug("skipping compaction since database is already closed") + return nil + } + + c.logger.Debug("starting compaction", + zap.String(directoryKey, c.db.Path()), + zap.String(tempDirectoryKey, file.Name())) + // cannot reuse newClient as db shouldn't contain any bucket - db, err := bbolt.Open(file.Name(), 0600, options) + compactedDb, err = bbolt.Open(file.Name(), 0600, options) if err != nil { - return nil, err + return err } - if err := bbolt.Compact(db, c.db, maxTransactionSize); err != nil { - return nil, err + compactionStart := time.Now() + + if err = bbolt.Compact(compactedDb, c.db, maxTransactionSize); err != nil { + return err } dbPath := c.db.Path() - tempDBPath := db.Path() + compactedDbPath := compactedDb.Path() - db.Close() - c.Close(ctx) + c.db.Close() + compactedDb.Close() // replace current db file with compacted db file - if err := os.Remove(dbPath); err != nil { - return nil, err + if err = os.Remove(dbPath); err != nil { + return err } - if err := os.Rename(tempDBPath, dbPath); err != nil { - return nil, err + if err = os.Rename(compactedDbPath, dbPath); err != nil { + return err + } + + c.db, err = bbolt.Open(dbPath, 0600, options) + + c.logger.Info("finished compaction", + zap.String(directoryKey, dbPath), + zap.Duration(elapsedKey, time.Since(compactionStart))) + + return err +} + +// startCompactionLoop provides asynchronous compaction function +func (c *fileStorageClient) startCompactionLoop(ctx context.Context) { + ctx, c.cancel = context.WithCancel(ctx) + + go func() { + c.logger.Debug("starting compaction loop", + zap.Duration("compaction_check_interval", c.compactionCfg.CheckInterval)) + + compactionTicker := time.NewTicker(c.compactionCfg.CheckInterval) + defer compactionTicker.Stop() + + for { + select { + case <-compactionTicker.C: + if c.shouldCompact() { + err := c.Compact(c.compactionCfg.Directory, c.openTimeout, c.compactionCfg.MaxTransactionSize) + if err != nil { + c.logger.Error("compaction failure", + zap.String(directoryKey, c.compactionCfg.Directory), + zap.Error(err)) + } + } + case <-ctx.Done(): + c.logger.Debug("shutting down compaction loop") + return + } + } + }() +} + +// shouldCompact checks whether the conditions for online compaction are met +func (c *fileStorageClient) shouldCompact() bool { + if !c.compactionCfg.OnRebound { + return false + } + + totalSizeBytes, dataSizeBytes, err := c.getDbSize() + if err != nil { + c.logger.Error("failed to get db size", zap.Error(err)) + return false + } + + c.logger.Debug("shouldCompact check", + zap.Int64("totalSizeBytes", totalSizeBytes), + zap.Int64("dataSizeBytes", dataSizeBytes)) + + if dataSizeBytes > c.compactionCfg.ReboundNeededThresholdMiB*oneMiB || + totalSizeBytes < c.compactionCfg.ReboundTriggerThresholdMiB*oneMiB { + return false + } + + c.logger.Debug("shouldCompact returns true", + zap.Int64("totalSizeBytes", totalSizeBytes), + zap.Int64("dataSizeBytes", dataSizeBytes)) + + return true +} + +func (c *fileStorageClient) getDbSize() (totalSizeResult int64, dataSizeResult int64, errResult error) { + var totalSize int64 + + err := c.db.View(func(tx *bbolt.Tx) error { + totalSize = tx.Size() + return nil + }) + if err != nil { + return 0, 0, err } - return newClient(dbPath, timeout) + dbStats := c.db.Stats() + dataSize := totalSize - int64(dbStats.FreeAlloc) + return totalSize, dataSize, nil } diff --git a/extension/storage/filestorage/client_test.go b/extension/storage/filestorage/client_test.go index 06b086f7fa4a..fa8e295e4eae 100644 --- a/extension/storage/filestorage/client_test.go +++ b/extension/storage/filestorage/client_test.go @@ -17,19 +17,23 @@ package filestorage import ( "context" "fmt" + "os" "path/filepath" + "sync" "testing" "time" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" "go.opentelemetry.io/collector/extension/experimental/storage" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" ) func TestClientOperations(t *testing.T) { dbFile := filepath.Join(t.TempDir(), "my_db") - client, err := newClient(dbFile, time.Second) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, client.Close(context.TODO())) @@ -67,7 +71,7 @@ func TestClientBatchOperations(t *testing.T) { tempDir := t.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(dbFile, time.Second) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, client.Close(context.TODO())) @@ -188,8 +192,11 @@ func TestNewClientTransactionErrors(t *testing.T) { tempDir := t.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(dbFile, timeout) + client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{}) require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, client.Close(context.TODO())) + }) // Create a problem require.NoError(t, client.db.Update(tc.setup)) @@ -209,19 +216,152 @@ func TestNewClientErrorsOnInvalidBucket(t *testing.T) { tempDir := t.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(dbFile, time.Second) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) require.Error(t, err) require.Nil(t, client) defaultBucket = temp } +func TestClientReboundCompaction(t *testing.T) { + tempDir := t.TempDir() + dbFile := filepath.Join(tempDir, "my_db") + + checkInterval := time.Second + + logger, _ := zap.NewDevelopment() + client, err := newClient(logger, dbFile, time.Second, &CompactionConfig{ + OnRebound: true, + CheckInterval: checkInterval, + ReboundNeededThresholdMiB: 1, + ReboundTriggerThresholdMiB: 4, + }) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, client.Close(context.TODO())) + }) + + // 1. Fill up the database + position := int64(0) + ctx := context.Background() + + entrySize := int64(1048576) + + for ; position < 5; position++ { + batchWrite := []storage.Operation{ + storage.SetOperation(fmt.Sprintf("foo-%d", position), make([]byte, entrySize)), + storage.SetOperation(fmt.Sprintf("bar-%d", position), []byte("testValueBar")), + } + err = client.Batch(ctx, batchWrite...) + require.NoError(t, err) + } + + require.Eventually(t, + func() bool { + totalSize, realSize, dbErr := client.getDbSize() + require.NoError(t, dbErr) + return totalSize > position*entrySize && realSize > position*entrySize + }, + 10*time.Second, 5*time.Millisecond, "database allocated space for data", + ) + + // 2. Remove the large entries + for i := 0; i < int(position); i++ { + err = client.Batch(ctx, storage.DeleteOperation(fmt.Sprintf("foo-%d", i))) + require.NoError(t, err) + } + + require.Eventually(t, + func() bool { + // The check is performed while the database might be compacted, hence we're reusing the mutex here + // (getDbSize is not called from outside the compaction loop otherwise) + client.compactionMutex.Lock() + defer client.compactionMutex.Unlock() + + totalSize, realSize, dbErr := client.getDbSize() + require.NoError(t, dbErr) + return totalSize < entrySize && realSize < entrySize + }, + 10*time.Second, 5*time.Millisecond, "database cleaned up not used space", + ) +} + +func TestClientConcurrentCompaction(t *testing.T) { + logCore, logObserver := observer.New(zap.DebugLevel) + logger := zap.New(logCore) + + tempDir := t.TempDir() + dbFile := filepath.Join(tempDir, "my_db") + + checkInterval := time.Millisecond + + client, err := newClient(logger, dbFile, time.Second, &CompactionConfig{ + OnRebound: true, + CheckInterval: checkInterval, + ReboundNeededThresholdMiB: 1, + ReboundTriggerThresholdMiB: 5, + }) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, client.Close(context.TODO())) + }) + + var wg sync.WaitGroup + repeats := 5 + ctx := context.Background() + + clientOperationsThread := func(id int) { + for i := 0; i < repeats; i++ { + batchWrite := []storage.Operation{ + storage.SetOperation(fmt.Sprintf("foo-%d-%d", id, i), make([]byte, 1000000)), + storage.SetOperation(fmt.Sprintf("bar-%d-%d", id, i), []byte("testValueBar")), + } + err := client.Batch(ctx, batchWrite...) + require.NoError(t, err) + + err = client.Batch(ctx, storage.DeleteOperation(fmt.Sprintf("foo-%d-%d", id, i))) + require.NoError(t, err) + + // Make sure the requests are somewhat spaced + time.Sleep(checkInterval * 2) + + result, err := client.Get(ctx, fmt.Sprintf("foo-%d-%d", id, i)) + require.NoError(t, err) + require.Equal(t, []byte(nil), result) + + result, err = client.Get(ctx, fmt.Sprintf("bar-%d-%d", id, i)) + require.NoError(t, err) + require.Equal(t, []byte("testValueBar"), result) + + // Make sure the requests are somewhat spaced + time.Sleep(checkInterval) + } + + wg.Done() + } + + // Start a couple of concurrent threads and see how they add/remove data as needed without failures + for i := 0; i < 10; i++ { + wg.Add(1) + go clientOperationsThread(i) + } + + wg.Wait() + + // The actual number might vary a bit depending on the actual intervals + require.GreaterOrEqual(t, len(logObserver.FilterMessage("finished compaction").All()), 3) +} + func BenchmarkClientGet(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(dbFile, time.Second) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, client.Close(context.TODO())) + }) ctx := context.Background() testKey := "testKey" @@ -237,8 +377,11 @@ func BenchmarkClientGet100(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(dbFile, time.Second) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, client.Close(context.TODO())) + }) ctx := context.Background() @@ -257,8 +400,11 @@ func BenchmarkClientSet(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(dbFile, time.Second) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, client.Close(context.TODO())) + }) ctx := context.Background() testKey := "testKey" @@ -274,9 +420,11 @@ func BenchmarkClientSet100(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(dbFile, time.Second) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) require.NoError(b, err) - + b.Cleanup(func() { + require.NoError(b, client.Close(context.TODO())) + }) ctx := context.Background() testEntries := make([]storage.Operation, 100) @@ -294,8 +442,11 @@ func BenchmarkClientDelete(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(dbFile, time.Second) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, client.Close(context.TODO())) + }) ctx := context.Background() testKey := "testKey" @@ -317,8 +468,11 @@ func BenchmarkClientSetLargeDB(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(dbFile, time.Second) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, client.Close(context.TODO())) + }) ctx := context.Background() @@ -351,8 +505,11 @@ func BenchmarkClientInitLargeDB(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(dbFile, time.Second) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, client.Close(context.TODO())) + }) ctx := context.Background() @@ -367,7 +524,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) { var tempClient *fileStorageClient b.ResetTimer() for n := 0; n < b.N; n++ { - tempClient, err = newClient(dbFile, time.Second) + tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) require.NoError(b, err) b.StopTimer() err = tempClient.Close(ctx) @@ -375,3 +532,91 @@ func BenchmarkClientInitLargeDB(b *testing.B) { b.StartTimer() } } + +func BenchmarkClientCompactLargeDBFile(b *testing.B) { + entrySizeInBytes := 1024 * 1024 + entryCount := 2000 + entry := make([]byte, entrySizeInBytes) + var testKey string + + tempDir := b.TempDir() + dbFile := filepath.Join(tempDir, "my_db") + + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, client.Close(context.TODO())) + }) + + ctx := context.Background() + + for n := 0; n < entryCount; n++ { + testKey = fmt.Sprintf("testKey-%d", n) + require.NoError(b, client.Set(ctx, testKey, entry)) + } + + // Leave one key in the db + for n := 0; n < entryCount-1; n++ { + testKey = fmt.Sprintf("testKey-%d", n) + require.NoError(b, client.Delete(ctx, testKey)) + } + + require.NoError(b, client.Close(ctx)) + + b.ResetTimer() + b.StopTimer() + for n := 0; n < b.N; n++ { + testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n)) + err = os.Link(dbFile, testDbFile) + require.NoError(b, err) + client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}) + require.NoError(b, err) + b.StartTimer() + require.NoError(b, client.Compact(tempDir, time.Second, 65536)) + b.StopTimer() + } +} + +func BenchmarkClientCompactDb(b *testing.B) { + entrySizeInBytes := 1024 * 128 + entryCount := 160 + entry := make([]byte, entrySizeInBytes) + var testKey string + + tempDir := b.TempDir() + dbFile := filepath.Join(tempDir, "my_db") + + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, client.Close(context.TODO())) + }) + + ctx := context.Background() + + for n := 0; n < entryCount; n++ { + testKey = fmt.Sprintf("testKey-%d", n) + require.NoError(b, client.Set(ctx, testKey, entry)) + } + + // Leave half the keys in the DB + for n := 0; n < entryCount/2; n++ { + testKey = fmt.Sprintf("testKey-%d", n) + require.NoError(b, client.Delete(ctx, testKey)) + } + + require.NoError(b, client.Close(ctx)) + + b.ResetTimer() + b.StopTimer() + for n := 0; n < b.N; n++ { + testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n)) + err = os.Link(dbFile, testDbFile) + require.NoError(b, err) + client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}) + require.NoError(b, err) + b.StartTimer() + require.NoError(b, client.Compact(tempDir, time.Second, 65536)) + b.StopTimer() + } +} diff --git a/extension/storage/filestorage/config.go b/extension/storage/filestorage/config.go index 00f9543e9e35..922e90b41879 100644 --- a/extension/storage/filestorage/config.go +++ b/extension/storage/filestorage/config.go @@ -24,7 +24,7 @@ import ( "go.opentelemetry.io/collector/config" ) -// Config defines configuration for http forwarder extension. +// Config defines configuration for file storage extension. type Config struct { config.ExtensionSettings `mapstructure:",squash"` @@ -34,10 +34,29 @@ type Config struct { Compaction *CompactionConfig `mapstructure:"compaction,omitempty"` } +// CompactionConfig defines configuration for optional file storage compaction. type CompactionConfig struct { - OnStart bool `mapstructure:"on_start,omitempty"` - Directory string `mapstructure:"directory,omitempty"` - MaxTransactionSize int64 `mapstructure:"max_transaction_size,omitempty"` + // OnStart specifies that compaction is attempted each time on start + OnStart bool `mapstructure:"on_start,omitempty"` + // OnRebound specifies that compaction is attempted online, when rebound conditions are met. + // This typically happens when storage usage has increased, which caused increase in space allocation + // and afterwards it had most items removed. We want to run the compaction online only when there are + // not too many elements still being stored (which is an indication that "heavy usage" period is over) + // so compaction should be relatively fast and at the same time there is relatively large volume of space + // that might be reclaimed. + OnRebound bool `mapstructure:"on_rebound,omitempty"` + // Directory specifies where the temporary files for compaction will be stored + Directory string `mapstructure:"directory,omitempty"` + // ReboundNeededThresholdMiB specifies the minimum total allocated size (both used and empty) + // to mark the need for online compaction + ReboundNeededThresholdMiB int64 `mapstructure:"rebound_needed_threshold_mib"` + // ReboundTriggerThresholdMiB is used when compaction is marked as needed. When allocated data size drops + // below the specified value, the compactions starts and the flag marking need for compaction is cleared + ReboundTriggerThresholdMiB int64 `mapstructure:"rebound_trigger_threshold_mib"` + // MaxTransactionSize specifies the maximum number of items that might be present in single compaction iteration + MaxTransactionSize int64 `mapstructure:"max_transaction_size,omitempty"` + // CheckInterval specifies frequency of compaction check + CheckInterval time.Duration `mapstructure:"check_interval,omitempty"` } func (cfg *Config) Validate() error { @@ -68,5 +87,9 @@ func (cfg *Config) Validate() error { return errors.New("max transaction size for compaction cannot be less than 0") } + if cfg.Compaction.OnRebound && cfg.Compaction.CheckInterval <= 0 { + return errors.New("compaction check interval must be positive when rebound compaction is set") + } + return nil } diff --git a/extension/storage/filestorage/config_test.go b/extension/storage/filestorage/config_test.go index bfbddde36c6c..c824404c79cd 100644 --- a/extension/storage/filestorage/config_test.go +++ b/extension/storage/filestorage/config_test.go @@ -57,9 +57,13 @@ func TestLoadConfig(t *testing.T) { ExtensionSettings: config.NewExtensionSettings(config.NewComponentIDWithName(typeStr, "all_settings")), Directory: ".", Compaction: &CompactionConfig{ - Directory: ".", - OnStart: true, - MaxTransactionSize: 2048, + Directory: ".", + OnStart: true, + OnRebound: true, + MaxTransactionSize: 2048, + ReboundTriggerThresholdMiB: 16, + ReboundNeededThresholdMiB: 128, + CheckInterval: time.Second * 5, }, Timeout: 2 * time.Second, }, diff --git a/extension/storage/filestorage/extension.go b/extension/storage/filestorage/extension.go index 4cc311191099..c45998c817f8 100644 --- a/extension/storage/filestorage/extension.go +++ b/extension/storage/filestorage/extension.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "path/filepath" - "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" @@ -27,12 +26,8 @@ import ( ) type localFileStorage struct { - directory string - timeout time.Duration - logger *zap.Logger - compactionDirectory string - compactOnStart bool - maxCompactionSize int64 + cfg *Config + logger *zap.Logger } // Ensure this storage extension implements the appropriate interface @@ -40,11 +35,8 @@ var _ storage.Extension = (*localFileStorage)(nil) func newLocalFileStorage(logger *zap.Logger, config *Config) (component.Extension, error) { return &localFileStorage{ - directory: filepath.Clean(config.Directory), - compactionDirectory: filepath.Clean(config.Compaction.Directory), - compactOnStart: config.Compaction.OnStart, - timeout: config.Timeout, - logger: logger, + cfg: config, + logger: logger, }, nil } @@ -69,16 +61,22 @@ func (lfs *localFileStorage) GetClient(ctx context.Context, kind component.Kind, rawName = fmt.Sprintf("%s_%s_%s_%s", kindString(kind), ent.Type(), ent.Name(), name) } // TODO sanitize rawName - absoluteName := filepath.Join(lfs.directory, rawName) - client, err := newClient(absoluteName, lfs.timeout) + absoluteName := filepath.Join(lfs.cfg.Directory, rawName) + client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction) + + if err != nil { + return nil, err + } // return if compaction is not required - if err != nil || !lfs.compactOnStart { - return client, err + if lfs.cfg.Compaction.OnStart { + compactionErr := client.Compact(lfs.cfg.Compaction.Directory, lfs.cfg.Timeout, lfs.cfg.Compaction.MaxTransactionSize) + if compactionErr != nil { + lfs.logger.Error("compaction on start failed", zap.Error(compactionErr)) + } } - // perform compaction and returns client - return client.Compact(ctx, lfs.compactionDirectory, lfs.timeout, lfs.maxCompactionSize) + return client, nil } func kindString(k component.Kind) string { diff --git a/extension/storage/filestorage/extension_test.go b/extension/storage/filestorage/extension_test.go index 6fea6d008678..db917ff65dad 100644 --- a/extension/storage/filestorage/extension_test.go +++ b/extension/storage/filestorage/extension_test.go @@ -300,10 +300,10 @@ func TestCompaction(t *testing.T) { // compact the db c, ok := client.(*fileStorageClient) require.True(t, ok) - fsClient1, err := c.Compact(ctx, tempDir, cfg.Timeout, 1) + err = c.Compact(tempDir, cfg.Timeout, 1) require.NoError(t, err) t.Cleanup(func() { - require.NoError(t, fsClient1.Close(ctx)) + require.NoError(t, client.Close(ctx)) }) // check size after compaction @@ -314,15 +314,17 @@ func TestCompaction(t *testing.T) { // remove data from database for i = 0; i < numEntries; i++ { key = fmt.Sprintf("key_%d", i) - err = fsClient1.Delete(ctx, key) + err = c.Delete(ctx, key) require.NoError(t, err) } // compact after data removal - fsClient2, err := fsClient1.Compact(ctx, tempDir, cfg.Timeout, 1) + c, ok = client.(*fileStorageClient) + require.True(t, ok) + err = c.Compact(tempDir, cfg.Timeout, 1) require.NoError(t, err) t.Cleanup(func() { - require.NoError(t, fsClient2.Close(ctx)) + require.NoError(t, client.Close(ctx)) }) // check size @@ -369,10 +371,10 @@ func TestCompactionRemoveTemp(t *testing.T) { // perform compaction in the same directory c, ok := client.(*fileStorageClient) require.True(t, ok) - fsClient1, err := c.Compact(ctx, tempDir, cfg.Timeout, 1) + err = c.Compact(tempDir, cfg.Timeout, 1) require.NoError(t, err) t.Cleanup(func() { - require.NoError(t, fsClient1.Close(ctx)) + require.NoError(t, client.Close(ctx)) }) // check if only db exists in tempDir @@ -384,10 +386,12 @@ func TestCompactionRemoveTemp(t *testing.T) { // perform compaction in different directory emptyTempDir := t.TempDir() - fsClient2, err := fsClient1.Compact(ctx, emptyTempDir, cfg.Timeout, 1) + c, ok = client.(*fileStorageClient) + require.True(t, ok) + err = c.Compact(emptyTempDir, cfg.Timeout, 1) require.NoError(t, err) t.Cleanup(func() { - require.NoError(t, fsClient2.Close(ctx)) + require.NoError(t, client.Close(ctx)) }) // check if emptyTempDir is empty after compaction diff --git a/extension/storage/filestorage/factory.go b/extension/storage/filestorage/factory.go index a523d5d2d564..9785a7f5140e 100644 --- a/extension/storage/filestorage/factory.go +++ b/extension/storage/filestorage/factory.go @@ -25,6 +25,15 @@ import ( // The value of extension "type" in configuration. const typeStr config.Type = "file_storage" +const ( + // use default bbolt value + // https://github.com/etcd-io/bbolt/blob/d5db64bdbfdee1cb410894605f42ffef898f395d/cmd/bbolt/main.go#L1955 + defaultMaxTransactionSize = 65536 + defaultReboundTriggerThresholdMib = 10 + defaultReboundNeededThresholdMib = 100 + defaultCompactionInterval = time.Second * 5 +) + // NewFactory creates a factory for HostObserver extension. func NewFactory() component.ExtensionFactory { return component.NewExtensionFactory( @@ -38,11 +47,13 @@ func createDefaultConfig() config.Extension { ExtensionSettings: config.NewExtensionSettings(config.NewComponentID(typeStr)), Directory: getDefaultDirectory(), Compaction: &CompactionConfig{ - Directory: getDefaultDirectory(), - OnStart: false, - // use default bbolt value - // https://github.com/etcd-io/bbolt/blob/d5db64bdbfdee1cb410894605f42ffef898f395d/cmd/bbolt/main.go#L1955 - MaxTransactionSize: 65536, + Directory: getDefaultDirectory(), + OnStart: false, + OnRebound: false, + MaxTransactionSize: defaultMaxTransactionSize, + ReboundNeededThresholdMiB: defaultReboundTriggerThresholdMib, + ReboundTriggerThresholdMiB: defaultReboundNeededThresholdMib, + CheckInterval: defaultCompactionInterval, }, Timeout: time.Second, } diff --git a/extension/storage/filestorage/testdata/config.yaml b/extension/storage/filestorage/testdata/config.yaml index 1bc59b790ccc..06269ee7f990 100644 --- a/extension/storage/filestorage/testdata/config.yaml +++ b/extension/storage/filestorage/testdata/config.yaml @@ -9,6 +9,9 @@ extensions: compaction: directory: . on_start: true + on_rebound: true + rebound_trigger_threshold_mib: 16 + rebound_needed_threshold_mib: 128 max_transaction_size: 2048 timeout: 2s