Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per-tenant block retention #508

Merged
merged 4 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [ENHANCEMENT] Switch blocklist polling and retention to different concurrency mechanism, add configuration options. [#475](https://github.com/grafana/tempo/issues/475)
* [ENHANCEMENT] Add S3 options region and forcepathstyle [#431](https://github.com/grafana/tempo/issues/431)
* [ENHANCEMENT] Add exhaustive search to combine traces from all blocks in the backend. [#489](https://github.com/grafana/tempo/pull/489)
* [ENHANCEMENT] Add per-tenant block retention [#77](https://github.com/grafana/tempo/issues/77)
* [BUGFIX] Upgrade cortex dependency to 1.6 to address issue with forgetting ring membership [#442](https://github.com/grafana/tempo/pull/442)
* [BUGFIX] No longer raise the `tempodb_blocklist_poll_errors_total` metric if a block doesn't have meta or compacted meta. [#481](https://github.com/grafana/tempo/pull/481)

Expand Down
4 changes: 2 additions & 2 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {
}

func (t *App) initCompactor() (services.Service, error) {
compactor, err := compactor.New(t.cfg.Compactor, t.store)
compactor, err := compactor.New(t.cfg.Compactor, t.store, t.overrides)
if err != nil {
return nil, fmt.Errorf("failed to create compactor %w", err)
}
Expand Down Expand Up @@ -260,7 +260,7 @@ func (t *App) setupModuleManager() error {
Distributor: {Ring, Server, Overrides},
Ingester: {Store, Server, Overrides, MemberlistKV},
Querier: {Store, Ring},
Compactor: {Store, Server, MemberlistKV},
Compactor: {Store, Server, Overrides, MemberlistKV},
All: {Compactor, QueryFrontend, Querier, Ingester, Distributor},
}

Expand Down
24 changes: 17 additions & 7 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/go-kit/kit/log/level"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/storage"
tempo_util "github.com/grafana/tempo/pkg/util"
"github.com/pkg/errors"
Expand All @@ -23,8 +24,9 @@ const (
type Compactor struct {
services.Service

cfg *Config
store storage.Store
cfg *Config
store storage.Store
overrides *overrides.Overrides

// Ring used for sharding compactions.
ringLifecycler *ring.Lifecycler
Expand All @@ -34,11 +36,12 @@ type Compactor struct {
subservicesWatcher *services.FailureWatcher
}

// New makes a new Querier.
func New(cfg Config, store storage.Store) (*Compactor, error) {
// New makes a new Compactor.
func New(cfg Config, store storage.Store, overrides *overrides.Overrides) (*Compactor, error) {
c := &Compactor{
cfg: &cfg,
store: store,
cfg: &cfg,
store: store,
overrides: overrides,
}

subservices := []services.Service(nil)
Expand Down Expand Up @@ -95,7 +98,7 @@ func (c *Compactor) running(ctx context.Context) error {
level.Info(util.Logger).Log("msg", "waiting for compaction ring to settle", "waitDuration", waitOnStartup)
time.Sleep(waitOnStartup)
level.Info(util.Logger).Log("msg", "enabling compaction")
c.store.EnableCompaction(&c.cfg.Compactor, c)
c.store.EnableCompaction(&c.cfg.Compactor, c, c)
}()

if c.subservices != nil {
Expand All @@ -121,6 +124,7 @@ func (c *Compactor) stopping(_ error) error {
return nil
}

// Owns implements CompactorSharder
func (c *Compactor) Owns(hash string) bool {
if !c.isSharded() {
return true
Expand Down Expand Up @@ -148,6 +152,7 @@ func (c *Compactor) Owns(hash string) bool {
return rs.Ingesters[0].Addr == c.ringLifecycler.Addr
}

// Combine implements CompactorSharder
func (c *Compactor) Combine(objA []byte, objB []byte) []byte {
combinedTrace, err := tempo_util.CombineTraces(objA, objB)
if err != nil {
Expand All @@ -156,6 +161,11 @@ func (c *Compactor) Combine(objA []byte, objB []byte) []byte {
return combinedTrace
}

// BlockRetentionForTenant implements CompactorOverrides
func (c *Compactor) BlockRetentionForTenant(tenantID string) time.Duration {
return c.overrides.BlockRetention(tenantID)
}

func (c *Compactor) waitRingActive(ctx context.Context) error {
for {
// Check if the ingester is ACTIVE in the ring and our ring client
Expand Down
3 changes: 3 additions & 0 deletions modules/overrides/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type Limits struct {
MaxGlobalTracesPerUser int `yaml:"max_global_traces_per_user"`
MaxSpansPerTrace int `yaml:"max_spans_per_trace"`

// Compactor enforced limits.
BlockRetention time.Duration `yaml:"block_retention"`

// Config for overrides, convenient if it goes here.
PerTenantOverrideConfig string `yaml:"per_tenant_override_config"`
PerTenantOverridePeriod time.Duration `yaml:"per_tenant_override_period"`
Expand Down
5 changes: 5 additions & 0 deletions modules/overrides/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -154,6 +155,10 @@ func (o *Overrides) IngestionBurstSize(userID string) int {
return o.getOverridesForUser(userID).IngestionBurstSize
}

func (o *Overrides) BlockRetention(userID string) time.Duration {
return o.getOverridesForUser(userID).BlockRetention
}

func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits(userID)
Expand Down
2 changes: 1 addition & 1 deletion tempodb/compactor_bookmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestCurrentClear(t *testing.T) {
MaxCompactionRange: time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})
}, &mockSharder{}, &mockOverrides{})

wal := w.WAL()
assert.NoError(t, err)
Expand Down
18 changes: 13 additions & 5 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ func (m *mockSharder) Combine(objA []byte, objB []byte) []byte {
return objB
}

type mockOverrides struct {
blockRetention time.Duration
}

func (m *mockOverrides) BlockRetentionForTenant(_ string) time.Duration {
return m.blockRetention
}

func TestCompaction(t *testing.T) {
tempDir, err := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)
Expand Down Expand Up @@ -68,7 +76,7 @@ func TestCompaction(t *testing.T) {
MaxCompactionRange: 24 * time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})
}, &mockSharder{}, &mockOverrides{})

wal := w.WAL()
assert.NoError(t, err)
Expand Down Expand Up @@ -195,7 +203,7 @@ func TestSameIDCompaction(t *testing.T) {
MaxCompactionRange: 24 * time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})
}, &mockSharder{}, &mockOverrides{})

wal := w.WAL()
assert.NoError(t, err)
Expand Down Expand Up @@ -282,7 +290,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
MaxCompactionRange: 24 * time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})
}, &mockSharder{}, &mockOverrides{})

// Cut x blocks with y records each
blockCount := 5
Expand Down Expand Up @@ -347,7 +355,7 @@ func TestCompactionMetrics(t *testing.T) {
MaxCompactionRange: 24 * time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})
}, &mockSharder{}, &mockOverrides{})

// Cut x blocks with y records each
blockCount := 5
Expand Down Expand Up @@ -422,7 +430,7 @@ func TestCompactionIteratesThroughTenants(t *testing.T) {
MaxCompactionObjects: 1000,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})
}, &mockSharder{}, &mockOverrides{})

