Skip to content

Commit

Permalink
Rebase on main and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Sep 27, 2024
1 parent 2dbf562 commit 788216a
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 181 deletions.
7 changes: 4 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storage/ingest"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/costattribution"
"github.com/grafana/mimir/pkg/util/globalerror"
mimir_limiter "github.com/grafana/mimir/pkg/util/limiter"
util_math "github.com/grafana/mimir/pkg/util/math"
Expand Down Expand Up @@ -105,7 +106,7 @@ type Distributor struct {
distributorsLifecycler *ring.BasicLifecycler
distributorsRing *ring.Ring
healthyInstancesCount *atomic.Uint32
costAttributionSvc *util.CostAttributionCleanupService
costAttributionSvc *costattribution.CostAttributionCleanupService
// For handling HA replicas.
HATracker *haTracker

Expand Down Expand Up @@ -306,7 +307,7 @@ func (m *PushMetrics) deleteUserMetrics(user string) {
}

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *costattribution.CostAttributionCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
clientMetrics := ingester_client.NewMetrics(reg)
if cfg.IngesterClientFactory == nil {
cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) {
Expand Down Expand Up @@ -1683,7 +1684,7 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
if costAttributionLabel != "" {
attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), now, costAttributionSize)
attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), now)
costAttribution[attribution]++
}
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/ingester/activeseries/active_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ func TestIsLabelValueActive(t *testing.T) {
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

memPostings := index.NewMemPostings()
for i, l := range series {
Expand Down
51 changes: 5 additions & 46 deletions pkg/ingester/activeseries/active_native_histogram_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD

activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -71,15 +62,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -123,15 +106,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -171,15 +146,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -216,15 +183,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
18 changes: 3 additions & 15 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ func TestPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -62,11 +58,7 @@ func TestPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -98,11 +90,7 @@ func TestPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
71 changes: 35 additions & 36 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
"sync"
"time"

"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/costattribution"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/util/zeropool"
"go.uber.org/atomic"

asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model"
)

const (
Expand Down Expand Up @@ -45,31 +47,30 @@ type ActiveSeries struct {

// matchersMutex protects matchers and lastMatchersUpdate.
matchersMutex sync.RWMutex
matchers *Matchers
matchers *asmodel.Matchers
lastMatchersUpdate time.Time

costAttributionLabel string
costAttributionSvc *util.CostAttributionCleanupService
maxCostAttributionPerUser int
costAttributionLabel string
costAttributionSvc *costattribution.CostAttributionCleanupService

// The duration after which series become inactive.
// Also used to determine if enough time has passed since configuration reload for valid results.
timeout time.Duration
userID string
timeout time.Duration
userID string
maxCostAttributionPerUser int
}

// seriesStripe holds a subset of the series timestamps for a single tenant.
type seriesStripe struct {
matchers *Matchers
matchers *asmodel.Matchers

deleted *deletedSeries

// Unix nanoseconds. Only used by purge. Zero = unknown.
// Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated
// without holding the lock -- hence the atomic).
oldestEntryTs atomic.Int64
costAttributionSvc *util.CostAttributionCleanupService
maxCostAttributionPerUser int
costAttributionSvc *costattribution.CostAttributionCleanupService
mu sync.RWMutex
refs map[storage.SeriesRef]seriesEntry
active uint32 // Number of active entries in this stripe. Only decreased during purge or clear.
Expand All @@ -87,20 +88,20 @@ type seriesStripe struct {

// seriesEntry holds a timestamp for single series.
type seriesEntry struct {
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
matches preAllocDynamicSlice // Index of the matcher matching
numNativeHistogramBuckets int // Number of buckets in native histogram series, -1 if not a native histogram.
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
matches asmodel.PreAllocDynamicSlice // Index of the matcher matching
numNativeHistogramBuckets int // Number of buckets in native histogram series, -1 if not a native histogram.
// keep the value corresponding the label configured in serieStripe
deleted bool // This series was marked as deleted, so before purging we need to remove the refence to it from the deletedSeries.
attributionValue string
}

func NewActiveSeries(
asm *Matchers,
asm *asmodel.Matchers,
timeout time.Duration,
userID string,
costAttributionLabel string,
costAttributionSvc *util.CostAttributionCleanupService,
costAttributionSvc *costattribution.CostAttributionCleanupService,
maxCostAttributionPerUser int,
) *ActiveSeries {
c := &ActiveSeries{
Expand All @@ -112,7 +113,7 @@ func NewActiveSeries(

// Stripes are pre-allocated so that we only read on them and no lock is required.
for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvc, maxCostAttributionPerUser)
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvc)
}

return c
Expand All @@ -124,18 +125,18 @@ func (c *ActiveSeries) CurrentMatcherNames() []string {
return c.matchers.MatcherNames()
}

func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) {
func (c *ActiveSeries) ReloadMatchers(asm *asmodel.Matchers, now time.Time) {
c.matchersMutex.Lock()
defer c.matchersMutex.Unlock()

for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionLabel, c.costAttributionSvc, c.maxCostAttributionPerUser)
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionLabel, c.costAttributionSvc)
}
c.matchers = asm
c.lastMatchersUpdate = now
}

func (c *ActiveSeries) CurrentConfig() CustomTrackersConfig {
func (c *ActiveSeries) CurrentConfig() asmodel.CustomTrackersConfig {
c.matchersMutex.RLock()
defer c.matchersMutex.RUnlock()
return c.matchers.Config()
Expand Down Expand Up @@ -372,21 +373,21 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
entry, ok := s.refs[ref]
if ok {
if entry.numNativeHistogramBuckets != numNativeHistogramBuckets {
matches := s.matchers.matches(series)
matchesLen := matches.len()
matches := s.matchers.Matches(series)
matchesLen := matches.Len()
if numNativeHistogramBuckets >= 0 && entry.numNativeHistogramBuckets >= 0 {
// change number of buckets but still a histogram
diff := numNativeHistogramBuckets - entry.numNativeHistogramBuckets
s.activeNativeHistogramBuckets = uint32(int(s.activeNativeHistogramBuckets) + diff)
for i := 0; i < matchesLen; i++ {
s.activeMatchingNativeHistogramBuckets[matches.get(i)] = uint32(int(s.activeMatchingNativeHistogramBuckets[matches.get(i)]) + diff)
s.activeMatchingNativeHistogramBuckets[matches.Get(i)] = uint32(int(s.activeMatchingNativeHistogramBuckets[matches.Get(i)]) + diff)
}
} else if numNativeHistogramBuckets >= 0 {
// change from float to histogram
s.activeNativeHistograms++
s.activeNativeHistogramBuckets += uint32(numNativeHistogramBuckets)
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
match := matches.Get(i)
s.activeMatchingNativeHistograms[match]++
s.activeMatchingNativeHistogramBuckets[match] += uint32(numNativeHistogramBuckets)
}
Expand All @@ -395,7 +396,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
s.activeNativeHistograms--
s.activeNativeHistogramBuckets -= uint32(entry.numNativeHistogramBuckets)
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
match := matches.Get(i)
s.activeMatchingNativeHistograms[match]--
s.activeMatchingNativeHistogramBuckets[match] -= uint32(entry.numNativeHistogramBuckets)
}
Expand All @@ -406,8 +407,8 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
return entry.nanos, false
}

matches := s.matchers.matches(series)
matchesLen := matches.len()
matches := s.matchers.Matches(series)
matchesLen := matches.Len()

s.active++

Expand All @@ -416,7 +417,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
s.activeNativeHistogramBuckets += uint32(numNativeHistogramBuckets)
}
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
match := matches.Get(i)
s.activeMatching[match]++
if numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]++
Expand All @@ -433,7 +434,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
// here if we have a cost attribution label, we can split the serie count based on the value of the label
// we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly
if s.costAttributionLabel != "" {
attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Unix(0, nowNanos), s.maxCostAttributionPerUser)
attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Unix(0, nowNanos))
s.costAttributionValues[attributionValue]++
e.attributionValue = attributionValue
}
Expand Down Expand Up @@ -462,12 +463,11 @@ func (s *seriesStripe) clear() {

// Reinitialize assigns new matchers and corresponding size activeMatching slices.
func (s *seriesStripe) reinitialize(
asm *Matchers,
asm *asmodel.Matchers,
deleted *deletedSeries,
userID string,
costAttributionLabel string,
costAttributionSvc *util.CostAttributionCleanupService,
maxCostAttributionPerUser int,
costAttributionSvc *costattribution.CostAttributionCleanupService,
) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -478,7 +478,6 @@ func (s *seriesStripe) reinitialize(
s.costAttributionValues = map[string]uint32{}
s.activeNativeHistograms = 0
s.activeNativeHistogramBuckets = 0
s.maxCostAttributionPerUser = maxCostAttributionPerUser
s.matchers = asm
s.userID = userID
s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching)
Expand Down Expand Up @@ -528,9 +527,9 @@ func (s *seriesStripe) purge(keepUntil time.Time) {
if entry.attributionValue != "" {
s.costAttributionValues[entry.attributionValue]++
}
ml := entry.matches.len()
ml := entry.matches.Len()
for i := 0; i < ml; i++ {
match := entry.matches.get(i)
match := entry.matches.Get(i)
s.activeMatching[match]++
if entry.numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]++
Expand Down Expand Up @@ -573,9 +572,9 @@ func (s *seriesStripe) remove(ref storage.SeriesRef) {
s.activeNativeHistograms--
s.activeNativeHistogramBuckets -= uint32(entry.numNativeHistogramBuckets)
}
ml := entry.matches.len()
ml := entry.matches.Len()
for i := 0; i < ml; i++ {
match := entry.matches.get(i)
match := entry.matches.Get(i)
s.activeMatching[match]--
if entry.numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]--
Expand Down
Loading

0 comments on commit 788216a

Please sign in to comment.