Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove remote read proto definitions from Mimir #8424

Merged
merged 10 commits into from
Jun 20, 2024
9 changes: 9 additions & 0 deletions development/mimir-read-write-mode/config/prometheus.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Configure Prometheus to read from Mimir, so that we can test Mimir remote read endpoint
# sending queries from Prometheus.
remote_read:
- name: Mimir
url: http://mimir-read-1:8080/prometheus/api/v1/read
remote_timeout: 10s
read_recent: true
headers:
X-Scope-OrgID: anonymous
16 changes: 16 additions & 0 deletions development/mimir-read-write-mode/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ std.manifestYamlDoc({
self.grafana +
self.grafana_agent +
self.memcached +
self.prometheus +
{},

write:: {
Expand Down Expand Up @@ -119,6 +120,21 @@ std.manifestYamlDoc({
},
},

prometheus:: {
prometheus: {
image: 'prom/prometheus:v2.53.0',
command: [
'--config.file=/etc/prometheus/prometheus.yaml',
'--enable-feature=exemplar-storage',
'--enable-feature=native-histograms',
],
volumes: [
'./config:/etc/prometheus',
],
ports: ['9090:9090'],
},
},

// This function builds docker-compose declaration for Mimir service.
local mimirService(serviceOptions) = {
local defaultOptions = {
Expand Down
10 changes: 10 additions & 0 deletions development/mimir-read-write-mode/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,14 @@
- "8080:8080"
"volumes":
- "../common/config:/etc/nginx/templates"
"prometheus":
"command":
- "--config.file=/etc/prometheus/prometheus.yaml"
- "--enable-feature=exemplar-storage"
- "--enable-feature=native-histograms"
"image": "prom/prometheus:v2.53.0"
"ports":
- "9090:9090"
"volumes":
- "./config:/etc/prometheus"
"version": "3.4"
83 changes: 62 additions & 21 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package integration
import (
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -92,8 +93,8 @@ func runBackwardCompatibilityTest(t *testing.T, previousImage string, oldFlagsMa
// Push some series to Mimir.
series1Timestamp := time.Now()
series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
series1, expectedVector1, _ := generateFloatSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"})
series2, expectedVector2, _ := generateFloatSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"})
series1, expectedVector1, _ := generateFloatSeries("series_1", series1Timestamp, prompb.Label{Name: "label_1", Value: "label_1"})
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: this change is not a fix. It just makes it easier to look at the diff in case the assertion fails, otherwise it could be misleading seeing series_1 both as metric name, and as additional key-value label pair.

series2, expectedVector2, _ := generateFloatSeries("series_2", series2Timestamp, prompb.Label{Name: "label_2", Value: "label_2"})

c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)
Expand All @@ -114,7 +115,7 @@ func runBackwardCompatibilityTest(t *testing.T, previousImage string, oldFlagsMa
// Push another series to further compact another block and delete the first block
// due to expired retention.
series3Timestamp := series2Timestamp.Add(blockRangePeriod * 2)
series3, expectedVector3, _ := generateFloatSeries("series_3", series3Timestamp, prompb.Label{Name: "series_3", Value: "series_3"})
series3, expectedVector3, _ := generateFloatSeries("series_3", series3Timestamp, prompb.Label{Name: "label_3", Value: "label_3"})

res, err = c.Push(series3)
require.NoError(t, err)
Expand All @@ -139,19 +140,41 @@ func runBackwardCompatibilityTest(t *testing.T, previousImage string, oldFlagsMa
compactor := e2emimir.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(compactor))

checkQueries(t, consul, previousImage, flags, oldFlagsMapper, s, 1, instantQueryTest{
expr: "series_1",
time: series1Timestamp,
expectedVector: expectedVector1,
}, instantQueryTest{
expr: "series_2",
time: series2Timestamp,
expectedVector: expectedVector2,
}, instantQueryTest{
expr: "series_3",
time: series3Timestamp,
expectedVector: expectedVector3,
})
checkQueries(t, consul, previousImage, flags, oldFlagsMapper, s, 1,
[]instantQueryTest{
{
expr: "series_1",
time: series1Timestamp,
expectedVector: expectedVector1,
}, {
expr: "series_2",
time: series2Timestamp,
expectedVector: expectedVector2,
}, {
expr: "series_3",
time: series3Timestamp,
expectedVector: expectedVector3,
},
},
[]remoteReadRequestTest{
{
metricName: "series_1",
startTime: series1Timestamp.Add(-time.Minute),
endTime: series1Timestamp.Add(time.Minute),
expectedTimeseries: vectorToPrompbTimeseries(expectedVector1),
}, {
metricName: "series_2",
startTime: series2Timestamp.Add(-time.Minute),
endTime: series2Timestamp.Add(time.Minute),
expectedTimeseries: vectorToPrompbTimeseries(expectedVector2),
}, {
metricName: "series_3",
startTime: series3Timestamp.Add(-time.Minute),
endTime: series3Timestamp.Add(time.Minute),
expectedTimeseries: vectorToPrompbTimeseries(expectedVector3),
},
},
)
}

// Check for issues like https://github.com/cortexproject/cortex/issues/2356
Expand Down Expand Up @@ -195,11 +218,11 @@ func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previo
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

checkQueries(t, consul, previousImage, flags, oldFlagsMapper, s, 3, instantQueryTest{
checkQueries(t, consul, previousImage, flags, oldFlagsMapper, s, 3, []instantQueryTest{{
time: now,
expr: "series_1",
expectedVector: expectedVector,
})
}}, nil)
}

func checkQueries(
Expand All @@ -210,7 +233,8 @@ func checkQueries(
oldFlagsMapper e2emimir.FlagMapper,
s *e2e.Scenario,
numIngesters int,
instantQueries ...instantQueryTest,
instantQueries []instantQueryTest,
remoteReadRequests []remoteReadRequestTest,
) {
cases := map[string]struct {
queryFrontendOptions []e2emimir.Option
Expand Down Expand Up @@ -272,11 +296,21 @@ func checkQueries(
require.NoError(t, err)

for _, query := range instantQueries {
t.Run(fmt.Sprintf("%s: %s", endpoint, query.expr), func(t *testing.T) {
t.Run(fmt.Sprintf("%s: instant query: %s", endpoint, query.expr), func(t *testing.T) {
result, err := c.Query(query.expr, query.time)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, query.expectedVector, result.(model.Vector))
require.Equal(t, query.expectedVector, result.(model.Vector))
})
}

for _, req := range remoteReadRequests {
t.Run(fmt.Sprintf("%s: remote read: %s", endpoint, req.metricName), func(t *testing.T) {
httpRes, result, _, err := c.RemoteRead(req.metricName, req.startTime, req.endTime)
require.NoError(t, err)
require.Equal(t, http.StatusOK, httpRes.StatusCode)
require.NotNil(t, result)
require.Equal(t, req.expectedTimeseries, result.Timeseries)
})
}
}
Expand All @@ -290,6 +324,13 @@ type instantQueryTest struct {
expectedVector model.Vector
}

type remoteReadRequestTest struct {
metricName string
startTime time.Time
endTime time.Time
expectedTimeseries []*prompb.TimeSeries
}

type testingLogger interface{ Logf(string, ...interface{}) }

func previousImageVersionOverrides(t *testing.T) map[string]e2emimir.FlagMapper {
Expand Down
16 changes: 3 additions & 13 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ func testMetadataQueriesWithBlocksStorage(
require.NoError(t, err)
if st.ok {
require.Equal(t, 1, len(seriesRes), st)
require.Equal(t, model.LabelSet(prompbLabelsToModelMetric(st.resp)), seriesRes[0], st)
require.Equal(t, model.LabelSet(prompbLabelsToMetric(st.resp)), seriesRes[0], st)
} else {
require.Equal(t, 0, len(seriesRes), st)
}
Expand Down Expand Up @@ -1025,7 +1025,7 @@ func TestHashCollisionHandling(t *testing.T) {
},
})
expectedVector = append(expectedVector, &model.Sample{
Metric: prompbLabelsToModelMetric(metric1),
Metric: prompbLabelsToMetric(metric1),
Value: model.SampleValue(float64(0)),
Timestamp: model.Time(tsMillis),
})
Expand All @@ -1036,7 +1036,7 @@ func TestHashCollisionHandling(t *testing.T) {
},
})
expectedVector = append(expectedVector, &model.Sample{
Metric: prompbLabelsToModelMetric(metric2),
Metric: prompbLabelsToMetric(metric2),
Value: model.SampleValue(float64(1)),
Timestamp: model.Time(tsMillis),
})
Expand Down Expand Up @@ -1070,13 +1070,3 @@ func getMetricName(lbls []prompb.Label) string {

panic(fmt.Sprintf("series %v has no metric name", lbls))
}

func prompbLabelsToModelMetric(pbLabels []prompb.Label) model.Metric {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: moved to integration/util.go, close to the new utilities.

metric := model.Metric{}

for _, l := range pbLabels {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}

return metric
}
53 changes: 53 additions & 0 deletions integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"os"
"os/exec"
"path/filepath"
"slices"
"strings"
"time"

"github.com/grafana/e2e"
Expand Down Expand Up @@ -258,3 +260,54 @@ func GenerateNHistogramSeries(nSeries, nExemplars int, name func() string, ts ti
}
return
}

func prompbLabelsToMetric(pbLabels []prompb.Label) model.Metric {
metric := make(model.Metric, len(pbLabels))

for _, l := range pbLabels {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}

return metric
}

func metricToPrompbLabels(metric model.Metric) []prompb.Label {
lbls := make([]prompb.Label, 0, len(metric))

for name, value := range metric {
lbls = append(lbls, prompb.Label{
Name: string(name),
Value: string(value),
})
}

// Sort labels because they're expected to be sorted by contract.
slices.SortFunc(lbls, func(a, b prompb.Label) int {
cmp := strings.Compare(a.Name, b.Name)
if cmp != 0 {
return cmp
}

return strings.Compare(a.Value, b.Value)
})

return lbls
}

func vectorToPrompbTimeseries(vector model.Vector) []*prompb.TimeSeries {
res := make([]*prompb.TimeSeries, 0, len(vector))

for _, sample := range vector {
res = append(res, &prompb.TimeSeries{
Labels: metricToPrompbLabels(sample.Metric),
Samples: []prompb.Sample{
{
Value: float64(sample.Value),
Timestamp: int64(sample.Timestamp),
},
},
})
}

return res
}
38 changes: 0 additions & 38 deletions pkg/ingester/client/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -88,43 +87,6 @@ func TestLabelNamesRequest(t *testing.T) {
assert.Equal(t, matchers, actualMatchers)
}

// This test checks that we ignore remote read hints when unmarshalling a
// remote read request. This is because the query frontend does the same
// and we want them to be in sync.
func TestRemoteReadRequestIgnoresHints(t *testing.T) {
promRemoteReadRequest := &prompb.ReadRequest{
Queries: []*prompb.Query{
{
Hints: &prompb.ReadHints{
StartMs: 1,
EndMs: 2,
StepMs: 3,
},
},
},
}
data, err := promRemoteReadRequest.Marshal()
require.NoError(t, err)

remoteReadRequest := &ReadRequest{}
err = remoteReadRequest.Unmarshal(data)
require.NoError(t, err)

data2, err := remoteReadRequest.Marshal()
require.NoError(t, err)

restored := &prompb.ReadRequest{}
err = restored.Unmarshal(data2)
require.NoError(t, err)

require.Equal(t, len(promRemoteReadRequest.Queries), len(restored.Queries))
for i := range promRemoteReadRequest.Queries {
require.Nil(t, restored.Queries[i].Hints)
promRemoteReadRequest.Queries[i].Hints = nil
require.Equal(t, promRemoteReadRequest.Queries[i], restored.Queries[i])
}
}

// The main usecase for `LabelsToKeyString` is to generate hashKeys
// for maps. We are benchmarking that here.
func BenchmarkSeriesMap(b *testing.B) {
Expand Down
Loading
Loading