Skip to content

Commit

Permalink
receiver/prometheus: glue and complete staleness marking for disappea…
Browse files Browse the repository at this point in the history
…ring metrics (open-telemetry#3423)

Adds the staleness store to the metricsBuilder and transaction to track when metrics
disappear between scrapes.

Depends on open-telemetry#3414
Fixes open-telemetry#3413
Fixes open-telemetry#2961
  • Loading branch information
odeke-em authored Jul 8, 2021
1 parent a4354b6 commit 8b79380
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 32 deletions.
15 changes: 13 additions & 2 deletions receiver/prometheusreceiver/internal/metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/value"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -57,12 +58,13 @@ type metricBuilder struct {
startTime float64
logger *zap.Logger
currentMf MetricFamily
stalenessStore *stalenessStore
}

// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object
// by calling its Build function
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger) *metricBuilder {
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore) *metricBuilder {
var regex *regexp.Regexp
if startTimeMetricRegex != "" {
regex, _ = regexp.Compile(startTimeMetricRegex)
Expand All @@ -75,6 +77,7 @@ func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetric
droppedTimeseries: 0,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: regex,
stalenessStore: stalenessStore,
}
}

Expand All @@ -87,7 +90,7 @@ func (b *metricBuilder) matchStartTimeMetric(metricName string) bool {
}

// AddDataPoint is for feeding prometheus data complexValue in its processing order
func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) error {
func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr error) {
// Any datapoint with duplicate labels MUST be rejected per:
// * https://github.com/open-telemetry/wg-prometheus/issues/44
// * https://github.com/open-telemetry/opentelemetry-collector/issues/3407
Expand All @@ -105,6 +108,14 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) error
return fmt.Errorf("invalid sample: non-unique label names: %q", dupLabels)
}

defer func() {
// Only mark this data point as in the current scrape
// iff it isn't a stale metric.
if rerr == nil && !value.IsStaleNaN(v) {
b.stalenessStore.markAsCurrentlySeen(ls, t)
}
}()

