From 8ae4571352b29568a4dda832b35e8120d415035f Mon Sep 17 00:00:00 2001 From: Ying WANG Date: Wed, 25 Sep 2024 14:43:24 +0200 Subject: [PATCH] address comments --- pkg/distributor/distributor.go | 45 +++++++++---------- pkg/distributor/distributor_test.go | 6 +-- pkg/ingester/activeseries/active_series.go | 22 ++++----- pkg/ingester/ingester.go | 11 ++--- pkg/ingester/ingester_ingest_storage_test.go | 2 +- pkg/ingester/ingester_test.go | 6 +-- pkg/mimir/modules.go | 4 +- .../benchmarks/comparison_test.go | 2 +- pkg/streamingpromql/benchmarks/ingester.go | 2 +- 9 files changed, 48 insertions(+), 52 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 2c51bce16a1..8eb8ceb77ad 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -94,18 +94,19 @@ const ( type Distributor struct { services.Service - cfg Config - log log.Logger - ingestersRing ring.ReadRing - ingesterPool *ring_client.Pool - limits *validation.Overrides + cfg Config + log log.Logger + ingestersRing ring.ReadRing + ingesterPool *ring_client.Pool + limits *validation.Overrides + maxCostAttributionPerUser int // The global rate limiter requires a distributors ring to count // the number of healthy instances distributorsLifecycler *ring.BasicLifecycler distributorsRing *ring.Ring healthyInstancesCount *atomic.Uint32 - costAttributionsvr *util.CostAttributionCleanupService + costAttributionSvr *util.CostAttributionCleanupService // For handling HA replicas. HATracker *haTracker @@ -307,10 +308,7 @@ func (m *PushMetrics) deleteUserMetrics(user string) { } // New constructs a new Distributor -func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, - activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService, - ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, - canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { +func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger, maxCostAttributionPerUser int) (*Distributor, error) { clientMetrics := ingester_client.NewMetrics(reg) if cfg.IngesterClientFactory == nil { cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) { @@ -336,17 +334,18 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove } d := &Distributor{ - cfg: cfg, - log: log, - ingestersRing: ingestersRing, - RequestBufferPool: requestBufferPool, - partitionsRing: partitionsRing, - ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log), - healthyInstancesCount: atomic.NewUint32(0), - limits: limits, - HATracker: haTracker, - costAttributionsvr: costAttributionClenaupService, - ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), + cfg: cfg, + log: log, + ingestersRing: ingestersRing, + RequestBufferPool: requestBufferPool, + partitionsRing: partitionsRing, + ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log), + healthyInstancesCount: atomic.NewUint32(0), + limits: limits, + HATracker: haTracker, + costAttributionSvr: costAttributionClenaupService, + ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), + maxCostAttributionPerUser: maxCostAttributionPerUser, queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_distributor_query_duration_seconds", @@ -1657,12 +1656,12 @@ func tokenForMetadata(userID string, metricName string) uint32 { func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string) { var receivedSamples, receivedExemplars, receivedMetadata int - costAttribution := make(map[string]int) + costAttribution := make(map[string]int, d.maxCostAttributionPerUser) for _, ts := range req.Timeseries { receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) receivedExemplars += len(ts.TimeSeries.Exemplars) if costAttributionLabel != "" { - attribution := d.costAttributionsvr.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now()) + attribution := d.costAttributionSvr.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now()) costAttribution[attribution]++ } } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index ec0a129fa63..5a19bf7b113 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2026,7 +2026,7 @@ func BenchmarkDistributor_Push(b *testing.B) { require.NoError(b, err) // Start the distributor. - distributor, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger()) + distributor, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger(), 100) require.NoError(b, err) require.NoError(b, services.StartAndAwaitRunning(context.Background(), distributor)) @@ -5314,7 +5314,7 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []* require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - d, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, partitionsRing, true, reg, log.NewNopLogger()) + d, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, partitionsRing, true, reg, log.NewNopLogger(), 100) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, d)) t.Cleanup(func() { @@ -7947,7 +7947,7 @@ func TestCheckStartedMiddleware(t *testing.T) { overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) - distributor, err := New(distributorConfig, clientConfig, overrides, nil, ingestersRing, nil, true, nil, log.NewNopLogger()) + distributor, err := New(distributorConfig, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger(), 100) require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), "user") diff --git a/pkg/ingester/activeseries/active_series.go b/pkg/ingester/activeseries/active_series.go index 4882f02e69c..ef065e0bf1e 100644 --- a/pkg/ingester/activeseries/active_series.go +++ b/pkg/ingester/activeseries/active_series.go @@ -49,7 +49,7 @@ type ActiveSeries struct { lastMatchersUpdate time.Time CostAttributionLabel string - costAttributionsvr *util.CostAttributionCleanupService + costAttributionSvr *util.CostAttributionCleanupService // The duration after which series become inactive. // Also used to determine if enough time has passed since configuration reload for valid results. @@ -67,7 +67,7 @@ type seriesStripe struct { // Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated // without holding the lock -- hence the atomic). oldestEntryTs atomic.Int64 - costAttributionsvr *util.CostAttributionCleanupService + costAttributionSvr *util.CostAttributionCleanupService mu sync.RWMutex refs map[storage.SeriesRef]seriesEntry active uint32 // Number of active entries in this stripe. Only decreased during purge or clear. @@ -98,17 +98,17 @@ func NewActiveSeries( timeout time.Duration, userID string, costAttributionLabel string, - costAttributionsvr *util.CostAttributionCleanupService, + costAttributionSvr *util.CostAttributionCleanupService, ) *ActiveSeries { c := &ActiveSeries{ matchers: asm, timeout: timeout, userID: userID, CostAttributionLabel: costAttributionLabel, - costAttributionsvr: costAttributionsvr, + costAttributionSvr: costAttributionSvr, } // Stripes are pre-allocated so that we only read on them and no lock is required. for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionsvr) + c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvr) } return c @@ -125,7 +125,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) { defer c.matchersMutex.Unlock() for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel, c.costAttributionsvr) + c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel, c.costAttributionSvr) } c.matchers = asm c.lastMatchersUpdate = now @@ -232,13 +232,13 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot } func (c *ActiveSeries) ActiveByAttributionValue() map[string]uint32 { - total := map[string]uint32{} + total := make(map[string]uint32) for s := 0; s < numStripes; s++ { c.stripes[s].mu.RLock() - defer c.stripes[s].mu.RUnlock() for k, v := range c.stripes[s].costAttributionValues { total[k] += v } + c.stripes[s].mu.RUnlock() } return total } @@ -429,7 +429,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef // here if we have a cost attribution label, we can split the serie count based on the value of the label // we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly if s.costAttributionLabel != "" { - attributionValue := s.costAttributionsvr.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Now()) + attributionValue := s.costAttributionSvr.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Now()) s.costAttributionValues[attributionValue]++ e.attributionValue = attributionValue } @@ -462,7 +462,7 @@ func (s *seriesStripe) reinitialize( deleted *deletedSeries, userID string, costAttributionLabel string, - costAttributionsvr *util.CostAttributionCleanupService, + costAttributionSvr *util.CostAttributionCleanupService, ) { s.mu.Lock() defer s.mu.Unlock() @@ -479,7 +479,7 @@ func (s *seriesStripe) reinitialize( s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms) s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets) s.costAttributionLabel = costAttributionLabel - s.costAttributionsvr = costAttributionsvr + s.costAttributionSvr = costAttributionSvr } func (s *seriesStripe) purge(keepUntil time.Time) { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index e780c449161..c122c8f4689 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -304,6 +304,7 @@ type Ingester struct { // Value used by shipper as external label. shipperIngesterID string + maxCostAttributionPerUser int // Metrics shared across all per-tenant shippers. shipperMetrics *shipperMetrics @@ -382,12 +383,7 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus } // New returns an Ingester that uses Mimir block storage. -func New( - cfg Config, limits *validation.Overrides, - ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, - activeGroupsCleanupService *util.ActiveGroupsCleanupService, - costAttributionCleanupService *util.CostAttributionCleanupService, - registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { +func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionCleanupService *util.CostAttributionCleanupService, registerer prometheus.Registerer, logger log.Logger, maxCostAttributionPerUser int) (*Ingester, error) { i, err := newIngester(cfg, limits, registerer, logger) if err != nil { return nil, err @@ -396,6 +392,7 @@ func New( i.metrics = newIngesterMetrics(registerer, cfg.ActiveSeriesMetrics.Enabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, &i.inflightPushRequestsBytes) i.activeGroups = activeGroupsCleanupService i.costAttribution = costAttributionCleanupService + i.maxCostAttributionPerUser = maxCostAttributionPerUser // We create a circuit breaker, which will be activated on a successful completion of starting. i.circuitBreaker = newIngesterCircuitBreaker(i.cfg.PushCircuitBreaker, i.cfg.ReadCircuitBreaker, logger, registerer) @@ -1172,7 +1169,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques // successfully committed stats = pushStats{ - failedSamplesAttribution: make(map[string]int), + failedSamplesAttribution: make(map[string]int, i.maxCostAttributionPerUser), } firstPartialErr error diff --git a/pkg/ingester/ingester_ingest_storage_test.go b/pkg/ingester/ingester_ingest_storage_test.go index 0b566b03c0b..76c3f537cad 100644 --- a/pkg/ingester/ingester_ingest_storage_test.go +++ b/pkg/ingester/ingester_ingest_storage_test.go @@ -650,7 +650,7 @@ func createTestIngesterWithIngestStorage(t testing.TB, ingesterCfg *Config, over require.NoError(t, services.StopAndAwaitTerminated(ctx, prw)) }) - ingester, err := New(*ingesterCfg, overrides, nil, prw, nil, nil, reg, util_test.NewTestingLogger(t)) + ingester, err := New(*ingesterCfg, overrides, nil, prw, nil, nil, reg, util_test.NewTestingLogger(t), 0) require.NoError(t, err) return ingester, kafkaCluster, prw diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 9aab9119adf..2ed1d1bed03 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -6008,7 +6008,7 @@ func prepareIngesterWithBlockStorageAndOverridesAndPartitionRing(t testing.TB, i ingestersRing = createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()) } - ingester, err := New(ingesterCfg, overrides, ingestersRing, partitionsRing, nil, nil, registerer, noDebugNoopLogger{}) // LOGGING: log.NewLogfmtLogger(os.Stderr) + ingester, err := New(ingesterCfg, overrides, ingestersRing, partitionsRing, nil, nil, registerer, noDebugNoopLogger{}, 0) // LOGGING: log.NewLogfmtLogger(os.Stderr) if err != nil { return nil, err } @@ -6214,7 +6214,7 @@ func TestIngester_OpenExistingTSDBOnStartup(t *testing.T) { // setup the tsdbs dir testData.setup(t, tempDir) - ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, nil, log.NewNopLogger()) + ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, nil, log.NewNopLogger(), 0) require.NoError(t, err) startErr := services.StartAndAwaitRunning(context.Background(), ingester) @@ -7374,7 +7374,7 @@ func TestHeadCompactionOnStartup(t *testing.T) { ingesterCfg.BlocksStorageConfig.Bucket.S3.Endpoint = "localhost" ingesterCfg.BlocksStorageConfig.TSDB.Retention = 2 * 24 * time.Hour // Make sure that no newly created blocks are deleted. - ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, nil, log.NewNopLogger()) + ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, nil, log.NewNopLogger(), 0) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester)) diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index cdd568669ed..cdd25e2c513 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -467,7 +467,7 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) { t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.ActiveGroupsCleanup, t.CostAttributionCleanup, t.IngesterRing, t.IngesterPartitionInstanceRing, - canJoinDistributorsRing, t.Registerer, util_log.Logger) + canJoinDistributorsRing, t.Registerer, util_log.Logger, t.Cfg.MaxCostAttributionPerUser) if err != nil { return } @@ -669,7 +669,7 @@ func (t *Mimir) initIngesterService() (serv services.Service, err error) { t.Cfg.Ingester.IngestStorageConfig = t.Cfg.IngestStorage t.tsdbIngesterConfig() - t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, t.IngesterRing, t.IngesterPartitionRingWatcher, t.ActiveGroupsCleanup, t.CostAttributionCleanup, t.Registerer, util_log.Logger) + t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, t.IngesterRing, t.IngesterPartitionRingWatcher, t.ActiveGroupsCleanup, t.CostAttributionCleanup, t.Registerer, util_log.Logger, t.Cfg.MaxCostAttributionPerUser) if err != nil { return } diff --git a/pkg/streamingpromql/benchmarks/comparison_test.go b/pkg/streamingpromql/benchmarks/comparison_test.go index 1ec18d108d3..60e03818b81 100644 --- a/pkg/streamingpromql/benchmarks/comparison_test.go +++ b/pkg/streamingpromql/benchmarks/comparison_test.go @@ -237,7 +237,7 @@ func createIngesterQueryable(t testing.TB, address string) storage.Queryable { overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) - d, err := distributor.New(distributorCfg, clientCfg, overrides, nil, nil, ingestersRing, nil, false, nil, logger) + d, err := distributor.New(distributorCfg, clientCfg, overrides, nil, nil, ingestersRing, nil, false, nil, logger, 100) require.NoError(t, err) queryMetrics := stats.NewQueryMetrics(nil) diff --git a/pkg/streamingpromql/benchmarks/ingester.go b/pkg/streamingpromql/benchmarks/ingester.go index 9107b66f64f..2eb16091330 100644 --- a/pkg/streamingpromql/benchmarks/ingester.go +++ b/pkg/streamingpromql/benchmarks/ingester.go @@ -96,7 +96,7 @@ func startBenchmarkIngester(rootDataDir string) (*ingester.Ingester, string, fun return services.StopAndAwaitTerminated(context.Background(), ingestersRing) }) - ing, err := ingester.New(ingesterCfg, overrides, ingestersRing, nil, nil, nil, nil, log.NewNopLogger()) + ing, err := ingester.New(ingesterCfg, overrides, ingestersRing, nil, nil, nil, nil, log.NewNopLogger(), 200) if err != nil { cleanup() return nil, "", nil, fmt.Errorf("could not create ingester: %w", err)