Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Sep 25, 2024
1 parent 3c422a8 commit 8ae4571
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 52 deletions.
45 changes: 22 additions & 23 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,19 @@ const (
type Distributor struct {
services.Service

cfg Config
log log.Logger
ingestersRing ring.ReadRing
ingesterPool *ring_client.Pool
limits *validation.Overrides
cfg Config
log log.Logger
ingestersRing ring.ReadRing
ingesterPool *ring_client.Pool
limits *validation.Overrides
maxCostAttributionPerUser int

// The global rate limiter requires a distributors ring to count
// the number of healthy instances
distributorsLifecycler *ring.BasicLifecycler
distributorsRing *ring.Ring
healthyInstancesCount *atomic.Uint32
costAttributionsvr *util.CostAttributionCleanupService
costAttributionSvr *util.CostAttributionCleanupService
// For handling HA replicas.
HATracker *haTracker

Expand Down Expand Up @@ -307,10 +308,7 @@ func (m *PushMetrics) deleteUserMetrics(user string) {
}

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides,
activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService,
ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing,
canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger, maxCostAttributionPerUser int) (*Distributor, error) {
clientMetrics := ingester_client.NewMetrics(reg)
if cfg.IngesterClientFactory == nil {
cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) {
Expand All @@ -336,17 +334,18 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
}

d := &Distributor{
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
RequestBufferPool: requestBufferPool,
partitionsRing: partitionsRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
HATracker: haTracker,
costAttributionsvr: costAttributionClenaupService,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
RequestBufferPool: requestBufferPool,
partitionsRing: partitionsRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
HATracker: haTracker,
costAttributionSvr: costAttributionClenaupService,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
maxCostAttributionPerUser: maxCostAttributionPerUser,

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_distributor_query_duration_seconds",
Expand Down Expand Up @@ -1657,12 +1656,12 @@ func tokenForMetadata(userID string, metricName string) uint32 {

func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string) {
var receivedSamples, receivedExemplars, receivedMetadata int
costAttribution := make(map[string]int)
costAttribution := make(map[string]int, d.maxCostAttributionPerUser)
for _, ts := range req.Timeseries {
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
if costAttributionLabel != "" {
attribution := d.costAttributionsvr.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now())
attribution := d.costAttributionSvr.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now())
costAttribution[attribution]++
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2026,7 +2026,7 @@ func BenchmarkDistributor_Push(b *testing.B) {
require.NoError(b, err)

// Start the distributor.
distributor, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger())
distributor, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger(), 100)
require.NoError(b, err)
require.NoError(b, services.StartAndAwaitRunning(context.Background(), distributor))

Expand Down Expand Up @@ -5314,7 +5314,7 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
d, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, partitionsRing, true, reg, log.NewNopLogger())
d, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, partitionsRing, true, reg, log.NewNopLogger(), 100)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, d))
t.Cleanup(func() {
Expand Down Expand Up @@ -7947,7 +7947,7 @@ func TestCheckStartedMiddleware(t *testing.T) {
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

distributor, err := New(distributorConfig, clientConfig, overrides, nil, ingestersRing, nil, true, nil, log.NewNopLogger())
distributor, err := New(distributorConfig, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger(), 100)
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), "user")
Expand Down
22 changes: 11 additions & 11 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type ActiveSeries struct {
lastMatchersUpdate time.Time

CostAttributionLabel string
costAttributionsvr *util.CostAttributionCleanupService
costAttributionSvr *util.CostAttributionCleanupService

// The duration after which series become inactive.
// Also used to determine if enough time has passed since configuration reload for valid results.
Expand All @@ -67,7 +67,7 @@ type seriesStripe struct {
// Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated
// without holding the lock -- hence the atomic).
oldestEntryTs atomic.Int64
costAttributionsvr *util.CostAttributionCleanupService
costAttributionSvr *util.CostAttributionCleanupService
mu sync.RWMutex
refs map[storage.SeriesRef]seriesEntry
active uint32 // Number of active entries in this stripe. Only decreased during purge or clear.
Expand Down Expand Up @@ -98,17 +98,17 @@ func NewActiveSeries(
timeout time.Duration,
userID string,
costAttributionLabel string,
costAttributionsvr *util.CostAttributionCleanupService,
costAttributionSvr *util.CostAttributionCleanupService,
) *ActiveSeries {
c := &ActiveSeries{
matchers: asm, timeout: timeout, userID: userID,
CostAttributionLabel: costAttributionLabel,
costAttributionsvr: costAttributionsvr,
costAttributionSvr: costAttributionSvr,
}

// Stripes are pre-allocated so that we only read on them and no lock is required.
for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionsvr)
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvr)
}

return c
Expand All @@ -125,7 +125,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) {
defer c.matchersMutex.Unlock()

for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel, c.costAttributionsvr)
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel, c.costAttributionSvr)
}
c.matchers = asm
c.lastMatchersUpdate = now
Expand Down Expand Up @@ -232,13 +232,13 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot
}

