Skip to content

Commit

Permalink
Chore: split out models from activeseries package (#9435)
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne authored Sep 27, 2024
1 parent 9e4c03c commit 5162b25
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 92 deletions.
4 changes: 3 additions & 1 deletion pkg/ingester/activeseries/active_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/require"

asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model"
)

type mockPostingsReader struct {
Expand Down Expand Up @@ -39,7 +41,7 @@ func TestIsLabelValueActive(t *testing.T) {
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl))

memPostings := index.NewMemPostings()
for i, l := range series {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/require"

asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model"
)

func TestNativeHistogramPostings_Expand(t *testing.T) {
Expand All @@ -24,7 +26,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl))

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -60,7 +62,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl))

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -104,8 +106,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl))
// Update each series at a different time according to its index.
for i := range allStorageRefs {
buckets := i * 10
Expand Down Expand Up @@ -144,8 +145,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl))
// Update each series at a different time according to its index.
for i := range allStorageRefs {
buckets := i * 10
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl))

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
11 changes: 5 additions & 6 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/require"

asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model"
)

func TestPostings_Expand(t *testing.T) {
Expand All @@ -24,8 +26,7 @@ func TestPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl))
// Update each series at a different time according to its index.
for i := range allStorageRefs {
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), -1)
Expand Down Expand Up @@ -56,8 +57,7 @@ func TestPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl))
// Update each series at a different time according to its index.
for i := range allStorageRefs {
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), -1)
Expand Down Expand Up @@ -88,8 +88,7 @@ func TestPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl))
// Update each series at a different time according to its index.
for i := range allStorageRefs {
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), -1)
Expand Down
44 changes: 23 additions & 21 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/util/zeropool"
"go.uber.org/atomic"

asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model"
)

const (
Expand Down Expand Up @@ -44,7 +46,7 @@ type ActiveSeries struct {

// matchersMutex protects matchers and lastMatchersUpdate.
matchersMutex sync.RWMutex
matchers *Matchers
matchers *asmodel.Matchers
lastMatchersUpdate time.Time

// The duration after which series become inactive.
Expand All @@ -54,7 +56,7 @@ type ActiveSeries struct {

// seriesStripe holds a subset of the series timestamps for a single tenant.
type seriesStripe struct {
matchers *Matchers
matchers *asmodel.Matchers

deleted *deletedSeries

Expand All @@ -75,14 +77,14 @@ type seriesStripe struct {

// seriesEntry holds a timestamp for single series.
type seriesEntry struct {
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
matches preAllocDynamicSlice // Index of the matcher matching
numNativeHistogramBuckets int // Number of buckets in native histogram series, -1 if not a native histogram.
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
matches asmodel.PreAllocDynamicSlice // Index of the matcher matching
numNativeHistogramBuckets int // Number of buckets in native histogram series, -1 if not a native histogram.

deleted bool // This series was marked as deleted, so before purging we need to remove the refence to it from the deletedSeries.
}

func NewActiveSeries(asm *Matchers, timeout time.Duration) *ActiveSeries {
func NewActiveSeries(asm *asmodel.Matchers, timeout time.Duration) *ActiveSeries {
c := &ActiveSeries{matchers: asm, timeout: timeout}

// Stripes are pre-allocated so that we only read on them and no lock is required.
Expand All @@ -99,7 +101,7 @@ func (c *ActiveSeries) CurrentMatcherNames() []string {
return c.matchers.MatcherNames()
}

func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) {
func (c *ActiveSeries) ReloadMatchers(asm *asmodel.Matchers, now time.Time) {
c.matchersMutex.Lock()
defer c.matchersMutex.Unlock()

Expand All @@ -110,7 +112,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) {
c.lastMatchersUpdate = now
}

func (c *ActiveSeries) CurrentConfig() CustomTrackersConfig {
func (c *ActiveSeries) CurrentConfig() asmodel.CustomTrackersConfig {
c.matchersMutex.RLock()
defer c.matchersMutex.RUnlock()
return c.matchers.Config()
Expand Down Expand Up @@ -335,21 +337,21 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
entry, ok := s.refs[ref]
if ok {
if entry.numNativeHistogramBuckets != numNativeHistogramBuckets {
matches := s.matchers.matches(series)
matchesLen := matches.len()
matches := s.matchers.Matches(series)
matchesLen := matches.Len()
if numNativeHistogramBuckets >= 0 && entry.numNativeHistogramBuckets >= 0 {
// change number of buckets but still a histogram
diff := numNativeHistogramBuckets - entry.numNativeHistogramBuckets
s.activeNativeHistogramBuckets = uint32(int(s.activeNativeHistogramBuckets) + diff)
for i := 0; i < matchesLen; i++ {
s.activeMatchingNativeHistogramBuckets[matches.get(i)] = uint32(int(s.activeMatchingNativeHistogramBuckets[matches.get(i)]) + diff)
s.activeMatchingNativeHistogramBuckets[matches.Get(i)] = uint32(int(s.activeMatchingNativeHistogramBuckets[matches.Get(i)]) + diff)
}
} else if numNativeHistogramBuckets >= 0 {
// change from float to histogram
s.activeNativeHistograms++
s.activeNativeHistogramBuckets += uint32(numNativeHistogramBuckets)
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
match := matches.Get(i)
s.activeMatchingNativeHistograms[match]++
s.activeMatchingNativeHistogramBuckets[match] += uint32(numNativeHistogramBuckets)
}
Expand All @@ -358,7 +360,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
s.activeNativeHistograms--
s.activeNativeHistogramBuckets -= uint32(entry.numNativeHistogramBuckets)
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
match := matches.Get(i)
s.activeMatchingNativeHistograms[match]--
s.activeMatchingNativeHistogramBuckets[match] -= uint32(entry.numNativeHistogramBuckets)
}
Expand All @@ -369,16 +371,16 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
return entry.nanos, false
}

matches := s.matchers.matches(series)
matchesLen := matches.len()
matches := s.matchers.Matches(series)
matchesLen := matches.Len()

s.active++
if numNativeHistogramBuckets >= 0 {
s.activeNativeHistograms++
s.activeNativeHistogramBuckets += uint32(numNativeHistogramBuckets)
}
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
match := matches.Get(i)
s.activeMatching[match]++
if numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]++
Expand Down Expand Up @@ -413,7 +415,7 @@ func (s *seriesStripe) clear() {
}

// Reinitialize assigns new matchers and corresponding size activeMatching slices.
func (s *seriesStripe) reinitialize(asm *Matchers, deleted *deletedSeries) {
func (s *seriesStripe) reinitialize(asm *asmodel.Matchers, deleted *deletedSeries) {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -462,9 +464,9 @@ func (s *seriesStripe) purge(keepUntil time.Time) {
s.activeNativeHistograms++
s.activeNativeHistogramBuckets += uint32(entry.numNativeHistogramBuckets)
}
ml := entry.matches.len()
ml := entry.matches.Len()
for i := 0; i < ml; i++ {
match := entry.matches.get(i)
match := entry.matches.Get(i)
s.activeMatching[match]++
if entry.numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]++
Expand Down Expand Up @@ -504,9 +506,9 @@ func (s *seriesStripe) remove(ref storage.SeriesRef) {
s.activeNativeHistograms--
s.activeNativeHistogramBuckets -= uint32(entry.numNativeHistogramBuckets)
}
ml := entry.matches.len()
ml := entry.matches.Len()
for i := 0; i < ml; i++ {
match := entry.matches.get(i)
match := entry.matches.Get(i)
s.activeMatching[match]--
if entry.numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]--
Expand Down
Loading

0 comments on commit 5162b25

Please sign in to comment.