// Cut blocks for multiple tenants
cutTestBlocks(t, w, testTenantID, 2, 2)
Expand Down
9 changes: 8 additions & 1 deletion tempodb/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,15 @@ func (rw *readerWriter) retainTenant(tenantID string) {
start := time.Now()
defer func() { metricRetentionDuration.Observe(time.Since(start).Seconds()) }()

// Check for overrides
retention := rw.compactorCfg.BlockRetention // Default
if r := rw.compactorOverrides.BlockRetentionForTenant(tenantID); r != 0 {
retention = r
}
level.Debug(rw.logger).Log("msg", "Performing block retention", "tenantID", tenantID, "retention", retention)

// iterate through block list. make compacted anything that is past retention.
cutoff := time.Now().Add(-rw.compactorCfg.BlockRetention)
cutoff := time.Now().Add(-retention)
blocklist := rw.blocklist(tenantID)
for _, b := range blocklist {
if b.EndTime.Before(cutoff) && rw.compactorSharder.Owns(b.BlockID.String()) {
Expand Down
57 changes: 56 additions & 1 deletion tempodb/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestRetention(t *testing.T) {
MaxCompactionRange: time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})
}, &mockSharder{}, &mockOverrides{})

blockID := uuid.New()

Expand Down Expand Up @@ -74,3 +74,58 @@ func TestRetention(t *testing.T) {
r.(*readerWriter).doRetention()
checkBlocklists(t, blockID, 0, 0, rw)
}