func (c *ActiveSeries) ActiveByAttributionValue() map[string]uint32 {
total := map[string]uint32{}
total := make(map[string]uint32)
for s := 0; s < numStripes; s++ {
c.stripes[s].mu.RLock()
defer c.stripes[s].mu.RUnlock()
for k, v := range c.stripes[s].costAttributionValues {
total[k] += v
}
c.stripes[s].mu.RUnlock()
}
return total
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
// here if we have a cost attribution label, we can split the serie count based on the value of the label
// we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly
if s.costAttributionLabel != "" {
attributionValue := s.costAttributionsvr.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Now())
attributionValue := s.costAttributionSvr.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Now())
s.costAttributionValues[attributionValue]++
e.attributionValue = attributionValue
}
Expand Down Expand Up @@ -462,7 +462,7 @@ func (s *seriesStripe) reinitialize(
deleted *deletedSeries,
userID string,
costAttributionLabel string,
costAttributionsvr *util.CostAttributionCleanupService,
costAttributionSvr *util.CostAttributionCleanupService,
) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -479,7 +479,7 @@ func (s *seriesStripe) reinitialize(
s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms)
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets)
s.costAttributionLabel = costAttributionLabel
s.costAttributionsvr = costAttributionsvr
s.costAttributionSvr = costAttributionSvr
}

func (s *seriesStripe) purge(keepUntil time.Time) {
Expand Down
11 changes: 4 additions & 7 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ type Ingester struct {
// Value used by shipper as external label.
shipperIngesterID string

maxCostAttributionPerUser int
// Metrics shared across all per-tenant shippers.
shipperMetrics *shipperMetrics

Expand Down Expand Up @@ -382,12 +383,7 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus
}

// New returns an Ingester that uses Mimir block storage.
func New(
cfg Config, limits *validation.Overrides,
ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher,
activeGroupsCleanupService *util.ActiveGroupsCleanupService,
costAttributionCleanupService *util.CostAttributionCleanupService,
registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionCleanupService *util.CostAttributionCleanupService, registerer prometheus.Registerer, logger log.Logger, maxCostAttributionPerUser int) (*Ingester, error) {
i, err := newIngester(cfg, limits, registerer, logger)
if err != nil {
return nil, err
Expand All @@ -396,6 +392,7 @@ func New(
i.metrics = newIngesterMetrics(registerer, cfg.ActiveSeriesMetrics.Enabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, &i.inflightPushRequestsBytes)
i.activeGroups = activeGroupsCleanupService
i.costAttribution = costAttributionCleanupService
i.maxCostAttributionPerUser = maxCostAttributionPerUser
// We create a circuit breaker, which will be activated on a successful completion of starting.
i.circuitBreaker = newIngesterCircuitBreaker(i.cfg.PushCircuitBreaker, i.cfg.ReadCircuitBreaker, logger, registerer)

Expand Down Expand Up @@ -1172,7 +1169,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
// successfully committed

stats = pushStats{
failedSamplesAttribution: make(map[string]int),
failedSamplesAttribution: make(map[string]int, i.maxCostAttributionPerUser),
}

firstPartialErr error
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func createTestIngesterWithIngestStorage(t testing.TB, ingesterCfg *Config, over
require.NoError(t, services.StopAndAwaitTerminated(ctx, prw))
})

ingester, err := New(*ingesterCfg, overrides, nil, prw, nil, nil, reg, util_test.NewTestingLogger(t))
ingester, err := New(*ingesterCfg, overrides, nil, prw, nil, nil, reg, util_test.NewTestingLogger(t), 0)
require.NoError(t, err)

return ingester, kafkaCluster, prw
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6008,7 +6008,7 @@ func prepareIngesterWithBlockStorageAndOverridesAndPartitionRing(t testing.TB, i
ingestersRing = createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig())
}

ingester, err := New(ingesterCfg, overrides, ingestersRing, partitionsRing, nil, nil, registerer, noDebugNoopLogger{}) // LOGGING: log.NewLogfmtLogger(os.Stderr)
ingester, err := New(ingesterCfg, overrides, ingestersRing, partitionsRing, nil, nil, registerer, noDebugNoopLogger{}, 0) // LOGGING: log.NewLogfmtLogger(os.Stderr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -6214,7 +6214,7 @@ func TestIngester_OpenExistingTSDBOnStartup(t *testing.T) {
// setup the tsdbs dir
testData.setup(t, tempDir)

ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, nil, log.NewNopLogger())
ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, nil, log.NewNopLogger(), 0)
require.NoError(t, err)

