diff --git a/CHANGELOG.md b/CHANGELOG.md index fd82cad96..30d68e39c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [CHANGE] Notifier: Increment the prometheus_notifications_errors_total metric by the number of affected alerts rather than by one per batch of affected alerts. #15428 * [ENHANCEMENT] OTLP receiver: Convert also metric metadata. #15416 +* [BUGFIX] OTLP receiver: Allow colons in non-standard units. #15710 ## 3.0.1 / 2024-11-28 diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f58215663..06f46f8d7 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -259,7 +259,7 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { logger.Info("Experimental out-of-order native histogram ingestion enabled. This will only take effect if OutOfOrderTimeWindow is > 0 and if EnableNativeHistograms = true") case "created-timestamp-zero-ingestion": c.scrape.EnableCreatedTimestampZeroIngestion = true - c.web.EnableCreatedTimestampZeroIngestion = true + c.web.CTZeroIngestionEnabled = true // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols @@ -989,18 +989,12 @@ func main() { listeners, err := webHandler.Listeners() if err != nil { logger.Error("Unable to start web listener", "err", err) - if err := queryEngine.Close(); err != nil { - logger.Warn("Closing query engine failed", "err", err) - } os.Exit(1) } err = toolkit_web.Validate(*webConfig) if err != nil { logger.Error("Unable to validate web configuration file", "err", err) - if err := queryEngine.Close(); err != nil { - logger.Warn("Closing query engine failed", "err", err) - } os.Exit(1) } @@ -1022,9 +1016,6 @@ func main() { case <-cancel: reloadReady.Close() } - if err := queryEngine.Close(); err != nil { - logger.Warn("Closing query engine failed", "err", err) - } return nil }, func(err error) { diff --git a/cmd/promtool/sd.go b/cmd/promtool/sd.go index 5e005bca8..8863fbeac 100644 --- a/cmd/promtool/sd.go +++ b/cmd/promtool/sd.go @@ -144,7 +144,9 @@ func getSDCheckResult(targetGroups []*targetgroup.Group, scrapeConfig *config.Sc } } - res, orig, err := scrape.PopulateLabels(lb, scrapeConfig) + scrape.PopulateDiscoveredLabels(lb, scrapeConfig, target, targetGroup.Labels) + orig := lb.Labels() + res, err := scrape.PopulateLabels(lb, scrapeConfig, target, targetGroup.Labels) result := sdCheckResult{ DiscoveredLabels: orig, Labels: res, diff --git a/discovery/uyuni/uyuni.go b/discovery/uyuni/uyuni.go index 1bd0cd2d4..a37083575 100644 --- a/discovery/uyuni/uyuni.go +++ b/discovery/uyuni/uyuni.go @@ -205,9 +205,6 @@ func getEndpointInfoForSystems( err := rpcclient.Call( "system.monitoring.listEndpoints", []interface{}{token, systemIDs}, &endpointInfos) - if err != nil { - return nil, err - } return endpointInfos, err } diff --git a/promql/engine.go b/promql/engine.go index 8e65063bc..c8b05f833 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -436,6 +436,8 @@ func NewEngine(opts EngineOpts) *Engine { } // Close closes ng. +// Callers must ensure the engine is really no longer in use before calling this to avoid +// issues failures like in https://github.com/prometheus/prometheus/issues/15232 func (ng *Engine) Close() error { if ng == nil { return nil diff --git a/scrape/manager_test.go b/scrape/manager_test.go index b9c6f4c40..75ac9ea69 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "maps" "net/http" "net/http/httptest" "net/url" @@ -61,7 +62,7 @@ func init() { func TestPopulateLabels(t *testing.T) { cases := []struct { - in labels.Labels + in model.LabelSet cfg *config.ScrapeConfig res labels.Labels resOrig labels.Labels @@ -69,10 +70,10 @@ func TestPopulateLabels(t *testing.T) { }{ // Regular population of scrape config options. { - in: labels.FromMap(map[string]string{ + in: model.LabelSet{ model.AddressLabel: "1.2.3.4:1000", "custom": "value", - }), + }, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -103,14 +104,14 @@ func TestPopulateLabels(t *testing.T) { // Pre-define/overwrite scrape config labels. // Leave out port and expect it to be defaulted to scheme. { - in: labels.FromMap(map[string]string{ + in: model.LabelSet{ model.AddressLabel: "1.2.3.4", model.SchemeLabel: "http", model.MetricsPathLabel: "/custom", model.JobLabel: "custom-job", model.ScrapeIntervalLabel: "2s", model.ScrapeTimeoutLabel: "2s", - }), + }, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -138,10 +139,10 @@ func TestPopulateLabels(t *testing.T) { }, // Provide instance label. HTTPS port default for IPv6. { - in: labels.FromMap(map[string]string{ + in: model.LabelSet{ model.AddressLabel: "[::1]", model.InstanceLabel: "custom-instance", - }), + }, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -170,7 +171,7 @@ func TestPopulateLabels(t *testing.T) { }, // Address label missing. { - in: labels.FromStrings("custom", "value"), + in: model.LabelSet{"custom": "value"}, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -184,7 +185,7 @@ func TestPopulateLabels(t *testing.T) { }, // Address label missing, but added in relabelling. { - in: labels.FromStrings("custom", "host:1234"), + in: model.LabelSet{"custom": "host:1234"}, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -222,7 +223,7 @@ func TestPopulateLabels(t *testing.T) { }, // Address label missing, but added in relabelling. { - in: labels.FromStrings("custom", "host:1234"), + in: model.LabelSet{"custom": "host:1234"}, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -260,10 +261,10 @@ func TestPopulateLabels(t *testing.T) { }, // Invalid UTF-8 in label. { - in: labels.FromMap(map[string]string{ + in: model.LabelSet{ model.AddressLabel: "1.2.3.4:1000", "custom": "\xbd", - }), + }, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -277,10 +278,10 @@ func TestPopulateLabels(t *testing.T) { }, // Invalid duration in interval label. { - in: labels.FromMap(map[string]string{ + in: model.LabelSet{ model.AddressLabel: "1.2.3.4:1000", model.ScrapeIntervalLabel: "2notseconds", - }), + }, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -294,10 +295,10 @@ func TestPopulateLabels(t *testing.T) { }, // Invalid duration in timeout label. { - in: labels.FromMap(map[string]string{ + in: model.LabelSet{ model.AddressLabel: "1.2.3.4:1000", model.ScrapeTimeoutLabel: "2notseconds", - }), + }, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -311,10 +312,10 @@ func TestPopulateLabels(t *testing.T) { }, // 0 interval in timeout label. { - in: labels.FromMap(map[string]string{ + in: model.LabelSet{ model.AddressLabel: "1.2.3.4:1000", model.ScrapeIntervalLabel: "0s", - }), + }, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -328,10 +329,10 @@ func TestPopulateLabels(t *testing.T) { }, // 0 duration in timeout label. { - in: labels.FromMap(map[string]string{ + in: model.LabelSet{ model.AddressLabel: "1.2.3.4:1000", model.ScrapeTimeoutLabel: "0s", - }), + }, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -345,11 +346,11 @@ func TestPopulateLabels(t *testing.T) { }, // Timeout less than interval. { - in: labels.FromMap(map[string]string{ + in: model.LabelSet{ model.AddressLabel: "1.2.3.4:1000", model.ScrapeIntervalLabel: "1s", model.ScrapeTimeoutLabel: "2s", - }), + }, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -363,9 +364,9 @@ func TestPopulateLabels(t *testing.T) { }, // Don't attach default port. { - in: labels.FromMap(map[string]string{ + in: model.LabelSet{ model.AddressLabel: "1.2.3.4", - }), + }, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -393,9 +394,9 @@ func TestPopulateLabels(t *testing.T) { }, // verify that the default port is not removed (http). { - in: labels.FromMap(map[string]string{ + in: model.LabelSet{ model.AddressLabel: "1.2.3.4:80", - }), + }, cfg: &config.ScrapeConfig{ Scheme: "http", MetricsPath: "/metrics", @@ -423,9 +424,9 @@ func TestPopulateLabels(t *testing.T) { }, // verify that the default port is not removed (https). { - in: labels.FromMap(map[string]string{ + in: model.LabelSet{ model.AddressLabel: "1.2.3.4:443", - }), + }, cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", @@ -453,17 +454,18 @@ func TestPopulateLabels(t *testing.T) { }, } for _, c := range cases { - in := c.in.Copy() - - res, orig, err := PopulateLabels(labels.NewBuilder(c.in), c.cfg) + in := maps.Clone(c.in) + lb := labels.NewBuilder(labels.EmptyLabels()) + res, err := PopulateLabels(lb, c.cfg, c.in, nil) if c.err != "" { require.EqualError(t, err, c.err) } else { require.NoError(t, err) + testutil.RequireEqual(t, c.res, res) + PopulateDiscoveredLabels(lb, c.cfg, c.in, nil) + testutil.RequireEqual(t, c.resOrig, lb.Labels()) } - require.Equal(t, c.in, in) - testutil.RequireEqual(t, c.res, res) - testutil.RequireEqual(t, c.resOrig, orig) + require.Equal(t, c.in, in) // Check this wasn't altered by PopulateLabels(). } } diff --git a/scrape/scrape.go b/scrape/scrape.go index 4803354cf..2da07d719 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -450,7 +450,7 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { switch { case nonEmpty: all = append(all, t) - case !t.discoveredLabels.IsEmpty(): + default: if sp.config.KeepDroppedTargets == 0 || uint(len(sp.droppedTargets)) < sp.config.KeepDroppedTargets { sp.droppedTargets = append(sp.droppedTargets, t) } @@ -553,9 +553,9 @@ func (sp *scrapePool) sync(targets []*Target) { if _, ok := uniqueLoops[hash]; !ok { uniqueLoops[hash] = nil } - // Need to keep the most updated labels information - // for displaying it in the Service Discovery web page. - sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels()) + // Need to keep the most updated ScrapeConfig for + // displaying labels in the Service Discovery web page. + sp.activeTargets[hash].SetScrapeConfig(sp.config, t.tLabels, t.tgLabels) } } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index bb62122bb..f9164ea7a 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -212,7 +212,8 @@ func TestDroppedTargetsList(t *testing.T) { sp.Sync(tgs) require.Len(t, sp.droppedTargets, expectedLength) require.Equal(t, expectedLength, sp.droppedTargetsCount) - require.Equal(t, expectedLabelSetString, sp.droppedTargets[0].DiscoveredLabels().String()) + lb := labels.NewBuilder(labels.EmptyLabels()) + require.Equal(t, expectedLabelSetString, sp.droppedTargets[0].DiscoveredLabels(lb).String()) // Check that count is still correct when we don't retain all dropped targets. sp.config.KeepDroppedTargets = 1 @@ -235,16 +236,19 @@ func TestDiscoveredLabelsUpdate(t *testing.T) { } sp.activeTargets = make(map[uint64]*Target) t1 := &Target{ - discoveredLabels: labels.FromStrings("label", "name"), + tLabels: model.LabelSet{"label": "name"}, + scrapeConfig: sp.config, } sp.activeTargets[t1.hash()] = t1 t2 := &Target{ - discoveredLabels: labels.FromStrings("labelNew", "nameNew"), + tLabels: model.LabelSet{"labelNew": "nameNew"}, + scrapeConfig: sp.config, } sp.sync([]*Target{t2}) - require.Equal(t, t2.DiscoveredLabels(), sp.activeTargets[t1.hash()].DiscoveredLabels()) + lb := labels.NewBuilder(labels.EmptyLabels()) + require.Equal(t, t2.DiscoveredLabels(lb), sp.activeTargets[t1.hash()].DiscoveredLabels(lb)) } type testLoop struct { @@ -309,7 +313,8 @@ func TestScrapePoolStop(t *testing.T) { for i := 0; i < numTargets; i++ { t := &Target{ - labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)), + labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)), + scrapeConfig: &config.ScrapeConfig{}, } l := &testLoop{} d := time.Duration((i+1)*20) * time.Millisecond @@ -394,8 +399,8 @@ func TestScrapePoolReload(t *testing.T) { for i := 0; i < numTargets; i++ { labels := labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)) t := &Target{ - labels: labels, - discoveredLabels: labels, + labels: labels, + scrapeConfig: &config.ScrapeConfig{}, } l := &testLoop{} d := time.Duration((i+1)*20) * time.Millisecond @@ -2689,6 +2694,7 @@ func TestTargetScraperScrapeOK(t *testing.T) { model.SchemeLabel, serverURL.Scheme, model.AddressLabel, serverURL.Host, ), + scrapeConfig: &config.ScrapeConfig{}, }, client: http.DefaultClient, timeout: configTimeout, @@ -2739,6 +2745,7 @@ func TestTargetScrapeScrapeCancel(t *testing.T) { model.SchemeLabel, serverURL.Scheme, model.AddressLabel, serverURL.Host, ), + scrapeConfig: &config.ScrapeConfig{}, }, client: http.DefaultClient, acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols, model.LegacyValidation), @@ -2794,6 +2801,7 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) { model.SchemeLabel, serverURL.Scheme, model.AddressLabel, serverURL.Host, ), + scrapeConfig: &config.ScrapeConfig{}, }, client: http.DefaultClient, acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols, model.LegacyValidation), @@ -2837,6 +2845,7 @@ func TestTargetScraperBodySizeLimit(t *testing.T) { model.SchemeLabel, serverURL.Scheme, model.AddressLabel, serverURL.Host, ), + scrapeConfig: &config.ScrapeConfig{}, }, client: http.DefaultClient, bodySizeLimit: bodySizeLimit, @@ -3107,7 +3116,8 @@ func TestReuseScrapeCache(t *testing.T) { } sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) t1 = &Target{ - discoveredLabels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"), + labels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"), + scrapeConfig: &config.ScrapeConfig{}, } proxyURL, _ = url.Parse("http://localhost:2128") ) @@ -3291,7 +3301,8 @@ func TestReuseCacheRace(t *testing.T) { buffers = pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }) sp, _ = newScrapePool(cfg, app, 0, nil, buffers, &Options{}, newTestScrapeMetrics(t)) t1 = &Target{ - discoveredLabels: labels.FromStrings("labelNew", "nameNew"), + labels: labels.FromStrings("labelNew", "nameNew"), + scrapeConfig: &config.ScrapeConfig{}, } ) defer sp.stop() @@ -4475,7 +4486,9 @@ func BenchmarkTargetScraperGzip(b *testing.B) { model.SchemeLabel, serverURL.Scheme, model.AddressLabel, serverURL.Host, ), - params: url.Values{"count": []string{strconv.Itoa(scenario.metricsCount)}}, + scrapeConfig: &config.ScrapeConfig{ + Params: url.Values{"count": []string{strconv.Itoa(scenario.metricsCount)}}, + }, }, client: client, timeout: time.Second, diff --git a/scrape/target.go b/scrape/target.go index 06d4737ff..d05866f86 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -45,12 +45,12 @@ const ( // Target refers to a singular HTTP or HTTPS endpoint. type Target struct { - // Labels before any processing. - discoveredLabels labels.Labels // Any labels that are added to this target and its metrics. labels labels.Labels - // Additional URL parameters that are part of the target URL. - params url.Values + // ScrapeConfig used to create this target. + scrapeConfig *config.ScrapeConfig + // Target and TargetGroup labels used to create this target. + tLabels, tgLabels model.LabelSet mtx sync.RWMutex lastError error @@ -61,12 +61,13 @@ type Target struct { } // NewTarget creates a reasonably configured target for querying. -func NewTarget(labels, discoveredLabels labels.Labels, params url.Values) *Target { +func NewTarget(labels labels.Labels, scrapeConfig *config.ScrapeConfig, tLabels, tgLabels model.LabelSet) *Target { return &Target{ - labels: labels, - discoveredLabels: discoveredLabels, - params: params, - health: HealthUnknown, + labels: labels, + tLabels: tLabels, + tgLabels: tgLabels, + scrapeConfig: scrapeConfig, + health: HealthUnknown, } } @@ -168,11 +169,11 @@ func (t *Target) offset(interval time.Duration, offsetSeed uint64) time.Duration } // Labels returns a copy of the set of all public labels of the target. -func (t *Target) Labels(b *labels.ScratchBuilder) labels.Labels { - b.Reset() +func (t *Target) Labels(b *labels.Builder) labels.Labels { + b.Reset(labels.EmptyLabels()) t.labels.Range(func(l labels.Label) { if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) { - b.Add(l.Name, l.Value) + b.Set(l.Name, l.Value) } }) return b.Labels() @@ -188,24 +189,31 @@ func (t *Target) LabelsRange(f func(l labels.Label)) { } // DiscoveredLabels returns a copy of the target's labels before any processing. -func (t *Target) DiscoveredLabels() labels.Labels { +func (t *Target) DiscoveredLabels(lb *labels.Builder) labels.Labels { t.mtx.Lock() - defer t.mtx.Unlock() - return t.discoveredLabels.Copy() + cfg, tLabels, tgLabels := t.scrapeConfig, t.tLabels, t.tgLabels + t.mtx.Unlock() + PopulateDiscoveredLabels(lb, cfg, tLabels, tgLabels) + return lb.Labels() } -// SetDiscoveredLabels sets new DiscoveredLabels. -func (t *Target) SetDiscoveredLabels(l labels.Labels) { +// SetScrapeConfig sets new ScrapeConfig. +func (t *Target) SetScrapeConfig(scrapeConfig *config.ScrapeConfig, tLabels, tgLabels model.LabelSet) { t.mtx.Lock() defer t.mtx.Unlock() - t.discoveredLabels = l + t.scrapeConfig = scrapeConfig + t.tLabels = tLabels + t.tgLabels = tgLabels } // URL returns a copy of the target's URL. func (t *Target) URL() *url.URL { + t.mtx.Lock() + configParams := t.scrapeConfig.Params + t.mtx.Unlock() params := url.Values{} - for k, v := range t.params { + for k, v := range configParams { params[k] = make([]string, len(v)) copy(params[k], v) } @@ -420,10 +428,19 @@ func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels return ref, nil } -// PopulateLabels builds a label set from the given label set and scrape configuration. -// It returns a label set before relabeling was applied as the second return value. -// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling. -func PopulateLabels(lb *labels.Builder, cfg *config.ScrapeConfig) (res, orig labels.Labels, err error) { +// PopulateDiscoveredLabels sets base labels on lb from target and group labels and scrape configuration, before relabeling. +func PopulateDiscoveredLabels(lb *labels.Builder, cfg *config.ScrapeConfig, tLabels, tgLabels model.LabelSet) { + lb.Reset(labels.EmptyLabels()) + + for ln, lv := range tLabels { + lb.Set(string(ln), string(lv)) + } + for ln, lv := range tgLabels { + if _, ok := tLabels[ln]; !ok { + lb.Set(string(ln), string(lv)) + } + } + // Copy labels into the labelset for the target if they are not set already. scrapeLabels := []labels.Label{ {Name: model.JobLabel, Value: cfg.JobName}, @@ -444,44 +461,49 @@ func PopulateLabels(lb *labels.Builder, cfg *config.ScrapeConfig) (res, orig lab lb.Set(name, v[0]) } } +} - preRelabelLabels := lb.Labels() +// PopulateLabels builds labels from target and group labels and scrape configuration, +// performs defined relabeling, checks validity, and adds Prometheus standard labels such as 'instance'. +// A return of empty labels and nil error means the target was dropped by relabeling. +func PopulateLabels(lb *labels.Builder, cfg *config.ScrapeConfig, tLabels, tgLabels model.LabelSet) (res labels.Labels, err error) { + PopulateDiscoveredLabels(lb, cfg, tLabels, tgLabels) keep := relabel.ProcessBuilder(lb, cfg.RelabelConfigs...) // Check if the target was dropped. if !keep { - return labels.EmptyLabels(), preRelabelLabels, nil + return labels.EmptyLabels(), nil } if v := lb.Get(model.AddressLabel); v == "" { - return labels.EmptyLabels(), labels.EmptyLabels(), errors.New("no address") + return labels.EmptyLabels(), errors.New("no address") } addr := lb.Get(model.AddressLabel) if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil { - return labels.EmptyLabels(), labels.EmptyLabels(), err + return labels.EmptyLabels(), err } interval := lb.Get(model.ScrapeIntervalLabel) intervalDuration, err := model.ParseDuration(interval) if err != nil { - return labels.EmptyLabels(), labels.EmptyLabels(), fmt.Errorf("error parsing scrape interval: %w", err) + return labels.EmptyLabels(), fmt.Errorf("error parsing scrape interval: %w", err) } if time.Duration(intervalDuration) == 0 { - return labels.EmptyLabels(), labels.EmptyLabels(), errors.New("scrape interval cannot be 0") + return labels.EmptyLabels(), errors.New("scrape interval cannot be 0") } timeout := lb.Get(model.ScrapeTimeoutLabel) timeoutDuration, err := model.ParseDuration(timeout) if err != nil { - return labels.EmptyLabels(), labels.EmptyLabels(), fmt.Errorf("error parsing scrape timeout: %w", err) + return labels.EmptyLabels(), fmt.Errorf("error parsing scrape timeout: %w", err) } if time.Duration(timeoutDuration) == 0 { - return labels.EmptyLabels(), labels.EmptyLabels(), errors.New("scrape timeout cannot be 0") + return labels.EmptyLabels(), errors.New("scrape timeout cannot be 0") } if timeoutDuration > intervalDuration { - return labels.EmptyLabels(), labels.EmptyLabels(), fmt.Errorf("scrape timeout cannot be greater than scrape interval (%q > %q)", timeout, interval) + return labels.EmptyLabels(), fmt.Errorf("scrape timeout cannot be greater than scrape interval (%q > %q)", timeout, interval) } // Meta labels are deleted after relabelling. Other internal labels propagate to @@ -506,9 +528,9 @@ func PopulateLabels(lb *labels.Builder, cfg *config.ScrapeConfig) (res, orig lab return nil }) if err != nil { - return labels.EmptyLabels(), labels.EmptyLabels(), err + return labels.EmptyLabels(), err } - return res, preRelabelLabels, nil + return res, nil } // TargetsFromGroup builds targets based on the given TargetGroup and config. @@ -516,24 +538,12 @@ func TargetsFromGroup(tg *targetgroup.Group, cfg *config.ScrapeConfig, targets [ targets = targets[:0] failures := []error{} - for i, tlset := range tg.Targets { - lb.Reset(labels.EmptyLabels()) - - for ln, lv := range tlset { - lb.Set(string(ln), string(lv)) - } - for ln, lv := range tg.Labels { - if _, ok := tlset[ln]; !ok { - lb.Set(string(ln), string(lv)) - } - } - - lset, origLabels, err := PopulateLabels(lb, cfg) + for i, tLabels := range tg.Targets { + lset, err := PopulateLabels(lb, cfg, tLabels, tg.Labels) if err != nil { failures = append(failures, fmt.Errorf("instance %d in group %s: %w", i, tg, err)) - } - if !lset.IsEmpty() || !origLabels.IsEmpty() { - targets = append(targets, NewTarget(lset, origLabels, cfg.Params)) + } else { + targets = append(targets, NewTarget(lset, cfg, tLabels, tg.Labels)) } } return targets, failures diff --git a/scrape/target_test.go b/scrape/target_test.go index 0d763d738..9dad18f01 100644 --- a/scrape/target_test.go +++ b/scrape/target_test.go @@ -43,8 +43,8 @@ const ( func TestTargetLabels(t *testing.T) { target := newTestTarget("example.com:80", 0, labels.FromStrings("job", "some_job", "foo", "bar")) want := labels.FromStrings(model.JobLabel, "some_job", "foo", "bar") - b := labels.NewScratchBuilder(0) - got := target.Labels(&b) + b := labels.NewBuilder(labels.EmptyLabels()) + got := target.Labels(b) require.Equal(t, want, got) i := 0 target.LabelsRange(func(l labels.Label) { @@ -103,9 +103,11 @@ func TestTargetOffset(t *testing.T) { } func TestTargetURL(t *testing.T) { - params := url.Values{ - "abc": []string{"foo", "bar", "baz"}, - "xyz": []string{"hoo"}, + scrapeConfig := &config.ScrapeConfig{ + Params: url.Values{ + "abc": []string{"foo", "bar", "baz"}, + "xyz": []string{"hoo"}, + }, } labels := labels.FromMap(map[string]string{ model.AddressLabel: "example.com:1234", @@ -114,7 +116,7 @@ func TestTargetURL(t *testing.T) { "__param_abc": "overwrite", "__param_cde": "huu", }) - target := NewTarget(labels, labels, params) + target := NewTarget(labels, scrapeConfig, nil, nil) // The reserved labels are concatenated into a full URL. The first value for each // URL query parameter can be set/modified via labels as well. @@ -139,7 +141,7 @@ func newTestTarget(targetURL string, _ time.Duration, lbls labels.Labels) *Targe lb.Set(model.AddressLabel, strings.TrimPrefix(targetURL, "http://")) lb.Set(model.MetricsPathLabel, "/metrics") - return &Target{labels: lb.Labels()} + return &Target{labels: lb.Labels(), scrapeConfig: &config.ScrapeConfig{}} } func TestNewHTTPBearerToken(t *testing.T) { diff --git a/scripts/golangci-lint.yml b/scripts/golangci-lint.yml index e645ba30a..01b943b9b 100644 --- a/scripts/golangci-lint.yml +++ b/scripts/golangci-lint.yml @@ -26,7 +26,7 @@ jobs: - name: Checkout repository uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - name: Install Go - uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5.1.0 + uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0 with: go-version: 1.23.x - name: Install snmp_exporter/generator dependencies diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 6fb1ae1e4..3557a87eb 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -104,9 +104,10 @@ var ( HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help. UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit. }, - Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 10}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 10}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(10, &testHistogram), writev2.FromFloatHistogram(20, testHistogram.ToFloat(nil))}, + CreatedTimestamp: 1, // CT needs to be lower than the sample's timestamp. }, { LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. @@ -116,9 +117,9 @@ var ( HelpRef: 17, // Symbolized writeV2RequestSeries2Metadata.Help. // No unit. }, - Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 2}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 20}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 20}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(30, &testHistogram), writev2.FromFloatHistogram(40, testHistogram.ToFloat(nil))}, }, }, } @@ -140,9 +141,10 @@ func TestWriteV2RequestFixture(t *testing.T) { HelpRef: st.Symbolize(writeV2RequestSeries1Metadata.Help), UnitRef: st.Symbolize(writeV2RequestSeries1Metadata.Unit), }, - Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 1}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 10}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 10}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(10, &testHistogram), writev2.FromFloatHistogram(20, testHistogram.ToFloat(nil))}, + CreatedTimestamp: 1, }, { LabelsRefs: labelRefs, @@ -151,9 +153,9 @@ func TestWriteV2RequestFixture(t *testing.T) { HelpRef: st.Symbolize(writeV2RequestSeries2Metadata.Help), // No unit. }, - Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 2}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 20}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 20}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(30, &testHistogram), writev2.FromFloatHistogram(40, testHistogram.ToFloat(nil))}, }, }, Symbols: st.Symbols(), diff --git a/storage/remote/otlptranslator/prometheus/normalize_name.go b/storage/remote/otlptranslator/prometheus/normalize_name.go index 6967ca013..0a48e2821 100644 --- a/storage/remote/otlptranslator/prometheus/normalize_name.go +++ b/storage/remote/otlptranslator/prometheus/normalize_name.go @@ -22,7 +22,6 @@ import ( "strings" "unicode" - "github.com/prometheus/prometheus/util/strutil" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -120,18 +119,19 @@ func BuildCompliantName(metric pmetric.Metric, namespace string, addMetricSuffix return metricName } +var nonMetricNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9:]`) + // Build a normalized name for the specified metric. func normalizeName(metric pmetric.Metric, namespace string, allowUTF8 bool) string { var nameTokens []string var separators []string if !allowUTF8 { - nonTokenMetricCharRE := regexp.MustCompile(`[^a-zA-Z0-9:]`) // Split metric name into "tokens" (of supported metric name runes). // Note that this has the side effect of replacing multiple consecutive underscores with a single underscore. // This is part of the OTel to Prometheus specification: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.38.0/specification/compatibility/prometheus_and_openmetrics.md#otlp-metric-points-to-prometheus. nameTokens = strings.FieldsFunc( metric.Name(), - func(r rune) bool { return nonTokenMetricCharRE.MatchString(string(r)) }, + func(r rune) bool { return nonMetricNameCharRE.MatchString(string(r)) }, ) } else { translationFunc := func(r rune) bool { return !unicode.IsLetter(r) && !unicode.IsDigit(r) && r != ':' } @@ -223,73 +223,13 @@ func normalizeName(metric pmetric.Metric, namespace string, allowUTF8 bool) stri return normalizedName } -// TrimPromSuffixes trims type and unit prometheus suffixes from a metric name. -// Following the [OpenTelemetry specs] for converting Prometheus Metric points to OTLP. -// -// [OpenTelemetry specs]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#metric-metadata -func TrimPromSuffixes(promName string, metricType pmetric.MetricType, unit string) string { - nameTokens := strings.Split(promName, "_") - if len(nameTokens) == 1 { - return promName - } - - nameTokens = removeTypeSuffixes(nameTokens, metricType) - nameTokens = removeUnitSuffixes(nameTokens, unit) - - return strings.Join(nameTokens, "_") -} - -func removeTypeSuffixes(tokens []string, metricType pmetric.MetricType) []string { - switch metricType { - case pmetric.MetricTypeSum: - // Only counters are expected to have a type suffix at this point. - // for other types, suffixes are removed during scrape. - return removeSuffix(tokens, "total") - default: - return tokens - } -} - -func removeUnitSuffixes(nameTokens []string, unit string) []string { - l := len(nameTokens) - unitTokens := strings.Split(unit, "_") - lu := len(unitTokens) - - if lu == 0 || l <= lu { - return nameTokens - } - - suffixed := true - for i := range unitTokens { - if nameTokens[l-i-1] != unitTokens[lu-i-1] { - suffixed = false - break - } - } - - if suffixed { - return nameTokens[:l-lu] - } - - return nameTokens -} - -func removeSuffix(tokens []string, suffix string) []string { - l := len(tokens) - if tokens[l-1] == suffix { - return tokens[:l-1] - } - - return tokens -} - // cleanUpUnit cleans up unit so it matches model.LabelNameRE. func cleanUpUnit(unit string) string { // Multiple consecutive underscores are replaced with a single underscore. // This is part of the OTel to Prometheus specification: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.38.0/specification/compatibility/prometheus_and_openmetrics.md#otlp-metric-points-to-prometheus. multipleUnderscoresRE := regexp.MustCompile(`__+`) return strings.TrimPrefix(multipleUnderscoresRE.ReplaceAllString( - strutil.SanitizeLabelName(unit), + nonMetricNameCharRE.ReplaceAllString(unit, "_"), "_", ), "_") } diff --git a/storage/remote/otlptranslator/prometheus/normalize_name_test.go b/storage/remote/otlptranslator/prometheus/normalize_name_test.go index d97e7a560..0473f6cbe 100644 --- a/storage/remote/otlptranslator/prometheus/normalize_name_test.go +++ b/storage/remote/otlptranslator/prometheus/normalize_name_test.go @@ -19,9 +19,7 @@ package prometheus import ( "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pmetric" ) func TestByte(t *testing.T) { @@ -39,6 +37,8 @@ func TestWhiteSpaces(t *testing.T) { func TestNonStandardUnit(t *testing.T) { require.Equal(t, "system_network_dropped", normalizeName(createGauge("system.network.dropped", "{packets}"), "", false)) + // The normal metric name character set is allowed in non-standard units. + require.Equal(t, "system_network_dropped_nonstandard:_1", normalizeName(createGauge("system.network.dropped", "nonstandard:_1"), "", false)) } func TestNonStandardUnitCounter(t *testing.T) { @@ -70,6 +70,12 @@ func TestHertz(t *testing.T) { func TestPer(t *testing.T) { require.Equal(t, "broken_metric_speed_km_per_hour", normalizeName(createGauge("broken.metric.speed", "km/h"), "", false)) require.Equal(t, "astro_light_speed_limit_meters_per_second", normalizeName(createGauge("astro.light.speed_limit", "m/s"), "", false)) + // The normal metric name character set is allowed in non-standard units. + require.Equal(t, "system_network_dropped_non_per_standard:_1", normalizeName(createGauge("system.network.dropped", "non/standard:_1"), "", false)) + + t.Run("invalid per unit", func(t *testing.T) { + require.Equal(t, "broken_metric_speed_km", normalizeName(createGauge("broken.metric.speed", "km/°"), "", false)) + }) } func TestPercent(t *testing.T) { @@ -91,7 +97,7 @@ func TestAllowUTF8(t *testing.T) { }) t.Run("disallow UTF8", func(t *testing.T) { require.Equal(t, "unsupported_metric_temperature_F", normalizeName(createGauge("unsupported.metric.temperature", "°F"), "", false)) - require.Equal(t, "unsupported_metric_weird", normalizeName(createGauge("unsupported.metric.weird", "+=.:,!* & #"), "", false)) + require.Equal(t, "unsupported_metric_weird", normalizeName(createGauge("unsupported.metric.weird", "+=.,!* & #"), "", false)) require.Equal(t, "unsupported_metric_redundant_test_per_C", normalizeName(createGauge("unsupported.metric.redundant", "__test $/°C"), "", false)) require.Equal(t, "metric_with_foreign_characters", normalizeName(createGauge("metric_with_字符_foreign_characters", "ど"), "", false)) }) @@ -140,36 +146,6 @@ func TestOTelReceivers(t *testing.T) { require.Equal(t, "redis_latest_fork_microseconds", normalizeName(createGauge("redis.latest_fork", "us"), "", false)) } -func TestTrimPromSuffixes(t *testing.T) { - assert.Equal(t, "active_directory_ds_replication_network_io", TrimPromSuffixes("active_directory_ds_replication_network_io_bytes_total", pmetric.MetricTypeSum, "bytes")) - assert.Equal(t, "active_directory_ds_name_cache_hit_rate", TrimPromSuffixes("active_directory_ds_name_cache_hit_rate_percent", pmetric.MetricTypeGauge, "percent")) - assert.Equal(t, "active_directory_ds_ldap_bind_last_successful_time", TrimPromSuffixes("active_directory_ds_ldap_bind_last_successful_time_milliseconds", pmetric.MetricTypeGauge, "milliseconds")) - assert.Equal(t, "apache_requests", TrimPromSuffixes("apache_requests_total", pmetric.MetricTypeSum, "1")) - assert.Equal(t, "system_cpu_utilization", TrimPromSuffixes("system_cpu_utilization_ratio", pmetric.MetricTypeGauge, "ratio")) - assert.Equal(t, "mongodbatlas_process_journaling_data_files", TrimPromSuffixes("mongodbatlas_process_journaling_data_files_mebibytes", pmetric.MetricTypeGauge, "mebibytes")) - assert.Equal(t, "mongodbatlas_process_network_io", TrimPromSuffixes("mongodbatlas_process_network_io_bytes_per_second", pmetric.MetricTypeGauge, "bytes_per_second")) - assert.Equal(t, "mongodbatlas_process_oplog_rate", TrimPromSuffixes("mongodbatlas_process_oplog_rate_gibibytes_per_hour", pmetric.MetricTypeGauge, "gibibytes_per_hour")) - assert.Equal(t, "nsxt_node_memory_usage", TrimPromSuffixes("nsxt_node_memory_usage_kilobytes", pmetric.MetricTypeGauge, "kilobytes")) - assert.Equal(t, "redis_latest_fork", TrimPromSuffixes("redis_latest_fork_microseconds", pmetric.MetricTypeGauge, "microseconds")) - assert.Equal(t, "up", TrimPromSuffixes("up", pmetric.MetricTypeGauge, "")) - - // These are not necessarily valid OM units, only tested for the sake of completeness. - assert.Equal(t, "active_directory_ds_replication_sync_object_pending", TrimPromSuffixes("active_directory_ds_replication_sync_object_pending_total", pmetric.MetricTypeSum, "{objects}")) - assert.Equal(t, "apache_current", TrimPromSuffixes("apache_current_connections", pmetric.MetricTypeGauge, "connections")) - assert.Equal(t, "bigip_virtual_server_request_count", TrimPromSuffixes("bigip_virtual_server_request_count_total", pmetric.MetricTypeSum, "{requests}")) - assert.Equal(t, "mongodbatlas_process_db_query_targeting_scanned_per_returned", TrimPromSuffixes("mongodbatlas_process_db_query_targeting_scanned_per_returned", pmetric.MetricTypeGauge, "{scanned}/{returned}")) - assert.Equal(t, "nginx_connections_accepted", TrimPromSuffixes("nginx_connections_accepted", pmetric.MetricTypeGauge, "connections")) - assert.Equal(t, "apache_workers", TrimPromSuffixes("apache_workers_connections", pmetric.MetricTypeGauge, "connections")) - assert.Equal(t, "nginx", TrimPromSuffixes("nginx_requests", pmetric.MetricTypeGauge, "requests")) - - // Units shouldn't be trimmed if the unit is not a direct match with the suffix, i.e, a suffix "_seconds" shouldn't be removed if unit is "sec" or "s" - assert.Equal(t, "system_cpu_load_average_15m_ratio", TrimPromSuffixes("system_cpu_load_average_15m_ratio", pmetric.MetricTypeGauge, "1")) - assert.Equal(t, "mongodbatlas_process_asserts_per_second", TrimPromSuffixes("mongodbatlas_process_asserts_per_second", pmetric.MetricTypeGauge, "{assertions}/s")) - assert.Equal(t, "memcached_operation_hit_ratio_percent", TrimPromSuffixes("memcached_operation_hit_ratio_percent", pmetric.MetricTypeGauge, "%")) - assert.Equal(t, "active_directory_ds_replication_object_rate_per_second", TrimPromSuffixes("active_directory_ds_replication_object_rate_per_second", pmetric.MetricTypeGauge, "{objects}/s")) - assert.Equal(t, "system_disk_operation_time_seconds", TrimPromSuffixes("system_disk_operation_time_seconds_total", pmetric.MetricTypeSum, "s")) -} - func TestNamespace(t *testing.T) { require.Equal(t, "space_test", normalizeName(createGauge("test", ""), "space", false)) require.Equal(t, "space_test", normalizeName(createGauge("#test", ""), "space", false)) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 49ec44dc0..487de25fe 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -48,6 +48,8 @@ type writeHandler struct { samplesAppendedWithoutMetadata prometheus.Counter acceptedProtoMsgs map[config.RemoteWriteProtoMsg]struct{} + + ingestCTZeroSample bool } const maxAheadTime = 10 * time.Minute @@ -57,7 +59,7 @@ const maxAheadTime = 10 * time.Minute // // NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible // as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write. -func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg) http.Handler { +func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg, ingestCTZeroSample bool) http.Handler { protoMsgs := map[config.RemoteWriteProtoMsg]struct{}{} for _, acc := range acceptedProtoMsgs { protoMsgs[acc] = struct{}{} @@ -78,6 +80,8 @@ func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable Name: "remote_write_without_metadata_appended_samples_total", Help: "The total number of received remote write samples (and histogram samples) which were ingested without corresponding metadata.", }), + + ingestCTZeroSample: ingestCTZeroSample, } return h } @@ -394,6 +398,17 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * var ref storage.SeriesRef // Samples. + if h.ingestCTZeroSample && len(ts.Samples) > 0 && ts.Samples[0].Timestamp != 0 && ts.CreatedTimestamp != 0 { + // CT only needs to be ingested for the first sample, it will be considered + // out of order for the rest. + ref, err = app.AppendCTZeroSample(ref, ls, ts.Samples[0].Timestamp, ts.CreatedTimestamp) + if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { + // Even for the first sample OOO is a common scenario because + // we can't tell if a CT was already ingested in a previous request. + // We ignore the error. + h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Samples[0].Timestamp) + } + } for _, s := range ts.Samples { ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue()) if err == nil { @@ -415,6 +430,17 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * // Native Histograms. for _, hp := range ts.Histograms { + if h.ingestCTZeroSample && hp.Timestamp != 0 && ts.CreatedTimestamp != 0 { + // Differently from samples, we need to handle CT for each histogram instead of just the first one. + // This is because histograms and float histograms are stored separately, even if they have the same labels. + ref, err = h.handleHistogramZeroSample(app, ref, ls, hp, ts.CreatedTimestamp) + if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { + // Even for the first sample OOO is a common scenario because + // we can't tell if a CT was already ingested in a previous request. + // We ignore the error. + h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", hp.Timestamp) + } + } if hp.IsFloatHistogram() { ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, nil, hp.ToFloatHistogram()) } else { @@ -479,6 +505,18 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * return samplesWithoutMetadata, http.StatusBadRequest, errors.Join(badRequestErrs...) } +// handleHistogramZeroSample appends CT as a zero-value sample with CT value as the sample timestamp. +// It doens't return errors in case of out of order CT. +func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, hist writev2.Histogram, ct int64) (storage.SeriesRef, error) { + var err error + if hist.IsFloatHistogram() { + ref, err = app.AppendHistogramCTZeroSample(ref, l, hist.Timestamp, ct, nil, hist.ToFloatHistogram()) + } else { + ref, err = app.AppendHistogramCTZeroSample(ref, l, hist.Timestamp, ct, hist.ToIntHistogram(), nil) + } + return ref, err +} + // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // writes them to the provided appendable. func NewOTLPWriteHandler(logger *slog.Logger, appendable storage.Appendable, configFunc func() config.Config, enableCTZeroIngestion bool, validIntervalCTZeroIngestion time.Duration) http.Handler { diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index c40f227ea..b37b3632b 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -130,7 +130,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) { } appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -231,7 +231,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { } appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -256,7 +256,7 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) { // in Prometheus, so keeping like this to not break existing 1.0 clients. appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -310,14 +310,23 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { expectedCode int expectedRespBody string - commitErr error - appendSampleErr error - appendHistogramErr error - appendExemplarErr error - updateMetadataErr error + commitErr error + appendSampleErr error + appendCTZeroSampleErr error + appendHistogramErr error + appendExemplarErr error + updateMetadataErr error + + ingestCTZeroSample bool }{ { - desc: "All timeseries accepted", + desc: "All timeseries accepted/ct_enabled", + input: writeV2RequestFixture.Timeseries, + expectedCode: http.StatusNoContent, + ingestCTZeroSample: true, + }, + { + desc: "All timeseries accepted/ct_disabled", input: writeV2RequestFixture.Timeseries, expectedCode: http.StatusNoContent, }, @@ -440,13 +449,14 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) appendable := &mockAppendable{ - commitErr: tc.commitErr, - appendSampleErr: tc.appendSampleErr, - appendHistogramErr: tc.appendHistogramErr, - appendExemplarErr: tc.appendExemplarErr, - updateMetadataErr: tc.updateMetadataErr, + commitErr: tc.commitErr, + appendSampleErr: tc.appendSampleErr, + appendCTZeroSampleErr: tc.appendCTZeroSampleErr, + appendHistogramErr: tc.appendHistogramErr, + appendExemplarErr: tc.appendExemplarErr, + updateMetadataErr: tc.updateMetadataErr, } - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, tc.ingestCTZeroSample) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -489,15 +499,27 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols) for _, s := range ts.Samples { + if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { + requireEqual(t, mockSample{ls, ts.CreatedTimestamp, 0}, appendable.samples[i]) + i++ + } requireEqual(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) i++ } for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { fh := hp.ToFloatHistogram() + if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { + requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, nil, &histogram.FloatHistogram{}}, appendable.histograms[k]) + k++ + } requireEqual(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) } else { h := hp.ToIntHistogram() + if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { + requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, &histogram.Histogram{}, nil}, appendable.histograms[k]) + k++ + } requireEqual(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) } k++ @@ -545,7 +567,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -587,7 +609,7 @@ func TestOutOfOrderExemplar_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -625,7 +647,7 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -656,7 +678,7 @@ func BenchmarkRemoteWriteHandler(b *testing.B) { appendable := &mockAppendable{} // TODO: test with other proto format(s) - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() b.ResetTimer() @@ -673,7 +695,7 @@ func TestCommitErr_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{commitErr: errors.New("commit error")} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -697,7 +719,7 @@ func TestCommitErr_V2Message(t *testing.T) { req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) appendable := &mockAppendable{commitErr: errors.New("commit error")} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -724,7 +746,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) { require.NoError(b, db.Close()) }) // TODO: test with other proto format(s) - handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy") require.NoError(b, err) @@ -775,15 +797,17 @@ type mockAppendable struct { latestExemplar map[uint64]int64 exemplars []mockExemplar latestHistogram map[uint64]int64 + latestFloatHist map[uint64]int64 histograms []mockHistogram metadata []mockMetadata // optional errors to inject. - commitErr error - appendSampleErr error - appendHistogramErr error - appendExemplarErr error - updateMetadataErr error + commitErr error + appendSampleErr error + appendCTZeroSampleErr error + appendHistogramErr error + appendExemplarErr error + updateMetadataErr error } type mockSample struct { @@ -827,6 +851,9 @@ func (m *mockAppendable) Appender(_ context.Context) storage.Appender { if m.latestHistogram == nil { m.latestHistogram = map[uint64]int64{} } + if m.latestFloatHist == nil { + m.latestFloatHist = map[uint64]int64{} + } if m.latestExemplar == nil { m.latestExemplar = map[uint64]int64{} } @@ -900,7 +927,12 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t return 0, m.appendHistogramErr } - latestTs := m.latestHistogram[l.Hash()] + var latestTs int64 + if h != nil { + latestTs = m.latestHistogram[l.Hash()] + } else { + latestTs = m.latestFloatHist[l.Hash()] + } if t < latestTs { return 0, storage.ErrOutOfOrderSample } @@ -915,15 +947,53 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t return 0, tsdb.ErrInvalidSample } - m.latestHistogram[l.Hash()] = t + if h != nil { + m.latestHistogram[l.Hash()] = t + } else { + m.latestFloatHist[l.Hash()] = t + } m.histograms = append(m.histograms, mockHistogram{l, t, h, fh}) return 0, nil } func (m *mockAppendable) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { - // AppendCTZeroSample is no-op for remote-write for now. - // TODO(bwplotka/arthursens): Add support for PRW 2.0 for CT zero feature (but also we might - // replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218). + if m.appendCTZeroSampleErr != nil { + return 0, m.appendCTZeroSampleErr + } + + // Created Timestamp can't be higher than the original sample's timestamp. + if ct > t { + return 0, storage.ErrOutOfOrderSample + } + + var latestTs int64 + if h != nil { + latestTs = m.latestHistogram[l.Hash()] + } else { + latestTs = m.latestFloatHist[l.Hash()] + } + if ct < latestTs { + return 0, storage.ErrOutOfOrderSample + } + if ct == latestTs { + return 0, storage.ErrDuplicateSampleForTimestamp + } + + if l.IsEmpty() { + return 0, tsdb.ErrInvalidSample + } + + if _, hasDuplicates := l.HasDuplicateLabelNames(); hasDuplicates { + return 0, tsdb.ErrInvalidSample + } + + if h != nil { + m.latestHistogram[l.Hash()] = ct + m.histograms = append(m.histograms, mockHistogram{l, ct, &histogram.Histogram{}, nil}) + } else { + m.latestFloatHist[l.Hash()] = ct + m.histograms = append(m.histograms, mockHistogram{l, ct, nil, &histogram.FloatHistogram{}}) + } return 0, nil } @@ -936,9 +1006,32 @@ func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp return 0, nil } -func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { - // AppendCTZeroSample is no-op for remote-write for now. - // TODO(bwplotka): Add support for PRW 2.0 for CT zero feature (but also we might - // replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218). +func (m *mockAppendable) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) { + if m.appendCTZeroSampleErr != nil { + return 0, m.appendCTZeroSampleErr + } + + // Created Timestamp can't be higher than the original sample's timestamp. + if ct > t { + return 0, storage.ErrOutOfOrderSample + } + + latestTs := m.latestSample[l.Hash()] + if ct < latestTs { + return 0, storage.ErrOutOfOrderSample + } + if ct == latestTs { + return 0, storage.ErrDuplicateSampleForTimestamp + } + + if l.IsEmpty() { + return 0, tsdb.ErrInvalidSample + } + if _, hasDuplicates := l.HasDuplicateLabelNames(); hasDuplicates { + return 0, tsdb.ErrInvalidSample + } + + m.latestSample[l.Hash()] = ct + m.samples = append(m.samples, mockSample{l, ct, 0}) return 0, nil } diff --git a/template/template.go b/template/template.go index 0698c6c8a..9ffed6ff6 100644 --- a/template/template.go +++ b/template/template.go @@ -166,7 +166,7 @@ func NewTemplateExpander( return html_template.HTML(text) }, "match": regexp.MatchString, - "title": strings.Title, //nolint:staticcheck // TODO(beorn7): Need to come up with a replacement using the cases package. + "title": strings.Title, "toUpper": strings.ToUpper, "toLower": strings.ToLower, "graphLink": strutil.GraphLinkForExpression, diff --git a/tsdb/db.go b/tsdb/db.go index 9d97420f0..5ec576b6a 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -2446,7 +2446,7 @@ func isTmpDir(fi fs.DirEntry) bool { fn := fi.Name() ext := filepath.Ext(fn) if ext == tmpForDeletionBlockDirSuffix || ext == tmpForCreationBlockDirSuffix || ext == tmpLegacy { - if strings.HasPrefix(fn, "checkpoint.") { + if strings.HasPrefix(fn, wlog.CheckpointPrefix) { return true } if strings.HasPrefix(fn, chunkSnapshotPrefix) { diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 58e11c770..dd62a79e2 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -81,7 +81,8 @@ func DeleteCheckpoints(dir string, maxIndex int) error { return errs.Err() } -const checkpointPrefix = "checkpoint." +// CheckpointPrefix is the prefix used for checkpoint files. +const CheckpointPrefix = "checkpoint." // Checkpoint creates a compacted checkpoint of segments in range [from, to] in the given WAL. // It includes the most recent checkpoint if it exists. @@ -363,7 +364,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He } func checkpointDir(dir string, i int) string { - return filepath.Join(dir, fmt.Sprintf(checkpointPrefix+"%08d", i)) + return filepath.Join(dir, fmt.Sprintf(CheckpointPrefix+"%08d", i)) } type checkpointRef struct { @@ -379,13 +380,13 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) { for i := 0; i < len(files); i++ { fi := files[i] - if !strings.HasPrefix(fi.Name(), checkpointPrefix) { + if !strings.HasPrefix(fi.Name(), CheckpointPrefix) { continue } if !fi.IsDir() { return nil, fmt.Errorf("checkpoint %s is not a directory", fi.Name()) } - idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + idx, err := strconv.Atoi(fi.Name()[len(CheckpointPrefix):]) if err != nil { continue } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 10c2ba2e0..caba3900f 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -258,7 +258,7 @@ func NewAPI( rwEnabled bool, acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg, otlpEnabled bool, - enableCTZeroIngestion bool, + ctZeroIngestionEnabled bool, validIntervalCTZeroIngestion time.Duration, ) *API { a := &API{ @@ -303,10 +303,10 @@ func NewAPI( } if rwEnabled { - a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs) + a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled) } if otlpEnabled { - a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc, enableCTZeroIngestion, validIntervalCTZeroIngestion) + a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc, ctZeroIngestionEnabled, validIntervalCTZeroIngestion) } return a @@ -1085,12 +1085,12 @@ func (api *API) targets(r *http.Request) apiFuncResult { showActive := state == "" || state == "any" || state == "active" showDropped := state == "" || state == "any" || state == "dropped" res := &TargetDiscovery{} + builder := labels.NewBuilder(labels.EmptyLabels()) if showActive { targetsActive := api.targetRetriever(r.Context()).TargetsActive() activeKeys, numTargets := sortKeys(targetsActive) res.ActiveTargets = make([]*Target, 0, numTargets) - builder := labels.NewScratchBuilder(0) for _, key := range activeKeys { if scrapePool != "" && key != scrapePool { @@ -1106,8 +1106,8 @@ func (api *API) targets(r *http.Request) apiFuncResult { globalURL, err := getGlobalURL(target.URL(), api.globalURLOptions) res.ActiveTargets = append(res.ActiveTargets, &Target{ - DiscoveredLabels: target.DiscoveredLabels(), - Labels: target.Labels(&builder), + DiscoveredLabels: target.DiscoveredLabels(builder), + Labels: target.Labels(builder), ScrapePool: key, ScrapeURL: target.URL().String(), GlobalURL: globalURL.String(), @@ -1145,7 +1145,7 @@ func (api *API) targets(r *http.Request) apiFuncResult { } for _, target := range targetsDropped[key] { res.DroppedTargets = append(res.DroppedTargets, &DroppedTarget{ - DiscoveredLabels: target.DiscoveredLabels(), + DiscoveredLabels: target.DiscoveredLabels(builder), }) } } @@ -1183,7 +1183,7 @@ func (api *API) targetMetadata(r *http.Request) apiFuncResult { } } - builder := labels.NewScratchBuilder(0) + builder := labels.NewBuilder(labels.EmptyLabels()) metric := r.FormValue("metric") res := []metricMetadata{} for _, tt := range api.targetRetriever(r.Context()).TargetsActive() { @@ -1191,7 +1191,7 @@ func (api *API) targetMetadata(r *http.Request) apiFuncResult { if limit >= 0 && len(res) >= limit { break } - targetLabels := t.Labels(&builder) + targetLabels := t.Labels(builder) // Filter targets that don't satisfy the label matchers. if matchTarget != "" && !matchLabels(targetLabels, matchers) { continue diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 0168bc57e..175ed2e0f 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -103,12 +103,12 @@ type testTargetRetriever struct { } type testTargetParams struct { - Identifier string - Labels labels.Labels - DiscoveredLabels labels.Labels - Params url.Values - Reports []*testReport - Active bool + Identifier string + Labels labels.Labels + targetLabels model.LabelSet + Params url.Values + Reports []*testReport + Active bool } type testReport struct { @@ -124,7 +124,7 @@ func newTestTargetRetriever(targetsInfo []*testTargetParams) *testTargetRetrieve droppedTargets = make(map[string][]*scrape.Target) for _, t := range targetsInfo { - nt := scrape.NewTarget(t.Labels, t.DiscoveredLabels, t.Params) + nt := scrape.NewTarget(t.Labels, &config.ScrapeConfig{Params: t.Params}, t.targetLabels, nil) for _, r := range t.Reports { nt.Report(r.Start, r.Duration, r.Error) @@ -1004,10 +1004,9 @@ func setupTestTargetRetriever(t *testing.T) *testTargetRetriever { model.ScrapeIntervalLabel: "15s", model.ScrapeTimeoutLabel: "5s", }), - DiscoveredLabels: labels.EmptyLabels(), - Params: url.Values{}, - Reports: []*testReport{{scrapeStart, 70 * time.Millisecond, nil}}, - Active: true, + Params: url.Values{}, + Reports: []*testReport{{scrapeStart, 70 * time.Millisecond, nil}}, + Active: true, }, { Identifier: "blackbox", @@ -1019,22 +1018,21 @@ func setupTestTargetRetriever(t *testing.T) *testTargetRetriever { model.ScrapeIntervalLabel: "20s", model.ScrapeTimeoutLabel: "10s", }), - DiscoveredLabels: labels.EmptyLabels(), - Params: url.Values{"target": []string{"example.com"}}, - Reports: []*testReport{{scrapeStart, 100 * time.Millisecond, errors.New("failed")}}, - Active: true, + Params: url.Values{"target": []string{"example.com"}}, + Reports: []*testReport{{scrapeStart, 100 * time.Millisecond, errors.New("failed")}}, + Active: true, }, { Identifier: "blackbox", Labels: labels.EmptyLabels(), - DiscoveredLabels: labels.FromMap(map[string]string{ + targetLabels: model.LabelSet{ model.SchemeLabel: "http", model.AddressLabel: "http://dropped.example.com:9115", model.MetricsPathLabel: "/probe", model.JobLabel: "blackbox", model.ScrapeIntervalLabel: "30s", model.ScrapeTimeoutLabel: "15s", - }), + }, Params: url.Values{}, Active: false, }, @@ -1507,7 +1505,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E response: &TargetDiscovery{ ActiveTargets: []*Target{ { - DiscoveredLabels: labels.FromStrings(), + DiscoveredLabels: labels.FromStrings("__param_target", "example.com", "__scrape_interval__", "0s", "__scrape_timeout__", "0s"), Labels: labels.FromStrings("job", "blackbox"), ScrapePool: "blackbox", ScrapeURL: "http://localhost:9115/probe?target=example.com", @@ -1520,7 +1518,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E ScrapeTimeout: "10s", }, { - DiscoveredLabels: labels.FromStrings(), + DiscoveredLabels: labels.FromStrings("__scrape_interval__", "0s", "__scrape_timeout__", "0s"), Labels: labels.FromStrings("job", "test"), ScrapePool: "test", ScrapeURL: "http://example.com:8080/metrics", @@ -1556,7 +1554,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E response: &TargetDiscovery{ ActiveTargets: []*Target{ { - DiscoveredLabels: labels.FromStrings(), + DiscoveredLabels: labels.FromStrings("__param_target", "example.com", "__scrape_interval__", "0s", "__scrape_timeout__", "0s"), Labels: labels.FromStrings("job", "blackbox"), ScrapePool: "blackbox", ScrapeURL: "http://localhost:9115/probe?target=example.com", @@ -1569,7 +1567,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E ScrapeTimeout: "10s", }, { - DiscoveredLabels: labels.FromStrings(), + DiscoveredLabels: labels.FromStrings("__scrape_interval__", "0s", "__scrape_timeout__", "0s"), Labels: labels.FromStrings("job", "test"), ScrapePool: "test", ScrapeURL: "http://example.com:8080/metrics", @@ -1605,7 +1603,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E response: &TargetDiscovery{ ActiveTargets: []*Target{ { - DiscoveredLabels: labels.FromStrings(), + DiscoveredLabels: labels.FromStrings("__param_target", "example.com", "__scrape_interval__", "0s", "__scrape_timeout__", "0s"), Labels: labels.FromStrings("job", "blackbox"), ScrapePool: "blackbox", ScrapeURL: "http://localhost:9115/probe?target=example.com", @@ -1618,7 +1616,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E ScrapeTimeout: "10s", }, { - DiscoveredLabels: labels.FromStrings(), + DiscoveredLabels: labels.FromStrings("__scrape_interval__", "0s", "__scrape_timeout__", "0s"), Labels: labels.FromStrings("job", "test"), ScrapePool: "test", ScrapeURL: "http://example.com:8080/metrics", diff --git a/web/web.go b/web/web.go index f52aa6933..f6f1c9569 100644 --- a/web/web.go +++ b/web/web.go @@ -290,6 +290,7 @@ type Options struct { EnableRemoteWriteReceiver bool EnableOTLPWriteReceiver bool IsAgent bool + CTZeroIngestionEnabled bool AppName string AcceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg @@ -298,7 +299,6 @@ type Options struct { Registerer prometheus.Registerer // Our Grafana Cloud additions. Leaving them separately for the updates with upstream. - EnableCreatedTimestampZeroIngestion bool ValidIntervalCreatedTimestampZeroIngestion time.Duration } @@ -390,7 +390,7 @@ func New(logger *slog.Logger, o *Options) *Handler { o.EnableRemoteWriteReceiver, o.AcceptRemoteWriteProtoMsgs, o.EnableOTLPWriteReceiver, - o.EnableCreatedTimestampZeroIngestion, + o.CTZeroIngestionEnabled, o.ValidIntervalCreatedTimestampZeroIngestion, )