diff --git a/CHANGELOG.md b/CHANGELOG.md index b2622a3b73d..85cde8a4882 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [CHANGE] Store-gateway: enabled attributes in-memory cache by default. New default configuration is `-blocks-storage.bucket-store.chunks-cache.attributes-in-memory-max-items=50000`. #1727 * [CHANGE] Compactor: Removed the metric `cortex_compactor_garbage_collected_blocks_total` since it duplicates `cortex_compactor_blocks_marked_for_deletion_total`. #1728 * [CHANGE] All: Logs that used the`org_id` label now use `user` label. #1634 #1758 +* [FEATURE] Querier: Added support for [streaming remote read](https://prometheus.io/blog/2019/10/10/remote-read-meets-streaming/). Should be noted that benefits of chunking the response are partial here, since in a typical `query-frontend` setup responses will be buffered until they've been completed. #1735 * [FEATURE] Ruler: Allow setting `evaluation_delay` for each rule group via rules group configuration file. #1474 * [FEATURE] Ruler: Added support for expression remote evaluation. #1536 * The following CLI flags (and their respective YAML config options) have been added: diff --git a/integration/querier_remote_read_test.go b/integration/querier_remote_read_test.go index 4486d50230b..467e1f13567 100644 --- a/integration/querier_remote_read_test.go +++ b/integration/querier_remote_read_test.go @@ -10,7 +10,9 @@ package integration import ( "bytes" "context" + "io" "io/ioutil" + "math/rand" "net/http" "testing" "time" @@ -82,7 +84,7 @@ func TestQuerierRemoteRead(t *testing.T) { req := &prompb.ReadRequest{ Queries: []*prompb.Query{q}, - AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS}, + AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES}, } data, err := proto.Marshal(req) @@ -125,3 +127,119 @@ func TestQuerierRemoteRead(t *testing.T) { require.Equal(t, int64(expectedVectors[0].Timestamp), resp.Results[0].Timeseries[0].Samples[0].Timestamp) require.Equal(t, float64(expectedVectors[0].Value), resp.Results[0].Timeseries[0].Samples[0].Value) } + +func TestQuerierStreamingRemoteRead(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-distributor.ingestion-rate-limit": "1048576", + "-distributor.ingestion-burst-size": "1048576", + }) + + // Start dependencies. + minio := e2edb.NewMinio(9000, blocksBucketName) + + consul := e2edb.NewConsul() + require.NoError(t, s.StartAndWaitReady(minio, consul)) + + // Start Mimir components for the write path. + distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags) + ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags) + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) + + // Wait until the distributor has updated the ring. + // The distributor should have 512 tokens for the ingester ring and 1 for the distributor ring + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total")) + + querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), BlocksStorageFlags()) + require.NoError(t, s.StartAndWaitReady(querier)) + + // Wait until the querier has updated the ring. + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Push a series to Mimir. + now := time.Now() + + c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") + require.NoError(t, err) + + // Generate the series + startMs := now.Add(-time.Minute).Unix() * 1000 + endMs := now.Add(time.Minute).Unix() * 1000 + + var samples []prompb.Sample + for i := startMs; i < endMs; i++ { + samples = append(samples, prompb.Sample{ + Value: rand.Float64(), + Timestamp: i, + }) + } + + var series []prompb.TimeSeries + series = append(series, prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: labels.MetricName, Value: "series_1"}, + }, + Samples: samples, + }) + + res, err := c.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "series_1") + require.NoError(t, err) + + q, err := remote.ToQuery(startMs, endMs, []*labels.Matcher{matcher}, &storage.SelectHints{ + Step: 1, + Start: startMs, + End: endMs, + }) + require.NoError(t, err) + + req := &prompb.ReadRequest{ + Queries: []*prompb.Query{q}, + AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS}, + } + + data, err := proto.Marshal(req) + require.NoError(t, err) + compressed := snappy.Encode(nil, data) + + // Call the remote read API endpoint with a timeout. + httpReqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + httpReq, err := http.NewRequestWithContext(httpReqCtx, "POST", "http://"+querier.HTTPEndpoint()+"/prometheus/api/v1/read", bytes.NewReader(compressed)) + require.NoError(t, err) + httpReq.Header.Set("X-Scope-OrgID", "user-1") + httpReq.Header.Set("User-Agent", "Prometheus/1.8.2") + httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") + + httpResp, err := http.DefaultClient.Do(httpReq) + require.NoError(t, err) + require.Equal(t, http.StatusOK, httpResp.StatusCode) + + // Fetch streaming response + stream := remote.NewChunkedReader(httpResp.Body, remote.DefaultChunkedReadLimit, nil) + + results := []prompb.ChunkedReadResponse{} + for { + var res prompb.ChunkedReadResponse + err := stream.NextProto(&res) + if err == io.EOF { + break + } + require.NoError(t, err) + results = append(results, res) + } + + // Validate the returned remote read data + require.Len(t, results, 1) + require.Len(t, results[0].ChunkedSeries, 1) + require.Len(t, results[0].ChunkedSeries[0].Labels, 1) + require.Equal(t, "series_1", results[0].ChunkedSeries[0].Labels[0].GetValue()) + require.True(t, len(results[0].ChunkedSeries[0].Chunks) > 0) +} diff --git a/pkg/ingester/client/ingester.pb.go b/pkg/ingester/client/ingester.pb.go index 1f3503005b2..f4b0f7dc2c1 100644 --- a/pkg/ingester/client/ingester.pb.go +++ b/pkg/ingester/client/ingester.pb.go @@ -64,6 +64,48 @@ func (MatchType) EnumDescriptor() ([]byte, []int) { return fileDescriptor_60f6df4f3586b478, []int{0} } +type ReadRequest_ResponseType int32 + +const ( + SAMPLES ReadRequest_ResponseType = 0 + STREAMED_XOR_CHUNKS ReadRequest_ResponseType = 1 +) + +var ReadRequest_ResponseType_name = map[int32]string{ + 0: "SAMPLES", + 1: "STREAMED_XOR_CHUNKS", +} + +var ReadRequest_ResponseType_value = map[string]int32{ + "SAMPLES": 0, + "STREAMED_XOR_CHUNKS": 1, +} + +func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_60f6df4f3586b478, []int{6, 0} +} + +type StreamChunk_Encoding int32 + +const ( + UNKNOWN StreamChunk_Encoding = 0 + XOR StreamChunk_Encoding = 1 +) + +var StreamChunk_Encoding_name = map[int32]string{ + 0: "UNKNOWN", + 1: "XOR", +} + +var StreamChunk_Encoding_value = map[string]int32{ + "UNKNOWN": 0, + "XOR": 1, +} + +func (StreamChunk_Encoding) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_60f6df4f3586b478, []int{10, 0} +} + type LabelNamesAndValuesRequest struct { Matchers []*LabelMatcher `protobuf:"bytes,1,rep,name=matchers,proto3" json:"matchers,omitempty"` } @@ -347,7 +389,8 @@ func (m *LabelValueSeriesCount) GetLabelValueSeries() map[string]uint64 { } type ReadRequest struct { - Queries []*QueryRequest `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries,omitempty"` + Queries []*QueryRequest `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries,omitempty"` + AcceptedResponseTypes []ReadRequest_ResponseType `protobuf:"varint,2,rep,packed,name=accepted_response_types,json=acceptedResponseTypes,proto3,enum=cortex.ReadRequest_ResponseType" json:"accepted_response_types,omitempty"` } func (m *ReadRequest) Reset() { *m = ReadRequest{} } @@ -389,6 +432,13 @@ func (m *ReadRequest) GetQueries() []*QueryRequest { return nil } +func (m *ReadRequest) GetAcceptedResponseTypes() []ReadRequest_ResponseType { + if m != nil { + return m.AcceptedResponseTypes + } + return nil +} + type ReadResponse struct { Results []*QueryResponse `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` } @@ -432,6 +482,168 @@ func (m *ReadResponse) GetResults() []*QueryResponse { return nil } +type StreamReadResponse struct { + ChunkedSeries []*StreamChunkedSeries `protobuf:"bytes,1,rep,name=chunked_series,json=chunkedSeries,proto3" json:"chunked_series,omitempty"` + QueryIndex int64 `protobuf:"varint,2,opt,name=query_index,json=queryIndex,proto3" json:"query_index,omitempty"` +} + +func (m *StreamReadResponse) Reset() { *m = StreamReadResponse{} } +func (*StreamReadResponse) ProtoMessage() {} +func (*StreamReadResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_60f6df4f3586b478, []int{8} +} +func (m *StreamReadResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StreamReadResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StreamReadResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StreamReadResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_StreamReadResponse.Merge(m, src) +} +func (m *StreamReadResponse) XXX_Size() int { + return m.Size() +} +func (m *StreamReadResponse) XXX_DiscardUnknown() { + xxx_messageInfo_StreamReadResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_StreamReadResponse proto.InternalMessageInfo + +func (m *StreamReadResponse) GetChunkedSeries() []*StreamChunkedSeries { + if m != nil { + return m.ChunkedSeries + } + return nil +} + +func (m *StreamReadResponse) GetQueryIndex() int64 { + if m != nil { + return m.QueryIndex + } + return 0 +} + +type StreamChunkedSeries struct { + Labels []github_com_grafana_mimir_pkg_mimirpb.LabelAdapter `protobuf:"bytes,1,rep,name=labels,proto3,customtype=github.com/grafana/mimir/pkg/mimirpb.LabelAdapter" json:"labels"` + Chunks []StreamChunk `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks"` +} + +func (m *StreamChunkedSeries) Reset() { *m = StreamChunkedSeries{} } +func (*StreamChunkedSeries) ProtoMessage() {} +func (*StreamChunkedSeries) Descriptor() ([]byte, []int) { + return fileDescriptor_60f6df4f3586b478, []int{9} +} +func (m *StreamChunkedSeries) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StreamChunkedSeries) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StreamChunkedSeries.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StreamChunkedSeries) XXX_Merge(src proto.Message) { + xxx_messageInfo_StreamChunkedSeries.Merge(m, src) +} +func (m *StreamChunkedSeries) XXX_Size() int { + return m.Size() +} +func (m *StreamChunkedSeries) XXX_DiscardUnknown() { + xxx_messageInfo_StreamChunkedSeries.DiscardUnknown(m) +} + +var xxx_messageInfo_StreamChunkedSeries proto.InternalMessageInfo + +func (m *StreamChunkedSeries) GetChunks() []StreamChunk { + if m != nil { + return m.Chunks + } + return nil +} + +type StreamChunk struct { + MinTimeMs int64 `protobuf:"varint,1,opt,name=min_time_ms,json=minTimeMs,proto3" json:"min_time_ms,omitempty"` + MaxTimeMs int64 `protobuf:"varint,2,opt,name=max_time_ms,json=maxTimeMs,proto3" json:"max_time_ms,omitempty"` + Type StreamChunk_Encoding `protobuf:"varint,3,opt,name=type,proto3,enum=cortex.StreamChunk_Encoding" json:"type,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` +} + +func (m *StreamChunk) Reset() { *m = StreamChunk{} } +func (*StreamChunk) ProtoMessage() {} +func (*StreamChunk) Descriptor() ([]byte, []int) { + return fileDescriptor_60f6df4f3586b478, []int{10} +} +func (m *StreamChunk) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StreamChunk) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StreamChunk.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StreamChunk) XXX_Merge(src proto.Message) { + xxx_messageInfo_StreamChunk.Merge(m, src) +} +func (m *StreamChunk) XXX_Size() int { + return m.Size() +} +func (m *StreamChunk) XXX_DiscardUnknown() { + xxx_messageInfo_StreamChunk.DiscardUnknown(m) +} + +var xxx_messageInfo_StreamChunk proto.InternalMessageInfo + +func (m *StreamChunk) GetMinTimeMs() int64 { + if m != nil { + return m.MinTimeMs + } + return 0 +} + +func (m *StreamChunk) GetMaxTimeMs() int64 { + if m != nil { + return m.MaxTimeMs + } + return 0 +} + +func (m *StreamChunk) GetType() StreamChunk_Encoding { + if m != nil { + return m.Type + } + return UNKNOWN +} + +func (m *StreamChunk) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + type QueryRequest struct { StartTimestampMs int64 `protobuf:"varint,1,opt,name=start_timestamp_ms,json=startTimestampMs,proto3" json:"start_timestamp_ms,omitempty"` EndTimestampMs int64 `protobuf:"varint,2,opt,name=end_timestamp_ms,json=endTimestampMs,proto3" json:"end_timestamp_ms,omitempty"` @@ -441,7 +653,7 @@ type QueryRequest struct { func (m *QueryRequest) Reset() { *m = QueryRequest{} } func (*QueryRequest) ProtoMessage() {} func (*QueryRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{8} + return fileDescriptor_60f6df4f3586b478, []int{11} } func (m *QueryRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -500,7 +712,7 @@ type ExemplarQueryRequest struct { func (m *ExemplarQueryRequest) Reset() { *m = ExemplarQueryRequest{} } func (*ExemplarQueryRequest) ProtoMessage() {} func (*ExemplarQueryRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{9} + return fileDescriptor_60f6df4f3586b478, []int{12} } func (m *ExemplarQueryRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -557,7 +769,7 @@ type QueryResponse struct { func (m *QueryResponse) Reset() { *m = QueryResponse{} } func (*QueryResponse) ProtoMessage() {} func (*QueryResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{10} + return fileDescriptor_60f6df4f3586b478, []int{13} } func (m *QueryResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -602,7 +814,7 @@ type QueryStreamResponse struct { func (m *QueryStreamResponse) Reset() { *m = QueryStreamResponse{} } func (*QueryStreamResponse) ProtoMessage() {} func (*QueryStreamResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{11} + return fileDescriptor_60f6df4f3586b478, []int{14} } func (m *QueryStreamResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -652,7 +864,7 @@ type ExemplarQueryResponse struct { func (m *ExemplarQueryResponse) Reset() { *m = ExemplarQueryResponse{} } func (*ExemplarQueryResponse) ProtoMessage() {} func (*ExemplarQueryResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{12} + return fileDescriptor_60f6df4f3586b478, []int{15} } func (m *ExemplarQueryResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -698,7 +910,7 @@ type LabelValuesRequest struct { func (m *LabelValuesRequest) Reset() { *m = LabelValuesRequest{} } func (*LabelValuesRequest) ProtoMessage() {} func (*LabelValuesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{13} + return fileDescriptor_60f6df4f3586b478, []int{16} } func (m *LabelValuesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -762,7 +974,7 @@ type LabelValuesResponse struct { func (m *LabelValuesResponse) Reset() { *m = LabelValuesResponse{} } func (*LabelValuesResponse) ProtoMessage() {} func (*LabelValuesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{14} + return fileDescriptor_60f6df4f3586b478, []int{17} } func (m *LabelValuesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -807,7 +1019,7 @@ type LabelNamesRequest struct { func (m *LabelNamesRequest) Reset() { *m = LabelNamesRequest{} } func (*LabelNamesRequest) ProtoMessage() {} func (*LabelNamesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{15} + return fileDescriptor_60f6df4f3586b478, []int{18} } func (m *LabelNamesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -864,7 +1076,7 @@ type LabelNamesResponse struct { func (m *LabelNamesResponse) Reset() { *m = LabelNamesResponse{} } func (*LabelNamesResponse) ProtoMessage() {} func (*LabelNamesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{16} + return fileDescriptor_60f6df4f3586b478, []int{19} } func (m *LabelNamesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -906,7 +1118,7 @@ type UserStatsRequest struct { func (m *UserStatsRequest) Reset() { *m = UserStatsRequest{} } func (*UserStatsRequest) ProtoMessage() {} func (*UserStatsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{17} + return fileDescriptor_60f6df4f3586b478, []int{20} } func (m *UserStatsRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -945,7 +1157,7 @@ type UserStatsResponse struct { func (m *UserStatsResponse) Reset() { *m = UserStatsResponse{} } func (*UserStatsResponse) ProtoMessage() {} func (*UserStatsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{18} + return fileDescriptor_60f6df4f3586b478, []int{21} } func (m *UserStatsResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1010,7 +1222,7 @@ type UserIDStatsResponse struct { func (m *UserIDStatsResponse) Reset() { *m = UserIDStatsResponse{} } func (*UserIDStatsResponse) ProtoMessage() {} func (*UserIDStatsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{19} + return fileDescriptor_60f6df4f3586b478, []int{22} } func (m *UserIDStatsResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1060,7 +1272,7 @@ type UsersStatsResponse struct { func (m *UsersStatsResponse) Reset() { *m = UsersStatsResponse{} } func (*UsersStatsResponse) ProtoMessage() {} func (*UsersStatsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{20} + return fileDescriptor_60f6df4f3586b478, []int{23} } func (m *UsersStatsResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1105,7 +1317,7 @@ type MetricsForLabelMatchersRequest struct { func (m *MetricsForLabelMatchersRequest) Reset() { *m = MetricsForLabelMatchersRequest{} } func (*MetricsForLabelMatchersRequest) ProtoMessage() {} func (*MetricsForLabelMatchersRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{21} + return fileDescriptor_60f6df4f3586b478, []int{24} } func (m *MetricsForLabelMatchersRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1162,7 +1374,7 @@ type MetricsForLabelMatchersResponse struct { func (m *MetricsForLabelMatchersResponse) Reset() { *m = MetricsForLabelMatchersResponse{} } func (*MetricsForLabelMatchersResponse) ProtoMessage() {} func (*MetricsForLabelMatchersResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{22} + return fileDescriptor_60f6df4f3586b478, []int{25} } func (m *MetricsForLabelMatchersResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1204,7 +1416,7 @@ type MetricsMetadataRequest struct { func (m *MetricsMetadataRequest) Reset() { *m = MetricsMetadataRequest{} } func (*MetricsMetadataRequest) ProtoMessage() {} func (*MetricsMetadataRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{23} + return fileDescriptor_60f6df4f3586b478, []int{26} } func (m *MetricsMetadataRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1240,7 +1452,7 @@ type MetricsMetadataResponse struct { func (m *MetricsMetadataResponse) Reset() { *m = MetricsMetadataResponse{} } func (*MetricsMetadataResponse) ProtoMessage() {} func (*MetricsMetadataResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{24} + return fileDescriptor_60f6df4f3586b478, []int{27} } func (m *MetricsMetadataResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1286,7 +1498,7 @@ type TimeSeriesChunk struct { func (m *TimeSeriesChunk) Reset() { *m = TimeSeriesChunk{} } func (*TimeSeriesChunk) ProtoMessage() {} func (*TimeSeriesChunk) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{25} + return fileDescriptor_60f6df4f3586b478, []int{28} } func (m *TimeSeriesChunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1346,7 +1558,7 @@ type Chunk struct { func (m *Chunk) Reset() { *m = Chunk{} } func (*Chunk) ProtoMessage() {} func (*Chunk) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{26} + return fileDescriptor_60f6df4f3586b478, []int{29} } func (m *Chunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1410,7 +1622,7 @@ type LabelMatchers struct { func (m *LabelMatchers) Reset() { *m = LabelMatchers{} } func (*LabelMatchers) ProtoMessage() {} func (*LabelMatchers) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{27} + return fileDescriptor_60f6df4f3586b478, []int{30} } func (m *LabelMatchers) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1455,7 +1667,7 @@ type LabelMatcher struct { func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } func (*LabelMatcher) ProtoMessage() {} func (*LabelMatcher) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{28} + return fileDescriptor_60f6df4f3586b478, []int{31} } func (m *LabelMatcher) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1515,7 +1727,7 @@ type TimeSeriesFile struct { func (m *TimeSeriesFile) Reset() { *m = TimeSeriesFile{} } func (*TimeSeriesFile) ProtoMessage() {} func (*TimeSeriesFile) Descriptor() ([]byte, []int) { - return fileDescriptor_60f6df4f3586b478, []int{29} + return fileDescriptor_60f6df4f3586b478, []int{32} } func (m *TimeSeriesFile) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1574,6 +1786,8 @@ func (m *TimeSeriesFile) GetData() []byte { func init() { proto.RegisterEnum("cortex.MatchType", MatchType_name, MatchType_value) + proto.RegisterEnum("cortex.ReadRequest_ResponseType", ReadRequest_ResponseType_name, ReadRequest_ResponseType_value) + proto.RegisterEnum("cortex.StreamChunk_Encoding", StreamChunk_Encoding_name, StreamChunk_Encoding_value) proto.RegisterType((*LabelNamesAndValuesRequest)(nil), "cortex.LabelNamesAndValuesRequest") proto.RegisterType((*LabelNamesAndValuesResponse)(nil), "cortex.LabelNamesAndValuesResponse") proto.RegisterType((*LabelValues)(nil), "cortex.LabelValues") @@ -1583,6 +1797,9 @@ func init() { proto.RegisterMapType((map[string]uint64)(nil), "cortex.LabelValueSeriesCount.LabelValueSeriesEntry") proto.RegisterType((*ReadRequest)(nil), "cortex.ReadRequest") proto.RegisterType((*ReadResponse)(nil), "cortex.ReadResponse") + proto.RegisterType((*StreamReadResponse)(nil), "cortex.StreamReadResponse") + proto.RegisterType((*StreamChunkedSeries)(nil), "cortex.StreamChunkedSeries") + proto.RegisterType((*StreamChunk)(nil), "cortex.StreamChunk") proto.RegisterType((*QueryRequest)(nil), "cortex.QueryRequest") proto.RegisterType((*ExemplarQueryRequest)(nil), "cortex.ExemplarQueryRequest") proto.RegisterType((*QueryResponse)(nil), "cortex.QueryResponse") @@ -1610,96 +1827,110 @@ func init() { func init() { proto.RegisterFile("ingester.proto", fileDescriptor_60f6df4f3586b478) } var fileDescriptor_60f6df4f3586b478 = []byte{ - // 1416 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x58, 0xcd, 0x6f, 0xdb, 0xc6, - 0x12, 0xd7, 0x4a, 0xb2, 0x6c, 0x8d, 0x64, 0x45, 0x5e, 0xc7, 0xb6, 0xc2, 0x3c, 0xd3, 0x7e, 0x7c, - 0x48, 0x9e, 0xfa, 0x11, 0x39, 0x71, 0x5a, 0x20, 0x09, 0x5a, 0x04, 0xb6, 0xe3, 0x24, 0x6e, 0x62, - 0x3b, 0xa1, 0x9d, 0xb6, 0x28, 0x50, 0x08, 0x2b, 0x69, 0x6d, 0x13, 0x26, 0x29, 0x85, 0x5c, 0x16, - 0xf1, 0xad, 0x40, 0xef, 0x6d, 0xd1, 0x53, 0x4f, 0x05, 0x7a, 0xeb, 0xb9, 0x97, 0xde, 0x7a, 0xce, - 0x31, 0xc7, 0xa0, 0x87, 0xa0, 0x51, 0x2e, 0xed, 0x2d, 0x7f, 0x42, 0xc1, 0xdd, 0x25, 0x45, 0x52, - 0xf4, 0x47, 0x8a, 0x24, 0x27, 0x6b, 0x67, 0x7e, 0x33, 0xfb, 0xdb, 0x99, 0xd9, 0x99, 0xa5, 0xa1, - 0x62, 0xd8, 0xbb, 0xd4, 0x65, 0xd4, 0x69, 0xf4, 0x9c, 0x2e, 0xeb, 0xe2, 0x42, 0xbb, 0xeb, 0x30, - 0xfa, 0x48, 0xb9, 0xb0, 0x6b, 0xb0, 0x3d, 0xaf, 0xd5, 0x68, 0x77, 0xad, 0x85, 0xdd, 0xee, 0x6e, - 0x77, 0x81, 0xab, 0x5b, 0xde, 0x0e, 0x5f, 0xf1, 0x05, 0xff, 0x25, 0xcc, 0x94, 0x8b, 0x51, 0xb8, - 0x43, 0x76, 0x88, 0x4d, 0x16, 0x2c, 0xc3, 0x32, 0x9c, 0x85, 0xde, 0xfe, 0xae, 0xf8, 0xd5, 0x6b, - 0x89, 0xbf, 0xc2, 0x42, 0xdb, 0x00, 0xe5, 0x2e, 0x69, 0x51, 0x73, 0x83, 0x58, 0xd4, 0x5d, 0xb2, - 0x3b, 0x9f, 0x12, 0xd3, 0xa3, 0xae, 0x4e, 0x1f, 0x7a, 0xd4, 0x65, 0xf8, 0x22, 0x8c, 0x59, 0x84, - 0xb5, 0xf7, 0xa8, 0xe3, 0xd6, 0xd0, 0x7c, 0xae, 0x5e, 0x5a, 0x3c, 0xdd, 0x10, 0xcc, 0x1a, 0xdc, - 0x6a, 0x5d, 0x28, 0xf5, 0x10, 0xa5, 0xdd, 0x86, 0xb3, 0xa9, 0xfe, 0xdc, 0x5e, 0xd7, 0x76, 0x29, - 0x7e, 0x07, 0x46, 0x0c, 0x46, 0xad, 0xc0, 0xdb, 0x64, 0xcc, 0x9b, 0xc4, 0x0a, 0x84, 0x76, 0x03, - 0x4a, 0x11, 0x29, 0x9e, 0x05, 0x30, 0xfd, 0x65, 0xd3, 0x26, 0x16, 0xad, 0xa1, 0x79, 0x54, 0x2f, - 0xea, 0x45, 0x33, 0xd8, 0x0a, 0x4f, 0x43, 0xe1, 0x2b, 0x0e, 0xac, 0x65, 0xe7, 0x73, 0xf5, 0xa2, - 0x2e, 0x57, 0x9a, 0x03, 0xb3, 0x11, 0x2f, 0x2b, 0xc4, 0xe9, 0x18, 0x36, 0x31, 0x0d, 0x76, 0x10, - 0x1c, 0x71, 0x0e, 0x4a, 0x03, 0xbf, 0x82, 0x57, 0x51, 0x87, 0xd0, 0xb1, 0x1b, 0x8b, 0x41, 0xf6, - 0x44, 0x31, 0x78, 0x00, 0xea, 0x61, 0x7b, 0xca, 0x30, 0x5c, 0x8e, 0x87, 0x61, 0x76, 0x38, 0x0c, - 0x5b, 0xd4, 0x31, 0xa8, 0xbb, 0xd2, 0xf5, 0x6c, 0x16, 0x04, 0xe4, 0x19, 0x82, 0xa9, 0x54, 0xc0, - 0x71, 0xb1, 0x21, 0x80, 0x85, 0x9a, 0xc7, 0xa4, 0xe9, 0x72, 0x4b, 0x79, 0x96, 0xcb, 0x47, 0x6e, - 0x3d, 0x24, 0x5d, 0xb5, 0x99, 0x73, 0xa0, 0x57, 0xcd, 0x84, 0x58, 0x59, 0x19, 0xa6, 0xc6, 0xa1, - 0xb8, 0x0a, 0xb9, 0x7d, 0x7a, 0x20, 0x39, 0xf9, 0x3f, 0xf1, 0x69, 0x18, 0xe1, 0x3c, 0x6a, 0xd9, - 0x79, 0x54, 0xcf, 0xeb, 0x62, 0x71, 0x2d, 0x7b, 0x05, 0x69, 0x1f, 0x43, 0x49, 0xa7, 0xa4, 0x13, - 0x64, 0xa6, 0x01, 0xa3, 0x0f, 0x3d, 0xc1, 0x35, 0x51, 0x7b, 0xf7, 0x3d, 0xea, 0x04, 0x09, 0xd4, - 0x03, 0x90, 0x76, 0x1d, 0xca, 0xc2, 0x5c, 0x06, 0x79, 0x01, 0x46, 0x1d, 0xea, 0x7a, 0x26, 0x0b, - 0xec, 0xa7, 0x12, 0xf6, 0x02, 0xa7, 0x07, 0x28, 0xed, 0x47, 0x04, 0xe5, 0xa8, 0x6b, 0xfc, 0x3e, - 0x60, 0x97, 0x11, 0x87, 0x35, 0x99, 0x61, 0x51, 0x97, 0x11, 0xab, 0xd7, 0xe4, 0x39, 0x43, 0xf5, - 0x9c, 0x5e, 0xe5, 0x9a, 0xed, 0x40, 0xb1, 0xee, 0xe2, 0x3a, 0x54, 0xa9, 0xdd, 0x89, 0x63, 0xb3, - 0x1c, 0x5b, 0xa1, 0x76, 0x27, 0x8a, 0x8c, 0x96, 0x54, 0xee, 0x44, 0x25, 0xf5, 0x33, 0x82, 0xd3, - 0xab, 0x8f, 0xa8, 0xd5, 0x33, 0x89, 0xf3, 0x56, 0x28, 0x5e, 0x1a, 0xa2, 0x38, 0x95, 0x46, 0xd1, - 0x8d, 0x70, 0xbc, 0x03, 0xe3, 0xb1, 0xc0, 0xe2, 0x6b, 0x00, 0x7c, 0xa7, 0xb4, 0x1c, 0xf6, 0x5a, - 0x0d, 0x7f, 0x3b, 0x51, 0x2a, 0xcb, 0xf9, 0xc7, 0xcf, 0xe6, 0x32, 0x7a, 0x04, 0xad, 0xfd, 0x80, - 0x60, 0x92, 0x7b, 0xdb, 0x62, 0x0e, 0x25, 0x56, 0xe8, 0xf3, 0x3a, 0x94, 0xda, 0x7b, 0x9e, 0xbd, - 0x1f, 0x73, 0x3a, 0x13, 0x50, 0x1b, 0xb8, 0x5c, 0xf1, 0x41, 0xd2, 0x6f, 0xd4, 0x22, 0x41, 0x2a, - 0xfb, 0x4a, 0xa4, 0xb6, 0x60, 0x2a, 0x91, 0x84, 0xd7, 0x70, 0xd2, 0xdf, 0x11, 0xe0, 0x68, 0xfb, - 0x93, 0x89, 0x3d, 0xe6, 0x4e, 0xa7, 0xe7, 0x3d, 0xfb, 0x0a, 0x79, 0xcf, 0x1d, 0x9b, 0xf7, 0xfc, - 0x3c, 0x3a, 0x49, 0xde, 0xaf, 0xc0, 0x64, 0x8c, 0xbf, 0x8c, 0xc9, 0x7f, 0xa1, 0x1c, 0xe9, 0x3a, - 0x41, 0x67, 0x2d, 0x0d, 0x5a, 0x87, 0xab, 0xfd, 0x84, 0x60, 0x62, 0x30, 0x2d, 0xde, 0x6e, 0x49, - 0x9f, 0xe8, 0x68, 0x1f, 0xca, 0xd4, 0x48, 0x7e, 0xf2, 0x64, 0xc7, 0x8d, 0x0c, 0x0d, 0x43, 0xf5, - 0x81, 0x4b, 0x9d, 0x2d, 0x46, 0x58, 0x70, 0x2a, 0xed, 0x37, 0x04, 0x13, 0x11, 0xa1, 0x74, 0x75, - 0x2e, 0x98, 0xfc, 0x46, 0xd7, 0x6e, 0x3a, 0x84, 0x89, 0x4c, 0x23, 0x7d, 0x3c, 0x94, 0xea, 0x84, - 0x51, 0xbf, 0x18, 0x6c, 0xcf, 0x1a, 0x74, 0x6e, 0xbf, 0x71, 0x16, 0x6d, 0xcf, 0x12, 0x45, 0xe5, - 0x47, 0x8c, 0xf4, 0x8c, 0x66, 0xc2, 0x53, 0x8e, 0x7b, 0xaa, 0x92, 0x9e, 0xb1, 0x16, 0x73, 0xd6, - 0x80, 0x49, 0xc7, 0x33, 0x69, 0x12, 0x9e, 0xe7, 0xf0, 0x09, 0x5f, 0x15, 0xc3, 0x6b, 0x5f, 0xc2, - 0xa4, 0x4f, 0x7c, 0xed, 0x46, 0x9c, 0xfa, 0x0c, 0x8c, 0x7a, 0x2e, 0x75, 0x9a, 0x46, 0x47, 0x56, - 0x67, 0xc1, 0x5f, 0xae, 0x75, 0xf0, 0x05, 0xc8, 0x77, 0x08, 0x23, 0x9c, 0x66, 0x69, 0xf1, 0x4c, - 0x10, 0xe3, 0xa1, 0xc3, 0xeb, 0x1c, 0xa6, 0xdd, 0x02, 0xec, 0xab, 0xdc, 0xb8, 0xf7, 0x4b, 0x30, - 0xe2, 0xfa, 0x02, 0x79, 0x99, 0xce, 0x46, 0xbd, 0x24, 0x98, 0xe8, 0x02, 0xa9, 0xfd, 0x8a, 0x40, - 0x5d, 0xa7, 0xcc, 0x31, 0xda, 0xee, 0xcd, 0xae, 0x13, 0x4f, 0xe9, 0x1b, 0x2e, 0xad, 0x2b, 0x50, - 0x0e, 0x6a, 0xa6, 0xe9, 0x52, 0x76, 0x74, 0xc7, 0x2c, 0x05, 0xd0, 0x2d, 0xca, 0xb4, 0x3b, 0x30, - 0x77, 0x28, 0x67, 0x19, 0x8a, 0x3a, 0x14, 0x2c, 0x0e, 0x91, 0xb1, 0xa8, 0x0e, 0x1a, 0x8b, 0x30, - 0xd5, 0xa5, 0x5e, 0xab, 0xc1, 0xb4, 0x74, 0xb6, 0x4e, 0x19, 0xf1, 0xa3, 0x1b, 0x54, 0xdf, 0x26, - 0xcc, 0x0c, 0x69, 0xa4, 0xfb, 0x0f, 0x60, 0xcc, 0x92, 0x32, 0xb9, 0x41, 0x2d, 0xb9, 0x41, 0x68, - 0x13, 0x22, 0xb5, 0xbf, 0x11, 0x9c, 0x4a, 0x74, 0x5b, 0x3f, 0x5e, 0x3b, 0x4e, 0xd7, 0x6a, 0x06, - 0x6f, 0xd9, 0x41, 0x69, 0x54, 0x7c, 0xf9, 0x9a, 0x14, 0xaf, 0x75, 0xa2, 0xb5, 0x93, 0x8d, 0xd5, - 0xce, 0x0e, 0x14, 0xf8, 0x3d, 0x0a, 0x86, 0xce, 0xe4, 0x80, 0x0a, 0x0f, 0xce, 0x3d, 0x62, 0x38, - 0xcb, 0x57, 0xfd, 0x1e, 0xfa, 0xc7, 0xb3, 0xb9, 0x4b, 0x27, 0x79, 0xed, 0x0a, 0xbb, 0xa5, 0x0e, - 0xe9, 0x31, 0xea, 0xe8, 0xd2, 0x3b, 0x7e, 0x0f, 0x0a, 0x62, 0x28, 0xd4, 0xf2, 0x7c, 0x9f, 0xf1, - 0x20, 0x55, 0xd1, 0xb9, 0x21, 0x21, 0xda, 0x77, 0x08, 0x46, 0xc4, 0x09, 0xdf, 0x54, 0xfd, 0x28, - 0x30, 0x46, 0xed, 0x76, 0xb7, 0x63, 0xd8, 0xbb, 0xfc, 0xda, 0x8e, 0xe8, 0xe1, 0x1a, 0x63, 0x79, - 0x9d, 0xfc, 0xfb, 0x59, 0x96, 0x77, 0x66, 0x09, 0xc6, 0x63, 0xb5, 0xf2, 0x2f, 0x1e, 0xea, 0x4d, - 0x28, 0x47, 0x35, 0xf8, 0x1c, 0xe4, 0xd9, 0x41, 0x4f, 0xf4, 0x9f, 0xca, 0xe2, 0x44, 0x60, 0xcd, - 0xd5, 0xdb, 0x07, 0x3d, 0xaa, 0x73, 0xb5, 0xcf, 0x86, 0x0f, 0x24, 0x91, 0x36, 0xfe, 0x7b, 0xf0, - 0xa2, 0xcb, 0x71, 0xa1, 0x58, 0x68, 0xdf, 0x20, 0xa8, 0x0c, 0x2a, 0xe4, 0xa6, 0x61, 0xd2, 0xd7, - 0x51, 0x20, 0x0a, 0x8c, 0xed, 0x18, 0x26, 0xe5, 0x1c, 0xc4, 0x76, 0xe1, 0x3a, 0x2d, 0x52, 0xef, - 0x7e, 0x02, 0xc5, 0xf0, 0x08, 0xb8, 0x08, 0x23, 0xab, 0xf7, 0x1f, 0x2c, 0xdd, 0xad, 0x66, 0xf0, - 0x38, 0x14, 0x37, 0x36, 0xb7, 0x9b, 0x62, 0x89, 0xf0, 0x29, 0x28, 0xe9, 0xab, 0xb7, 0x56, 0x3f, - 0x6f, 0xae, 0x2f, 0x6d, 0xaf, 0xdc, 0xae, 0x66, 0x31, 0x86, 0x8a, 0x10, 0x6c, 0x6c, 0x4a, 0x59, - 0x6e, 0xf1, 0xdb, 0x51, 0x18, 0x0b, 0x38, 0xe2, 0xab, 0x90, 0xbf, 0xe7, 0xb9, 0x7b, 0x78, 0x7a, - 0x50, 0xa1, 0x9f, 0x39, 0x06, 0xa3, 0xf2, 0xc6, 0x29, 0x33, 0x43, 0x72, 0x71, 0xdf, 0xb4, 0x0c, - 0xbe, 0x01, 0xa5, 0xc8, 0xd3, 0x06, 0xa7, 0x3e, 0x6b, 0x95, 0xb3, 0x31, 0x69, 0xfc, 0x15, 0xa4, - 0x65, 0x2e, 0x22, 0xbc, 0x09, 0x15, 0xae, 0x0a, 0x5e, 0x24, 0x2e, 0xfe, 0x4f, 0x60, 0x92, 0xf6, - 0x52, 0x54, 0x66, 0x0f, 0xd1, 0x86, 0xb4, 0x6e, 0xc7, 0x3f, 0xb8, 0x94, 0xb4, 0x6f, 0xb3, 0x24, - 0xb9, 0x94, 0xc1, 0xaf, 0x65, 0xf0, 0x2a, 0xc0, 0x60, 0x6c, 0xe2, 0x33, 0x31, 0x70, 0x74, 0xd4, - 0x2b, 0x4a, 0x9a, 0x2a, 0x74, 0xb3, 0x0c, 0xc5, 0x70, 0x68, 0xe0, 0x5a, 0xca, 0x1c, 0x11, 0x4e, - 0x0e, 0x9f, 0x30, 0x5a, 0x06, 0xdf, 0x84, 0xf2, 0x92, 0x69, 0x9e, 0xc4, 0x8d, 0x12, 0xd5, 0xb8, - 0x49, 0x3f, 0x66, 0xd8, 0x40, 0x93, 0x7d, 0x1a, 0x9f, 0x0f, 0xef, 0xca, 0x91, 0xc3, 0x47, 0xf9, - 0xff, 0xb1, 0xb8, 0x70, 0xb7, 0x6d, 0x38, 0x95, 0x68, 0xd7, 0x58, 0x4d, 0x58, 0x27, 0x3a, 0xbc, - 0x32, 0x77, 0xa8, 0x3e, 0xf4, 0xda, 0x92, 0x0f, 0xb5, 0xf8, 0xb7, 0x39, 0xd6, 0x86, 0x93, 0x90, - 0xfc, 0x47, 0x80, 0xf2, 0xbf, 0x23, 0x31, 0x91, 0xaa, 0xdc, 0x87, 0xe9, 0xf4, 0x6f, 0x5f, 0x7c, - 0x2e, 0xa5, 0x66, 0x86, 0xbf, 0xc7, 0x95, 0xf3, 0xc7, 0xc1, 0x06, 0x9b, 0x2d, 0x7f, 0xf4, 0xe4, - 0xb9, 0x9a, 0x79, 0xfa, 0x5c, 0xcd, 0xbc, 0x7c, 0xae, 0xa2, 0xaf, 0xfb, 0x2a, 0xfa, 0xa5, 0xaf, - 0xa2, 0xc7, 0x7d, 0x15, 0x3d, 0xe9, 0xab, 0xe8, 0xcf, 0xbe, 0x8a, 0xfe, 0xea, 0xab, 0x99, 0x97, - 0x7d, 0x15, 0x7d, 0xff, 0x42, 0xcd, 0x3c, 0x79, 0xa1, 0x66, 0x9e, 0xbe, 0x50, 0x33, 0x5f, 0x14, - 0xda, 0xa6, 0x41, 0x6d, 0xd6, 0x2a, 0xf0, 0xff, 0x80, 0x5c, 0xfe, 0x27, 0x00, 0x00, 0xff, 0xff, - 0x5c, 0xeb, 0x47, 0xa9, 0x7c, 0x11, 0x00, 0x00, + // 1640 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x58, 0xcd, 0x6f, 0xdb, 0x46, + 0x16, 0xd7, 0x48, 0xb2, 0x6c, 0x3d, 0xd9, 0x8a, 0x3c, 0x8a, 0x6d, 0x85, 0x59, 0xd3, 0x5a, 0x2e, + 0x92, 0xd5, 0xee, 0x26, 0xf2, 0x47, 0xb2, 0x40, 0x12, 0x14, 0x08, 0x64, 0x5b, 0x89, 0x5d, 0x47, + 0x72, 0x42, 0xd9, 0x8d, 0x51, 0xa0, 0x20, 0x28, 0x69, 0x6c, 0x13, 0x16, 0x29, 0x85, 0xa4, 0x0a, + 0xfb, 0x56, 0xa0, 0xf7, 0xb6, 0xe8, 0xa9, 0xa7, 0x02, 0xbd, 0xf5, 0x58, 0x14, 0x28, 0x7a, 0xeb, + 0x39, 0x97, 0x02, 0x39, 0x06, 0x3d, 0x04, 0x8d, 0x73, 0x69, 0x6f, 0xf9, 0x13, 0x0a, 0xce, 0x0c, + 0x29, 0x92, 0xa2, 0x3f, 0x52, 0x24, 0x39, 0x49, 0xf3, 0xde, 0x6f, 0x7e, 0xf3, 0xbe, 0x38, 0xef, + 0x91, 0x90, 0xd5, 0x8c, 0x3d, 0x62, 0xd9, 0xc4, 0x2c, 0xf7, 0xcc, 0xae, 0xdd, 0xc5, 0xa9, 0x56, + 0xd7, 0xb4, 0xc9, 0xa1, 0x70, 0x7d, 0x4f, 0xb3, 0xf7, 0xfb, 0xcd, 0x72, 0xab, 0xab, 0xcf, 0xef, + 0x75, 0xf7, 0xba, 0xf3, 0x54, 0xdd, 0xec, 0xef, 0xd2, 0x15, 0x5d, 0xd0, 0x7f, 0x6c, 0x9b, 0xb0, + 0xe0, 0x87, 0x9b, 0xea, 0xae, 0x6a, 0xa8, 0xf3, 0xba, 0xa6, 0x6b, 0xe6, 0x7c, 0xef, 0x60, 0x8f, + 0xfd, 0xeb, 0x35, 0xd9, 0x2f, 0xdb, 0x21, 0xd5, 0x41, 0x78, 0xa0, 0x36, 0x49, 0xa7, 0xae, 0xea, + 0xc4, 0xaa, 0x18, 0xed, 0x8f, 0xd4, 0x4e, 0x9f, 0x58, 0x32, 0x79, 0xd2, 0x27, 0x96, 0x8d, 0x17, + 0x60, 0x4c, 0x57, 0xed, 0xd6, 0x3e, 0x31, 0xad, 0x02, 0x2a, 0x26, 0x4a, 0x99, 0xa5, 0x8b, 0x65, + 0x66, 0x59, 0x99, 0xee, 0xaa, 0x31, 0xa5, 0xec, 0xa1, 0xa4, 0x35, 0xb8, 0x1c, 0xc9, 0x67, 0xf5, + 0xba, 0x86, 0x45, 0xf0, 0x7f, 0x60, 0x44, 0xb3, 0x89, 0xee, 0xb2, 0xe5, 0x03, 0x6c, 0x1c, 0xcb, + 0x10, 0xd2, 0x2a, 0x64, 0x7c, 0x52, 0x3c, 0x0b, 0xd0, 0x71, 0x96, 0x8a, 0xa1, 0xea, 0xa4, 0x80, + 0x8a, 0xa8, 0x94, 0x96, 0xd3, 0x1d, 0xf7, 0x28, 0x3c, 0x0d, 0xa9, 0x4f, 0x29, 0xb0, 0x10, 0x2f, + 0x26, 0x4a, 0x69, 0x99, 0xaf, 0x24, 0x13, 0x66, 0x7d, 0x2c, 0x2b, 0xaa, 0xd9, 0xd6, 0x0c, 0xb5, + 0xa3, 0xd9, 0x47, 0xae, 0x8b, 0x73, 0x90, 0x19, 0xf0, 0x32, 0xbb, 0xd2, 0x32, 0x78, 0xc4, 0x56, + 0x20, 0x06, 0xf1, 0x73, 0xc5, 0x60, 0x1b, 0xc4, 0x93, 0xce, 0xe4, 0x61, 0xb8, 0x11, 0x0c, 0xc3, + 0xec, 0x70, 0x18, 0x1a, 0xc4, 0xd4, 0x88, 0xb5, 0xd2, 0xed, 0x1b, 0xb6, 0x1b, 0x90, 0x17, 0x08, + 0xa6, 0x22, 0x01, 0x67, 0xc5, 0x46, 0x05, 0xcc, 0xd4, 0x34, 0x26, 0x8a, 0x45, 0x77, 0x72, 0x5f, + 0x6e, 0x9c, 0x7a, 0xf4, 0x90, 0xb4, 0x6a, 0xd8, 0xe6, 0x91, 0x9c, 0xeb, 0x84, 0xc4, 0xc2, 0xca, + 0xb0, 0x69, 0x14, 0x8a, 0x73, 0x90, 0x38, 0x20, 0x47, 0xdc, 0x26, 0xe7, 0x2f, 0xbe, 0x08, 0x23, + 0xd4, 0x8e, 0x42, 0xbc, 0x88, 0x4a, 0x49, 0x99, 0x2d, 0xee, 0xc4, 0x6f, 0x21, 0xe9, 0x57, 0x04, + 0x19, 0x99, 0xa8, 0x6d, 0x37, 0x35, 0x65, 0x18, 0x7d, 0xd2, 0x67, 0xc6, 0x86, 0x8a, 0xef, 0x51, + 0x9f, 0x98, 0x6e, 0x06, 0x65, 0x17, 0x84, 0x77, 0x60, 0x46, 0x6d, 0xb5, 0x48, 0xcf, 0x26, 0x6d, + 0xc5, 0xe4, 0xa1, 0x56, 0xec, 0xa3, 0x1e, 0x77, 0x36, 0xbb, 0x54, 0x74, 0xf7, 0xfb, 0x4e, 0x29, + 0xbb, 0x49, 0xd9, 0x3a, 0xea, 0x11, 0x79, 0xca, 0x25, 0xf0, 0x4b, 0x2d, 0xe9, 0x26, 0x8c, 0xfb, + 0x05, 0x38, 0x03, 0xa3, 0x8d, 0x4a, 0xed, 0xe1, 0x83, 0x6a, 0x23, 0x17, 0xc3, 0x33, 0x90, 0x6f, + 0x6c, 0xc9, 0xd5, 0x4a, 0xad, 0xba, 0xaa, 0xec, 0x6c, 0xca, 0xca, 0xca, 0xda, 0x76, 0x7d, 0xa3, + 0x91, 0x43, 0xd2, 0x5d, 0x67, 0x97, 0xea, 0x51, 0xe1, 0x79, 0x18, 0x35, 0x89, 0xd5, 0xef, 0xd8, + 0xae, 0x3f, 0x53, 0x21, 0x7f, 0x18, 0x4e, 0x76, 0x51, 0xd2, 0x11, 0xe0, 0x86, 0x6d, 0x12, 0x55, + 0x0f, 0xd0, 0x2c, 0x43, 0xb6, 0xb5, 0xdf, 0x37, 0x0e, 0x48, 0xdb, 0x4d, 0x25, 0x63, 0xbb, 0xec, + 0xb2, 0xb1, 0x3d, 0x2b, 0x0c, 0xc3, 0x92, 0x21, 0x4f, 0xb4, 0xfc, 0x4b, 0xa7, 0xea, 0x9d, 0xa8, + 0x1d, 0x29, 0x9a, 0xd1, 0x26, 0x87, 0x34, 0x15, 0x09, 0x19, 0xa8, 0x68, 0xdd, 0x91, 0x48, 0x3f, + 0x20, 0xc8, 0x47, 0xf0, 0xe0, 0x5d, 0x48, 0xd1, 0xe4, 0x87, 0x9f, 0xe0, 0x5e, 0x93, 0xd5, 0xca, + 0x43, 0x55, 0x33, 0x97, 0x6f, 0x3f, 0x7d, 0x31, 0x17, 0xfb, 0xed, 0xc5, 0xdc, 0xe2, 0x79, 0xae, + 0x23, 0xb6, 0xaf, 0xd2, 0x56, 0x7b, 0x36, 0x31, 0x65, 0xce, 0x8e, 0x17, 0x21, 0x45, 0x2d, 0x76, + 0xeb, 0x34, 0x1f, 0xe1, 0xdc, 0x72, 0xd2, 0x39, 0x47, 0xe6, 0x40, 0xe9, 0x27, 0x04, 0x19, 0x9f, + 0x16, 0x8b, 0x90, 0xd1, 0x35, 0x43, 0xb1, 0x35, 0x9d, 0x28, 0xf4, 0x51, 0x73, 0x7c, 0x4c, 0xeb, + 0x9a, 0xb1, 0xa5, 0xe9, 0xa4, 0x66, 0x51, 0xbd, 0x7a, 0xe8, 0xe9, 0xe3, 0x5c, 0xaf, 0x1e, 0x72, + 0xfd, 0x02, 0x24, 0x9d, 0xe2, 0x29, 0x24, 0x8a, 0xa8, 0x94, 0x5d, 0xfa, 0x47, 0x84, 0x01, 0xe5, + 0xaa, 0xd1, 0xea, 0xb6, 0x35, 0x63, 0x4f, 0xa6, 0x48, 0x8c, 0x21, 0xd9, 0x56, 0x6d, 0xb5, 0x90, + 0x2c, 0xa2, 0xd2, 0xb8, 0x4c, 0xff, 0x4b, 0x45, 0x18, 0x73, 0x51, 0x4e, 0xd9, 0x6c, 0xd7, 0x37, + 0xea, 0x9b, 0x8f, 0xeb, 0xb9, 0x18, 0x1e, 0x85, 0xc4, 0xce, 0xa6, 0x9c, 0x43, 0xd2, 0x37, 0x08, + 0xc6, 0xfd, 0x05, 0x8d, 0xaf, 0x01, 0xb6, 0x6c, 0xd5, 0xb4, 0xa9, 0x69, 0x96, 0xad, 0xea, 0xbd, + 0x81, 0xfd, 0x39, 0xaa, 0xd9, 0x72, 0x15, 0x35, 0x0b, 0x97, 0x20, 0x47, 0x8c, 0x76, 0x10, 0xcb, + 0x7c, 0xc9, 0x12, 0xa3, 0xed, 0x47, 0xfa, 0x6f, 0xb2, 0xc4, 0xb9, 0x6e, 0xb2, 0xef, 0x10, 0x5c, + 0xac, 0x1e, 0x12, 0xbd, 0xd7, 0x51, 0xcd, 0xf7, 0x62, 0xe2, 0xe2, 0x90, 0x89, 0x53, 0x51, 0x26, + 0x5a, 0x3e, 0x1b, 0x37, 0x60, 0x22, 0xf0, 0xf8, 0xe0, 0x3b, 0x00, 0xf4, 0xa4, 0xa8, 0x9b, 0xa3, + 0xd7, 0x2c, 0x3b, 0xc7, 0xb1, 0x62, 0xe6, 0xf5, 0xe3, 0x43, 0x4b, 0x5f, 0x23, 0xc8, 0x53, 0x36, + 0xf7, 0xb9, 0xe3, 0x9c, 0x77, 0x21, 0xc3, 0xaa, 0xcc, 0x4f, 0x3a, 0xe3, 0x9a, 0x36, 0xa0, 0xf4, + 0xd7, 0xa5, 0x7f, 0x47, 0xc8, 0xa8, 0xf8, 0x1b, 0x19, 0xd5, 0x80, 0xa9, 0x50, 0x12, 0xde, 0x82, + 0xa7, 0xbf, 0x20, 0xc0, 0xfe, 0xae, 0xcb, 0x13, 0x7b, 0x46, 0x2b, 0x89, 0xce, 0x7b, 0xfc, 0x0d, + 0xf2, 0x9e, 0x38, 0x33, 0xef, 0xce, 0xd3, 0x73, 0x8e, 0xbc, 0xdf, 0x82, 0x7c, 0xc0, 0x7e, 0x1e, + 0x93, 0x7f, 0xc2, 0xb8, 0xaf, 0xd9, 0xb9, 0x0d, 0x3d, 0x33, 0xe8, 0x58, 0x96, 0xf4, 0x2d, 0x82, + 0xc9, 0xc1, 0x90, 0xf2, 0x7e, 0x4b, 0xfa, 0x5c, 0xae, 0xfd, 0x9f, 0xa7, 0x86, 0xdb, 0xc7, 0x3d, + 0x3b, 0x6b, 0x52, 0x91, 0x30, 0xe4, 0xb6, 0x2d, 0x62, 0x36, 0x6c, 0xd5, 0x76, 0xbd, 0x92, 0x7e, + 0x46, 0x30, 0xe9, 0x13, 0x72, 0xaa, 0x2b, 0xee, 0xc0, 0xa9, 0x75, 0x0d, 0xc5, 0x54, 0x6d, 0x96, + 0x69, 0x24, 0x4f, 0x78, 0x52, 0x59, 0xb5, 0x89, 0x53, 0x0c, 0x46, 0x5f, 0x1f, 0x0c, 0x0c, 0x4e, + 0xbf, 0x4e, 0x1b, 0x7d, 0x9d, 0xf7, 0x82, 0x6b, 0x80, 0xd5, 0x9e, 0xa6, 0x84, 0x98, 0x12, 0x94, + 0x29, 0xa7, 0xf6, 0xb4, 0xf5, 0x00, 0x59, 0x19, 0xf2, 0x66, 0xbf, 0x43, 0xc2, 0xf0, 0x24, 0x85, + 0x4f, 0x3a, 0xaa, 0x00, 0x5e, 0xfa, 0x04, 0xf2, 0x8e, 0xe1, 0xeb, 0xab, 0x41, 0xd3, 0x67, 0x60, + 0xb4, 0x6f, 0x11, 0x53, 0xd1, 0xda, 0xbc, 0x3a, 0x53, 0xce, 0x72, 0xbd, 0x8d, 0xaf, 0xf3, 0xcb, + 0x37, 0x4e, 0x63, 0x7c, 0xc9, 0x8d, 0xf1, 0x90, 0xf3, 0xfc, 0x5e, 0xbe, 0x0f, 0xd8, 0x51, 0x59, + 0x41, 0xf6, 0x45, 0x18, 0xb1, 0x1c, 0x41, 0xb8, 0xa5, 0x46, 0x58, 0x22, 0x33, 0xa4, 0xf4, 0x23, + 0x02, 0xb1, 0x46, 0x6c, 0x53, 0x6b, 0x59, 0xf7, 0xba, 0x66, 0x30, 0xa5, 0xef, 0xb8, 0xb4, 0x6e, + 0xc1, 0xb8, 0x5b, 0x33, 0x8a, 0x45, 0xec, 0xd3, 0x6f, 0xcc, 0x8c, 0x0b, 0x6d, 0x10, 0x5b, 0xda, + 0x80, 0xb9, 0x13, 0x6d, 0xe6, 0xa1, 0x28, 0x41, 0x4a, 0xa7, 0x10, 0x1e, 0x8b, 0xdc, 0xe0, 0x62, + 0x61, 0x5b, 0x65, 0xae, 0x97, 0x0a, 0x30, 0xcd, 0xc9, 0x6a, 0xc4, 0x56, 0x9d, 0xe8, 0xba, 0xd5, + 0xb7, 0x09, 0x33, 0x43, 0x1a, 0x4e, 0x7f, 0x13, 0xc6, 0x74, 0x2e, 0xe3, 0x07, 0x14, 0xc2, 0x07, + 0x78, 0x7b, 0x3c, 0xa4, 0xf4, 0x27, 0x82, 0x0b, 0xa1, 0xdb, 0xd6, 0x89, 0xd7, 0xae, 0xd9, 0xd5, + 0x15, 0xf7, 0x15, 0x6a, 0x50, 0x1a, 0x59, 0x47, 0xbe, 0xce, 0xc5, 0xeb, 0x6d, 0x7f, 0xed, 0xc4, + 0x03, 0xb5, 0x33, 0x98, 0x6a, 0x12, 0xef, 0x74, 0xaa, 0xf9, 0x9f, 0x37, 0xd5, 0x24, 0xe9, 0x39, + 0x13, 0x6e, 0xaa, 0xa2, 0xe6, 0x99, 0x2f, 0x11, 0x8c, 0x30, 0x0f, 0xdf, 0x55, 0xfd, 0x08, 0x30, + 0x46, 0xf8, 0x6c, 0x42, 0x1f, 0xdb, 0x11, 0xd9, 0x5b, 0x47, 0xce, 0x32, 0x15, 0x98, 0x08, 0xd4, + 0xca, 0xdf, 0x78, 0x3f, 0x54, 0x60, 0xdc, 0xaf, 0xc1, 0x57, 0xf8, 0x90, 0x85, 0xe8, 0x90, 0x35, + 0xe9, 0xee, 0xa6, 0x6a, 0x3a, 0x91, 0x7b, 0x93, 0x15, 0x6d, 0x48, 0x2c, 0x6d, 0xf4, 0xff, 0xe0, + 0x45, 0x22, 0x41, 0x85, 0x6c, 0x21, 0x7d, 0x8e, 0x20, 0x3b, 0xa8, 0x90, 0x7b, 0x5a, 0x87, 0xbc, + 0x8d, 0x02, 0x11, 0x60, 0x6c, 0x57, 0xeb, 0x10, 0x6a, 0x03, 0x3b, 0xce, 0x5b, 0x47, 0x45, 0xea, + 0xbf, 0x1f, 0x42, 0xda, 0x73, 0x01, 0xa7, 0x61, 0xa4, 0xfa, 0x68, 0xbb, 0xf2, 0x20, 0x17, 0xc3, + 0x13, 0x90, 0xae, 0x6f, 0x6e, 0x29, 0x6c, 0x89, 0xf0, 0x05, 0xc8, 0xc8, 0xd5, 0xfb, 0xd5, 0x1d, + 0xa5, 0x56, 0xd9, 0x5a, 0x59, 0xcb, 0xc5, 0x31, 0x86, 0x2c, 0x13, 0xd4, 0x37, 0xb9, 0x2c, 0xb1, + 0xf4, 0xc5, 0x28, 0x8c, 0xb9, 0x36, 0xe2, 0xdb, 0x90, 0x7c, 0xd8, 0xb7, 0xf6, 0xf1, 0xf4, 0xa0, + 0x42, 0x1f, 0x9b, 0x9a, 0x4d, 0xf8, 0x13, 0x27, 0xcc, 0x0c, 0xc9, 0xd9, 0xf3, 0x26, 0xc5, 0xf0, + 0x2a, 0x64, 0x7c, 0xa3, 0x0d, 0x8e, 0x7c, 0x99, 0x12, 0x2e, 0x07, 0xa4, 0xc1, 0x29, 0x48, 0x8a, + 0x2d, 0x20, 0xbc, 0x09, 0x59, 0xaa, 0x72, 0x27, 0x12, 0x0b, 0x7b, 0x93, 0x71, 0xd4, 0xa4, 0x28, + 0xcc, 0x9e, 0xa0, 0xf5, 0xcc, 0x5a, 0x0b, 0xbe, 0xe7, 0x0b, 0x51, 0x9f, 0x04, 0xc2, 0xc6, 0x45, + 0x34, 0x7e, 0x29, 0x86, 0xab, 0x00, 0x83, 0xb6, 0x89, 0x2f, 0x05, 0xc0, 0xfe, 0x56, 0x2f, 0x08, + 0x51, 0x2a, 0x8f, 0x66, 0x19, 0xd2, 0x5e, 0xd3, 0xc0, 0x85, 0x88, 0x3e, 0xc2, 0x48, 0x4e, 0xee, + 0x30, 0x52, 0x0c, 0xdf, 0x83, 0xf1, 0x4a, 0xa7, 0x73, 0x1e, 0x1a, 0xc1, 0xaf, 0xb1, 0xc2, 0x3c, + 0x1d, 0xef, 0x02, 0x0d, 0xdf, 0xd3, 0xf8, 0xaa, 0xf7, 0xac, 0x9c, 0xda, 0x7c, 0x84, 0x7f, 0x9f, + 0x89, 0xf3, 0x4e, 0xdb, 0x82, 0x0b, 0xa1, 0xeb, 0x1a, 0x8b, 0xa1, 0xdd, 0xa1, 0x1b, 0x5e, 0x98, + 0x3b, 0x51, 0xef, 0xb1, 0x36, 0xf9, 0xa0, 0x16, 0xfc, 0x24, 0x84, 0xa5, 0xe1, 0x24, 0x84, 0xbf, + 0x3f, 0x09, 0xff, 0x3a, 0x15, 0xe3, 0xab, 0xca, 0x03, 0x98, 0x8e, 0xfe, 0xe4, 0x82, 0xaf, 0x44, + 0xd4, 0xcc, 0xf0, 0x67, 0x20, 0xe1, 0xea, 0x59, 0xb0, 0xc1, 0x61, 0xcb, 0x1f, 0x3c, 0x7b, 0x29, + 0xc6, 0x9e, 0xbf, 0x14, 0x63, 0xaf, 0x5f, 0x8a, 0xe8, 0xb3, 0x63, 0x11, 0x7d, 0x7f, 0x2c, 0xa2, + 0xa7, 0xc7, 0x22, 0x7a, 0x76, 0x2c, 0xa2, 0xdf, 0x8f, 0x45, 0xf4, 0xc7, 0xb1, 0x18, 0x7b, 0x7d, + 0x2c, 0xa2, 0xaf, 0x5e, 0x89, 0xb1, 0x67, 0xaf, 0xc4, 0xd8, 0xf3, 0x57, 0x62, 0xec, 0xe3, 0x54, + 0xab, 0xa3, 0x11, 0xc3, 0x6e, 0xa6, 0xe8, 0x87, 0xb7, 0x1b, 0x7f, 0x05, 0x00, 0x00, 0xff, 0xff, + 0x2f, 0xe4, 0x86, 0xaf, 0xf3, 0x13, 0x00, 0x00, } func (x MatchType) String() string { @@ -1709,6 +1940,20 @@ func (x MatchType) String() string { } return strconv.Itoa(int(x)) } +func (x ReadRequest_ResponseType) String() string { + s, ok := ReadRequest_ResponseType_name[int32(x)] + if ok { + return s + } + return strconv.Itoa(int(x)) +} +func (x StreamChunk_Encoding) String() string { + s, ok := StreamChunk_Encoding_name[int32(x)] + if ok { + return s + } + return strconv.Itoa(int(x)) +} func (this *LabelNamesAndValuesRequest) Equal(that interface{}) bool { if that == nil { return this == nil @@ -1924,6 +2169,14 @@ func (this *ReadRequest) Equal(that interface{}) bool { return false } } + if len(this.AcceptedResponseTypes) != len(that1.AcceptedResponseTypes) { + return false + } + for i := range this.AcceptedResponseTypes { + if this.AcceptedResponseTypes[i] != that1.AcceptedResponseTypes[i] { + return false + } + } return true } func (this *ReadResponse) Equal(that interface{}) bool { @@ -1955,14 +2208,14 @@ func (this *ReadResponse) Equal(that interface{}) bool { } return true } -func (this *QueryRequest) Equal(that interface{}) bool { +func (this *StreamReadResponse) Equal(that interface{}) bool { if that == nil { return this == nil } - that1, ok := that.(*QueryRequest) + that1, ok := that.(*StreamReadResponse) if !ok { - that2, ok := that.(QueryRequest) + that2, ok := that.(StreamReadResponse) if ok { that1 = &that2 } else { @@ -1974,30 +2227,27 @@ func (this *QueryRequest) Equal(that interface{}) bool { } else if this == nil { return false } - if this.StartTimestampMs != that1.StartTimestampMs { - return false - } - if this.EndTimestampMs != that1.EndTimestampMs { - return false - } - if len(this.Matchers) != len(that1.Matchers) { + if len(this.ChunkedSeries) != len(that1.ChunkedSeries) { return false } - for i := range this.Matchers { - if !this.Matchers[i].Equal(that1.Matchers[i]) { + for i := range this.ChunkedSeries { + if !this.ChunkedSeries[i].Equal(that1.ChunkedSeries[i]) { return false } } + if this.QueryIndex != that1.QueryIndex { + return false + } return true } -func (this *ExemplarQueryRequest) Equal(that interface{}) bool { +func (this *StreamChunkedSeries) Equal(that interface{}) bool { if that == nil { return this == nil } - that1, ok := that.(*ExemplarQueryRequest) + that1, ok := that.(*StreamChunkedSeries) if !ok { - that2, ok := that.(ExemplarQueryRequest) + that2, ok := that.(StreamChunkedSeries) if ok { that1 = &that2 } else { @@ -2009,30 +2259,32 @@ func (this *ExemplarQueryRequest) Equal(that interface{}) bool { } else if this == nil { return false } - if this.StartTimestampMs != that1.StartTimestampMs { + if len(this.Labels) != len(that1.Labels) { return false } - if this.EndTimestampMs != that1.EndTimestampMs { - return false + for i := range this.Labels { + if !this.Labels[i].Equal(that1.Labels[i]) { + return false + } } - if len(this.Matchers) != len(that1.Matchers) { + if len(this.Chunks) != len(that1.Chunks) { return false } - for i := range this.Matchers { - if !this.Matchers[i].Equal(that1.Matchers[i]) { + for i := range this.Chunks { + if !this.Chunks[i].Equal(&that1.Chunks[i]) { return false } } return true } -func (this *QueryResponse) Equal(that interface{}) bool { +func (this *StreamChunk) Equal(that interface{}) bool { if that == nil { return this == nil } - that1, ok := that.(*QueryResponse) + that1, ok := that.(*StreamChunk) if !ok { - that2, ok := that.(QueryResponse) + that2, ok := that.(StreamChunk) if ok { that1 = &that2 } else { @@ -2044,24 +2296,28 @@ func (this *QueryResponse) Equal(that interface{}) bool { } else if this == nil { return false } - if len(this.Timeseries) != len(that1.Timeseries) { + if this.MinTimeMs != that1.MinTimeMs { return false } - for i := range this.Timeseries { - if !this.Timeseries[i].Equal(&that1.Timeseries[i]) { - return false - } + if this.MaxTimeMs != that1.MaxTimeMs { + return false + } + if this.Type != that1.Type { + return false + } + if !bytes.Equal(this.Data, that1.Data) { + return false } return true } -func (this *QueryStreamResponse) Equal(that interface{}) bool { +func (this *QueryRequest) Equal(that interface{}) bool { if that == nil { return this == nil } - that1, ok := that.(*QueryStreamResponse) + that1, ok := that.(*QueryRequest) if !ok { - that2, ok := that.(QueryStreamResponse) + that2, ok := that.(QueryRequest) if ok { that1 = &that2 } else { @@ -2073,32 +2329,131 @@ func (this *QueryStreamResponse) Equal(that interface{}) bool { } else if this == nil { return false } - if len(this.Chunkseries) != len(that1.Chunkseries) { + if this.StartTimestampMs != that1.StartTimestampMs { return false } - for i := range this.Chunkseries { - if !this.Chunkseries[i].Equal(&that1.Chunkseries[i]) { - return false - } + if this.EndTimestampMs != that1.EndTimestampMs { + return false } - if len(this.Timeseries) != len(that1.Timeseries) { + if len(this.Matchers) != len(that1.Matchers) { return false } - for i := range this.Timeseries { - if !this.Timeseries[i].Equal(&that1.Timeseries[i]) { + for i := range this.Matchers { + if !this.Matchers[i].Equal(that1.Matchers[i]) { return false } } return true } -func (this *ExemplarQueryResponse) Equal(that interface{}) bool { +func (this *ExemplarQueryRequest) Equal(that interface{}) bool { if that == nil { return this == nil } - that1, ok := that.(*ExemplarQueryResponse) + that1, ok := that.(*ExemplarQueryRequest) if !ok { - that2, ok := that.(ExemplarQueryResponse) + that2, ok := that.(ExemplarQueryRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.StartTimestampMs != that1.StartTimestampMs { + return false + } + if this.EndTimestampMs != that1.EndTimestampMs { + return false + } + if len(this.Matchers) != len(that1.Matchers) { + return false + } + for i := range this.Matchers { + if !this.Matchers[i].Equal(that1.Matchers[i]) { + return false + } + } + return true +} +func (this *QueryResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*QueryResponse) + if !ok { + that2, ok := that.(QueryResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.Timeseries) != len(that1.Timeseries) { + return false + } + for i := range this.Timeseries { + if !this.Timeseries[i].Equal(&that1.Timeseries[i]) { + return false + } + } + return true +} +func (this *QueryStreamResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*QueryStreamResponse) + if !ok { + that2, ok := that.(QueryStreamResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.Chunkseries) != len(that1.Chunkseries) { + return false + } + for i := range this.Chunkseries { + if !this.Chunkseries[i].Equal(&that1.Chunkseries[i]) { + return false + } + } + if len(this.Timeseries) != len(that1.Timeseries) { + return false + } + for i := range this.Timeseries { + if !this.Timeseries[i].Equal(&that1.Timeseries[i]) { + return false + } + } + return true +} +func (this *ExemplarQueryResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*ExemplarQueryResponse) + if !ok { + that2, ok := that.(ExemplarQueryResponse) if ok { that1 = &that2 } else { @@ -2720,11 +3075,12 @@ func (this *ReadRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 5) + s := make([]string, 0, 6) s = append(s, "&client.ReadRequest{") if this.Queries != nil { s = append(s, "Queries: "+fmt.Sprintf("%#v", this.Queries)+",\n") } + s = append(s, "AcceptedResponseTypes: "+fmt.Sprintf("%#v", this.AcceptedResponseTypes)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -2740,6 +3096,49 @@ func (this *ReadResponse) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *StreamReadResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&client.StreamReadResponse{") + if this.ChunkedSeries != nil { + s = append(s, "ChunkedSeries: "+fmt.Sprintf("%#v", this.ChunkedSeries)+",\n") + } + s = append(s, "QueryIndex: "+fmt.Sprintf("%#v", this.QueryIndex)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *StreamChunkedSeries) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&client.StreamChunkedSeries{") + s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") + if this.Chunks != nil { + vs := make([]*StreamChunk, len(this.Chunks)) + for i := range vs { + vs[i] = &this.Chunks[i] + } + s = append(s, "Chunks: "+fmt.Sprintf("%#v", vs)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *StreamChunk) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) + s = append(s, "&client.StreamChunk{") + s = append(s, "MinTimeMs: "+fmt.Sprintf("%#v", this.MinTimeMs)+",\n") + s = append(s, "MaxTimeMs: "+fmt.Sprintf("%#v", this.MaxTimeMs)+",\n") + s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n") + s = append(s, "Data: "+fmt.Sprintf("%#v", this.Data)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func (this *QueryRequest) GoString() string { if this == nil { return "nil" @@ -3839,6 +4238,24 @@ func (m *ReadRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.AcceptedResponseTypes) > 0 { + dAtA2 := make([]byte, len(m.AcceptedResponseTypes)*10) + var j1 int + for _, num := range m.AcceptedResponseTypes { + for num >= 1<<7 { + dAtA2[j1] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j1++ + } + dAtA2[j1] = uint8(num) + j1++ + } + i -= j1 + copy(dAtA[i:], dAtA2[:j1]) + i = encodeVarintIngester(dAtA, i, uint64(j1)) + i-- + dAtA[i] = 0x12 + } if len(m.Queries) > 0 { for iNdEx := len(m.Queries) - 1; iNdEx >= 0; iNdEx-- { { @@ -3893,7 +4310,7 @@ func (m *ReadResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *QueryRequest) Marshal() (dAtA []byte, err error) { +func (m *StreamReadResponse) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -3903,20 +4320,25 @@ func (m *QueryRequest) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *QueryRequest) MarshalTo(dAtA []byte) (int, error) { +func (m *StreamReadResponse) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *QueryRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *StreamReadResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Matchers) > 0 { - for iNdEx := len(m.Matchers) - 1; iNdEx >= 0; iNdEx-- { + if m.QueryIndex != 0 { + i = encodeVarintIngester(dAtA, i, uint64(m.QueryIndex)) + i-- + dAtA[i] = 0x10 + } + if len(m.ChunkedSeries) > 0 { + for iNdEx := len(m.ChunkedSeries) - 1; iNdEx >= 0; iNdEx-- { { - size, err := m.Matchers[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + size, err := m.ChunkedSeries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -3924,23 +4346,13 @@ func (m *QueryRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintIngester(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x1a + dAtA[i] = 0xa } } - if m.EndTimestampMs != 0 { - i = encodeVarintIngester(dAtA, i, uint64(m.EndTimestampMs)) - i-- - dAtA[i] = 0x10 - } - if m.StartTimestampMs != 0 { - i = encodeVarintIngester(dAtA, i, uint64(m.StartTimestampMs)) - i-- - dAtA[i] = 0x8 - } return len(dAtA) - i, nil } -func (m *ExemplarQueryRequest) Marshal() (dAtA []byte, err error) { +func (m *StreamChunkedSeries) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -3950,20 +4362,20 @@ func (m *ExemplarQueryRequest) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *ExemplarQueryRequest) MarshalTo(dAtA []byte) (int, error) { +func (m *StreamChunkedSeries) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *ExemplarQueryRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *StreamChunkedSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Matchers) > 0 { - for iNdEx := len(m.Matchers) - 1; iNdEx >= 0; iNdEx-- { + if len(m.Chunks) > 0 { + for iNdEx := len(m.Chunks) - 1; iNdEx >= 0; iNdEx-- { { - size, err := m.Matchers[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + size, err := m.Chunks[iNdEx].MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -3971,23 +4383,72 @@ func (m *ExemplarQueryRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintIngester(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x1a + dAtA[i] = 0x12 } } - if m.EndTimestampMs != 0 { - i = encodeVarintIngester(dAtA, i, uint64(m.EndTimestampMs)) + if len(m.Labels) > 0 { + for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { + { + size := m.Labels[iNdEx].Size() + i -= size + if _, err := m.Labels[iNdEx].MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + i = encodeVarintIngester(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *StreamChunk) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StreamChunk) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StreamChunk) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Data) > 0 { + i -= len(m.Data) + copy(dAtA[i:], m.Data) + i = encodeVarintIngester(dAtA, i, uint64(len(m.Data))) + i-- + dAtA[i] = 0x22 + } + if m.Type != 0 { + i = encodeVarintIngester(dAtA, i, uint64(m.Type)) + i-- + dAtA[i] = 0x18 + } + if m.MaxTimeMs != 0 { + i = encodeVarintIngester(dAtA, i, uint64(m.MaxTimeMs)) i-- dAtA[i] = 0x10 } - if m.StartTimestampMs != 0 { - i = encodeVarintIngester(dAtA, i, uint64(m.StartTimestampMs)) + if m.MinTimeMs != 0 { + i = encodeVarintIngester(dAtA, i, uint64(m.MinTimeMs)) i-- dAtA[i] = 0x8 } return len(dAtA) - i, nil } -func (m *QueryResponse) Marshal() (dAtA []byte, err error) { +func (m *QueryRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -3997,20 +4458,20 @@ func (m *QueryResponse) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *QueryResponse) MarshalTo(dAtA []byte) (int, error) { +func (m *QueryRequest) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *QueryResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *QueryRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Timeseries) > 0 { - for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { + if len(m.Matchers) > 0 { + for iNdEx := len(m.Matchers) - 1; iNdEx >= 0; iNdEx-- { { - size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + size, err := m.Matchers[iNdEx].MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -4018,13 +4479,23 @@ func (m *QueryResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintIngester(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0xa + dAtA[i] = 0x1a } } + if m.EndTimestampMs != 0 { + i = encodeVarintIngester(dAtA, i, uint64(m.EndTimestampMs)) + i-- + dAtA[i] = 0x10 + } + if m.StartTimestampMs != 0 { + i = encodeVarintIngester(dAtA, i, uint64(m.StartTimestampMs)) + i-- + dAtA[i] = 0x8 + } return len(dAtA) - i, nil } -func (m *QueryStreamResponse) Marshal() (dAtA []byte, err error) { +func (m *ExemplarQueryRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -4034,20 +4505,20 @@ func (m *QueryStreamResponse) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *QueryStreamResponse) MarshalTo(dAtA []byte) (int, error) { +func (m *ExemplarQueryRequest) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *QueryStreamResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *ExemplarQueryRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Timeseries) > 0 { - for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { + if len(m.Matchers) > 0 { + for iNdEx := len(m.Matchers) - 1; iNdEx >= 0; iNdEx-- { { - size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + size, err := m.Matchers[iNdEx].MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -4055,7 +4526,91 @@ func (m *QueryStreamResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintIngester(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x12 + dAtA[i] = 0x1a + } + } + if m.EndTimestampMs != 0 { + i = encodeVarintIngester(dAtA, i, uint64(m.EndTimestampMs)) + i-- + dAtA[i] = 0x10 + } + if m.StartTimestampMs != 0 { + i = encodeVarintIngester(dAtA, i, uint64(m.StartTimestampMs)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *QueryResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Timeseries) > 0 { + for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintIngester(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *QueryStreamResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryStreamResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryStreamResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Timeseries) > 0 { + for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintIngester(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 } } if len(m.Chunkseries) > 0 { @@ -4934,6 +5489,13 @@ func (m *ReadRequest) Size() (n int) { n += 1 + l + sovIngester(uint64(l)) } } + if len(m.AcceptedResponseTypes) > 0 { + l = 0 + for _, e := range m.AcceptedResponseTypes { + l += sovIngester(uint64(e)) + } + n += 1 + sovIngester(uint64(l)) + l + } return n } @@ -4952,6 +5514,67 @@ func (m *ReadResponse) Size() (n int) { return n } +func (m *StreamReadResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.ChunkedSeries) > 0 { + for _, e := range m.ChunkedSeries { + l = e.Size() + n += 1 + l + sovIngester(uint64(l)) + } + } + if m.QueryIndex != 0 { + n += 1 + sovIngester(uint64(m.QueryIndex)) + } + return n +} + +func (m *StreamChunkedSeries) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sovIngester(uint64(l)) + } + } + if len(m.Chunks) > 0 { + for _, e := range m.Chunks { + l = e.Size() + n += 1 + l + sovIngester(uint64(l)) + } + } + return n +} + +func (m *StreamChunk) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.MinTimeMs != 0 { + n += 1 + sovIngester(uint64(m.MinTimeMs)) + } + if m.MaxTimeMs != 0 { + n += 1 + sovIngester(uint64(m.MaxTimeMs)) + } + if m.Type != 0 { + n += 1 + sovIngester(uint64(m.Type)) + } + l = len(m.Data) + if l > 0 { + n += 1 + l + sovIngester(uint64(l)) + } + return n +} + func (m *QueryRequest) Size() (n int) { if m == nil { return 0 @@ -5460,6 +6083,7 @@ func (this *ReadRequest) String() string { repeatedStringForQueries += "}" s := strings.Join([]string{`&ReadRequest{`, `Queries:` + repeatedStringForQueries + `,`, + `AcceptedResponseTypes:` + fmt.Sprintf("%v", this.AcceptedResponseTypes) + `,`, `}`, }, "") return s @@ -5479,6 +6103,51 @@ func (this *ReadResponse) String() string { }, "") return s } +func (this *StreamReadResponse) String() string { + if this == nil { + return "nil" + } + repeatedStringForChunkedSeries := "[]*StreamChunkedSeries{" + for _, f := range this.ChunkedSeries { + repeatedStringForChunkedSeries += strings.Replace(f.String(), "StreamChunkedSeries", "StreamChunkedSeries", 1) + "," + } + repeatedStringForChunkedSeries += "}" + s := strings.Join([]string{`&StreamReadResponse{`, + `ChunkedSeries:` + repeatedStringForChunkedSeries + `,`, + `QueryIndex:` + fmt.Sprintf("%v", this.QueryIndex) + `,`, + `}`, + }, "") + return s +} +func (this *StreamChunkedSeries) String() string { + if this == nil { + return "nil" + } + repeatedStringForChunks := "[]StreamChunk{" + for _, f := range this.Chunks { + repeatedStringForChunks += strings.Replace(strings.Replace(f.String(), "StreamChunk", "StreamChunk", 1), `&`, ``, 1) + "," + } + repeatedStringForChunks += "}" + s := strings.Join([]string{`&StreamChunkedSeries{`, + `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, + `Chunks:` + repeatedStringForChunks + `,`, + `}`, + }, "") + return s +} +func (this *StreamChunk) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&StreamChunk{`, + `MinTimeMs:` + fmt.Sprintf("%v", this.MinTimeMs) + `,`, + `MaxTimeMs:` + fmt.Sprintf("%v", this.MaxTimeMs) + `,`, + `Type:` + fmt.Sprintf("%v", this.Type) + `,`, + `Data:` + fmt.Sprintf("%v", this.Data) + `,`, + `}`, + }, "") + return s +} func (this *QueryRequest) String() string { if this == nil { return "nil" @@ -6550,6 +7219,75 @@ func (m *ReadRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 2: + if wireType == 0 { + var v ReadRequest_ResponseType + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= ReadRequest_ResponseType(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.AcceptedResponseTypes = append(m.AcceptedResponseTypes, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthIngester + } + postIndex := iNdEx + packedLen + if postIndex < 0 { + return ErrInvalidLengthIngester + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var elementCount int + if elementCount != 0 && len(m.AcceptedResponseTypes) == 0 { + m.AcceptedResponseTypes = make([]ReadRequest_ResponseType, 0, elementCount) + } + for iNdEx < postIndex { + var v ReadRequest_ResponseType + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= ReadRequest_ResponseType(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.AcceptedResponseTypes = append(m.AcceptedResponseTypes, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field AcceptedResponseTypes", wireType) + } default: iNdEx = preIndex skippy, err := skipIngester(dAtA[iNdEx:]) @@ -6661,6 +7399,377 @@ func (m *ReadResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *StreamReadResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamReadResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamReadResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ChunkedSeries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthIngester + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthIngester + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ChunkedSeries = append(m.ChunkedSeries, &StreamChunkedSeries{}) + if err := m.ChunkedSeries[len(m.ChunkedSeries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field QueryIndex", wireType) + } + m.QueryIndex = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.QueryIndex |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipIngester(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthIngester + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthIngester + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *StreamChunkedSeries) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamChunkedSeries: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamChunkedSeries: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthIngester + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthIngester + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, github_com_grafana_mimir_pkg_mimirpb.LabelAdapter{}) + if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthIngester + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthIngester + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Chunks = append(m.Chunks, StreamChunk{}) + if err := m.Chunks[len(m.Chunks)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipIngester(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthIngester + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthIngester + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *StreamChunk) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamChunk: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamChunk: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MinTimeMs", wireType) + } + m.MinTimeMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MinTimeMs |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxTimeMs", wireType) + } + m.MaxTimeMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MaxTimeMs |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Type |= StreamChunk_Encoding(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIngester + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthIngester + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthIngester + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipIngester(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthIngester + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthIngester + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *QueryRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/ingester/client/ingester.proto b/pkg/ingester/client/ingester.proto index 55375d666c2..d381f31ea92 100644 --- a/pkg/ingester/client/ingester.proto +++ b/pkg/ingester/client/ingester.proto @@ -67,12 +67,41 @@ message LabelValueSeriesCount { message ReadRequest { repeated QueryRequest queries = 1; + + enum ResponseType { + SAMPLES = 0; + STREAMED_XOR_CHUNKS = 1; + } + repeated ResponseType accepted_response_types = 2; } message ReadResponse { repeated QueryResponse results = 1; } +message StreamReadResponse { + repeated StreamChunkedSeries chunked_series = 1; + + int64 query_index = 2; +} + +message StreamChunkedSeries { + repeated cortexpb.LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/grafana/mimir/pkg/mimirpb.LabelAdapter"]; + repeated StreamChunk chunks = 2 [(gogoproto.nullable) = false]; +} + +message StreamChunk { + int64 min_time_ms = 1; + int64 max_time_ms = 2; + + enum Encoding { + UNKNOWN = 0; + XOR = 1; + } + Encoding type = 3; + bytes data = 4; +} + message QueryRequest { int64 start_timestamp_ms = 1; int64 end_timestamp_ms = 2; diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index bc520453818..59263923ba3 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -129,8 +129,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, engine } -// NewSampleAndChunkQueryable creates a SampleAndChunkQueryable from a -// Queryable with a ChunkQueryable stub, that errors once it get's called. +// NewSampleAndChunkQueryable creates a SampleAndChunkQueryable from a Queryable. func NewSampleAndChunkQueryable(q storage.Queryable) storage.SampleAndChunkQueryable { return &sampleAndChunkQueryable{q} } @@ -140,7 +139,19 @@ type sampleAndChunkQueryable struct { } func (q *sampleAndChunkQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { - return nil, errors.New("ChunkQuerier not implemented") + qr, err := q.Queryable.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + return &chunkQuerier{qr}, nil +} + +type chunkQuerier struct { + storage.Querier +} + +func (q *chunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { + return storage.NewSeriesSetToChunkSet(q.Querier.Select(sortSeries, hints, matchers...)) } // QueryableWithFilter extends Queryable interface with `UseQueryable` filtering function. diff --git a/pkg/querier/remote_read.go b/pkg/querier/remote_read.go index 0012ce901ee..b83126fcc2d 100644 --- a/pkg/querier/remote_read.go +++ b/pkg/querier/remote_read.go @@ -6,11 +6,16 @@ package querier import ( + "context" + "io" "net/http" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" "github.com/prometheus/prometheus/storage" + prom_remote "github.com/prometheus/prometheus/storage/remote" "github.com/grafana/mimir/pkg/ingester/client" "github.com/grafana/mimir/pkg/mimirpb" @@ -18,66 +23,160 @@ import ( util_log "github.com/grafana/mimir/pkg/util/log" ) -// Queries are a set of matchers with time ranges - should not get into megabytes -const maxRemoteReadQuerySize = 1024 * 1024 +const ( + // Queries are a set of matchers with time ranges - should not get into megabytes + maxRemoteReadQuerySize = 1024 * 1024 + + // Maximum number of bytes in frame when using streaming remote read. + // Google's recommendation is to keep protobuf message not larger than 1MB. + // https://developers.google.com/protocol-buffers/docs/techniques#large-data + maxRemoteReadFrameBytes = 1024 * 1024 +) // RemoteReadHandler handles Prometheus remote read requests. -func RemoteReadHandler(q storage.Queryable, logger log.Logger) http.Handler { +func RemoteReadHandler(q storage.SampleAndChunkQueryable, logger log.Logger) http.Handler { + return remoteReadHandler(q, maxRemoteReadFrameBytes, logger) +} + +func remoteReadHandler(q storage.SampleAndChunkQueryable, maxBytesInFrame int, lg log.Logger) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() var req client.ReadRequest - logger := util_log.WithContext(r.Context(), logger) + logger := util_log.WithContext(r.Context(), lg) if _, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, nil, &req, util.RawSnappy); err != nil { level.Error(logger).Log("msg", "failed to parse proto", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return } - // Fetch samples for all queries in parallel. - resp := client.ReadResponse{ - Results: make([]*client.QueryResponse, len(req.Queries)), + respType, err := negotiateResponseType(req.AcceptedResponseTypes) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return } - errors := make(chan error) - for i, qr := range req.Queries { - go func(i int, qr *client.QueryRequest) { - from, to, matchers, err := client.FromQueryRequest(qr) - if err != nil { - errors <- err - return - } - - querier, err := q.Querier(ctx, int64(from), int64(to)) - if err != nil { - errors <- err - return - } - - params := &storage.SelectHints{ - Start: int64(from), - End: int64(to), - } - seriesSet := querier.Select(false, params, matchers...) - resp.Results[i], err = seriesSetToQueryResponse(seriesSet) - errors <- err - }(i, qr) + + switch respType { + case client.STREAMED_XOR_CHUNKS: + remoteReadStreamedXORChunks(ctx, q, w, &req, maxBytesInFrame, logger) + default: + remoteReadSamples(ctx, q, w, &req, logger) } + }) +} - var lastErr error - for range req.Queries { - err := <-errors +func remoteReadSamples( + ctx context.Context, + q storage.Queryable, + w http.ResponseWriter, + req *client.ReadRequest, + logger log.Logger, +) { + resp := client.ReadResponse{ + Results: make([]*client.QueryResponse, len(req.Queries)), + } + // Fetch samples for all queries in parallel. + errCh := make(chan error) + + for i, qr := range req.Queries { + go func(i int, qr *client.QueryRequest) { + from, to, matchers, err := client.FromQueryRequest(qr) + if err != nil { + errCh <- err + return + } + + querier, err := q.Querier(ctx, int64(from), int64(to)) if err != nil { - lastErr = err + errCh <- err + return + } + + params := &storage.SelectHints{ + Start: int64(from), + End: int64(to), } + seriesSet := querier.Select(false, params, matchers...) + resp.Results[i], err = seriesSetToQueryResponse(seriesSet) + errCh <- err + }(i, qr) + } + + var lastErr error + for range req.Queries { + err := <-errCh + if err != nil { + lastErr = err } - if lastErr != nil { - http.Error(w, lastErr.Error(), http.StatusBadRequest) + } + if lastErr != nil { + http.Error(w, lastErr.Error(), http.StatusBadRequest) + return + } + w.Header().Add("Content-Type", "application/x-protobuf") + w.Header().Set("Content-Encoding", "snappy") + + if err := util.SerializeProtoResponse(w, &resp, util.RawSnappy); err != nil { + level.Error(logger).Log("msg", "error sending remote read response", "err", err) + } +} + +func remoteReadStreamedXORChunks( + ctx context.Context, + q storage.ChunkQueryable, + w http.ResponseWriter, + req *client.ReadRequest, + maxBytesInFrame int, + logger log.Logger, +) { + f, ok := w.(http.Flusher) + if !ok { + http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusBadRequest) + return + } + + w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") + + for i, qr := range req.Queries { + if err := processReadStreamedQueryRequest(ctx, i, qr, q, w, f, maxBytesInFrame); err != nil { + level.Error(logger).Log("msg", "error streaming remote read response", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.Header().Add("Content-Type", "application/x-protobuf") - if err := util.SerializeProtoResponse(w, &resp, util.RawSnappy); err != nil { - level.Error(logger).Log("msg", "error sending remote read response", "err", err) - } - }) + } + w.WriteHeader(http.StatusOK) +} + +func processReadStreamedQueryRequest( + ctx context.Context, + idx int, + queryReq *client.QueryRequest, + q storage.ChunkQueryable, + w http.ResponseWriter, + f http.Flusher, + maxBytesInFrame int, +) error { + from, to, matchers, err := client.FromQueryRequest(queryReq) + if err != nil { + return err + } + + querier, err := q.ChunkQuerier(ctx, int64(from), int64(to)) + if err != nil { + return err + } + + params := &storage.SelectHints{ + Start: int64(from), + End: int64(to), + } + + return streamChunkedReadResponses( + prom_remote.NewChunkedWriter(w, f), + // The streaming API has to provide the series sorted. + querier.Select(true, params, matchers...), + idx, + maxBytesInFrame, + ) } func seriesSetToQueryResponse(s storage.SeriesSet) (*client.QueryResponse, error) { @@ -105,3 +204,86 @@ func seriesSetToQueryResponse(s storage.SeriesSet) (*client.QueryResponse, error return result, s.Err() } + +func negotiateResponseType(accepted []client.ReadRequest_ResponseType) (client.ReadRequest_ResponseType, error) { + if len(accepted) == 0 { + return client.SAMPLES, nil + } + + supported := map[client.ReadRequest_ResponseType]struct{}{ + client.SAMPLES: {}, + client.STREAMED_XOR_CHUNKS: {}, + } + + for _, resType := range accepted { + if _, ok := supported[resType]; ok { + return resType, nil + } + } + return 0, errors.Errorf("server does not support any of the requested response types: %v; supported: %v", accepted, supported) +} + +func streamChunkedReadResponses(stream io.Writer, ss storage.ChunkSeriesSet, queryIndex, maxBytesInFrame int) error { + var ( + chks []client.StreamChunk + lbls []mimirpb.LabelAdapter + ) + + for ss.Next() { + series := ss.At() + iter := series.Iterator() + lbls = mimirpb.FromLabelsToLabelAdapters(series.Labels()) + + frameBytesLeft := maxBytesInFrame + for _, lbl := range lbls { + frameBytesLeft -= lbl.Size() + } + + isNext := iter.Next() + + for isNext { + chk := iter.At() + + if chk.Chunk == nil { + return errors.Errorf("found not populated chunk returned by SeriesSet at ref: %v", chk.Ref) + } + + // Cut the chunk. + chks = append(chks, client.StreamChunk{ + MinTimeMs: chk.MinTime, + MaxTimeMs: chk.MaxTime, + Type: client.StreamChunk_Encoding(chk.Chunk.Encoding()), + Data: chk.Chunk.Bytes(), + }) + frameBytesLeft -= chks[len(chks)-1].Size() + + // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. + isNext = iter.Next() + if frameBytesLeft > 0 && isNext { + continue + } + + b, err := proto.Marshal(&client.StreamReadResponse{ + ChunkedSeries: []*client.StreamChunkedSeries{ + { + Labels: lbls, + Chunks: chks, + }, + }, + QueryIndex: int64(queryIndex), + }) + if err != nil { + return errors.Wrap(err, "marshal client.StreamReadResponse") + } + + if _, err := stream.Write(b); err != nil { + return errors.Wrap(err, "write to stream") + } + chks = chks[:0] + } + if err := iter.Err(); err != nil { + return err + } + } + return ss.Err() +} diff --git a/pkg/querier/remote_read_test.go b/pkg/querier/remote_read_test.go index 0cf64695044..99ceccb65bc 100644 --- a/pkg/querier/remote_read_test.go +++ b/pkg/querier/remote_read_test.go @@ -8,7 +8,7 @@ package querier import ( "bytes" "context" - "fmt" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -20,6 +20,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + prom_remote "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/require" "github.com/grafana/mimir/pkg/ingester/client" @@ -27,22 +29,61 @@ import ( "github.com/grafana/mimir/pkg/storage/series" ) -func TestRemoteReadHandler(t *testing.T) { - q := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return mockQuerier{ - matrix: model.Matrix{ - { - Metric: model.Metric{"foo": "bar"}, - Values: []model.SamplePair{ - {Timestamp: 0, Value: 0}, - {Timestamp: 1, Value: 1}, - {Timestamp: 2, Value: 2}, - {Timestamp: 3, Value: 3}, +type mockSampleAndChunkQueryable struct { + queryableFn func(ctx context.Context, mint, maxt int64) (storage.Querier, error) + chunkQueryableFn func(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) +} + +func (m mockSampleAndChunkQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return m.queryableFn(ctx, mint, maxt) +} + +func (m mockSampleAndChunkQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { + return m.chunkQueryableFn(ctx, mint, maxt) +} + +type mockQuerier struct { + storage.Querier + matrix model.Matrix +} + +func (m mockQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + if sp == nil { + panic("mockQuerier: select params must be set") + } + return series.MatrixToSeriesSet(m.matrix) +} + +type mockChunkQuerier struct { + storage.ChunkQuerier + matrix model.Matrix +} + +func (m mockChunkQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { + if sp == nil { + panic("mockChunkQuerier: select params must be set") + } + return storage.NewSeriesSetToChunkSet(series.MatrixToSeriesSet(m.matrix)) +} + +func TestSampledRemoteRead(t *testing.T) { + q := &mockSampleAndChunkQueryable{ + queryableFn: func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return mockQuerier{ + matrix: model.Matrix{ + { + Metric: model.Metric{"foo": "bar"}, + Values: []model.SamplePair{ + {Timestamp: 0, Value: 0}, + {Timestamp: 1, Value: 1}, + {Timestamp: 2, Value: 2}, + {Timestamp: 3, Value: 3}, + }, }, }, - }, - }, nil - }) + }, nil + }, + } handler := RemoteReadHandler(q, log.NewNopLogger()) requestBody, err := proto.Marshal(&client.ReadRequest{ @@ -52,7 +93,7 @@ func TestRemoteReadHandler(t *testing.T) { }) require.NoError(t, err) requestBody = snappy.Encode(nil, requestBody) - request, err := http.NewRequest("GET", "/query", bytes.NewReader(requestBody)) + request, err := http.NewRequest(http.MethodPost, "/api/v1/read", bytes.NewReader(requestBody)) require.NoError(t, err) request.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") @@ -91,25 +132,185 @@ func TestRemoteReadHandler(t *testing.T) { require.Equal(t, expected, response) } -type mockQuerier struct { - matrix model.Matrix -} +func TestStreamedRemoteRead(t *testing.T) { + tcs := map[string]struct { + samples []model.SamplePair + expectedResults []*client.StreamReadResponse + }{ + "with 120 samples, we expect 1 frame with 1 chunk": { + samples: getNSamples(120), + expectedResults: []*client.StreamReadResponse{ + { + ChunkedSeries: []*client.StreamChunkedSeries{ + { + Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar"}}, + Chunks: []client.StreamChunk{ + { + MinTimeMs: 0, + MaxTimeMs: 119, + Type: client.XOR, + Data: getIndexedXORChunk(0, 120), + }, + }, + }, + }, + QueryIndex: 0, + }, + }, + }, + "with 121 samples, we expect 1 frame with 2 chunks": { + samples: getNSamples(121), + expectedResults: []*client.StreamReadResponse{ + { + ChunkedSeries: []*client.StreamChunkedSeries{ + { + Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar"}}, + Chunks: []client.StreamChunk{ + { + MinTimeMs: 0, + MaxTimeMs: 119, + Type: client.XOR, + Data: getIndexedXORChunk(0, 121), + }, + { + MinTimeMs: 120, + MaxTimeMs: 120, + Type: client.XOR, + Data: getIndexedXORChunk(1, 121), + }, + }, + }, + }, + QueryIndex: 0, + }, + }, + }, + "with 241 samples, we expect 1 frame with 2 chunks, and 1 frame with 1 chunk due to frame limit": { + samples: getNSamples(241), + expectedResults: []*client.StreamReadResponse{ + { + ChunkedSeries: []*client.StreamChunkedSeries{ + { + Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar"}}, + Chunks: []client.StreamChunk{ + { + MinTimeMs: 0, + MaxTimeMs: 119, + Type: client.XOR, + Data: getIndexedXORChunk(0, 241), + }, + { + MinTimeMs: 120, + MaxTimeMs: 239, + Type: client.XOR, + Data: getIndexedXORChunk(1, 241), + }, + }, + }, + }, + QueryIndex: 0, + }, + { + ChunkedSeries: []*client.StreamChunkedSeries{ + { + Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar"}}, + Chunks: []client.StreamChunk{ + { + MinTimeMs: 240, + MaxTimeMs: 240, + Type: client.XOR, + Data: getIndexedXORChunk(2, 241), + }, + }, + }, + }, + QueryIndex: 0, + }, + }, + }, + } + for tn, tc := range tcs { + t.Run(tn, func(t *testing.T) { + q := &mockSampleAndChunkQueryable{ + chunkQueryableFn: func(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { + return mockChunkQuerier{ + matrix: model.Matrix{ + { + Metric: model.Metric{"foo": "bar"}, + Values: tc.samples, + }, + }, + }, nil + }, + } + // Labelset has 10 bytes. Full frame in test data has roughly 160 bytes. This allows us to have at max 2 frames in this test. + maxBytesInFrame := 10 + 160*2 -func (m mockQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - if sp == nil { - panic(fmt.Errorf("select params must be set")) + handler := remoteReadHandler(q, maxBytesInFrame, log.NewNopLogger()) + + requestBody, err := proto.Marshal(&client.ReadRequest{ + Queries: []*client.QueryRequest{ + {StartTimestampMs: 0, EndTimestampMs: 10}, + }, + AcceptedResponseTypes: []client.ReadRequest_ResponseType{client.STREAMED_XOR_CHUNKS}, + }) + require.NoError(t, err) + requestBody = snappy.Encode(nil, requestBody) + request, err := http.NewRequest(http.MethodPost, "/api/v1/read", bytes.NewReader(requestBody)) + require.NoError(t, err) + request.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, request) + + require.Equal(t, 200, recorder.Result().StatusCode) + require.Equal(t, []string{"application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"}, recorder.Result().Header["Content-Type"]) + + stream := prom_remote.NewChunkedReader(recorder.Result().Body, prom_remote.DefaultChunkedReadLimit, nil) + + i := 0 + for { + var res client.StreamReadResponse + err := stream.NextProto(&res) + if err == io.EOF { + break + } + require.NoError(t, err) + + if len(tc.expectedResults) < i+1 { + require.Fail(t, "unexpected result message") + } + require.Equal(t, tc.expectedResults[i], &res) + i++ + } + }) } - return series.MatrixToSeriesSet(m.matrix) } -func (m mockQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { - return nil, nil, nil +func getNSamples(n int) []model.SamplePair { + var retVal []model.SamplePair + for i := 0; i < n; i++ { + retVal = append(retVal, model.SamplePair{ + Timestamp: model.Time(i), + Value: model.SampleValue(i), + }) + } + return retVal } -func (m mockQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { - return nil, nil, nil -} +func getIndexedXORChunk(idx, samplesCount int) []byte { + const samplesPerChunk = 120 -func (mockQuerier) Close() error { - return nil + enc := chunkenc.NewXORChunk() + ap, _ := enc.Appender() + + baseIdx := idx * samplesPerChunk + for i := 0; i < samplesPerChunk; i++ { + j := baseIdx + i + if j >= samplesCount { + break + } + ap.Append(int64(j), float64(j)) + } + return enc.Bytes() }