From c9c18d0cf0bbf1fa797c41c4bbc0c5cdc65df0df Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 8 Mar 2022 09:25:09 -0500 Subject: [PATCH 1/5] Added config option Signed-off-by: Joe Elliott --- docs/tempo/website/configuration/_index.md | 8 +++- docs/tempo/website/configuration/manifest.md | 1 + modules/ingester/ingester.go | 2 +- modules/storage/config.go | 2 + tempodb/wal/append_block.go | 40 +++++++++++++++----- tempodb/wal/wal.go | 7 ++-- tempodb/wal/wal_test.go | 19 +++++----- 7 files changed, 55 insertions(+), 24 deletions(-) diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index 1b90e715f2b..23686597bf1 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -651,6 +651,13 @@ storage: # Options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 [search_encoding: | default = none] + # When a span is written to the WAL it adjusts the start and end times of the block it is written to. + # This block start and end time range is then used when choosing blocks for search. To prevent spans too far + # in the past or future from impacting the block start and end times we use this configuration option. + # This option only allows spans that occur within the configured duration to adjust the block start and + # end times. + [ingestion_time_range_slack: | default = 2m] + # block configuration block: @@ -671,7 +678,6 @@ storage: # number of bytes per search page [search_page_size_bytes: | default = 1MiB] - ``` ## Memberlist diff --git a/docs/tempo/website/configuration/manifest.md b/docs/tempo/website/configuration/manifest.md index 2e0ea7d1bd6..2cd4305f51c 100644 --- a/docs/tempo/website/configuration/manifest.md +++ b/docs/tempo/website/configuration/manifest.md @@ -316,6 +316,7 @@ storage: blocksfilepath: /tmp/tempo/wal/blocks encoding: snappy search_encoding: none + ingestion_time_range_slack: 2m0s block: index_downsample_bytes: 1048576 index_page_size_bytes: 256000 diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index de3f69ead68..aa9ecf4b766 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -342,7 +342,7 @@ func (i *Ingester) replayWal() error { return 0, 0, err } return start, end, nil - }, log.Logger) + }, i.cfg.MaxBlockDuration, log.Logger) if err != nil { return fmt.Errorf("fatal error replaying wal %w", err) } diff --git a/modules/storage/config.go b/modules/storage/config.go index dd5e399509c..612d2acb068 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -2,6 +2,7 @@ package storage import ( "flag" + "time" "github.com/grafana/tempo/pkg/cache" @@ -36,6 +37,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.") cfg.Trace.WAL.Encoding = backend.EncSnappy cfg.Trace.WAL.SearchEncoding = backend.EncNone + cfg.Trace.WAL.IngestionSlack = 2 * time.Minute cfg.Trace.Search = &tempodb.SearchConfig{} cfg.Trace.Search.ChunkSizeBytes = tempodb.DefaultSearchChunkSizeBytes diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 8ffd303ce5a..5e028c70c7e 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -21,8 +21,9 @@ const maxDataEncodingLength = 32 // AppendBlock is a block that is actively used to append new objects to. It stores all data in the appendFile // in the order it was received and an in memory sorted index. type AppendBlock struct { - meta *backend.BlockMeta - encoding encoding.VersionedEncoding + meta *backend.BlockMeta + encoding encoding.VersionedEncoding + ingestionSlack time.Duration appendFile *os.File appender encoding.Appender @@ -32,7 +33,7 @@ type AppendBlock struct { once sync.Once } -func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string) (*AppendBlock, error) { +func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration) (*AppendBlock, error) { if strings.ContainsRune(dataEncoding, ':') || len([]rune(dataEncoding)) > maxDataEncodingLength { return nil, fmt.Errorf("dataEncoding %s is invalid", dataEncoding) @@ -44,9 +45,10 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.En } h := &AppendBlock{ - encoding: v, - meta: backend.NewBlockMeta(tenantID, id, v.Version(), e, dataEncoding), - filepath: filepath, + encoding: v, + meta: backend.NewBlockMeta(tenantID, id, v.Version(), e, dataEncoding), + filepath: filepath, + ingestionSlack: ingestionSlack, } name := h.fullFilename() @@ -69,7 +71,7 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.En // newAppendBlockFromFile returns an AppendBlock that can not be appended to, but can // be completed. It can return a warning or a fatal error -func newAppendBlockFromFile(filename string, path string, fn RangeFunc) (*AppendBlock, error, error) { +func newAppendBlockFromFile(filename string, path string, ingestionSlack time.Duration, additionalStartSlack time.Duration, fn RangeFunc) (*AppendBlock, error, error) { var warning error blockID, tenantID, version, e, dataEncoding, err := ParseFilename(filename) if err != nil { @@ -82,9 +84,10 @@ func newAppendBlockFromFile(filename string, path string, fn RangeFunc) (*Append } b := &AppendBlock{ - meta: backend.NewBlockMeta(tenantID, blockID, version, e, dataEncoding), - filepath: path, - encoding: v, + meta: backend.NewBlockMeta(tenantID, blockID, version, e, dataEncoding), + filepath: path, + encoding: v, + ingestionSlack: ingestionSlack, } // replay file to extract records @@ -101,6 +104,7 @@ func newAppendBlockFromFile(filename string, path string, fn RangeFunc) (*Append if err != nil { return err } + start, end = b.adjustTimeRangeForSlack(start, end, additionalStartSlack) if start < blockStart { blockStart = start } @@ -128,6 +132,7 @@ func (a *AppendBlock) Append(id common.ID, b []byte, start, end uint32) error { if err != nil { return err } + start, end = a.adjustTimeRangeForSlack(start, end, 0) a.meta.ObjectAdded(id, start, end) return nil } @@ -238,3 +243,18 @@ func (a *AppendBlock) file() (*os.File, error) { return a.readFile, err } + +func (a *AppendBlock) adjustTimeRangeForSlack(start uint32, end uint32, additionalStartSlack time.Duration) (uint32, uint32) { + now := time.Now() + startOfRange := uint32(now.Add(-a.ingestionSlack).Add(-additionalStartSlack).Unix()) + endOfRange := uint32(now.Add(a.ingestionSlack).Unix()) + + if start < startOfRange { + start = startOfRange // jpe prefer now + } + if end > endOfRange { + end = endOfRange + } + + return start, end +} diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index fff7049883e..3a59db2d4ef 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -36,6 +36,7 @@ type Config struct { BlocksFilepath string Encoding backend.Encoding `yaml:"encoding"` SearchEncoding backend.Encoding `yaml:"search_encoding"` + IngestionSlack time.Duration `yaml:"ingestion_time_range_slack"` } func New(c *Config) (*WAL, error) { @@ -84,7 +85,7 @@ func New(c *Config) (*WAL, error) { } // RescanBlocks returns a slice of append blocks from the wal folder -func (w *WAL) RescanBlocks(fn RangeFunc, log log.Logger) ([]*AppendBlock, error) { +func (w *WAL) RescanBlocks(fn RangeFunc, additionalStartSlack time.Duration, log log.Logger) ([]*AppendBlock, error) { files, err := os.ReadDir(w.c.Filepath) if err != nil { return nil, err @@ -103,7 +104,7 @@ func (w *WAL) RescanBlocks(fn RangeFunc, log log.Logger) ([]*AppendBlock, error) } level.Info(log).Log("msg", "beginning replay", "file", f.Name(), "size", fileInfo.Size()) - b, warning, err := newAppendBlockFromFile(f.Name(), w.c.Filepath, fn) + b, warning, err := newAppendBlockFromFile(f.Name(), w.c.Filepath, w.c.IngestionSlack, additionalStartSlack, fn) remove := false if err != nil { @@ -138,7 +139,7 @@ func (w *WAL) RescanBlocks(fn RangeFunc, log log.Logger) ([]*AppendBlock, error) } func (w *WAL) NewBlock(id uuid.UUID, tenantID string, dataEncoding string) (*AppendBlock, error) { - return newAppendBlock(id, tenantID, w.c.Filepath, w.c.Encoding, dataEncoding) + return newAppendBlock(id, tenantID, w.c.Filepath, w.c.Encoding, dataEncoding, w.c.IngestionSlack) } func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string) (*os.File, string, backend.Encoding, error) { diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index 8e919483ae2..0f7f4ae77b4 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -153,7 +153,7 @@ func TestErrorConditions(t *testing.T) { blocks, err := wal.RescanBlocks(func([]byte, string) (uint32, uint32, error) { return 0, 0, nil - }, log.NewNopLogger()) + }, 0, log.NewNopLogger()) require.NoError(t, err, "unexpected error getting blocks") require.Len(t, blocks, 1) @@ -164,10 +164,11 @@ func TestErrorConditions(t *testing.T) { require.NoFileExists(t, filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e:blerg:v2:gzip")) } -func TestAppendBlockStartEnd(t *testing.T) { +func TestAppendBlockStartEnd(t *testing.T) { // jpe extend wal, err := New(&Config{ - Filepath: t.TempDir(), - Encoding: backend.EncNone, + Filepath: t.TempDir(), + Encoding: backend.EncNone, + IngestionSlack: 2 * time.Minute, }) require.NoError(t, err, "unexpected error creating temp wal") @@ -191,12 +192,12 @@ func TestAppendBlockStartEnd(t *testing.T) { require.Equal(t, blockEnd, uint32(block.meta.EndTime.Unix())) // rescan the block and make sure that start/end times are correct - blockStart = uint32(time.Now().Add(time.Hour).Unix()) - blockEnd = uint32(time.Now().Add(2 * time.Hour).Unix()) + blockStart = uint32(time.Now().Add(-time.Hour).Unix()) + blockEnd = uint32(time.Now().Unix()) blocks, err := wal.RescanBlocks(func([]byte, string) (uint32, uint32, error) { return blockStart, blockEnd, nil - }, log.NewNopLogger()) + }, time.Hour, log.NewNopLogger()) require.NoError(t, err, "unexpected error getting blocks") require.Len(t, blocks, 1) @@ -256,7 +257,7 @@ func testAppendReplayFind(t *testing.T, e backend.Encoding) { blocks, err := wal.RescanBlocks(func([]byte, string) (uint32, uint32, error) { return 0, 0, nil - }, log.NewNopLogger()) + }, 0, log.NewNopLogger()) require.NoError(t, err, "unexpected error getting blocks") require.Len(t, blocks, 1) @@ -489,7 +490,7 @@ func benchmarkWriteFindReplay(b *testing.B, encoding backend.Encoding) { // replay _, err = wal.RescanBlocks(func([]byte, string) (uint32, uint32, error) { return 0, 0, nil - }, log.NewNopLogger()) + }, 0, log.NewNopLogger()) require.NoError(b, err) } } From 68ef5cae65c52f2e8bf0c325a54569f1a7a3daf9 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 8 Mar 2022 09:54:17 -0500 Subject: [PATCH 2/5] metrics and tests Signed-off-by: Joe Elliott --- tempodb/wal/append_block.go | 6 ++++-- tempodb/wal/wal.go | 12 +++++++++++ tempodb/wal/wal_test.go | 40 +++++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 5e028c70c7e..5f1ea3f4d20 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -250,10 +250,12 @@ func (a *AppendBlock) adjustTimeRangeForSlack(start uint32, end uint32, addition endOfRange := uint32(now.Add(a.ingestionSlack).Unix()) if start < startOfRange { - start = startOfRange // jpe prefer now + metricWarnings.WithLabelValues(a.meta.TenantID, reasonOutsideIngestionSlack).Inc() + start = uint32(now.Unix()) } if end > endOfRange { - end = endOfRange + metricWarnings.WithLabelValues(a.meta.TenantID, reasonOutsideIngestionSlack).Inc() + end = uint32(now.Unix()) } return start, end diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 3a59db2d4ef..baf96e9b5c2 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -10,12 +10,24 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/local" versioned_encoding "github.com/grafana/tempo/tempodb/encoding" ) +const reasonOutsideIngestionSlack = "outside_ingestion_time_slack" + +var ( + metricWarnings = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempodb", + Name: "warnings_total", + Help: "The total number of warnings per tenant with reason.", + }, []string{"tenant", "reason"}) +) + // extracts a time range from an object. start/end times returned are unix epoch // seconds type RangeFunc func(obj []byte, dataEncoding string) (uint32, uint32, error) diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index 0f7f4ae77b4..ca7ef4eb617 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/golang/protobuf/proto" "github.com/google/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/tempo/pkg/tempopb" @@ -205,6 +206,45 @@ func TestAppendBlockStartEnd(t *testing.T) { // jpe extend require.Equal(t, blockEnd, uint32(blocks[0].meta.EndTime.Unix())) } +func TestAdjustTimeRangeForSlack(t *testing.T) { + a := &AppendBlock{ + meta: &backend.BlockMeta{ + TenantID: "test", + }, + ingestionSlack: 2 * time.Minute, + } + + // test happy path + start := uint32(time.Now().Unix()) + end := uint32(time.Now().Unix()) + actualStart, actualEnd := a.adjustTimeRangeForSlack(start, end, 0) + assert.Equal(t, start, actualStart) + assert.Equal(t, end, actualEnd) + + // test start out of range + now := uint32(time.Now().Unix()) + start = uint32(time.Now().Add(-time.Hour).Unix()) + end = uint32(time.Now().Unix()) + actualStart, actualEnd = a.adjustTimeRangeForSlack(start, end, 0) + assert.Equal(t, now, actualStart) + assert.Equal(t, end, actualEnd) + + // test end out of range + now = uint32(time.Now().Unix()) + start = uint32(time.Now().Unix()) + end = uint32(time.Now().Add(time.Hour).Unix()) + actualStart, actualEnd = a.adjustTimeRangeForSlack(start, end, 0) + assert.Equal(t, start, actualStart) + assert.Equal(t, now, actualEnd) + + // test additional start slack honored + start = uint32(time.Now().Add(-time.Hour).Unix()) + end = uint32(time.Now().Unix()) + actualStart, actualEnd = a.adjustTimeRangeForSlack(start, end, time.Hour) + assert.Equal(t, start, actualStart) + assert.Equal(t, end, actualEnd) +} + func TestAppendReplayFind(t *testing.T) { for _, e := range backend.SupportedEncoding { t.Run(e.String(), func(t *testing.T) { From 6180410c0cf009e7fcf6c0bf38a4ce8e0a9e3e3e Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 8 Mar 2022 10:07:58 -0500 Subject: [PATCH 3/5] tempo_warnings_total Signed-off-by: Joe Elliott --- CHANGELOG.md | 8 ++++++++ tempodb/wal/wal.go | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0016a80bf8a..56028e9b3bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,14 @@ * [BUGFIX] Fixed issue when query-frontend doesn't log request details when request is cancelled [#1136](https://github.com/grafana/tempo/issues/1136) (@adityapwr) * [BUGFIX] Update OTLP port in examples (docker-compose & kubernetes) from legacy ports (55680/55681) to new ports (4317/4318) [#1294](https://github.com/grafana/tempo/pull/1294) (@mapno) * [BUGFIX] Fixes min/max time on blocks to be based on span times instead of ingestion time. [#1314](https://github.com/grafana/tempo/pull/1314) (@joe-elliott) + * Includes new configuration option to restrict the amount of slack around now to update the block start/end time. [#1332](https://github.com/grafana/tempo/pull/1332) (@joe-elliott) + ``` + storage: + trace: + wal: + ingestion_time_range_slack: 2m0s + ``` + * Includes a new metric to determine how often this range is exceeded: `tempo_warnings_total{reason="outside_ingestion_time_slack"}` ## v1.3.2 / 2022-02-23 * [BUGFIX] Fixed an issue where the query-frontend would corrupt start/end time ranges on searches which included the ingesters [#1295] (@joe-elliott) diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index baf96e9b5c2..d4dc8ae9060 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -22,7 +22,7 @@ const reasonOutsideIngestionSlack = "outside_ingestion_time_slack" var ( metricWarnings = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "tempodb", + Namespace: "tempo", Name: "warnings_total", Help: "The total number of warnings per tenant with reason.", }, []string{"tenant", "reason"}) From e43022ed13d7ced66d1d5ebeaa1487b5a292dad6 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 8 Mar 2022 15:42:26 -0500 Subject: [PATCH 4/5] fixed warning inc Signed-off-by: Joe Elliott --- tempodb/wal/append_block.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 5f1ea3f4d20..f338710ca95 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -249,14 +249,19 @@ func (a *AppendBlock) adjustTimeRangeForSlack(start uint32, end uint32, addition startOfRange := uint32(now.Add(-a.ingestionSlack).Add(-additionalStartSlack).Unix()) endOfRange := uint32(now.Add(a.ingestionSlack).Unix()) + warn := false if start < startOfRange { - metricWarnings.WithLabelValues(a.meta.TenantID, reasonOutsideIngestionSlack).Inc() + warn = true start = uint32(now.Unix()) } if end > endOfRange { - metricWarnings.WithLabelValues(a.meta.TenantID, reasonOutsideIngestionSlack).Inc() + warn = true end = uint32(now.Unix()) } + if warn { + metricWarnings.WithLabelValues(a.meta.TenantID, reasonOutsideIngestionSlack).Inc() + } + return start, end } From 5496807d2eca638753118f9a4e6c4a98f2e2ddf8 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 8 Mar 2022 15:53:33 -0500 Subject: [PATCH 5/5] comment Signed-off-by: Joe Elliott --- modules/ingester/ingester.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index aa9ecf4b766..7018b9db80a 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -327,6 +327,10 @@ func (i *Ingester) TransferOut(ctx context.Context) error { func (i *Ingester) replayWal() error { level.Info(log.Logger).Log("msg", "beginning wal replay") + // pass i.cfg.MaxBlockDuration into RescanBlocks to make an attempt to set the start time + // of the blocks correctly. as we are scanning traces in the blocks we read their start/end times + // and attempt to set start/end times appropriately. we use now - max_block_duration - ingestion_slack + // as the minimum acceptable start time for a replayed block. blocks, err := i.store.WAL().RescanBlocks(func(b []byte, dataEncoding string) (uint32, uint32, error) { d, err := model.NewObjectDecoder(dataEncoding) if err != nil {