Skip to content

Commit

Permalink
feat(xPrometheus): added header (#470)
Browse files Browse the repository at this point in the history
  • Loading branch information
paologallinaharbur authored Feb 21, 2024
1 parent 3027d97 commit 4f98d24
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 10 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ Unreleased section should follow [Release Toolkit](https://github.com/newrelic/r

## Unreleased

### Enhancement
- Added XPrometheusHeader to allow exporters to give up complex computation when needed.

## v2.20.1 - 2024-01-03

### ⛓️ Dependencies
Expand Down Expand Up @@ -105,7 +108,7 @@ In particular, it was not possible having `lowDataMode=true` to filter out every

## 2.16.3
## Changed
- Several dependencies updated
- Several dependencies updated
- The `use_bearer` config is now exposed the config for static targets by @paologallinaharbur in https://github.com/newrelic/nri-prometheus/pull/327

## 2.16.2
Expand Down
1 change: 1 addition & 0 deletions configs/nri-prometheus-config.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ integrations:
audit: false

# The HTTP client timeout when fetching data from endpoints. Defaults to "5s" if it is not set.
# This timeout in seconds is passed as well as a X-Prometheus-Scrape-Timeout-Seconds header to the exporters
# scrape_timeout: "5s"

# Length in time to distribute the scraping from the endpoints. Default to "30s" if it is not set.
Expand Down
6 changes: 4 additions & 2 deletions internal/integration/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -155,7 +156,7 @@ type prometheusFetcher struct {
httpClient prometheus.HTTPDoer
bearerClient prometheus.HTTPDoer
// Provides IoC for better testability. Its usual value is 'prometheus.Get'.
getMetrics func(httpClient prometheus.HTTPDoer, url string, acceptHeader string) (prometheus.MetricFamiliesByName, error)
getMetrics func(httpClient prometheus.HTTPDoer, url string, acceptHeader string, fetchTimeout string) (prometheus.MetricFamiliesByName, error)
log *logrus.Entry
}

Expand Down Expand Up @@ -253,7 +254,8 @@ func (pf *prometheusFetcher) fetch(t endpoints.Target) (prometheus.MetricFamilie
httpClient = pf.bearerClient
}

mfs, err := pf.getMetrics(httpClient, t.URL.String(), pf.acceptHeader)
ft := strconv.FormatFloat(pf.fetchTimeout.Seconds(), 'f', -1, 64)
mfs, err := pf.getMetrics(httpClient, t.URL.String(), pf.acceptHeader, ft)
timer.ObserveDuration()
if err != nil {
pf.log.WithError(err).Warnf("fetching Prometheus metrics: %s (%s)", t.URL.String(), t.Object.Name)
Expand Down
50 changes: 47 additions & 3 deletions internal/integration/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package integration

import (
"bytes"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync/atomic"
Expand All @@ -24,17 +27,58 @@ import (
const (
fetchDuration = 1 * time.Millisecond
fetchTimeout = time.Second * 5
testAccept = "this-is-a-test"
workerThreads = 4
queueLength = 100
)

type mockClient struct {
recordedHeader http.Header
}

func (m *mockClient) Do(req *http.Request) (*http.Response, error) {
m.recordedHeader = req.Header
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(""))),
}, nil
}

func TestXHeader(t *testing.T) {
mClient := mockClient{}
fetcher := NewFetcher(fetchDuration, fetchTimeout, "this-is-a-test", workerThreads, "", "", true, queueLength)
fetcher.(*prometheusFetcher).httpClient = &mClient

pairsCh := fetcher.Fetch([]endpoints.Target{
{
URL: url.URL{Scheme: "http", Path: "hello/metrics"},
},
})

select {
case <-pairsCh:
case <-time.After(fetchTimeout):
t.Fatal("can't fetch data")
}

accept := mClient.recordedHeader.Get(prometheus.AcceptHeader)
if accept != testAccept {
t.Errorf("Expected Accept header %q, got %q", testAccept, accept)
}

xPrometheus := mClient.recordedHeader.Get(prometheus.XPrometheusScrapeTimeoutHeader)
if xPrometheus != "5" {
t.Errorf("Expected xPrometheus header %q, got %q", "5", xPrometheus)
}
}

func TestFetcher(t *testing.T) {
t.Parallel()

// Given a fetcher
fetcher := NewFetcher(fetchDuration, fetchTimeout, "", workerThreads, "", "", true, queueLength)
var invokedURL string
fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string) (names prometheus.MetricFamiliesByName, e error) {
fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string, _ string) (names prometheus.MetricFamiliesByName, e error) {
invokedURL = url
return prometheus.MetricFamiliesByName{
"some-name": dto.MetricFamily{},
Expand Down Expand Up @@ -73,7 +117,7 @@ func TestFetcher_Error(t *testing.T) {

// That fails retrieving data from one of the metrics endpoint
invokedURLs := make([]string, 0)
fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string) (names prometheus.MetricFamiliesByName, e error) {
fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string, _ string) (names prometheus.MetricFamiliesByName, e error) {
if strings.Contains(url, "fail") {
return nil, errors.New("catapun")
}
Expand Down Expand Up @@ -125,7 +169,7 @@ func TestFetcher_ConcurrencyLimit(t *testing.T) {
// Given a Fetcher
fetcher := NewFetcher(time.Millisecond, fetchTimeout, "", workerThreads, "", "", true, queueLength)

fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string) (names prometheus.MetricFamiliesByName, e error) {
fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string, _ string) (names prometheus.MetricFamiliesByName, e error) {
defer atomic.AddInt32(&parallelTasks, -1)
atomic.AddInt32(&parallelTasks, 1)
reportedParallel <- atomic.LoadInt32(&parallelTasks)
Expand Down
12 changes: 10 additions & 2 deletions internal/pkg/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,23 @@ func ResetTargetSize() {
targetSize.Reset()
}

const (
// XPrometheusScrapeTimeoutHeader included in all requests. It informs exporters about its timeout.
XPrometheusScrapeTimeoutHeader = "X-Prometheus-Scrape-Timeout-Seconds"
// AcceptHeader included in all requests
AcceptHeader = "Accept"
)

// Get scrapes the given URL and decodes the retrieved payload.
func Get(client HTTPDoer, url string, acceptHeader string) (MetricFamiliesByName, error) {
func Get(client HTTPDoer, url string, acceptHeader string, fetchTimeout string) (MetricFamiliesByName, error) {
mfs := MetricFamiliesByName{}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return mfs, err
}

req.Header.Add("Accept", acceptHeader)
req.Header.Add(AcceptHeader, acceptHeader)
req.Header.Add(XPrometheusScrapeTimeoutHeader, fetchTimeout)

resp, err := client.Do(req)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions internal/pkg/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,25 @@ import (
const testHeader = "application/openmetrics-text"

func TestGetHeader(t *testing.T) {
fetchTimeout := "15"

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
accept := r.Header.Get("Accept")
accept := r.Header.Get(prometheus.AcceptHeader)
if accept != testHeader {
t.Errorf("Expected Accept header %s, got %q", testHeader, accept)
}

xPrometheus := r.Header.Get(prometheus.XPrometheusScrapeTimeoutHeader)
if xPrometheus != fetchTimeout {
t.Errorf("Expected xPrometheus header %s, got %q", xPrometheus, fetchTimeout)
}

_, _ = w.Write([]byte("metric_a 1\nmetric_b 2\n"))
}))
defer ts.Close()

expected := []string{"metric_a", "metric_b"}
mfs, err := prometheus.Get(http.DefaultClient, ts.URL, testHeader)
mfs, err := prometheus.Get(http.DefaultClient, ts.URL, testHeader, fetchTimeout)
actual := []string{}
for k := range mfs {
actual = append(actual, k)
Expand Down

0 comments on commit 4f98d24

Please sign in to comment.