From 687564e62e402457ebcf44de02c9ad10d6a01b65 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Fri, 5 Feb 2021 10:52:48 -0500 Subject: [PATCH 1/3] Add per-tenant block retention --- CHANGELOG.md | 1 + cmd/tempo/app/modules.go | 4 +-- modules/compactor/compactor.go | 24 +++++++++---- modules/overrides/limits.go | 3 ++ modules/overrides/overrides.go | 5 +++ tempodb/compactor_bookmark_test.go | 2 +- tempodb/compactor_test.go | 18 +++++++--- tempodb/retention.go | 9 ++++- tempodb/retention_test.go | 54 +++++++++++++++++++++++++++++- tempodb/tempodb.go | 10 ++++-- tempodb/tempodb_test.go | 4 +-- 11 files changed, 113 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2af41ef8479..dd31404fcd1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [ENHANCEMENT] Switch blocklist polling and retention to different concurrency mechanism, add configuration options. [#475](https://github.com/grafana/tempo/issues/475) * [ENHANCEMENT] Add S3 options region and forcepathstyle [#431](https://github.com/grafana/tempo/issues/431) * [ENHANCEMENT] Add exhaustive search to combine traces from all blocks in the backend. [#489](https://github.com/grafana/tempo/pull/489) +* [ENHANCEMENT] Add per-tenant block retention [#77](https://github.com/grafana/tempo/issues/77) * [BUGFIX] Upgrade cortex dependency to 1.6 to address issue with forgetting ring membership [#442](https://github.com/grafana/tempo/pull/442) * [BUGFIX] No longer raise the `tempodb_blocklist_poll_errors_total` metric if a block doesn't have meta or compacted meta. [#481](https://github.com/grafana/tempo/pull/481) diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 53c7bbff920..643d76b6a8c 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -188,7 +188,7 @@ func (t *App) initQueryFrontend() (services.Service, error) { } func (t *App) initCompactor() (services.Service, error) { - compactor, err := compactor.New(t.cfg.Compactor, t.store) + compactor, err := compactor.New(t.cfg.Compactor, t.store, t.overrides) if err != nil { return nil, fmt.Errorf("failed to create compactor %w", err) } @@ -260,7 +260,7 @@ func (t *App) setupModuleManager() error { Distributor: {Ring, Server, Overrides}, Ingester: {Store, Server, Overrides, MemberlistKV}, Querier: {Store, Ring}, - Compactor: {Store, Server, MemberlistKV}, + Compactor: {Store, Server, Overrides, MemberlistKV}, All: {Compactor, QueryFrontend, Querier, Ingester, Distributor}, } diff --git a/modules/compactor/compactor.go b/modules/compactor/compactor.go index daffd127eba..576ddaa4aee 100644 --- a/modules/compactor/compactor.go +++ b/modules/compactor/compactor.go @@ -10,6 +10,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" "github.com/go-kit/kit/log/level" + "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/modules/storage" tempo_util "github.com/grafana/tempo/pkg/util" "github.com/pkg/errors" @@ -23,8 +24,9 @@ const ( type Compactor struct { services.Service - cfg *Config - store storage.Store + cfg *Config + store storage.Store + overrides *overrides.Overrides // Ring used for sharding compactions. ringLifecycler *ring.Lifecycler @@ -34,11 +36,12 @@ type Compactor struct { subservicesWatcher *services.FailureWatcher } -// New makes a new Querier. -func New(cfg Config, store storage.Store) (*Compactor, error) { +// New makes a new Compactor. +func New(cfg Config, store storage.Store, overrides *overrides.Overrides) (*Compactor, error) { c := &Compactor{ - cfg: &cfg, - store: store, + cfg: &cfg, + store: store, + overrides: overrides, } subservices := []services.Service(nil) @@ -95,7 +98,7 @@ func (c *Compactor) running(ctx context.Context) error { level.Info(util.Logger).Log("msg", "waiting for compaction ring to settle", "waitDuration", waitOnStartup) time.Sleep(waitOnStartup) level.Info(util.Logger).Log("msg", "enabling compaction") - c.store.EnableCompaction(&c.cfg.Compactor, c) + c.store.EnableCompaction(&c.cfg.Compactor, c, c) }() if c.subservices != nil { @@ -121,6 +124,7 @@ func (c *Compactor) stopping(_ error) error { return nil } +// Owns implements CompactorSharder func (c *Compactor) Owns(hash string) bool { if !c.isSharded() { return true @@ -148,6 +152,7 @@ func (c *Compactor) Owns(hash string) bool { return rs.Ingesters[0].Addr == c.ringLifecycler.Addr } +// Combine implements CompactorSharder func (c *Compactor) Combine(objA []byte, objB []byte) []byte { combinedTrace, err := tempo_util.CombineTraces(objA, objB) if err != nil { @@ -156,6 +161,11 @@ func (c *Compactor) Combine(objA []byte, objB []byte) []byte { return combinedTrace } +// BlockRentionForTenant implements CompactorOverrides +func (c *Compactor) BlockRentionForTenant(tenantID string) time.Duration { + return c.overrides.BlockRetention(tenantID) +} + func (c *Compactor) waitRingActive(ctx context.Context) error { for { // Check if the ingester is ACTIVE in the ring and our ring client diff --git a/modules/overrides/limits.go b/modules/overrides/limits.go index 1de96899ff2..44fe212f532 100644 --- a/modules/overrides/limits.go +++ b/modules/overrides/limits.go @@ -26,6 +26,9 @@ type Limits struct { MaxGlobalTracesPerUser int `yaml:"max_global_traces_per_user"` MaxSpansPerTrace int `yaml:"max_spans_per_trace"` + // Compactor enforced limits. + BlockRetention time.Duration `yaml:"block_retention"` + // Config for overrides, convenient if it goes here. PerTenantOverrideConfig string `yaml:"per_tenant_override_config"` PerTenantOverridePeriod time.Duration `yaml:"per_tenant_override_period"` diff --git a/modules/overrides/overrides.go b/modules/overrides/overrides.go index 991725dd6c4..62f0116f676 100644 --- a/modules/overrides/overrides.go +++ b/modules/overrides/overrides.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "time" "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "github.com/cortexproject/cortex/pkg/util/services" @@ -154,6 +155,10 @@ func (o *Overrides) IngestionBurstSize(userID string) int { return o.getOverridesForUser(userID).IngestionBurstSize } +func (o *Overrides) BlockRetention(userID string) time.Duration { + return o.getOverridesForUser(userID).BlockRetention +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits(userID) diff --git a/tempodb/compactor_bookmark_test.go b/tempodb/compactor_bookmark_test.go index 0b19e5b046f..72a02fb197c 100644 --- a/tempodb/compactor_bookmark_test.go +++ b/tempodb/compactor_bookmark_test.go @@ -44,7 +44,7 @@ func TestCurrentClear(t *testing.T) { MaxCompactionRange: time.Hour, BlockRetention: 0, CompactedBlockRetention: 0, - }, &mockSharder{}) + }, &mockSharder{}, &mockOverrides{}) wal := w.WAL() assert.NoError(t, err) diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index e2f2cbf0e63..f7dd70ae61b 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -37,6 +37,14 @@ func (m *mockSharder) Combine(objA []byte, objB []byte) []byte { return objB } +type mockOverrides struct { + blockRetention time.Duration +} + +func (m *mockOverrides) BlockRentionForTenant(_ string) time.Duration { + return m.blockRetention +} + func TestCompaction(t *testing.T) { tempDir, err := ioutil.TempDir("/tmp", "") defer os.RemoveAll(tempDir) @@ -64,7 +72,7 @@ func TestCompaction(t *testing.T) { MaxCompactionRange: 24 * time.Hour, BlockRetention: 0, CompactedBlockRetention: 0, - }, &mockSharder{}) + }, &mockSharder{}, &mockOverrides{}) wal := w.WAL() assert.NoError(t, err) @@ -188,7 +196,7 @@ func TestSameIDCompaction(t *testing.T) { MaxCompactionRange: 24 * time.Hour, BlockRetention: 0, CompactedBlockRetention: 0, - }, &mockSharder{}) + }, &mockSharder{}, &mockOverrides{}) wal := w.WAL() assert.NoError(t, err) @@ -272,7 +280,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { MaxCompactionRange: 24 * time.Hour, BlockRetention: 0, CompactedBlockRetention: 0, - }, &mockSharder{}) + }, &mockSharder{}, &mockOverrides{}) // Cut x blocks with y records each blockCount := 5 @@ -334,7 +342,7 @@ func TestCompactionMetrics(t *testing.T) { MaxCompactionRange: 24 * time.Hour, BlockRetention: 0, CompactedBlockRetention: 0, - }, &mockSharder{}) + }, &mockSharder{}, &mockOverrides{}) // Cut x blocks with y records each blockCount := 5 @@ -406,7 +414,7 @@ func TestCompactionIteratesThroughTenants(t *testing.T) { MaxCompactionObjects: 1000, BlockRetention: 0, CompactedBlockRetention: 0, - }, &mockSharder{}) + }, &mockSharder{}, &mockOverrides{}) // Cut blocks for multiple tenants cutTestBlocks(t, w, testTenantID, 2, 2) diff --git a/tempodb/retention.go b/tempodb/retention.go index b80be965c5d..34d07aa90a1 100644 --- a/tempodb/retention.go +++ b/tempodb/retention.go @@ -37,8 +37,15 @@ func (rw *readerWriter) retainTenant(tenantID string) { start := time.Now() defer func() { metricRetentionDuration.Observe(time.Since(start).Seconds()) }() + // Check for overrides + retention := rw.compactorCfg.BlockRetention // Default + if r := rw.compactorOverrides.BlockRentionForTenant(tenantID); r != 0 { + retention = r + } + level.Debug(rw.logger).Log("msg", "Performing block retention", "tenantID", tenantID, "retention", retention) + // iterate through block list. make compacted anything that is past retention. - cutoff := time.Now().Add(-rw.compactorCfg.BlockRetention) + cutoff := time.Now().Add(-retention) blocklist := rw.blocklist(tenantID) for _, b := range blocklist { if b.EndTime.Before(cutoff) && rw.compactorSharder.Owns(b.BlockID.String()) { diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index e84b1aa528c..9bc5b79efb7 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -40,7 +40,7 @@ func TestRetention(t *testing.T) { MaxCompactionRange: time.Hour, BlockRetention: 0, CompactedBlockRetention: 0, - }, &mockSharder{}) + }, &mockSharder{}, &mockOverrides{}) blockID := uuid.New() @@ -69,3 +69,55 @@ func TestRetention(t *testing.T) { r.(*readerWriter).doRetention() checkBlocklists(t, blockID, 0, 0, rw) } + +func TestBlockRetentionOverride(t *testing.T) { + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + assert.NoError(t, err, "unexpected error creating temp dir") + + r, w, c, err := New(&Config{ + Backend: "local", + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + IndexDownsample: 17, + BloomFP: .01, + }, + BlocklistPoll: 0, + }, log.NewNopLogger()) + assert.NoError(t, err) + + overrides := &mockOverrides{} + + c.EnableCompaction(&CompactorConfig{ + ChunkSizeBytes: 10, + MaxCompactionRange: time.Hour, + BlockRetention: time.Hour, + CompactedBlockRetention: 0, + }, &mockSharder{}, overrides) + + cutTestBlocks(t, w, testTenantID, 10, 10) + + rw := r.(*readerWriter) + rw.pollBlocklist() + + // Retention = 1 hour, does nothing + overrides.blockRetention = time.Hour + r.(*readerWriter).doRetention() + rw.pollBlocklist() + assert.Equal(t, 10, len(rw.blocklist(testTenantID))) + + // Retention = 0, use default, still does nothing + overrides.blockRetention = time.Minute + r.(*readerWriter).doRetention() + rw.pollBlocklist() + assert.Equal(t, 10, len(rw.blocklist(testTenantID))) + + // Retention = 1ns, deletes everything + overrides.blockRetention = time.Nanosecond + r.(*readerWriter).doRetention() + rw.pollBlocklist() + assert.Equal(t, 0, len(rw.blocklist(testTenantID))) +} diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 4b7283f619b..6debafe414f 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -90,7 +90,7 @@ type Reader interface { } type Compactor interface { - EnableCompaction(cfg *CompactorConfig, sharder CompactorSharder) + EnableCompaction(cfg *CompactorConfig, sharder CompactorSharder, overrides CompactorOverrides) } type CompactorSharder interface { @@ -98,6 +98,10 @@ type CompactorSharder interface { Owns(hash string) bool } +type CompactorOverrides interface { + BlockRentionForTenant(tenantID string) time.Duration +} + type WriteableBlock interface { Write(ctx context.Context, w backend.Writer) error } @@ -118,6 +122,7 @@ type readerWriter struct { compactorCfg *CompactorConfig compactedBlockLists map[string][]*backend.CompactedBlockMeta compactorSharder CompactorSharder + compactorOverrides CompactorOverrides compactorTenantOffset uint } @@ -271,7 +276,7 @@ func (rw *readerWriter) Shutdown() { rw.r.Shutdown() } -func (rw *readerWriter) EnableCompaction(cfg *CompactorConfig, c CompactorSharder) { +func (rw *readerWriter) EnableCompaction(cfg *CompactorConfig, c CompactorSharder, overrides CompactorOverrides) { // Set default if needed. This is mainly for tests. if cfg.RetentionConcurrency == 0 { cfg.RetentionConcurrency = DefaultRetentionConcurrency @@ -279,6 +284,7 @@ func (rw *readerWriter) EnableCompaction(cfg *CompactorConfig, c CompactorSharde rw.compactorCfg = cfg rw.compactorSharder = c + rw.compactorOverrides = overrides if rw.cfg.BlocklistPoll == 0 { level.Info(rw.logger).Log("msg", "maintenance cycle unset. compaction and retention disabled.") diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 615f37ecc65..d2c6b1da402 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -50,7 +50,7 @@ func TestDB(t *testing.T) { MaxCompactionRange: time.Hour, BlockRetention: 0, CompactedBlockRetention: 0, - }, &mockSharder{}) + }, &mockSharder{}, &mockOverrides{}) blockID := uuid.New() @@ -222,7 +222,7 @@ func TestBlockCleanup(t *testing.T) { MaxCompactionRange: time.Hour, BlockRetention: 0, CompactedBlockRetention: 0, - }, &mockSharder{}) + }, &mockSharder{}, &mockOverrides{}) blockID := uuid.New() From 7cad595903a8d3302900240aefdf7d1daa9e87ef Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Fri, 5 Feb 2021 15:14:47 -0500 Subject: [PATCH 2/3] Fix typo --- modules/compactor/compactor.go | 4 ++-- tempodb/compactor_test.go | 2 +- tempodb/retention.go | 2 +- tempodb/tempodb.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/compactor/compactor.go b/modules/compactor/compactor.go index 576ddaa4aee..509dc46f6f1 100644 --- a/modules/compactor/compactor.go +++ b/modules/compactor/compactor.go @@ -161,8 +161,8 @@ func (c *Compactor) Combine(objA []byte, objB []byte) []byte { return combinedTrace } -// BlockRentionForTenant implements CompactorOverrides -func (c *Compactor) BlockRentionForTenant(tenantID string) time.Duration { +// BlockRetentionForTenant implements CompactorOverrides +func (c *Compactor) BlockRetentionForTenant(tenantID string) time.Duration { return c.overrides.BlockRetention(tenantID) } diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index f7dd70ae61b..6b63bf8418f 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -41,7 +41,7 @@ type mockOverrides struct { blockRetention time.Duration } -func (m *mockOverrides) BlockRentionForTenant(_ string) time.Duration { +func (m *mockOverrides) BlockRetentionForTenant(_ string) time.Duration { return m.blockRetention } diff --git a/tempodb/retention.go b/tempodb/retention.go index 34d07aa90a1..3cb4784b51d 100644 --- a/tempodb/retention.go +++ b/tempodb/retention.go @@ -39,7 +39,7 @@ func (rw *readerWriter) retainTenant(tenantID string) { // Check for overrides retention := rw.compactorCfg.BlockRetention // Default - if r := rw.compactorOverrides.BlockRentionForTenant(tenantID); r != 0 { + if r := rw.compactorOverrides.BlockRetentionForTenant(tenantID); r != 0 { retention = r } level.Debug(rw.logger).Log("msg", "Performing block retention", "tenantID", tenantID, "retention", retention) diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 6debafe414f..e7e2bbecda1 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -99,7 +99,7 @@ type CompactorSharder interface { } type CompactorOverrides interface { - BlockRentionForTenant(tenantID string) time.Duration + BlockRetentionForTenant(tenantID string) time.Duration } type WriteableBlock interface { From 23d987b997150b5b5ed7c7a466ecebaeac096bfb Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Mon, 8 Feb 2021 07:30:24 -0500 Subject: [PATCH 3/3] Fix conflict with master --- tempodb/retention_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index 44aaf6deeb6..f086a23e08a 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -85,10 +85,13 @@ func TestBlockRetentionOverride(t *testing.T) { Local: &local.Config{ Path: path.Join(tempDir, "traces"), }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), + Block: &encoding.BlockConfig{ IndexDownsample: 17, BloomFP: .01, + Encoding: backend.EncLZ4_256k, + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), }, BlocklistPoll: 0, }, log.NewNopLogger())