metricName := ls.Get(model.MetricNameLabel)
switch {
case metricName == "":
Expand Down
12 changes: 6 additions & 6 deletions receiver/prometheusreceiver/internal/metricsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func runBuilderTests(t *testing.T, tests []buildTestData) {
mc := newMockMetadataCache(testMetadata)
st := startTs
for i, page := range tt.inputs {
b := newMetricBuilder(mc, true, "", testLogger)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
Expand All @@ -123,7 +123,7 @@ func runBuilderStartTimeTests(t *testing.T, tests []buildTestData,
st := startTs
for _, page := range tt.inputs {
b := newMetricBuilder(mc, true, startTimeMetricRegex,
testLogger)
testLogger, dummyStalenessStore())
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
Expand Down Expand Up @@ -1201,7 +1201,7 @@ func Test_metricBuilder_summary(t *testing.T) {
func Test_metricBuilder_baddata(t *testing.T) {
t.Run("empty-metric-name", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(labels.FromStrings("a", "b"), startTs, 123); err != errMetricNameNotFound {
t.Error("expecting errMetricNameNotFound error, but get nil")
Expand All @@ -1215,7 +1215,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("histogram-datapoint-no-bucket-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("hist_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand All @@ -1224,7 +1224,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("summary-datapoint-no-quantile-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("summary_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand Down Expand Up @@ -1451,7 +1451,7 @@ func Test_heuristicalMetricAndKnownUnits(t *testing.T) {
// Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44.
func TestMetricBuilderDuplicateLabelKeysAreRejected(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
mb := newMetricBuilder(mc, true, "", testLogger)
mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())

dupLabels := labels.Labels{
{Name: "__name__", Value: "test"},
Expand Down
8 changes: 7 additions & 1 deletion receiver/prometheusreceiver/internal/ocastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ type OcaStore struct {
receiverID config.ComponentID
externalLabels labels.Labels

logger *zap.Logger
logger *zap.Logger
stalenessStore *stalenessStore
}

// NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable
Expand All @@ -74,6 +75,7 @@ func NewOcaStore(
startTimeMetricRegex: startTimeMetricRegex,
receiverID: receiverID,
externalLabels: externalLabels,
stalenessStore: newStalenessStore(),
}
}

Expand All @@ -88,6 +90,9 @@ func (o *OcaStore) SetScrapeManager(scrapeManager *scrape.Manager) {
func (o *OcaStore) Appender(context.Context) storage.Appender {
state := atomic.LoadInt32(&o.running)
if state == runningStateReady {
// Firstly prepare the stalenessStore for a new scrape cyle.
o.stalenessStore.refresh()

return newTransaction(
o.ctx,
o.jobsMap,
Expand All @@ -98,6 +103,7 @@ func (o *OcaStore) Appender(context.Context) storage.Appender {
o.sink,
o.externalLabels,
o.logger,
o.stalenessStore,
)
} else if state == runningStateInit {
panic("ScrapeManager is not set")
Expand Down
237 changes: 237 additions & 0 deletions receiver/prometheusreceiver/internal/staleness_end_to_end_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal_test

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync/atomic"
"testing"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/prometheusremotewriteexporter"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/receiver/prometheusreceiver"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/parserprovider"
)

// Test that staleness markers are emitted for timeseries that intermittently disappear.
// This test runs the entire collector and end-to-end scrapes then checks with the
// Prometheus remotewrite exporter that staleness markers are emitted per timeseries.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/3413
func TestStalenessMarkersEndToEnd(t *testing.T) {
if testing.Short() {
t.Skip("This test can take a long time")
}

ctx, cancel := context.WithCancel(context.Background())

// 1. Setup the server that sends series that intermittently appear and disappear.
var n uint64
scrapeServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// Increment the scrape count atomically per scrape.
i := atomic.AddUint64(&n, 1)

select {
case <-ctx.Done():
return
default:
}

// Alternate metrics per scrape so that every one of
// them will be reported as stale.
if i%2 == 0 {
fmt.Fprintf(rw, `
# HELP jvm_memory_bytes_used Used bytes of a given JVM memory area.
# TYPE jvm_memory_bytes_used gauge
jvm_memory_bytes_used{area="heap"} %.1f`, float64(i))
} else {
fmt.Fprintf(rw, `
# HELP jvm_memory_pool_bytes_used Used bytes of a given JVM memory pool.
# TYPE jvm_memory_pool_bytes_used gauge
jvm_memory_pool_bytes_used{pool="CodeHeap 'non-nmethods'"} %.1f`, float64(i))
}
}))
defer scrapeServer.Close()

serverURL, err := url.Parse(scrapeServer.URL)
require.Nil(t, err)

// 2. Set up the Prometheus RemoteWrite endpoint.
prweUploads := make(chan *prompb.WriteRequest)
prweServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// Snappy decode the uploads.
payload, rerr := ioutil.ReadAll(req.Body)
if err != nil {
panic(rerr)
}
recv := make([]byte, len(payload))
decoded, derr := snappy.Decode(recv, payload)
if err != nil {
panic(derr)
}

writeReq := new(prompb.WriteRequest)
if uerr := proto.Unmarshal(decoded, writeReq); uerr != nil {
panic(uerr)
}

select {
case <-ctx.Done():
return
case prweUploads <- writeReq:
}
}))
defer prweServer.Close()

// 3. Set the OpenTelemetry Prometheus receiver.
config := fmt.Sprintf(`
receivers:
prometheus:
config:
scrape_configs:
- job_name: 'test'
scrape_interval: 2ms
static_configs:
- targets: [%q]
processors:
batch:
exporters:
prometheusremotewrite:
endpoint: %q
insecure: true
service:
pipelines:
metrics:
receivers: [prometheus]
processors: [batch]
exporters: [prometheusremotewrite]`, serverURL.Host, prweServer.URL)

// 4. Run the OpenTelemetry Collector.
receivers, err := component.MakeReceiverFactoryMap(prometheusreceiver.NewFactory())
require.Nil(t, err)
exporters, err := component.MakeExporterFactoryMap(prometheusremotewriteexporter.NewFactory())
require.Nil(t, err)
processors, err := component.MakeProcessorFactoryMap(batchprocessor.NewFactory())
require.Nil(t, err)

factories := component.Factories{
Receivers: receivers,
Exporters: exporters,
Processors: processors,
}

appSettings := service.CollectorSettings{
Factories: factories,
ParserProvider: parserprovider.NewInMemory(strings.NewReader(config)),
BuildInfo: component.BuildInfo{
Command: "otelcol",
Description: "OpenTelemetry Collector",
Version: "tests",
},
LoggingOptions: []zap.Option{
// Turn off the verbose logging from the collector.
zap.WrapCore(func(zapcore.Core) zapcore.Core {
return zapcore.NewNopCore()
}),
},
}
app, err := service.New(appSettings)
require.Nil(t, err)

go func() {
if err := app.Run(); err != nil {
t.Error(err)
}
}()

// Wait until the collector has actually started.
stateChannel := app.GetStateChannel()
for notYetStarted := true; notYetStarted; {
switch state := <-stateChannel; state {
case service.Running, service.Closed, service.Closing:
notYetStarted = false
}
}

// The OpenTelemetry collector has a data race because it closes
// a channel while
if false {
defer app.Shutdown()
}

// 5. Let's wait on 10 fetches.
var wReqL []*prompb.WriteRequest
for i := 0; i < 10; i++ {
wReqL = append(wReqL, <-prweUploads)
}
defer cancel()

// 6. Assert that we encounter the stale markers aka special NaNs for the various time series.
staleMarkerCount := 0
totalSamples := 0
for i, wReq := range wReqL {
name := fmt.Sprintf("WriteRequest#%d", i)
require.True(t, len(wReq.Timeseries) > 0, "Expecting at least 1 timeSeries for:: "+name)
for j, ts := range wReq.Timeseries {
fullName := fmt.Sprintf("%s/TimeSeries#%d", name, j)
assert.True(t, len(ts.Samples) > 0, "Expected at least 1 Sample in:: "+fullName)

// We are strictly counting series directly included in the scrapes, and no
// internal timeseries like "up" nor "scrape_seconds" etc.
metricName := ""
for _, label := range ts.Labels {
if label.Name == "__name__" {
metricName = label.Value
}
}
if !strings.HasPrefix(metricName, "jvm") {
continue
}

for _, sample := range ts.Samples {
totalSamples++
if value.IsStaleNaN(sample.Value) {
staleMarkerCount++
}
}
}
}

require.True(t, totalSamples > 0, "Expected at least 1 sample")
// On every alternative scrape the prior scrape will be reported as sale.
// Expect at least:
// * The first scrape will NOT return stale markers
// * (N-1 / alternatives) = ((10-1) / 2) = ~40% chance of stale markers being emitted.
chance := float64(staleMarkerCount) / float64(totalSamples)
require.True(t, chance >= 0.4, fmt.Sprintf("Expected at least one stale marker: %.3f", chance))
}
Loading

0 comments on commit 8b79380

Please sign in to comment.