startErr := services.StartAndAwaitRunning(context.Background(), ingester)
Expand Down Expand Up @@ -7374,7 +7374,7 @@ func TestHeadCompactionOnStartup(t *testing.T) {
ingesterCfg.BlocksStorageConfig.Bucket.S3.Endpoint = "localhost"
ingesterCfg.BlocksStorageConfig.TSDB.Retention = 2 * 24 * time.Hour // Make sure that no newly created blocks are deleted.

ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, nil, log.NewNopLogger())
ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, nil, log.NewNopLogger(), 0)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester))

Expand Down
4 changes: 2 additions & 2 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) {

t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides,
t.ActiveGroupsCleanup, t.CostAttributionCleanup, t.IngesterRing, t.IngesterPartitionInstanceRing,
canJoinDistributorsRing, t.Registerer, util_log.Logger)
canJoinDistributorsRing, t.Registerer, util_log.Logger, t.Cfg.MaxCostAttributionPerUser)
if err != nil {
return
}
Expand Down Expand Up @@ -669,7 +669,7 @@ func (t *Mimir) initIngesterService() (serv services.Service, err error) {
t.Cfg.Ingester.IngestStorageConfig = t.Cfg.IngestStorage
t.tsdbIngesterConfig()

t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, t.IngesterRing, t.IngesterPartitionRingWatcher, t.ActiveGroupsCleanup, t.CostAttributionCleanup, t.Registerer, util_log.Logger)
t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, t.IngesterRing, t.IngesterPartitionRingWatcher, t.ActiveGroupsCleanup, t.CostAttributionCleanup, t.Registerer, util_log.Logger, t.Cfg.MaxCostAttributionPerUser)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/benchmarks/comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func createIngesterQueryable(t testing.TB, address string) storage.Queryable {
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

d, err := distributor.New(distributorCfg, clientCfg, overrides, nil, nil, ingestersRing, nil, false, nil, logger)
d, err := distributor.New(distributorCfg, clientCfg, overrides, nil, nil, ingestersRing, nil, false, nil, logger, 100)
require.NoError(t, err)

queryMetrics := stats.NewQueryMetrics(nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/benchmarks/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func startBenchmarkIngester(rootDataDir string) (*ingester.Ingester, string, fun
return services.StopAndAwaitTerminated(context.Background(), ingestersRing)
})

ing, err := ingester.New(ingesterCfg, overrides, ingestersRing, nil, nil, nil, nil, log.NewNopLogger())
ing, err := ingester.New(ingesterCfg, overrides, ingestersRing, nil, nil, nil, nil, log.NewNopLogger(), 200)
if err != nil {
cleanup()
return nil, "", nil, fmt.Errorf("could not create ingester: %w", err)
Expand Down

0 comments on commit 8ae4571

Please sign in to comment.