Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init histogram counters with 0 #4140

Merged
merged 4 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
* [BUGFIX] Bring back application-json content-type header. [#4121](https://github.com/grafana/tempo/pull/4121) (@javiermolinar)
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
* [BUGFIX] Pushes a 0 to classic histogram's counter when the series is new to allow Prometheus to start from a non-null value. [#4140](https://github.com/grafana/tempo/pull/4140) (@mapno)
* [CHANGE] TraceByID: don't allow concurrent_shards greater than query_shards. [#4074](https://github.com/grafana/tempo/pull/4074) (@electron0zero)
* **BREAKING CHANGE** tempo-query is no longer a jaeger instance with grpcPlugin. Its now a standalone server. Serving a grpc api for jaeger on `0.0.0.0:7777` by default. [#3840](https://github.com/grafana/tempo/issues/3840) (@frzifus)
* [CHANGE] **BREAKING CHANGE** The dynamic injection of X-Scope-OrgID header for metrics generator remote-writes is changed. If the header is aleady set in per-tenant overrides or global tempo configuration, then it is honored and not overwritten. [#4021](https://github.com/grafana/tempo/pull/4021) (@mdisibio)
Expand Down
35 changes: 31 additions & 4 deletions modules/generator/registry/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ type histogramSeries struct {
exemplars []*atomic.String
exemplarValues []*atomic.Float64
lastUpdated *atomic.Int64
// firstSeries is used to track if this series is new to the counter. This
// is used to ensure that new counters being with 0, and then are incremented
// to the desired value. This avoids Prometheus throwing away the first
// value in the series, due to the transition from null -> x.
firstSeries *atomic.Bool
}

func (hs *histogramSeries) isNew() bool {
return hs.firstSeries.Load()
}

func (hs *histogramSeries) registerSeenSeries() {
hs.firstSeries.Store(false)
}

var (
Expand Down Expand Up @@ -114,6 +127,7 @@ func (h *histogram) newSeries(labelValueCombo *LabelValueCombo, value float64, t
buckets: nil,
exemplars: nil,
lastUpdated: atomic.NewInt64(0),
firstSeries: atomic.NewBool(true),
}
for i := 0; i < len(h.buckets); i++ {
newSeries.buckets = append(newSeries.buckets, atomic.NewFloat64(0))
Expand Down Expand Up @@ -151,6 +165,8 @@ func (h *histogram) collectMetrics(appender storage.Appender, timeMs int64, exte
h.seriesMtx.Lock()
defer h.seriesMtx.Unlock()

t := timeMs

activeSeries = len(h.series) * int(h.activeSeriesPerHistogramSerie())

labelsCount := 0
Expand All @@ -171,16 +187,27 @@ func (h *histogram) collectMetrics(appender storage.Appender, timeMs int64, exte
lb.Set(name, s.labels.values[i])
}

// If we are about to call Append for the first time on a series,
// we need to first insert a 0 value to allow Prometheus to start from a non-null value.
if s.isNew() {
lb.Set(labels.MetricName, h.nameCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question, maybe it makes sense to create a new ìnit` method in the interface:

type metric interface {
	name() string
        initMetrics(appender storage.Appender,...)
	collectMetrics(appender storage.Appender, timeMs int64, externalLabels map[string]string) (activeSeries int, err error)
	removeStaleSeries(staleTimeMs int64)
}

since I see we do the same logic in the counter

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is actually of the series, not the metric. It needs passing a couple of values, the function would actually encapsulate very little. I don't think it's worth the hassle.

func (co *counterSeries) init(appender storage.Appender, lbls labels.Labels, ts int64) {
	_, _ = appender.Append(0, lbls, ts, 0)
	co.registerSeenSeries()
}

_, err = appender.Append(0, lb.Labels(), t-1, 0) // t-1 to ensure that the next value is not at the same time
if err != nil {
return
}
s.registerSeenSeries()
}

// sum
lb.Set(labels.MetricName, h.nameSum)
_, err = appender.Append(0, lb.Labels(), timeMs, s.sum.Load())
_, err = appender.Append(0, lb.Labels(), t, s.sum.Load())
if err != nil {
return
}

// count
lb.Set(labels.MetricName, h.nameCount)
_, err = appender.Append(0, lb.Labels(), timeMs, s.count.Load())
_, err = appender.Append(0, lb.Labels(), t, s.count.Load())
if err != nil {
return
}
Expand All @@ -190,7 +217,7 @@ func (h *histogram) collectMetrics(appender storage.Appender, timeMs int64, exte

for i, bucketLabel := range h.bucketLabels {
lb.Set(labels.BucketLabel, bucketLabel)
ref, err := appender.Append(0, lb.Labels(), timeMs, s.buckets[i].Load())
ref, err := appender.Append(0, lb.Labels(), t, s.buckets[i].Load())
if err != nil {
return activeSeries, err
}
Expand All @@ -203,7 +230,7 @@ func (h *histogram) collectMetrics(appender storage.Appender, timeMs int64, exte
Value: ex,
}},
Value: s.exemplarValues[i].Load(),
Ts: timeMs,
Ts: t,
})
if err != nil {
return activeSeries, err
Expand Down
20 changes: 19 additions & 1 deletion modules/generator/registry/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ func Test_histogram(t *testing.T) {
assert.Equal(t, 2, seriesAdded)

collectionTimeMs := time.Now().UnixMilli()
collectionTimeWithOffsetMs := collectionTimeMs - 1
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1"}, collectionTimeWithOffsetMs, 0), // Zero entry for value-1 series
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "2"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "+Inf"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-2"}, collectionTimeWithOffsetMs, 0), // Zero entry for value-2 series
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-2"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-2"}, collectionTimeMs, 1.5),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-2", "le": "1"}, collectionTimeMs, 0),
Expand All @@ -59,6 +62,7 @@ func Test_histogram(t *testing.T) {
assert.Equal(t, 3, seriesAdded)

collectionTimeMs = time.Now().UnixMilli()
collectionTimeWithOffsetMs = collectionTimeMs - 1
expectedSamples = []sample{
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-1"}, collectionTimeMs, 1),
Expand All @@ -70,6 +74,7 @@ func Test_histogram(t *testing.T) {
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-2", "le": "1"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-2", "le": "2"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-2", "le": "+Inf"}, collectionTimeMs, 2),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-3"}, collectionTimeWithOffsetMs, 0), // Zero entry for value-3 series
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-3"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-3"}, collectionTimeMs, 3),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-3", "le": "1"}, collectionTimeMs, 0),
Expand Down Expand Up @@ -150,12 +155,15 @@ func Test_histogram_cantAdd(t *testing.T) {
h.ObserveWithExemplar(newLabelValueCombo([]string{"label"}, []string{"value-2"}), 1.5, "", 1.0)

collectionTimeMs := time.Now().UnixMilli()
collectionTimeWithOffsetMs := collectionTimeMs - 1
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1"}, collectionTimeWithOffsetMs, 0),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "2"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "+Inf"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-2"}, collectionTimeWithOffsetMs, 0),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-2"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-2"}, collectionTimeMs, 1.5),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-2", "le": "1"}, collectionTimeMs, 0),
Expand Down Expand Up @@ -204,12 +212,15 @@ func Test_histogram_removeStaleSeries(t *testing.T) {
assert.Equal(t, 0, removedSeries)

collectionTimeMs := time.Now().UnixMilli()
collectionTimeWithOffsetMs := collectionTimeMs - 1
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1"}, collectionTimeWithOffsetMs, 0),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "2"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "+Inf"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-2"}, collectionTimeWithOffsetMs, 0),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-2"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-2"}, collectionTimeMs, 1.5),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-2", "le": "1"}, collectionTimeMs, 0),
Expand Down Expand Up @@ -246,12 +257,15 @@ func Test_histogram_externalLabels(t *testing.T) {
h.ObserveWithExemplar(newLabelValueCombo([]string{"label"}, []string{"value-2"}), 1.5, "", 1.0)

collectionTimeMs := time.Now().UnixMilli()
collectionTimeWithOffsetMs := collectionTimeMs - 1
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1", "external_label": "external_value"}, collectionTimeWithOffsetMs, 0),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1", "external_label": "external_value"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-1", "external_label": "external_value"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "1", "external_label": "external_value"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "2", "external_label": "external_value"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "+Inf", "external_label": "external_value"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-2", "external_label": "external_value"}, collectionTimeWithOffsetMs, 0),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-2", "external_label": "external_value"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-2", "external_label": "external_value"}, collectionTimeMs, 1.5),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-2", "le": "1", "external_label": "external_value"}, collectionTimeMs, 0),
Expand Down Expand Up @@ -337,7 +351,9 @@ func Test_histogram_concurrencyCorrectness(t *testing.T) {
wg.Wait()

collectionTimeMs := time.Now().UnixMilli()
collectionTimeWithOffsetMs := collectionTimeMs - 1
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1"}, collectionTimeWithOffsetMs, 0),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1"}, collectionTimeMs, float64(totalCount.Load())),
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-1"}, collectionTimeMs, 2*float64(totalCount.Load())),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "1"}, collectionTimeMs, 0),
Expand All @@ -353,9 +369,11 @@ func Test_histogram_span_multiplier(t *testing.T) {
h.ObserveWithExemplar(newLabelValueCombo([]string{"label"}, []string{"value-1"}), 2.0, "", 5)

collectionTimeMs := time.Now().UnixMilli()
collectionTimeWithOffsetMs := collectionTimeMs - 1
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-1"}, collectionTimeMs, 11.5),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1"}, collectionTimeWithOffsetMs, 0),
newSample(map[string]string{"__name__": "my_histogram_count", "label": "value-1"}, collectionTimeMs, 6.5),
newSample(map[string]string{"__name__": "my_histogram_sum", "label": "value-1"}, collectionTimeMs, 11.5),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "1"}, collectionTimeMs, 1.5),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "2"}, collectionTimeMs, 6.5),
newSample(map[string]string{"__name__": "my_histogram_bucket", "label": "value-1", "le": "+Inf"}, collectionTimeMs, 6.5),
Expand Down
25 changes: 25 additions & 0 deletions modules/generator/registry/native_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
promhistogram "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -46,6 +47,20 @@ type nativeHistogramSeries struct {
promHistogram prometheus.Histogram
lastUpdated int64
histogram *dto.Histogram

// firstSeries is used to track if this series is new to the counter.
// This is used in classic histograms to ensure that new counters begin with 0.
// This avoids Prometheus throwing away the first value in the series,
// due to the transition from null -> x.
firstSeries *atomic.Bool
}

func (hs *nativeHistogramSeries) isNew() bool {
return hs.firstSeries.Load()
}

func (hs *nativeHistogramSeries) registerSeenSeries() {
hs.firstSeries.Store(false)
}

var (
Expand Down Expand Up @@ -112,6 +127,7 @@ func (h *nativeHistogram) newSeries(labelValueCombo *LabelValueCombo, value floa
NativeHistogramMinResetDuration: 15 * time.Minute,
}),
lastUpdated: 0,
firstSeries: atomic.NewBool(true),
}

h.updateSeries(newSeries, value, traceID, multiplier)
Expand Down Expand Up @@ -267,6 +283,15 @@ func (h *nativeHistogram) nativeHistograms(appender storage.Appender, lb *labels
}

func (h *nativeHistogram) classicHistograms(appender storage.Appender, lb *labels.Builder, timeMs int64, s *nativeHistogramSeries) (activeSeries int, err error) {
if s.isNew() {
lb.Set(labels.MetricName, h.metricName+"_count")
_, err = appender.Append(0, lb.Labels(), timeMs-1, 0)
if err != nil {
return activeSeries, err
}
s.registerSeenSeries()
}

// sum
lb.Set(labels.MetricName, h.metricName+"_sum")
_, err = appender.Append(0, lb.Labels(), timeMs, s.histogram.GetSampleSum())
Expand Down
Loading