func TestBlockRetentionOverride(t *testing.T) {
tempDir, err := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)
assert.NoError(t, err, "unexpected error creating temp dir")

r, w, c, err := New(&Config{
Backend: "local",
Local: &local.Config{
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 17,
BloomFP: .01,
Encoding: backend.EncLZ4_256k,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
},
BlocklistPoll: 0,
}, log.NewNopLogger())
assert.NoError(t, err)

overrides := &mockOverrides{}

c.EnableCompaction(&CompactorConfig{
ChunkSizeBytes: 10,
MaxCompactionRange: time.Hour,
BlockRetention: time.Hour,
CompactedBlockRetention: 0,
}, &mockSharder{}, overrides)

cutTestBlocks(t, w, testTenantID, 10, 10)

rw := r.(*readerWriter)
rw.pollBlocklist()

// Retention = 1 hour, does nothing
overrides.blockRetention = time.Hour
r.(*readerWriter).doRetention()
rw.pollBlocklist()
assert.Equal(t, 10, len(rw.blocklist(testTenantID)))

// Retention = 0, use default, still does nothing
overrides.blockRetention = time.Minute
r.(*readerWriter).doRetention()
rw.pollBlocklist()
assert.Equal(t, 10, len(rw.blocklist(testTenantID)))

// Retention = 1ns, deletes everything
overrides.blockRetention = time.Nanosecond
r.(*readerWriter).doRetention()
rw.pollBlocklist()
assert.Equal(t, 0, len(rw.blocklist(testTenantID)))
}
10 changes: 8 additions & 2 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,18 @@ type Reader interface {
}

type Compactor interface {
EnableCompaction(cfg *CompactorConfig, sharder CompactorSharder)
EnableCompaction(cfg *CompactorConfig, sharder CompactorSharder, overrides CompactorOverrides)
}

type CompactorSharder interface {
Combine(objA []byte, objB []byte) []byte
Owns(hash string) bool
}

type CompactorOverrides interface {
BlockRetentionForTenant(tenantID string) time.Duration
}

type WriteableBlock interface {
Write(ctx context.Context, w backend.Writer) error
}
Expand All @@ -119,6 +123,7 @@ type readerWriter struct {
compactorCfg *CompactorConfig
compactedBlockLists map[string][]*backend.CompactedBlockMeta
compactorSharder CompactorSharder
compactorOverrides CompactorOverrides
compactorTenantOffset uint
}

Expand Down Expand Up @@ -279,14 +284,15 @@ func (rw *readerWriter) Shutdown() {
rw.r.Shutdown()
}

func (rw *readerWriter) EnableCompaction(cfg *CompactorConfig, c CompactorSharder) {
func (rw *readerWriter) EnableCompaction(cfg *CompactorConfig, c CompactorSharder, overrides CompactorOverrides) {
// Set default if needed. This is mainly for tests.
if cfg.RetentionConcurrency == 0 {
cfg.RetentionConcurrency = DefaultRetentionConcurrency
}

rw.compactorCfg = cfg
rw.compactorSharder = c
rw.compactorOverrides = overrides

if rw.cfg.BlocklistPoll == 0 {
level.Info(rw.logger).Log("msg", "maintenance cycle unset. compaction and retention disabled.")
Expand Down
4 changes: 2 additions & 2 deletions tempodb/tempodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestDB(t *testing.T) {
MaxCompactionRange: time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})
}, &mockSharder{}, &mockOverrides{})

blockID := uuid.New()

Expand Down Expand Up @@ -235,7 +235,7 @@ func TestBlockCleanup(t *testing.T) {
MaxCompactionRange: time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})
}, &mockSharder{}, &mockOverrides{})

blockID := uuid.New()

Expand Down