Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support remote write v2 by converting request #6330

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ jobs:
- integration_querier
- integration_ruler
- integration_query_fuzz
- integration_remote_write_v2
steps:
- name: Upgrade golang
uses: actions/setup-go@f111f3307d8850f501ac008e886eec1fd1932a34 # v5.3.0
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ run:
- integration_querier
- integration_ruler
- integration_query_fuzz
- integration_remote_write_v2
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
* [FEATURE] Support Prometheus remote write 2.0. #6330
* [ENHANCEMENT] Querier: Apply bytes limiter to LabelNames and LabelValuesForLabelNames. #6568
* [ENHANCEMENT] Query Frontend: Add a `too_many_tenants` reason label value to `cortex_rejected_queries_total` metric to track the rejected query count due to the # of tenant limits. #6569
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2669,6 +2669,11 @@ ha_tracker:
# CLI flag: -distributor.sign-write-requests
[sign_write_requests: <boolean> | default = false]

# EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push
# request.
# CLI flag: -distributor.remote-write2-enabled
[remote_write2_enabled: <boolean> | default = false]

ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Currently experimental features are:
- Distributor:
- Do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`)
- Accept multiple HA pairs in the same request (enabled via `-experimental.distributor.ha-tracker.mixed-ha-samples=true`)
- Accept Prometheus remote write 2.0 request (`-distributor.remote-write2-enabled=true`)
- Tenant Deletion in Purger, for blocks storage.
- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`)
- Blocks storage bucket index
Expand Down
76 changes: 76 additions & 0 deletions integration/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
Expand Down Expand Up @@ -334,3 +335,78 @@ func CreateBlock(

return id, nil
}

func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
tsMillis := TimeToMilliseconds(ts)

st := writev2.NewSymbolTable()

lbs := labels.Labels{labels.Label{Name: "__name__", Value: name}}
for _, lbl := range additionalLabels {
lbs = append(lbs, labels.Label{Name: lbl.Name, Value: lbl.Value})
}

var (
h *histogram.Histogram
fh *histogram.FloatHistogram
ph writev2.Histogram
)
if floatHistogram {
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
ph = writev2.FromFloatHistogram(tsMillis, fh)
} else {
h = tsdbutil.GenerateTestHistogram(int(i))
ph = writev2.FromIntHistogram(tsMillis, h)
}

// Generate the series
series = append(series, writev2.TimeSeries{
LabelsRefs: st.SymbolizeLabels(lbs, nil),
Histograms: []writev2.Histogram{ph},
})

symbols = st.Symbols()

return
}

func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
tsMillis := TimeToMilliseconds(ts)
value := rand.Float64()

st := writev2.NewSymbolTable()
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}

for _, label := range additionalLabels {
lbs = append(lbs, labels.Label{
Name: label.Name,
Value: label.Value,
})
}
series = append(series, writev2.TimeSeries{
// Generate the series
LabelsRefs: st.SymbolizeLabels(lbs, nil),
Samples: []writev2.Sample{
{Value: value, Timestamp: tsMillis},
},
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
},
})
symbols = st.Symbols()

// Generate the expected vector when querying it
metric := model.Metric{}
metric[labels.MetricName] = model.LabelValue(name)
for _, lbl := range additionalLabels {
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

vector = append(vector, &model.Sample{
Metric: metric,
Value: model.SampleValue(value),
Timestamp: model.Time(tsMillis),
})

return
}
40 changes: 40 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
yaml "gopkg.in/yaml.v3"
Expand Down Expand Up @@ -147,6 +148,39 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
return res, nil
}

// PushV2 the input timeseries to the remote endpoint
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
// Create write request
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
if err != nil {
return nil, err
}

// Create HTTP request
compressed := snappy.Encode(nil, data)
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
if err != nil {
return nil, err
}

req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()
return res, nil
}

func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
var metricName string
attributes := make(map[string]any)
Expand Down Expand Up @@ -356,6 +390,12 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
return value, err
}

// Metadata runs a metadata query
func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) {
metadata, err := c.querierClient.Metadata(context.Background(), name, limit)
return metadata, err
}

// QueryExemplars runs an exemplars query
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
Expand Down
Loading
Loading