From ae544b329279446c6826248fcb212b0054859e1e Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 23 Jan 2025 03:49:32 -0500 Subject: [PATCH] Check cached postings TTL before returning from cache and expose some metrics (#10500) * TSDB: Check cached postings TTL before returning from cache + metrics Signed-off-by: Marco Pracucci * Updated vendored mimir-prometheus Signed-off-by: Marco Pracucci * Fixed TestTSDBBuilder Signed-off-by: Marco Pracucci * Fixed unit tests Signed-off-by: Marco Pracucci * Added CHANGELOG entry Signed-off-by: Marco Pracucci * Revendor mimir-prometheus from main Signed-off-by: Marco Pracucci --------- Signed-off-by: Marco Pracucci --- CHANGELOG.md | 10 ++ .../mimir-ingest-storage/config/mimir.yaml | 2 + go.mod | 2 +- go.sum | 4 +- pkg/blockbuilder/tsdb.go | 34 +++--- pkg/ingester/ingester.go | 2 + pkg/ingester/metrics.go | 7 ++ pkg/ingester/metrics_test.go | 72 +++++++++++ .../prometheus/prometheus/tsdb/block.go | 6 +- .../prometheus/prometheus/tsdb/db.go | 17 ++- .../prometheus/prometheus/tsdb/head.go | 4 +- .../tsdb/postings_for_matchers_cache.go | 112 +++++++++++++++++- vendor/modules.txt | 4 +- 13 files changed, 241 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6da31cafded..b1c6fab3175 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,15 @@ * [ENHANCEMENT] Ingester: Hide tokens in ingester ring status page when ingest storage is enabled #10399 * [ENHANCEMENT] Ingester: add `active_series_additional_custom_trackers` configuration, in addition to the already existing `active_series_custom_trackers`. The `active_series_additional_custom_trackers` configuration allows you to configure additional custom trackers that get merged with `active_series_custom_trackers` at runtime. #10428 * [ENHANCEMENT] Query-frontend: Allow blocking raw http requests with the `blocked_requests` configuration. Requests can be blocked based on their path, method or query parameters #10484 +* [ENHANCEMENT] Ingester: Added the following metrics exported by `PostingsForMatchers` cache: #10500 + * `cortex_ingester_tsdb_head_postings_for_matchers_cache_hits_total` + * `cortex_ingester_tsdb_head_postings_for_matchers_cache_misses_total` + * `cortex_ingester_tsdb_head_postings_for_matchers_cache_requests_total` + * `cortex_ingester_tsdb_head_postings_for_matchers_cache_skips_total` + * `cortex_ingester_tsdb_block_postings_for_matchers_cache_hits_total` + * `cortex_ingester_tsdb_block_postings_for_matchers_cache_misses_total` + * `cortex_ingester_tsdb_block_postings_for_matchers_cache_requests_total` + * `cortex_ingester_tsdb_block_postings_for_matchers_cache_skips_total` * [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185 * [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154 * [BUGFIX] Query-frontend and querier: show warning/info annotations in some cases where they were missing (if a lazy querier was used). #10277 @@ -40,6 +49,7 @@ * [BUGFIX] Distributor: return HTTP status 415 Unsupported Media Type instead of 200 Success for Remote Write 2.0 until we support it. #10423 * [BUGFIX] Query-frontend: Add flag `-query-frontend.prom2-range-compat` and corresponding YAML to rewrite queries with ranges that worked in Prometheus 2 but are invalid in Prometheus 3. #10445 #10461 #10502 * [BUGFIX] Distributor: Fix edge case at the HA-tracker with memberlist as KVStore, where when a replica in the KVStore is marked as deleted but not yet removed, it fails to update the KVStore. #10443 +* [BUGFIX] Ingester: Fixed a race condition in the `PostingsForMatchers` cache that may have infrequently returned expired cached postings. #10500 ### Mixin diff --git a/development/mimir-ingest-storage/config/mimir.yaml b/development/mimir-ingest-storage/config/mimir.yaml index 50b49fb6150..d3e4c16e7d7 100644 --- a/development/mimir-ingest-storage/config/mimir.yaml +++ b/development/mimir-ingest-storage/config/mimir.yaml @@ -38,6 +38,8 @@ blocks_storage: bucket_name: mimir-blocks tsdb: dir: /data/ingester + head_postings_for_matchers_cache_force: true + block_postings_for_matchers_cache_force: true bucket_store: index_cache: diff --git a/go.mod b/go.mod index 5e0f956563b..2aa80648eaf 100644 --- a/go.mod +++ b/go.mod @@ -288,7 +288,7 @@ require ( sigs.k8s.io/yaml v1.4.0 // indirect ) -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250116135451-914982745659 +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250123075837-0cc2978b5013 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index f97dd9a73bb..342ceddcbff 100644 --- a/go.sum +++ b/go.sum @@ -1283,8 +1283,8 @@ github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40 h1:1TeKhyS+pvzO github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40/go.mod h1:IGRj8oOoxwJbHBYl1+OhS9UjQR0dv6SQOep7HqmtyFU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20250116135451-914982745659 h1:OfkJoA8D1dg3zMW3kDMkDdbcMBlNqDfCFSZgPcMToOQ= -github.com/grafana/mimir-prometheus v0.0.0-20250116135451-914982745659/go.mod h1:KfyZCeyGxf5gvl6VZbrQsd400nJjGw+ygMEtDVZKIT4= +github.com/grafana/mimir-prometheus v0.0.0-20250123075837-0cc2978b5013 h1:70NFJ8OVRMCPc89vN520cTJd0vo/elnaXoF7q0I6c2M= +github.com/grafana/mimir-prometheus v0.0.0-20250123075837-0cc2978b5013/go.mod h1:KfyZCeyGxf5gvl6VZbrQsd400nJjGw+ygMEtDVZKIT4= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/prometheus-alertmanager v0.25.1-0.20240930132144-b5e64e81e8d3 h1:6D2gGAwyQBElSrp3E+9lSr7k8gLuP3Aiy20rweLWeBw= diff --git a/pkg/blockbuilder/tsdb.go b/pkg/blockbuilder/tsdb.go index 5f057f11752..acecdc54b25 100644 --- a/pkg/blockbuilder/tsdb.go +++ b/pkg/blockbuilder/tsdb.go @@ -280,22 +280,24 @@ func (b *TSDBBuilder) newTSDB(tenant tsdbTenant) (*userTSDB, error) { } db, err := tsdb.Open(udir, util_log.SlogFromGoKit(userLogger), nil, &tsdb.Options{ - RetentionDuration: 0, - MinBlockDuration: 2 * time.Hour.Milliseconds(), - MaxBlockDuration: 2 * time.Hour.Milliseconds(), - NoLockfile: true, - StripeSize: b.blocksStorageCfg.TSDB.StripeSize, - HeadChunksWriteBufferSize: b.blocksStorageCfg.TSDB.HeadChunksWriteBufferSize, - HeadChunksWriteQueueSize: b.blocksStorageCfg.TSDB.HeadChunksWriteQueueSize, - WALSegmentSize: -1, // No WAL - BlocksToDelete: func([]*tsdb.Block) map[ulid.ULID]struct{} { return map[ulid.ULID]struct{}{} }, // Always noop - IsolationDisabled: true, - EnableOverlappingCompaction: false, // Always false since Mimir only uploads lvl 1 compacted blocks - OutOfOrderTimeWindow: b.limits.OutOfOrderTimeWindow(userID).Milliseconds(), // The unit must be same as our timestamps. - OutOfOrderCapMax: int64(b.blocksStorageCfg.TSDB.OutOfOrderCapacityMax), - EnableNativeHistograms: b.limits.NativeHistogramsIngestionEnabled(userID), - SecondaryHashFunction: nil, // TODO(codesome): May needed when applying limits. Used to determine the owned series by an ingesters - SeriesLifecycleCallback: udb, + RetentionDuration: 0, + MinBlockDuration: 2 * time.Hour.Milliseconds(), + MaxBlockDuration: 2 * time.Hour.Milliseconds(), + NoLockfile: true, + StripeSize: b.blocksStorageCfg.TSDB.StripeSize, + HeadChunksWriteBufferSize: b.blocksStorageCfg.TSDB.HeadChunksWriteBufferSize, + HeadChunksWriteQueueSize: b.blocksStorageCfg.TSDB.HeadChunksWriteQueueSize, + WALSegmentSize: -1, // No WAL + BlocksToDelete: func([]*tsdb.Block) map[ulid.ULID]struct{} { return map[ulid.ULID]struct{}{} }, // Always noop + IsolationDisabled: true, + EnableOverlappingCompaction: false, // Always false since Mimir only uploads lvl 1 compacted blocks + OutOfOrderTimeWindow: b.limits.OutOfOrderTimeWindow(userID).Milliseconds(), // The unit must be same as our timestamps. + OutOfOrderCapMax: int64(b.blocksStorageCfg.TSDB.OutOfOrderCapacityMax), + EnableNativeHistograms: b.limits.NativeHistogramsIngestionEnabled(userID), + SecondaryHashFunction: nil, // TODO(codesome): May needed when applying limits. Used to determine the owned series by an ingesters + SeriesLifecycleCallback: udb, + HeadPostingsForMatchersCacheMetrics: tsdb.NewPostingsForMatchersCacheMetrics(nil), + BlockPostingsForMatchersCacheMetrics: tsdb.NewPostingsForMatchersCacheMetrics(nil), }, nil) if err != nil { return nil, err diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f7e5bdbd7bc..2407e64d3c9 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -2728,10 +2728,12 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD HeadPostingsForMatchersCacheMaxItems: i.cfg.BlocksStorageConfig.TSDB.HeadPostingsForMatchersCacheMaxItems, HeadPostingsForMatchersCacheMaxBytes: i.cfg.BlocksStorageConfig.TSDB.HeadPostingsForMatchersCacheMaxBytes, HeadPostingsForMatchersCacheForce: i.cfg.BlocksStorageConfig.TSDB.HeadPostingsForMatchersCacheForce, + HeadPostingsForMatchersCacheMetrics: i.tsdbMetrics.headPostingsForMatchersCacheMetrics, BlockPostingsForMatchersCacheTTL: i.cfg.BlocksStorageConfig.TSDB.BlockPostingsForMatchersCacheTTL, BlockPostingsForMatchersCacheMaxItems: i.cfg.BlocksStorageConfig.TSDB.BlockPostingsForMatchersCacheMaxItems, BlockPostingsForMatchersCacheMaxBytes: i.cfg.BlocksStorageConfig.TSDB.BlockPostingsForMatchersCacheMaxBytes, BlockPostingsForMatchersCacheForce: i.cfg.BlocksStorageConfig.TSDB.BlockPostingsForMatchersCacheForce, + BlockPostingsForMatchersCacheMetrics: i.tsdbMetrics.blockPostingsForMatchersCacheMetrics, EnableNativeHistograms: i.limits.NativeHistogramsIngestionEnabled(userID), EnableOOONativeHistograms: i.limits.OOONativeHistogramsIngestionEnabled(userID), SecondaryHashFunction: secondaryTSDBHashFunctionForUser(userID), diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index bf920c383a3..68d9df67b36 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -12,6 +12,7 @@ import ( dskit_metrics "github.com/grafana/dskit/metrics" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/tsdb" "go.uber.org/atomic" util_math "github.com/grafana/mimir/pkg/util/math" @@ -522,6 +523,9 @@ type tsdbMetrics struct { memSeriesCreatedTotal *prometheus.Desc memSeriesRemovedTotal *prometheus.Desc + headPostingsForMatchersCacheMetrics *tsdb.PostingsForMatchersCacheMetrics + blockPostingsForMatchersCacheMetrics *tsdb.PostingsForMatchersCacheMetrics + regs *dskit_metrics.TenantRegistries } @@ -701,6 +705,9 @@ func newTSDBMetrics(r prometheus.Registerer, logger log.Logger) *tsdbMetrics { "cortex_ingester_memory_series_removed_total", "The total number of series that were removed per user.", []string{"user"}, nil), + + headPostingsForMatchersCacheMetrics: tsdb.NewPostingsForMatchersCacheMetrics(prometheus.WrapRegistererWithPrefix("cortex_ingester_tsdb_head_", r)), + blockPostingsForMatchersCacheMetrics: tsdb.NewPostingsForMatchersCacheMetrics(prometheus.WrapRegistererWithPrefix("cortex_ingester_tsdb_block_", r)), } if r != nil { diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index 35000bd68bf..23c479aa26e 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -239,6 +239,42 @@ func TestTSDBMetrics(t *testing.T) { # HELP cortex_ingester_tsdb_exemplar_exemplars_in_storage Number of TSDB exemplars currently in storage. # TYPE cortex_ingester_tsdb_exemplar_exemplars_in_storage gauge cortex_ingester_tsdb_exemplar_exemplars_in_storage 30 + + # HELP cortex_ingester_tsdb_head_postings_for_matchers_cache_hits_total Total number of postings lists returned from the PostingsForMatchers cache. + # TYPE cortex_ingester_tsdb_head_postings_for_matchers_cache_hits_total counter + cortex_ingester_tsdb_head_postings_for_matchers_cache_hits_total 0 + + # HELP cortex_ingester_tsdb_head_postings_for_matchers_cache_misses_total Total number of requests to the PostingsForMatchers cache for which there is no valid cached entry. The subsequent result is cached. + # TYPE cortex_ingester_tsdb_head_postings_for_matchers_cache_misses_total counter + cortex_ingester_tsdb_head_postings_for_matchers_cache_misses_total 0 + + # HELP cortex_ingester_tsdb_head_postings_for_matchers_cache_requests_total Total number of requests to the PostingsForMatchers cache. + # TYPE cortex_ingester_tsdb_head_postings_for_matchers_cache_requests_total counter + cortex_ingester_tsdb_head_postings_for_matchers_cache_requests_total 0 + + # HELP cortex_ingester_tsdb_head_postings_for_matchers_cache_skips_total Total number of requests to the PostingsForMatchers cache that have been skipped the cache. The subsequent result is not cached. + # TYPE cortex_ingester_tsdb_head_postings_for_matchers_cache_skips_total counter + cortex_ingester_tsdb_head_postings_for_matchers_cache_skips_total{reason="canceled-cached-entry"} 0 + cortex_ingester_tsdb_head_postings_for_matchers_cache_skips_total{reason="ineligible"} 0 + cortex_ingester_tsdb_head_postings_for_matchers_cache_skips_total{reason="stale-cached-entry"} 0 + + # HELP cortex_ingester_tsdb_block_postings_for_matchers_cache_hits_total Total number of postings lists returned from the PostingsForMatchers cache. + # TYPE cortex_ingester_tsdb_block_postings_for_matchers_cache_hits_total counter + cortex_ingester_tsdb_block_postings_for_matchers_cache_hits_total 0 + + # HELP cortex_ingester_tsdb_block_postings_for_matchers_cache_misses_total Total number of requests to the PostingsForMatchers cache for which there is no valid cached entry. The subsequent result is cached. + # TYPE cortex_ingester_tsdb_block_postings_for_matchers_cache_misses_total counter + cortex_ingester_tsdb_block_postings_for_matchers_cache_misses_total 0 + + # HELP cortex_ingester_tsdb_block_postings_for_matchers_cache_requests_total Total number of requests to the PostingsForMatchers cache. + # TYPE cortex_ingester_tsdb_block_postings_for_matchers_cache_requests_total counter + cortex_ingester_tsdb_block_postings_for_matchers_cache_requests_total 0 + + # HELP cortex_ingester_tsdb_block_postings_for_matchers_cache_skips_total Total number of requests to the PostingsForMatchers cache that have been skipped the cache. The subsequent result is not cached. + # TYPE cortex_ingester_tsdb_block_postings_for_matchers_cache_skips_total counter + cortex_ingester_tsdb_block_postings_for_matchers_cache_skips_total{reason="canceled-cached-entry"} 0 + cortex_ingester_tsdb_block_postings_for_matchers_cache_skips_total{reason="ineligible"} 0 + cortex_ingester_tsdb_block_postings_for_matchers_cache_skips_total{reason="stale-cached-entry"} 0 `)) require.NoError(t, err) } @@ -457,6 +493,42 @@ func TestTSDBMetricsWithRemoval(t *testing.T) { # TYPE cortex_ingester_tsdb_out_of_order_samples_appended_total counter cortex_ingester_tsdb_out_of_order_samples_appended_total{user="user1"} 3 cortex_ingester_tsdb_out_of_order_samples_appended_total{user="user2"} 3 + + # HELP cortex_ingester_tsdb_head_postings_for_matchers_cache_hits_total Total number of postings lists returned from the PostingsForMatchers cache. + # TYPE cortex_ingester_tsdb_head_postings_for_matchers_cache_hits_total counter + cortex_ingester_tsdb_head_postings_for_matchers_cache_hits_total 0 + + # HELP cortex_ingester_tsdb_head_postings_for_matchers_cache_misses_total Total number of requests to the PostingsForMatchers cache for which there is no valid cached entry. The subsequent result is cached. + # TYPE cortex_ingester_tsdb_head_postings_for_matchers_cache_misses_total counter + cortex_ingester_tsdb_head_postings_for_matchers_cache_misses_total 0 + + # HELP cortex_ingester_tsdb_head_postings_for_matchers_cache_requests_total Total number of requests to the PostingsForMatchers cache. + # TYPE cortex_ingester_tsdb_head_postings_for_matchers_cache_requests_total counter + cortex_ingester_tsdb_head_postings_for_matchers_cache_requests_total 0 + + # HELP cortex_ingester_tsdb_head_postings_for_matchers_cache_skips_total Total number of requests to the PostingsForMatchers cache that have been skipped the cache. The subsequent result is not cached. + # TYPE cortex_ingester_tsdb_head_postings_for_matchers_cache_skips_total counter + cortex_ingester_tsdb_head_postings_for_matchers_cache_skips_total{reason="canceled-cached-entry"} 0 + cortex_ingester_tsdb_head_postings_for_matchers_cache_skips_total{reason="ineligible"} 0 + cortex_ingester_tsdb_head_postings_for_matchers_cache_skips_total{reason="stale-cached-entry"} 0 + + # HELP cortex_ingester_tsdb_block_postings_for_matchers_cache_hits_total Total number of postings lists returned from the PostingsForMatchers cache. + # TYPE cortex_ingester_tsdb_block_postings_for_matchers_cache_hits_total counter + cortex_ingester_tsdb_block_postings_for_matchers_cache_hits_total 0 + + # HELP cortex_ingester_tsdb_block_postings_for_matchers_cache_misses_total Total number of requests to the PostingsForMatchers cache for which there is no valid cached entry. The subsequent result is cached. + # TYPE cortex_ingester_tsdb_block_postings_for_matchers_cache_misses_total counter + cortex_ingester_tsdb_block_postings_for_matchers_cache_misses_total 0 + + # HELP cortex_ingester_tsdb_block_postings_for_matchers_cache_requests_total Total number of requests to the PostingsForMatchers cache. + # TYPE cortex_ingester_tsdb_block_postings_for_matchers_cache_requests_total counter + cortex_ingester_tsdb_block_postings_for_matchers_cache_requests_total 0 + + # HELP cortex_ingester_tsdb_block_postings_for_matchers_cache_skips_total Total number of requests to the PostingsForMatchers cache that have been skipped the cache. The subsequent result is not cached. + # TYPE cortex_ingester_tsdb_block_postings_for_matchers_cache_skips_total counter + cortex_ingester_tsdb_block_postings_for_matchers_cache_skips_total{reason="canceled-cached-entry"} 0 + cortex_ingester_tsdb_block_postings_for_matchers_cache_skips_total{reason="ineligible"} 0 + cortex_ingester_tsdb_block_postings_for_matchers_cache_skips_total{reason="stale-cached-entry"} 0 `)) require.NoError(t, err) } diff --git a/vendor/github.com/prometheus/prometheus/tsdb/block.go b/vendor/github.com/prometheus/prometheus/tsdb/block.go index 372aa607080..a8fc4ccfcfe 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/block.go @@ -352,11 +352,11 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. func OpenBlock(logger *slog.Logger, dir string, pool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory) (pb *Block, err error) { - return OpenBlockWithOptions(logger, dir, pool, postingsDecoderFactory, nil, DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce) + return OpenBlockWithOptions(logger, dir, pool, postingsDecoderFactory, nil, DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce, NewPostingsForMatchersCacheMetrics(nil)) } // OpenBlockWithOptions is like OpenBlock but allows to pass a cache provider and sharding function. -func OpenBlockWithOptions(logger *slog.Logger, dir string, pool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory, cache index.ReaderCacheProvider, postingsCacheTTL time.Duration, postingsCacheMaxItems int, postingsCacheMaxBytes int64, postingsCacheForce bool) (pb *Block, err error) { +func OpenBlockWithOptions(logger *slog.Logger, dir string, pool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory, cache index.ReaderCacheProvider, postingsCacheTTL time.Duration, postingsCacheMaxItems int, postingsCacheMaxBytes int64, postingsCacheForce bool, postingsCacheMetrics *PostingsForMatchersCacheMetrics) (pb *Block, err error) { if logger == nil { logger = promslog.NewNopLogger() } @@ -385,7 +385,7 @@ func OpenBlockWithOptions(logger *slog.Logger, dir string, pool chunkenc.Pool, p if err != nil { return nil, err } - pfmc := NewPostingsForMatchersCache(postingsCacheTTL, postingsCacheMaxItems, postingsCacheMaxBytes, postingsCacheForce) + pfmc := NewPostingsForMatchersCache(postingsCacheTTL, postingsCacheMaxItems, postingsCacheMaxBytes, postingsCacheForce, postingsCacheMetrics) ir := indexReaderWithPostingsForMatchers{indexReader, pfmc} closers = append(closers, ir) diff --git a/vendor/github.com/prometheus/prometheus/tsdb/db.go b/vendor/github.com/prometheus/prometheus/tsdb/db.go index 5ec576b6a7f..69278947aca 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/db.go @@ -100,10 +100,12 @@ func DefaultOptions() *Options { HeadPostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems, HeadPostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes, HeadPostingsForMatchersCacheForce: DefaultPostingsForMatchersCacheForce, + HeadPostingsForMatchersCacheMetrics: NewPostingsForMatchersCacheMetrics(nil), BlockPostingsForMatchersCacheTTL: DefaultPostingsForMatchersCacheTTL, BlockPostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems, BlockPostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes, BlockPostingsForMatchersCacheForce: DefaultPostingsForMatchersCacheForce, + BlockPostingsForMatchersCacheMetrics: NewPostingsForMatchersCacheMetrics(nil), } } @@ -259,6 +261,9 @@ type Options struct { // HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param. HeadPostingsForMatchersCacheForce bool + // HeadPostingsForMatchersCacheMetrics holds the metrics tracked by PostingsForMatchers cache when querying the Head. + HeadPostingsForMatchersCacheMetrics *PostingsForMatchersCacheMetrics + // BlockPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache of each compacted block. // If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished. BlockPostingsForMatchersCacheTTL time.Duration @@ -275,6 +280,9 @@ type Options struct { // regardless of the `concurrent` param. BlockPostingsForMatchersCacheForce bool + // BlockPostingsForMatchersCacheMetrics holds the metrics tracked by PostingsForMatchers cache when querying blocks. + BlockPostingsForMatchersCacheMetrics *PostingsForMatchersCacheMetrics + // SecondaryHashFunction is an optional function that is applied to each series in the Head. // Values returned from this function are preserved and available by calling ForEachSecondaryHash function on the Head. SecondaryHashFunction func(labels.Labels) uint32 @@ -702,7 +710,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { return nil, ErrClosed default: } - loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, DefaultPostingsDecoderFactory, nil, DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce) + loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, DefaultPostingsDecoderFactory, nil, DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce, NewPostingsForMatchersCacheMetrics(nil)) if err != nil { return nil, err } @@ -1043,6 +1051,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn headOpts.PostingsForMatchersCacheMaxItems = opts.HeadPostingsForMatchersCacheMaxItems headOpts.PostingsForMatchersCacheMaxBytes = opts.HeadPostingsForMatchersCacheMaxBytes headOpts.PostingsForMatchersCacheForce = opts.HeadPostingsForMatchersCacheForce + headOpts.PostingsForMatchersCacheMetrics = opts.HeadPostingsForMatchersCacheMetrics headOpts.SecondaryHashFunction = opts.SecondaryHashFunction if opts.WALReplayConcurrency > 0 { headOpts.WALReplayConcurrency = opts.WALReplayConcurrency @@ -1685,7 +1694,7 @@ func (db *DB) reloadBlocks() (err error) { db.mtx.Lock() defer db.mtx.Unlock() - loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory, db.opts.SeriesHashCache, db.opts.BlockPostingsForMatchersCacheTTL, db.opts.BlockPostingsForMatchersCacheMaxItems, db.opts.BlockPostingsForMatchersCacheMaxBytes, db.opts.BlockPostingsForMatchersCacheForce) + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory, db.opts.SeriesHashCache, db.opts.BlockPostingsForMatchersCacheTTL, db.opts.BlockPostingsForMatchersCacheMaxItems, db.opts.BlockPostingsForMatchersCacheMaxBytes, db.opts.BlockPostingsForMatchersCacheForce, db.opts.BlockPostingsForMatchersCacheMetrics) if err != nil { return err } @@ -1780,7 +1789,7 @@ func (db *DB) reloadBlocks() (err error) { return nil } -func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory, cache *hashcache.SeriesHashCache, postingsCacheTTL time.Duration, postingsCacheMaxItems int, postingsCacheMaxBytes int64, postingsCacheForce bool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { +func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory, cache *hashcache.SeriesHashCache, postingsCacheTTL time.Duration, postingsCacheMaxItems int, postingsCacheMaxBytes int64, postingsCacheForce bool, postingsCacheMetrics *PostingsForMatchersCacheMetrics) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { bDirs, err := blockDirs(dir) if err != nil { return nil, nil, fmt.Errorf("find blocks: %w", err) @@ -1802,7 +1811,7 @@ func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc. cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String()) } - block, err = OpenBlockWithOptions(l, bDir, chunkPool, postingsDecoderFactory, cacheProvider, postingsCacheTTL, postingsCacheMaxItems, postingsCacheMaxBytes, postingsCacheForce) + block, err = OpenBlockWithOptions(l, bDir, chunkPool, postingsDecoderFactory, cacheProvider, postingsCacheTTL, postingsCacheMaxItems, postingsCacheMaxBytes, postingsCacheForce, postingsCacheMetrics) if err != nil { corrupted[meta.ULID] = err continue diff --git a/vendor/github.com/prometheus/prometheus/tsdb/head.go b/vendor/github.com/prometheus/prometheus/tsdb/head.go index b4fa652a438..8f8b709be1d 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/head.go @@ -193,6 +193,7 @@ type HeadOptions struct { PostingsForMatchersCacheMaxItems int PostingsForMatchersCacheMaxBytes int64 PostingsForMatchersCacheForce bool + PostingsForMatchersCacheMetrics *PostingsForMatchersCacheMetrics // Optional hash function applied to each new series. Computed hash value is preserved for each series in the head, // and values can be iterated by using Head.ForEachSecondaryHash method. @@ -222,6 +223,7 @@ func DefaultHeadOptions() *HeadOptions { PostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems, PostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes, PostingsForMatchersCacheForce: DefaultPostingsForMatchersCacheForce, + PostingsForMatchersCacheMetrics: NewPostingsForMatchersCacheMetrics(nil), WALReplayConcurrency: defaultWALReplayConcurrency, } ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax) @@ -296,7 +298,7 @@ func NewHead(r prometheus.Registerer, l *slog.Logger, wal, wbl *wlog.WL, opts *H stats: stats, reg: r, secondaryHashFunc: shf, - pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheMaxItems, opts.PostingsForMatchersCacheMaxBytes, opts.PostingsForMatchersCacheForce), + pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheMaxItems, opts.PostingsForMatchersCacheMaxBytes, opts.PostingsForMatchersCacheForce, opts.PostingsForMatchersCacheMetrics), } if err := h.resetInMemoryState(); err != nil { return nil, err diff --git a/vendor/github.com/prometheus/prometheus/tsdb/postings_for_matchers_cache.go b/vendor/github.com/prometheus/prometheus/tsdb/postings_for_matchers_cache.go index 2c207eab7e0..e82134eb714 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/postings_for_matchers_cache.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/postings_for_matchers_cache.go @@ -10,6 +10,8 @@ import ( "time" "github.com/DmitriyVTitov/size" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -52,7 +54,7 @@ type IndexPostingsReader interface { // NewPostingsForMatchersCache creates a new PostingsForMatchersCache. // If `ttl` is 0, then it only deduplicates in-flight requests. // If `force` is true, then all requests go through cache, regardless of the `concurrent` param provided to the PostingsForMatchers method. -func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64, force bool) *PostingsForMatchersCache { +func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64, force bool, metrics *PostingsForMatchersCacheMetrics) *PostingsForMatchersCache { b := &PostingsForMatchersCache{ calls: &sync.Map{}, cached: list.New(), @@ -62,8 +64,12 @@ func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64 maxItems: maxItems, maxBytes: maxBytes, force: force, + metrics: metrics, - timeNow: time.Now, + timeNow: func() time.Time { + // Ensure it is UTC, so that it's faster to compute the cache entry size. + return time.Now().UTC() + }, postingsForMatchers: PostingsForMatchers, tracer: otel.Tracer(""), @@ -86,6 +92,7 @@ type PostingsForMatchersCache struct { maxItems int maxBytes int64 force bool + metrics *PostingsForMatchersCacheMetrics // Signal whether there's already a call to expire() in progress, in order to avoid multiple goroutines // cleaning up expired entries at the same time (1 at a time is enough). @@ -101,6 +108,9 @@ type PostingsForMatchersCache struct { // beginning of onPromiseExecutionDone() execution. onPromiseExecutionDoneBeforeHook func() + // evictHeadBeforeHook is used for testing purposes. It allows to hook before calls to evictHead(). + evictHeadBeforeHook func() + tracer trace.Tracer // Preallocated for performance ttlAttrib attribute.KeyValue @@ -108,6 +118,8 @@ type PostingsForMatchersCache struct { } func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { + c.metrics.requests.Inc() + span := trace.SpanFromContext(ctx) defer func(startTime time.Time) { span.AddEvent( @@ -117,7 +129,9 @@ func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix I }(time.Now()) if !concurrent && !c.force { + c.metrics.skipsBecauseIneligible.Inc() span.AddEvent("cache not used") + p, err := c.postingsForMatchers(ctx, ix, ms...) if err != nil { span.SetStatus(codes.Error, "getting postings for matchers without cache failed") @@ -146,6 +160,10 @@ type postingsForMatcherPromise struct { done chan struct{} cloner *index.PostingsCloner err error + + // Keep track of the time this promise completed evaluation. + // Do not access this field until the done channel is closed. + evaluationCompletedAt time.Time } func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) { @@ -195,6 +213,31 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex oldPromise := oldPromiseValue.(*postingsForMatcherPromise) + // Check if the promise already completed the execution and its TTL has not expired yet. + // If the TTL has expired, we don't want to return it, so we just recompute the postings + // on-the-fly, bypassing the cache logic. It's less performant, but more accurate, because + // avoids returning stale data. + if c.ttl > 0 { + select { + case <-oldPromise.done: + if c.timeNow().Sub(oldPromise.evaluationCompletedAt) >= c.ttl { + // The cached promise already expired, but it has not been evicted. + span.AddEvent("skipping cached postingsForMatchers promise because its TTL already expired", trace.WithAttributes( + attribute.Stringer("cached promise evaluation completed at", oldPromise.evaluationCompletedAt), + attribute.String("cache_key", key), + )) + c.metrics.skipsBecauseStale.Inc() + + return func(ctx context.Context) (index.Postings, error) { + return c.postingsForMatchers(ctx, ix, ms...) + } + } + + default: + // The evaluation is still in-flight. We wait for it. + } + } + // Add the caller context to the ones tracked by the old promise (currently in-flight). if err := oldPromise.callersCtxTracker.add(ctx); err != nil && errors.Is(err, errContextsTrackerCanceled{}) { // We've hit a race condition happening when the "loaded" promise execution was just canceled, @@ -202,6 +245,7 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex // // We expect this race condition to be infrequent. In this case we simply skip the cache and // pass through the execution to the underlying postingsForMatchers(). + c.metrics.skipsBecauseCanceled.Inc() span.AddEvent("looked up in-flight postingsForMatchers promise, but the promise was just canceled due to a race condition: skipping the cache", trace.WithAttributes( attribute.String("cache_key", key), )) @@ -211,6 +255,7 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex } } + c.metrics.hits.Inc() span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes( attribute.String("cache_key", key), )) @@ -218,6 +263,7 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex return oldPromise.result } + c.metrics.misses.Inc() span.AddEvent("no postingsForMatchers promise in cache, executing query", trace.WithAttributes(attribute.String("cache_key", key))) // promise was stored, close its channel after fulfilment @@ -235,13 +281,16 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex promise.cloner = index.NewPostingsCloner(postings) } + // Keep track of when the evaluation completed. + promise.evaluationCompletedAt = c.timeNow() + // The execution terminated (or has been canceled). We have to close the tracker to release resources. // It's important to close it before computing the promise size, so that the actual size is smaller. promise.callersCtxTracker.close() sizeBytes := int64(len(key) + size.Of(promise)) - c.onPromiseExecutionDone(ctx, key, c.timeNow(), sizeBytes, promise.err) + c.onPromiseExecutionDone(ctx, key, promise.evaluationCompletedAt, sizeBytes, promise.err) return promise.result } @@ -273,6 +322,11 @@ func (c *PostingsForMatchersCache) expire() { } c.cachedMtx.RUnlock() + // Call the registered hook, if any. It's used only for testing purposes. + if c.evictHeadBeforeHook != nil { + c.evictHeadBeforeHook() + } + c.cachedMtx.Lock() defer c.cachedMtx.Unlock() @@ -309,7 +363,7 @@ func (c *PostingsForMatchersCache) evictHead() { // onPromiseExecutionDone must be called once the execution of PostingsForMatchers promise has done. // The input err contains details about any error that could have occurred when executing it. // The input ts is the function call time. -func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, key string, ts time.Time, sizeBytes int64, err error) { +func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, key string, completedAt time.Time, sizeBytes int64, err error) { span := trace.SpanFromContext(ctx) // Call the registered hook, if any. It's used only for testing purposes. @@ -339,7 +393,7 @@ func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, k c.cached.PushBack(&postingsForMatchersCachedCall{ key: key, - ts: ts, + ts: completedAt, sizeBytes: sizeBytes, }) c.cachedBytes += sizeBytes @@ -349,7 +403,7 @@ func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, k } span.AddEvent("added cached value to expiry queue", trace.WithAttributes( - attribute.Stringer("timestamp", ts), + attribute.Stringer("evaluation completed at", completedAt), attribute.Int64("size in bytes", sizeBytes), attribute.Int64("cached bytes", lastCachedBytes), )) @@ -499,3 +553,49 @@ func (t *contextsTracker) trackedContextsCount() int { return t.trackedCount } + +type PostingsForMatchersCacheMetrics struct { + requests prometheus.Counter + hits prometheus.Counter + misses prometheus.Counter + skipsBecauseIneligible prometheus.Counter + skipsBecauseStale prometheus.Counter + skipsBecauseCanceled prometheus.Counter +} + +func NewPostingsForMatchersCacheMetrics(reg prometheus.Registerer) *PostingsForMatchersCacheMetrics { + const ( + skipsMetric = "postings_for_matchers_cache_skips_total" + skipsHelp = "Total number of requests to the PostingsForMatchers cache that have been skipped the cache. The subsequent result is not cached." + ) + + return &PostingsForMatchersCacheMetrics{ + requests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "postings_for_matchers_cache_requests_total", + Help: "Total number of requests to the PostingsForMatchers cache.", + }), + hits: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "postings_for_matchers_cache_hits_total", + Help: "Total number of postings lists returned from the PostingsForMatchers cache.", + }), + misses: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "postings_for_matchers_cache_misses_total", + Help: "Total number of requests to the PostingsForMatchers cache for which there is no valid cached entry. The subsequent result is cached.", + }), + skipsBecauseIneligible: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: skipsMetric, + Help: skipsHelp, + ConstLabels: map[string]string{"reason": "ineligible"}, + }), + skipsBecauseStale: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: skipsMetric, + Help: skipsHelp, + ConstLabels: map[string]string{"reason": "stale-cached-entry"}, + }), + skipsBecauseCanceled: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: skipsMetric, + Help: skipsHelp, + ConstLabels: map[string]string{"reason": "canceled-cached-entry"}, + }), + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 571b0fd64c1..45f5071df21 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1046,7 +1046,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20250116135451-914982745659 +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20250123075837-0cc2978b5013 ## explicit; go 1.22.7 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1710,7 +1710,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250116135451-914982745659 +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250123075837-0cc2978b5013 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240531075221-3685f1377d7b