From 34dfec2461631ca4a103a8567a157ea182c0a532 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Fri, 25 Jun 2021 15:30:11 -0400 Subject: [PATCH 1/7] Add per-user query metrics for series and bytes returned Add stats included in query responses from the querier and distributor for measuring the number of series and bytes included in successful queries. These stats are emitted per-user as summaries from the query frontends. These stats are picked to add visibility into the same resources limited as part of #4179 and #4216. Fixes #4259 Signed-off-by: Nick Pillitteri --- CHANGELOG.md | 2 +- pkg/distributor/query.go | 5 ++ pkg/frontend/transport/handler.go | 32 ++++++- pkg/frontend/transport/handler_test.go | 63 +++++++++++++ pkg/querier/blocks_store_queryable.go | 24 ++--- pkg/querier/stats/stats.go | 34 +++++++ pkg/querier/stats/stats.pb.go | 117 +++++++++++++++++++++---- pkg/querier/stats/stats.proto | 4 + pkg/querier/stats/stats_test.go | 91 +++++++++++++++++++ 9 files changed, 343 insertions(+), 29 deletions(-) create mode 100644 pkg/querier/stats/stats_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ad847de93f..06e391bafc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## master / unreleased * [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317 - +* [FEATURE] Query Frontend: Add `cortex_query_series` and `cortex_query_bytes` per-user histograms to expose the number of series and bytes returned by queries. These metrics can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #4343 * [CHANGE] Update Go version to 1.16.6. #4362 * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 * [CHANGE] Memberlist: the `memberlist_kv_store_value_bytes` has been removed due to values no longer being stored in-memory as encoded bytes. #4345 diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 8619a07125..7e6d7d23bf 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -13,6 +13,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" ingester_client "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" @@ -282,6 +283,7 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) { var ( queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) + reqStats = stats.FromContext(ctx) ) // Fetch samples from multiple ingesters @@ -383,6 +385,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri resp.Timeseries = append(resp.Timeseries, series) } + reqStats.AddSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries))) + reqStats.AddBytes(uint64(resp.ChunksSize())) + return resp, nil } diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 435a022748..f41489899d 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -60,6 +60,8 @@ type Handler struct { // Metrics. querySeconds *prometheus.CounterVec + querySeries *prometheus.SummaryVec + queryBytes *prometheus.SummaryVec activeUsers *util.ActiveUsersCleanupService } @@ -77,8 +79,27 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge Help: "Total amount of wall clock time spend processing queries.", }, []string{"user"}) + // Empty objectives for these summaries on purpose since they can't be aggregate + // and so we are just using them for the convenience of sum and count metrics. No + // histograms here since the cardinality from the number of buckets required to + // understand query responses is prohibitively expensive. + + h.querySeries = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{ + Name: "cortex_query_series", + Help: "Number of series returned by successful queries.", + Objectives: map[float64]float64{}, + }, []string{"user"}) + + h.queryBytes = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{ + Name: "cortex_query_bytes", + Help: "Number of bytes returned by successful queries.", + Objectives: map[float64]float64{}, + }, []string{"user"}) + h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { h.querySeconds.DeleteLabelValues(user) + h.querySeries.DeleteLabelValues(user) + h.queryBytes.DeleteLabelValues(user) }) // If cleaner stops or fail, we will simply not clean the metrics for inactive users. _ = h.activeUsers.StartAsync(context.Background()) @@ -165,9 +186,14 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer return } userID := tenant.JoinTenantIDs(tenantIDs) + wallTime := stats.LoadWallTime() + numSeries := stats.LoadSeries() + numBytes := stats.LoadBytes() // Track stats. - f.querySeconds.WithLabelValues(userID).Add(stats.LoadWallTime().Seconds()) + f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds()) + f.querySeries.WithLabelValues(userID).Observe(float64(numSeries)) + f.queryBytes.WithLabelValues(userID).Observe(float64(numBytes)) f.activeUsers.UpdateUserTimestamp(userID, time.Now()) // Log stats. @@ -177,7 +203,9 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer "method", r.Method, "path", r.URL.Path, "response_time", queryResponseTime, - "query_wall_time_seconds", stats.LoadWallTime().Seconds(), + "query_wall_time_seconds", wallTime.Seconds(), + "query_series", numSeries, + "query_bytes", numBytes, }, formatQueryString(queryString)...) level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index 136f187993..f9e478e9ae 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -2,15 +2,28 @@ package transport import ( "context" + "io" "net/http" "net/http/httptest" + "strings" "testing" + "github.com/go-kit/kit/log" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" ) +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return f(r) +} + func TestWriteError(t *testing.T) { for _, test := range []struct { status int @@ -28,3 +41,53 @@ func TestWriteError(t *testing.T) { }) } } + +func TestHandler_ServeHTTP(t *testing.T) { + for _, tt := range []struct { + name string + cfg HandlerConfig + expectedMetrics int + }{ + { + name: "test handler with stats enabled", + cfg: HandlerConfig{QueryStatsEnabled: true}, + expectedMetrics: 3, + }, + { + name: "test handler with stats disabled", + cfg: HandlerConfig{QueryStatsEnabled: false}, + expectedMetrics: 0, + }, + } { + t.Run(tt.name, func(t *testing.T) { + roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("{}")), + }, nil + }) + + reg := prometheus.NewPedanticRegistry() + handler := NewHandler(tt.cfg, roundTripper, log.NewNopLogger(), reg) + + ctx := user.InjectOrgID(context.Background(), "12345") + req := httptest.NewRequest("GET", "/", nil) + req = req.WithContext(ctx) + resp := httptest.NewRecorder() + + handler.ServeHTTP(resp, req) + _, _ = io.ReadAll(resp.Body) + require.Equal(t, resp.Code, http.StatusOK) + + count, err := promtest.GatherAndCount( + reg, + "cortex_query_seconds_total", + "cortex_query_series", + "cortex_query_bytes", + ) + + assert.NoError(t, err) + assert.Equal(t, tt.expectedMetrics, count) + }) + } +} diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 98cb61e140..8411546b65 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -29,6 +29,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/series" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/storage/bucket" @@ -565,6 +566,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( numChunks = atomic.NewInt32(0) spanLog = spanlogger.FromContext(ctx) queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) + reqStats = stats.FromContext(ctx) ) // Concurrently fetch series from all clients. @@ -626,10 +628,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit)) } } - chunksSize := 0 - for _, c := range s.Chunks { - chunksSize += c.Size() - } + chunksSize := countChunkBytes(s) if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil { return validation.LimitError(chunkBytesLimitErr.Error()) } @@ -657,10 +656,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( } } + numSeries := len(mySeries) + chunkBytes := countChunkBytes(mySeries...) + + reqStats.AddSeries(uint64(numSeries)) + reqStats.AddBytes(uint64(chunkBytes)) + level.Debug(spanLog).Log("msg", "received series from store-gateway", "instance", c.RemoteAddress(), - "num series", len(mySeries), - "bytes series", countSeriesBytes(mySeries), + "num series", numSeries, + "bytes series", chunkBytes, "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) @@ -944,12 +949,11 @@ func convertBlockHintsToULIDs(hints []hintspb.Block) ([]ulid.ULID, error) { return res, nil } -func countSeriesBytes(series []*storepb.Series) (count uint64) { +// countChunkBytes returns the size of the chunks making up the provided series in bytes +func countChunkBytes(series ...*storepb.Series) (count int) { for _, s := range series { for _, c := range s.Chunks { - if c.Raw != nil { - count += uint64(len(c.Raw.Data)) - } + count += c.Size() } } diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index 05a7de5347..9b528369ef 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -54,6 +54,38 @@ func (s *Stats) LoadWallTime() time.Duration { return time.Duration(atomic.LoadInt64((*int64)(&s.WallTime))) } +func (s *Stats) AddSeries(series uint64) { + if s == nil { + return + } + + atomic.AddUint64(&s.NumSeries, series) +} + +func (s *Stats) LoadSeries() uint64 { + if s == nil { + return 0 + } + + return atomic.LoadUint64(&s.NumSeries) +} + +func (s *Stats) AddBytes(bytes uint64) { + if s == nil { + return + } + + atomic.AddUint64(&s.NumBytes, bytes) +} + +func (s *Stats) LoadBytes() uint64 { + if s == nil { + return 0 + } + + return atomic.LoadUint64(&s.NumBytes) +} + // Merge the provide Stats into this one. func (s *Stats) Merge(other *Stats) { if s == nil || other == nil { @@ -61,6 +93,8 @@ func (s *Stats) Merge(other *Stats) { } s.AddWallTime(other.LoadWallTime()) + s.AddSeries(other.LoadSeries()) + s.AddBytes(other.LoadBytes()) } func ShouldTrackHTTPGRPCResponse(r *httpgrpc.HTTPResponse) bool { diff --git a/pkg/querier/stats/stats.pb.go b/pkg/querier/stats/stats.pb.go index b9ec9a49ba..34481eb4fa 100644 --- a/pkg/querier/stats/stats.pb.go +++ b/pkg/querier/stats/stats.pb.go @@ -32,6 +32,10 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type Stats struct { // The sum of all wall time spent in the querier to execute the query. WallTime time.Duration `protobuf:"bytes,1,opt,name=wall_time,json=wallTime,proto3,stdduration" json:"wall_time"` + // The number of series included in the query response + NumSeries uint64 `protobuf:"varint,2,opt,name=num_series,json=numSeries,proto3" json:"num_series,omitempty"` + // The number of bytes of the chunks included in the query response + NumBytes uint64 `protobuf:"varint,3,opt,name=num_bytes,json=numBytes,proto3" json:"num_bytes,omitempty"` } func (m *Stats) Reset() { *m = Stats{} } @@ -73,6 +77,20 @@ func (m *Stats) GetWallTime() time.Duration { return 0 } +func (m *Stats) GetNumSeries() uint64 { + if m != nil { + return m.NumSeries + } + return 0 +} + +func (m *Stats) GetNumBytes() uint64 { + if m != nil { + return m.NumBytes + } + return 0 +} + func init() { proto.RegisterType((*Stats)(nil), "stats.Stats") } @@ -80,21 +98,24 @@ func init() { func init() { proto.RegisterFile("stats.proto", fileDescriptor_b4756a0aec8b9d44) } var fileDescriptor_b4756a0aec8b9d44 = []byte{ - // 213 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0x2e, 0x49, 0x2c, - 0x29, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x73, 0xa4, 0x74, 0xd3, 0x33, 0x4b, - 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0xd3, 0xf3, 0xd3, 0xf3, 0xf5, 0xc1, 0xb2, 0x49, - 0xa5, 0x69, 0x60, 0x1e, 0x98, 0x03, 0x66, 0x41, 0x74, 0x49, 0xc9, 0xa5, 0xe7, 0xe7, 0xa7, 0xe7, - 0xa4, 0x22, 0x54, 0xa5, 0x94, 0x16, 0x25, 0x96, 0x64, 0xe6, 0xe7, 0x41, 0xe4, 0x95, 0x3c, 0xb9, - 0x58, 0x83, 0x41, 0xe6, 0x0a, 0x39, 0x70, 0x71, 0x96, 0x27, 0xe6, 0xe4, 0xc4, 0x97, 0x64, 0xe6, - 0xa6, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x1b, 0x49, 0xea, 0x41, 0x34, 0xeb, 0xc1, 0x34, 0xeb, - 0xb9, 0x40, 0x35, 0x3b, 0x71, 0x9c, 0xb8, 0x27, 0xcf, 0x30, 0xe3, 0xbe, 0x3c, 0x63, 0x10, 0x07, - 0x48, 0x57, 0x48, 0x66, 0x6e, 0xaa, 0x93, 0xf5, 0x85, 0x87, 0x72, 0x0c, 0x37, 0x1e, 0xca, 0x31, - 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48, 0x8e, 0x71, 0xc5, 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, - 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0xf1, 0xc5, 0x23, 0x39, 0x86, 0x0f, 0x8f, - 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, 0x8e, 0xe1, 0xc6, 0x63, 0x39, 0x86, 0x28, - 0x88, 0xb7, 0x92, 0xd8, 0xc0, 0x76, 0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x2d, 0xc4, 0x26, - 0x5d, 0xf3, 0x00, 0x00, 0x00, + // 263 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x44, 0x90, 0xb1, 0x4e, 0xc3, 0x30, + 0x10, 0x86, 0x7d, 0x40, 0x51, 0xeb, 0x6e, 0x99, 0x42, 0x11, 0xd7, 0x8a, 0xa9, 0x0b, 0xae, 0x04, + 0x23, 0x0b, 0x8a, 0x78, 0x82, 0x96, 0x89, 0x25, 0x4a, 0xc0, 0x18, 0x4b, 0x71, 0x8c, 0x62, 0x5b, + 0x88, 0x8d, 0x85, 0x9d, 0x91, 0x47, 0xe0, 0x51, 0x3a, 0x66, 0xec, 0x04, 0xc4, 0x59, 0x18, 0xfb, + 0x08, 0x28, 0x0e, 0x88, 0xed, 0xbe, 0xfb, 0xef, 0x3b, 0x9d, 0x8e, 0x8e, 0x8d, 0xcd, 0xac, 0x61, + 0x0f, 0x95, 0xb6, 0x3a, 0x1a, 0x04, 0x98, 0x9c, 0x08, 0x69, 0xef, 0x5d, 0xce, 0x6e, 0xb4, 0x5a, + 0x08, 0x2d, 0xf4, 0x22, 0xa4, 0xb9, 0xbb, 0x0b, 0x14, 0x20, 0x54, 0xbd, 0x35, 0x41, 0xa1, 0xb5, + 0x28, 0xf8, 0xff, 0xd4, 0xad, 0xab, 0x32, 0x2b, 0x75, 0xd9, 0xe7, 0xc7, 0x2f, 0x40, 0x07, 0xab, + 0x6e, 0x71, 0x74, 0x41, 0x47, 0x8f, 0x59, 0x51, 0xa4, 0x56, 0x2a, 0x1e, 0xc3, 0x0c, 0xe6, 0xe3, + 0xd3, 0x03, 0xd6, 0xdb, 0xec, 0xcf, 0x66, 0x97, 0xbf, 0x76, 0x32, 0x5c, 0x7f, 0x4c, 0xc9, 0xdb, + 0xe7, 0x14, 0x96, 0xc3, 0xce, 0xba, 0x92, 0x8a, 0x47, 0x47, 0x94, 0x96, 0x4e, 0xa5, 0x86, 0x57, + 0x92, 0x9b, 0x78, 0x67, 0x06, 0xf3, 0xbd, 0xe5, 0xa8, 0x74, 0x6a, 0x15, 0x1a, 0xd1, 0x21, 0xed, + 0x20, 0xcd, 0x9f, 0x2c, 0x37, 0xf1, 0x6e, 0x48, 0x87, 0xa5, 0x53, 0x49, 0xc7, 0xc9, 0x79, 0xdd, + 0x20, 0xd9, 0x34, 0x48, 0xb6, 0x0d, 0xc2, 0xb3, 0x47, 0x78, 0xf7, 0x08, 0x6b, 0x8f, 0x50, 0x7b, + 0x84, 0x2f, 0x8f, 0xf0, 0xed, 0x91, 0x6c, 0x3d, 0xc2, 0x6b, 0x8b, 0xa4, 0x6e, 0x91, 0x6c, 0x5a, + 0x24, 0xd7, 0xfd, 0x4f, 0xf2, 0xfd, 0x70, 0xdf, 0xd9, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xda, + 0xa1, 0xb8, 0x33, 0x30, 0x01, 0x00, 0x00, } func (this *Stats) Equal(that interface{}) bool { @@ -119,15 +140,23 @@ func (this *Stats) Equal(that interface{}) bool { if this.WallTime != that1.WallTime { return false } + if this.NumSeries != that1.NumSeries { + return false + } + if this.NumBytes != that1.NumBytes { + return false + } return true } func (this *Stats) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 5) + s := make([]string, 0, 7) s = append(s, "&stats.Stats{") s = append(s, "WallTime: "+fmt.Sprintf("%#v", this.WallTime)+",\n") + s = append(s, "NumSeries: "+fmt.Sprintf("%#v", this.NumSeries)+",\n") + s = append(s, "NumBytes: "+fmt.Sprintf("%#v", this.NumBytes)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -159,6 +188,16 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.NumBytes != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.NumBytes)) + i-- + dAtA[i] = 0x18 + } + if m.NumSeries != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.NumSeries)) + i-- + dAtA[i] = 0x10 + } n1, err1 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.WallTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.WallTime):]) if err1 != nil { return 0, err1 @@ -189,6 +228,12 @@ func (m *Stats) Size() (n int) { _ = l l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.WallTime) n += 1 + l + sovStats(uint64(l)) + if m.NumSeries != 0 { + n += 1 + sovStats(uint64(m.NumSeries)) + } + if m.NumBytes != 0 { + n += 1 + sovStats(uint64(m.NumBytes)) + } return n } @@ -204,6 +249,8 @@ func (this *Stats) String() string { } s := strings.Join([]string{`&Stats{`, `WallTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.WallTime), "Duration", "duration.Duration", 1), `&`, ``, 1) + `,`, + `NumSeries:` + fmt.Sprintf("%v", this.NumSeries) + `,`, + `NumBytes:` + fmt.Sprintf("%v", this.NumBytes) + `,`, `}`, }, "") return s @@ -278,6 +325,44 @@ func (m *Stats) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NumSeries", wireType) + } + m.NumSeries = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NumSeries |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NumBytes", wireType) + } + m.NumBytes = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NumBytes |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) diff --git a/pkg/querier/stats/stats.proto b/pkg/querier/stats/stats.proto index 3ec55448af..9e824bf712 100644 --- a/pkg/querier/stats/stats.proto +++ b/pkg/querier/stats/stats.proto @@ -13,4 +13,8 @@ option (gogoproto.unmarshaler_all) = true; message Stats { // The sum of all wall time spent in the querier to execute the query. google.protobuf.Duration wall_time = 1 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; + // The number of series included in the query response + uint64 num_series = 2; + // The number of bytes of the chunks included in the query response + uint64 num_bytes = 3; } diff --git a/pkg/querier/stats/stats_test.go b/pkg/querier/stats/stats_test.go new file mode 100644 index 0000000000..43f4b2c61d --- /dev/null +++ b/pkg/querier/stats/stats_test.go @@ -0,0 +1,91 @@ +package stats + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestStats_WallTime(t *testing.T) { + t.Run("add and load wall time", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + stats.AddWallTime(time.Second) + stats.AddWallTime(time.Second) + + assert.Equal(t, 2*time.Second, stats.LoadWallTime()) + }) + + t.Run("add and load wall time nil receiver", func(t *testing.T) { + var stats *Stats + stats.AddWallTime(time.Second) + + assert.Equal(t, time.Duration(0), stats.LoadWallTime()) + }) +} + +func TestStats_Series(t *testing.T) { + t.Run("add and load series", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + stats.AddSeries(100) + stats.AddSeries(50) + + assert.Equal(t, uint64(150), stats.LoadSeries()) + }) + + t.Run("add and load series nil receiver", func(t *testing.T) { + var stats *Stats + stats.AddSeries(50) + + assert.Equal(t, uint64(0), stats.LoadSeries()) + }) +} + +func TestStats_Bytes(t *testing.T) { + t.Run("add and load bytes", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + stats.AddBytes(4096) + stats.AddBytes(4096) + + assert.Equal(t, uint64(8192), stats.LoadBytes()) + }) + + t.Run("add and load bytes nil receiver", func(t *testing.T) { + var stats *Stats + stats.AddBytes(1024) + + assert.Equal(t, uint64(0), stats.LoadBytes()) + }) +} + +func TestStats_Merge(t *testing.T) { + t.Run("merge two stats objects", func(t *testing.T) { + stats1 := &Stats{} + stats1.AddWallTime(time.Millisecond) + stats1.AddSeries(50) + stats1.AddBytes(42) + + stats2 := &Stats{} + stats2.AddWallTime(time.Second) + stats2.AddSeries(60) + stats2.AddBytes(100) + + stats1.Merge(stats2) + + assert.Equal(t, 1001*time.Millisecond, stats1.LoadWallTime()) + assert.Equal(t, uint64(110), stats1.LoadSeries()) + assert.Equal(t, uint64(142), stats1.LoadBytes()) + }) + + t.Run("merge two nil stats objects", func(t *testing.T) { + var stats1 *Stats + var stats2 *Stats + + stats1.Merge(stats2) + + assert.Equal(t, time.Duration(0), stats1.LoadWallTime()) + assert.Equal(t, uint64(0), stats1.LoadSeries()) + assert.Equal(t, uint64(0), stats1.LoadBytes()) + }) +} From 92e8eadc2e9c7243cb496bb69d6d8019a14d52e6 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Tue, 13 Jul 2021 16:25:48 -0400 Subject: [PATCH 2/7] Formatting fix Signed-off-by: Nick Pillitteri --- pkg/frontend/transport/handler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index f41489899d..de273eb224 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -85,14 +85,14 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge // understand query responses is prohibitively expensive. h.querySeries = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{ - Name: "cortex_query_series", - Help: "Number of series returned by successful queries.", + Name: "cortex_query_series", + Help: "Number of series returned by successful queries.", Objectives: map[float64]float64{}, }, []string{"user"}) h.queryBytes = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{ - Name: "cortex_query_bytes", - Help: "Number of bytes returned by successful queries.", + Name: "cortex_query_bytes", + Help: "Number of bytes returned by successful queries.", Objectives: map[float64]float64{}, }, []string{"user"}) From 94248c2f8d4b6a7a380b242ac06241ba41c0205d Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Tue, 13 Jul 2021 16:36:29 -0400 Subject: [PATCH 3/7] Fix changelog to match actual changes Signed-off-by: Nick Pillitteri --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 06e391bafc..e2698d26c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## master / unreleased * [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317 -* [FEATURE] Query Frontend: Add `cortex_query_series` and `cortex_query_bytes` per-user histograms to expose the number of series and bytes returned by queries. These metrics can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #4343 +* [FEATURE] Query Frontend: Add `cortex_query_series` and `cortex_query_bytes` per-user metrics to expose the number of series and bytes returned by queries. These metrics can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #4343 * [CHANGE] Update Go version to 1.16.6. #4362 * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 * [CHANGE] Memberlist: the `memberlist_kv_store_value_bytes` has been removed due to values no longer being stored in-memory as encoded bytes. #4345 From 9f419619b975c9b317ca782c5c96ba9f001fe363 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Wed, 14 Jul 2021 09:12:31 -0400 Subject: [PATCH 4/7] Typo Signed-off-by: Nick Pillitteri --- pkg/frontend/transport/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index de273eb224..529b531989 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -79,7 +79,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge Help: "Total amount of wall clock time spend processing queries.", }, []string{"user"}) - // Empty objectives for these summaries on purpose since they can't be aggregate + // Empty objectives for these summaries on purpose since they can't be aggregated // and so we are just using them for the convenience of sum and count metrics. No // histograms here since the cardinality from the number of buckets required to // understand query responses is prohibitively expensive. From edb788098db3f078821f699ebe52952bf7afb145 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Fri, 16 Jul 2021 09:34:57 -0400 Subject: [PATCH 5/7] Code review changes, rename things for clarity Signed-off-by: Nick Pillitteri --- CHANGELOG.md | 2 +- pkg/distributor/query.go | 4 +- pkg/frontend/transport/handler.go | 16 ++--- pkg/frontend/transport/handler_test.go | 4 +- pkg/querier/blocks_store_queryable.go | 8 +-- pkg/querier/stats/stats.go | 20 +++--- pkg/querier/stats/stats.pb.go | 93 +++++++++++++------------- pkg/querier/stats/stats.proto | 8 +-- pkg/querier/stats/stats_test.go | 36 +++++----- 9 files changed, 96 insertions(+), 95 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e2698d26c0..7358351fab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## master / unreleased * [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317 -* [FEATURE] Query Frontend: Add `cortex_query_series` and `cortex_query_bytes` per-user metrics to expose the number of series and bytes returned by queries. These metrics can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #4343 +* [FEATURE] Query Frontend: Add `cortex_query_fetched_series_per_query` and `cortex_query_fetched_chunks_bytes_per_query` per-user summaries to expose the number of series and bytes fetched as part of queries. These metrics can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #4343 * [CHANGE] Update Go version to 1.16.6. #4362 * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 * [CHANGE] Memberlist: the `memberlist_kv_store_value_bytes` has been removed due to values no longer being stored in-memory as encoded bytes. #4345 diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 7e6d7d23bf..87b45164dc 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -385,8 +385,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri resp.Timeseries = append(resp.Timeseries, series) } - reqStats.AddSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries))) - reqStats.AddBytes(uint64(resp.ChunksSize())) + reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries))) + reqStats.AddFetchedChunkBytes(uint64(resp.ChunksSize())) return resp, nil } diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 529b531989..932c3f7abb 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -85,14 +85,14 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge // understand query responses is prohibitively expensive. h.querySeries = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{ - Name: "cortex_query_series", - Help: "Number of series returned by successful queries.", + Name: "cortex_query_fetched_series_per_query", + Help: "Number of series fetched to execute a query.", Objectives: map[float64]float64{}, }, []string{"user"}) h.queryBytes = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{ - Name: "cortex_query_bytes", - Help: "Number of bytes returned by successful queries.", + Name: "cortex_query_fetched_chunks_bytes_per_query", + Help: "Size of all chunks fetched to execute a query in bytes.", Objectives: map[float64]float64{}, }, []string{"user"}) @@ -187,8 +187,8 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer } userID := tenant.JoinTenantIDs(tenantIDs) wallTime := stats.LoadWallTime() - numSeries := stats.LoadSeries() - numBytes := stats.LoadBytes() + numSeries := stats.LoadFetchedSeries() + numBytes := stats.LoadFetchedChunkBytes() // Track stats. f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds()) @@ -204,8 +204,8 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer "path", r.URL.Path, "response_time", queryResponseTime, "query_wall_time_seconds", wallTime.Seconds(), - "query_series", numSeries, - "query_bytes", numBytes, + "fetched_series_count", numSeries, + "fetched_chunk_bytes", numBytes, }, formatQueryString(queryString)...) level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index f9e478e9ae..2039fddf67 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -82,8 +82,8 @@ func TestHandler_ServeHTTP(t *testing.T) { count, err := promtest.GatherAndCount( reg, "cortex_query_seconds_total", - "cortex_query_series", - "cortex_query_bytes", + "cortex_query_fetched_series_per_query", + "cortex_query_fetched_chunks_bytes_per_query", ) assert.NoError(t, err) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 8411546b65..02c50b5b9d 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -659,13 +659,13 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( numSeries := len(mySeries) chunkBytes := countChunkBytes(mySeries...) - reqStats.AddSeries(uint64(numSeries)) - reqStats.AddBytes(uint64(chunkBytes)) + reqStats.AddFetchedSeries(uint64(numSeries)) + reqStats.AddFetchedChunkBytes(uint64(chunkBytes)) level.Debug(spanLog).Log("msg", "received series from store-gateway", "instance", c.RemoteAddress(), - "num series", numSeries, - "bytes series", chunkBytes, + "fetched series", numSeries, + "fetched chunk bytes", chunkBytes, "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index 9b528369ef..1a39b32069 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -54,36 +54,36 @@ func (s *Stats) LoadWallTime() time.Duration { return time.Duration(atomic.LoadInt64((*int64)(&s.WallTime))) } -func (s *Stats) AddSeries(series uint64) { +func (s *Stats) AddFetchedSeries(series uint64) { if s == nil { return } - atomic.AddUint64(&s.NumSeries, series) + atomic.AddUint64(&s.FetchedSeriesCount, series) } -func (s *Stats) LoadSeries() uint64 { +func (s *Stats) LoadFetchedSeries() uint64 { if s == nil { return 0 } - return atomic.LoadUint64(&s.NumSeries) + return atomic.LoadUint64(&s.FetchedSeriesCount) } -func (s *Stats) AddBytes(bytes uint64) { +func (s *Stats) AddFetchedChunkBytes(bytes uint64) { if s == nil { return } - atomic.AddUint64(&s.NumBytes, bytes) + atomic.AddUint64(&s.FetchedChunkBytes, bytes) } -func (s *Stats) LoadBytes() uint64 { +func (s *Stats) LoadFetchedChunkBytes() uint64 { if s == nil { return 0 } - return atomic.LoadUint64(&s.NumBytes) + return atomic.LoadUint64(&s.FetchedChunkBytes) } // Merge the provide Stats into this one. @@ -93,8 +93,8 @@ func (s *Stats) Merge(other *Stats) { } s.AddWallTime(other.LoadWallTime()) - s.AddSeries(other.LoadSeries()) - s.AddBytes(other.LoadBytes()) + s.AddFetchedSeries(other.LoadFetchedSeries()) + s.AddFetchedChunkBytes(other.LoadFetchedChunkBytes()) } func ShouldTrackHTTPGRPCResponse(r *httpgrpc.HTTPResponse) bool { diff --git a/pkg/querier/stats/stats.pb.go b/pkg/querier/stats/stats.pb.go index 34481eb4fa..9fd4affc1f 100644 --- a/pkg/querier/stats/stats.pb.go +++ b/pkg/querier/stats/stats.pb.go @@ -32,10 +32,10 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type Stats struct { // The sum of all wall time spent in the querier to execute the query. WallTime time.Duration `protobuf:"bytes,1,opt,name=wall_time,json=wallTime,proto3,stdduration" json:"wall_time"` - // The number of series included in the query response - NumSeries uint64 `protobuf:"varint,2,opt,name=num_series,json=numSeries,proto3" json:"num_series,omitempty"` - // The number of bytes of the chunks included in the query response - NumBytes uint64 `protobuf:"varint,3,opt,name=num_bytes,json=numBytes,proto3" json:"num_bytes,omitempty"` + // The number of series fetched for the query + FetchedSeriesCount uint64 `protobuf:"varint,2,opt,name=fetched_series_count,json=fetchedSeriesCount,proto3" json:"fetched_series_count,omitempty"` + // The number of bytes of the chunks fetched for the query + FetchedChunkBytes uint64 `protobuf:"varint,3,opt,name=fetched_chunk_bytes,json=fetchedChunkBytes,proto3" json:"fetched_chunk_bytes,omitempty"` } func (m *Stats) Reset() { *m = Stats{} } @@ -77,16 +77,16 @@ func (m *Stats) GetWallTime() time.Duration { return 0 } -func (m *Stats) GetNumSeries() uint64 { +func (m *Stats) GetFetchedSeriesCount() uint64 { if m != nil { - return m.NumSeries + return m.FetchedSeriesCount } return 0 } -func (m *Stats) GetNumBytes() uint64 { +func (m *Stats) GetFetchedChunkBytes() uint64 { if m != nil { - return m.NumBytes + return m.FetchedChunkBytes } return 0 } @@ -98,24 +98,25 @@ func init() { func init() { proto.RegisterFile("stats.proto", fileDescriptor_b4756a0aec8b9d44) } var fileDescriptor_b4756a0aec8b9d44 = []byte{ - // 263 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x44, 0x90, 0xb1, 0x4e, 0xc3, 0x30, - 0x10, 0x86, 0x7d, 0x40, 0x51, 0xeb, 0x6e, 0x99, 0x42, 0x11, 0xd7, 0x8a, 0xa9, 0x0b, 0xae, 0x04, - 0x23, 0x0b, 0x8a, 0x78, 0x82, 0x96, 0x89, 0x25, 0x4a, 0xc0, 0x18, 0x4b, 0x71, 0x8c, 0x62, 0x5b, - 0x88, 0x8d, 0x85, 0x9d, 0x91, 0x47, 0xe0, 0x51, 0x3a, 0x66, 0xec, 0x04, 0xc4, 0x59, 0x18, 0xfb, - 0x08, 0x28, 0x0e, 0x88, 0xed, 0xbe, 0xfb, 0xef, 0x3b, 0x9d, 0x8e, 0x8e, 0x8d, 0xcd, 0xac, 0x61, - 0x0f, 0x95, 0xb6, 0x3a, 0x1a, 0x04, 0x98, 0x9c, 0x08, 0x69, 0xef, 0x5d, 0xce, 0x6e, 0xb4, 0x5a, - 0x08, 0x2d, 0xf4, 0x22, 0xa4, 0xb9, 0xbb, 0x0b, 0x14, 0x20, 0x54, 0xbd, 0x35, 0x41, 0xa1, 0xb5, - 0x28, 0xf8, 0xff, 0xd4, 0xad, 0xab, 0x32, 0x2b, 0x75, 0xd9, 0xe7, 0xc7, 0x2f, 0x40, 0x07, 0xab, - 0x6e, 0x71, 0x74, 0x41, 0x47, 0x8f, 0x59, 0x51, 0xa4, 0x56, 0x2a, 0x1e, 0xc3, 0x0c, 0xe6, 0xe3, - 0xd3, 0x03, 0xd6, 0xdb, 0xec, 0xcf, 0x66, 0x97, 0xbf, 0x76, 0x32, 0x5c, 0x7f, 0x4c, 0xc9, 0xdb, - 0xe7, 0x14, 0x96, 0xc3, 0xce, 0xba, 0x92, 0x8a, 0x47, 0x47, 0x94, 0x96, 0x4e, 0xa5, 0x86, 0x57, - 0x92, 0x9b, 0x78, 0x67, 0x06, 0xf3, 0xbd, 0xe5, 0xa8, 0x74, 0x6a, 0x15, 0x1a, 0xd1, 0x21, 0xed, - 0x20, 0xcd, 0x9f, 0x2c, 0x37, 0xf1, 0x6e, 0x48, 0x87, 0xa5, 0x53, 0x49, 0xc7, 0xc9, 0x79, 0xdd, - 0x20, 0xd9, 0x34, 0x48, 0xb6, 0x0d, 0xc2, 0xb3, 0x47, 0x78, 0xf7, 0x08, 0x6b, 0x8f, 0x50, 0x7b, - 0x84, 0x2f, 0x8f, 0xf0, 0xed, 0x91, 0x6c, 0x3d, 0xc2, 0x6b, 0x8b, 0xa4, 0x6e, 0x91, 0x6c, 0x5a, - 0x24, 0xd7, 0xfd, 0x4f, 0xf2, 0xfd, 0x70, 0xdf, 0xd9, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xda, - 0xa1, 0xb8, 0x33, 0x30, 0x01, 0x00, 0x00, + // 281 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x44, 0xd0, 0xb1, 0x4e, 0x83, 0x40, + 0x1c, 0xc7, 0xf1, 0xfb, 0xab, 0x35, 0x95, 0x4e, 0xa2, 0x03, 0x76, 0xf8, 0xb7, 0x71, 0xea, 0xe2, + 0xd5, 0xe8, 0xe8, 0x62, 0xa8, 0x4f, 0xd0, 0x3a, 0xb9, 0x10, 0xa0, 0x57, 0x20, 0x02, 0x67, 0xe0, + 0x2e, 0xc6, 0xcd, 0x47, 0x70, 0xf4, 0x11, 0x4c, 0x7c, 0x91, 0x8e, 0x8c, 0x9d, 0x54, 0x8e, 0xc5, + 0xb1, 0x8f, 0x60, 0xee, 0xa0, 0x71, 0xe3, 0x97, 0x0f, 0xdf, 0x4b, 0xee, 0xac, 0x41, 0x29, 0x7c, + 0x51, 0xd2, 0xa7, 0x82, 0x0b, 0x6e, 0xf7, 0xcc, 0x18, 0x5e, 0x44, 0x89, 0x88, 0x65, 0x40, 0x43, + 0x9e, 0x4d, 0x23, 0x1e, 0xf1, 0xa9, 0xd1, 0x40, 0xae, 0xcc, 0x32, 0xc3, 0x7c, 0xb5, 0xd5, 0x10, + 0x23, 0xce, 0xa3, 0x94, 0xfd, 0xff, 0xb5, 0x94, 0x85, 0x2f, 0x12, 0x9e, 0xb7, 0x7e, 0xfe, 0x09, + 0x56, 0x6f, 0xa1, 0x0f, 0xb6, 0x6f, 0xad, 0xa3, 0x67, 0x3f, 0x4d, 0x3d, 0x91, 0x64, 0xcc, 0x81, + 0x31, 0x4c, 0x06, 0x57, 0x67, 0xb4, 0xad, 0xe9, 0xae, 0xa6, 0x77, 0x5d, 0xed, 0xf6, 0xd7, 0x5f, + 0x23, 0xf2, 0xfe, 0x3d, 0x82, 0x79, 0x5f, 0x57, 0xf7, 0x49, 0xc6, 0xec, 0x4b, 0xeb, 0x74, 0xc5, + 0x44, 0x18, 0xb3, 0xa5, 0x57, 0xb2, 0x22, 0x61, 0xa5, 0x17, 0x72, 0x99, 0x0b, 0x67, 0x6f, 0x0c, + 0x93, 0x83, 0xb9, 0xdd, 0xd9, 0xc2, 0xd0, 0x4c, 0x8b, 0x4d, 0xad, 0x93, 0x5d, 0x11, 0xc6, 0x32, + 0x7f, 0xf4, 0x82, 0x17, 0xc1, 0x4a, 0x67, 0xdf, 0x04, 0xc7, 0x1d, 0xcd, 0xb4, 0xb8, 0x1a, 0xdc, + 0x9b, 0xaa, 0x46, 0xb2, 0xa9, 0x91, 0x6c, 0x6b, 0x84, 0x57, 0x85, 0xf0, 0xa1, 0x10, 0xd6, 0x0a, + 0xa1, 0x52, 0x08, 0x3f, 0x0a, 0xe1, 0x57, 0x21, 0xd9, 0x2a, 0x84, 0xb7, 0x06, 0x49, 0xd5, 0x20, + 0xd9, 0x34, 0x48, 0x1e, 0xda, 0x97, 0x0b, 0x0e, 0xcd, 0x2d, 0xae, 0xff, 0x02, 0x00, 0x00, 0xff, + 0xff, 0x9d, 0xf1, 0x86, 0xb8, 0x56, 0x01, 0x00, 0x00, } func (this *Stats) Equal(that interface{}) bool { @@ -140,10 +141,10 @@ func (this *Stats) Equal(that interface{}) bool { if this.WallTime != that1.WallTime { return false } - if this.NumSeries != that1.NumSeries { + if this.FetchedSeriesCount != that1.FetchedSeriesCount { return false } - if this.NumBytes != that1.NumBytes { + if this.FetchedChunkBytes != that1.FetchedChunkBytes { return false } return true @@ -155,8 +156,8 @@ func (this *Stats) GoString() string { s := make([]string, 0, 7) s = append(s, "&stats.Stats{") s = append(s, "WallTime: "+fmt.Sprintf("%#v", this.WallTime)+",\n") - s = append(s, "NumSeries: "+fmt.Sprintf("%#v", this.NumSeries)+",\n") - s = append(s, "NumBytes: "+fmt.Sprintf("%#v", this.NumBytes)+",\n") + s = append(s, "FetchedSeriesCount: "+fmt.Sprintf("%#v", this.FetchedSeriesCount)+",\n") + s = append(s, "FetchedChunkBytes: "+fmt.Sprintf("%#v", this.FetchedChunkBytes)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -188,13 +189,13 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.NumBytes != 0 { - i = encodeVarintStats(dAtA, i, uint64(m.NumBytes)) + if m.FetchedChunkBytes != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.FetchedChunkBytes)) i-- dAtA[i] = 0x18 } - if m.NumSeries != 0 { - i = encodeVarintStats(dAtA, i, uint64(m.NumSeries)) + if m.FetchedSeriesCount != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.FetchedSeriesCount)) i-- dAtA[i] = 0x10 } @@ -228,11 +229,11 @@ func (m *Stats) Size() (n int) { _ = l l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.WallTime) n += 1 + l + sovStats(uint64(l)) - if m.NumSeries != 0 { - n += 1 + sovStats(uint64(m.NumSeries)) + if m.FetchedSeriesCount != 0 { + n += 1 + sovStats(uint64(m.FetchedSeriesCount)) } - if m.NumBytes != 0 { - n += 1 + sovStats(uint64(m.NumBytes)) + if m.FetchedChunkBytes != 0 { + n += 1 + sovStats(uint64(m.FetchedChunkBytes)) } return n } @@ -249,8 +250,8 @@ func (this *Stats) String() string { } s := strings.Join([]string{`&Stats{`, `WallTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.WallTime), "Duration", "duration.Duration", 1), `&`, ``, 1) + `,`, - `NumSeries:` + fmt.Sprintf("%v", this.NumSeries) + `,`, - `NumBytes:` + fmt.Sprintf("%v", this.NumBytes) + `,`, + `FetchedSeriesCount:` + fmt.Sprintf("%v", this.FetchedSeriesCount) + `,`, + `FetchedChunkBytes:` + fmt.Sprintf("%v", this.FetchedChunkBytes) + `,`, `}`, }, "") return s @@ -327,9 +328,9 @@ func (m *Stats) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 2: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field NumSeries", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field FetchedSeriesCount", wireType) } - m.NumSeries = 0 + m.FetchedSeriesCount = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowStats @@ -339,16 +340,16 @@ func (m *Stats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.NumSeries |= uint64(b&0x7F) << shift + m.FetchedSeriesCount |= uint64(b&0x7F) << shift if b < 0x80 { break } } case 3: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field NumBytes", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field FetchedChunkBytes", wireType) } - m.NumBytes = 0 + m.FetchedChunkBytes = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowStats @@ -358,7 +359,7 @@ func (m *Stats) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.NumBytes |= uint64(b&0x7F) << shift + m.FetchedChunkBytes |= uint64(b&0x7F) << shift if b < 0x80 { break } diff --git a/pkg/querier/stats/stats.proto b/pkg/querier/stats/stats.proto index 9e824bf712..765dd99582 100644 --- a/pkg/querier/stats/stats.proto +++ b/pkg/querier/stats/stats.proto @@ -13,8 +13,8 @@ option (gogoproto.unmarshaler_all) = true; message Stats { // The sum of all wall time spent in the querier to execute the query. google.protobuf.Duration wall_time = 1 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; - // The number of series included in the query response - uint64 num_series = 2; - // The number of bytes of the chunks included in the query response - uint64 num_bytes = 3; + // The number of series fetched for the query + uint64 fetched_series_count = 2; + // The number of bytes of the chunks fetched for the query + uint64 fetched_chunk_bytes = 3; } diff --git a/pkg/querier/stats/stats_test.go b/pkg/querier/stats/stats_test.go index 43f4b2c61d..edbf48a996 100644 --- a/pkg/querier/stats/stats_test.go +++ b/pkg/querier/stats/stats_test.go @@ -28,34 +28,34 @@ func TestStats_WallTime(t *testing.T) { func TestStats_Series(t *testing.T) { t.Run("add and load series", func(t *testing.T) { stats, _ := ContextWithEmptyStats(context.Background()) - stats.AddSeries(100) - stats.AddSeries(50) + stats.AddFetchedSeries(100) + stats.AddFetchedSeries(50) - assert.Equal(t, uint64(150), stats.LoadSeries()) + assert.Equal(t, uint64(150), stats.LoadFetchedSeries()) }) t.Run("add and load series nil receiver", func(t *testing.T) { var stats *Stats - stats.AddSeries(50) + stats.AddFetchedSeries(50) - assert.Equal(t, uint64(0), stats.LoadSeries()) + assert.Equal(t, uint64(0), stats.LoadFetchedSeries()) }) } func TestStats_Bytes(t *testing.T) { t.Run("add and load bytes", func(t *testing.T) { stats, _ := ContextWithEmptyStats(context.Background()) - stats.AddBytes(4096) - stats.AddBytes(4096) + stats.AddFetchedChunkBytes(4096) + stats.AddFetchedChunkBytes(4096) - assert.Equal(t, uint64(8192), stats.LoadBytes()) + assert.Equal(t, uint64(8192), stats.LoadFetchedChunkBytes()) }) t.Run("add and load bytes nil receiver", func(t *testing.T) { var stats *Stats - stats.AddBytes(1024) + stats.AddFetchedChunkBytes(1024) - assert.Equal(t, uint64(0), stats.LoadBytes()) + assert.Equal(t, uint64(0), stats.LoadFetchedChunkBytes()) }) } @@ -63,19 +63,19 @@ func TestStats_Merge(t *testing.T) { t.Run("merge two stats objects", func(t *testing.T) { stats1 := &Stats{} stats1.AddWallTime(time.Millisecond) - stats1.AddSeries(50) - stats1.AddBytes(42) + stats1.AddFetchedSeries(50) + stats1.AddFetchedChunkBytes(42) stats2 := &Stats{} stats2.AddWallTime(time.Second) - stats2.AddSeries(60) - stats2.AddBytes(100) + stats2.AddFetchedSeries(60) + stats2.AddFetchedChunkBytes(100) stats1.Merge(stats2) assert.Equal(t, 1001*time.Millisecond, stats1.LoadWallTime()) - assert.Equal(t, uint64(110), stats1.LoadSeries()) - assert.Equal(t, uint64(142), stats1.LoadBytes()) + assert.Equal(t, uint64(110), stats1.LoadFetchedSeries()) + assert.Equal(t, uint64(142), stats1.LoadFetchedChunkBytes()) }) t.Run("merge two nil stats objects", func(t *testing.T) { @@ -85,7 +85,7 @@ func TestStats_Merge(t *testing.T) { stats1.Merge(stats2) assert.Equal(t, time.Duration(0), stats1.LoadWallTime()) - assert.Equal(t, uint64(0), stats1.LoadSeries()) - assert.Equal(t, uint64(0), stats1.LoadBytes()) + assert.Equal(t, uint64(0), stats1.LoadFetchedSeries()) + assert.Equal(t, uint64(0), stats1.LoadFetchedChunkBytes()) }) } From e6c9d26d2a435dc480c330576b87e52dead79777 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri <56quarters@users.noreply.github.com> Date: Fri, 16 Jul 2021 10:57:05 -0400 Subject: [PATCH 6/7] Apply suggestions from code review Co-authored-by: Marco Pracucci Signed-off-by: Nick Pillitteri --- pkg/frontend/transport/handler.go | 2 +- pkg/querier/stats/stats_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 932c3f7abb..901dbb7ed2 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -205,7 +205,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer "response_time", queryResponseTime, "query_wall_time_seconds", wallTime.Seconds(), "fetched_series_count", numSeries, - "fetched_chunk_bytes", numBytes, + "fetched_chunks_bytes", numBytes, }, formatQueryString(queryString)...) level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) diff --git a/pkg/querier/stats/stats_test.go b/pkg/querier/stats/stats_test.go index edbf48a996..f8a23c96c3 100644 --- a/pkg/querier/stats/stats_test.go +++ b/pkg/querier/stats/stats_test.go @@ -25,7 +25,7 @@ func TestStats_WallTime(t *testing.T) { }) } -func TestStats_Series(t *testing.T) { +func TestStats_AddFetchedSeries(t *testing.T) { t.Run("add and load series", func(t *testing.T) { stats, _ := ContextWithEmptyStats(context.Background()) stats.AddFetchedSeries(100) @@ -42,7 +42,7 @@ func TestStats_Series(t *testing.T) { }) } -func TestStats_Bytes(t *testing.T) { +func TestStats_AddFetchedChunkBytes(t *testing.T) { t.Run("add and load bytes", func(t *testing.T) { stats, _ := ContextWithEmptyStats(context.Background()) stats.AddFetchedChunkBytes(4096) From c048c7b7165469b6bacf07c79b0601169684baa8 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Tue, 20 Jul 2021 09:54:21 -0400 Subject: [PATCH 7/7] Code review changes, remove superfluous summaries Signed-off-by: Nick Pillitteri --- CHANGELOG.md | 2 +- pkg/frontend/transport/handler.go | 27 ++++++++++---------------- pkg/frontend/transport/handler_test.go | 4 ++-- 3 files changed, 13 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7358351fab..1032efaf68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## master / unreleased * [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317 -* [FEATURE] Query Frontend: Add `cortex_query_fetched_series_per_query` and `cortex_query_fetched_chunks_bytes_per_query` per-user summaries to expose the number of series and bytes fetched as part of queries. These metrics can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #4343 +* [FEATURE] Query Frontend: Add `cortex_query_fetched_series_total` and `cortex_query_fetched_chunks_bytes_total` per-user counters to expose the number of series and bytes fetched as part of queries. These metrics can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #4343 * [CHANGE] Update Go version to 1.16.6. #4362 * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 * [CHANGE] Memberlist: the `memberlist_kv_store_value_bytes` has been removed due to values no longer being stored in-memory as encoded bytes. #4345 diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 901dbb7ed2..af87e4fe30 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -60,8 +60,8 @@ type Handler struct { // Metrics. querySeconds *prometheus.CounterVec - querySeries *prometheus.SummaryVec - queryBytes *prometheus.SummaryVec + querySeries *prometheus.CounterVec + queryBytes *prometheus.CounterVec activeUsers *util.ActiveUsersCleanupService } @@ -79,21 +79,14 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge Help: "Total amount of wall clock time spend processing queries.", }, []string{"user"}) - // Empty objectives for these summaries on purpose since they can't be aggregated - // and so we are just using them for the convenience of sum and count metrics. No - // histograms here since the cardinality from the number of buckets required to - // understand query responses is prohibitively expensive. - - h.querySeries = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{ - Name: "cortex_query_fetched_series_per_query", - Help: "Number of series fetched to execute a query.", - Objectives: map[float64]float64{}, + h.querySeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_fetched_series_total", + Help: "Number of series fetched to execute a query.", }, []string{"user"}) - h.queryBytes = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{ - Name: "cortex_query_fetched_chunks_bytes_per_query", - Help: "Size of all chunks fetched to execute a query in bytes.", - Objectives: map[float64]float64{}, + h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_fetched_chunks_bytes_total", + Help: "Size of all chunks fetched to execute a query in bytes.", }, []string{"user"}) h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { @@ -192,8 +185,8 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer // Track stats. f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds()) - f.querySeries.WithLabelValues(userID).Observe(float64(numSeries)) - f.queryBytes.WithLabelValues(userID).Observe(float64(numBytes)) + f.querySeries.WithLabelValues(userID).Add(float64(numSeries)) + f.queryBytes.WithLabelValues(userID).Add(float64(numBytes)) f.activeUsers.UpdateUserTimestamp(userID, time.Now()) // Log stats. diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index 2039fddf67..8553d9fe21 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -82,8 +82,8 @@ func TestHandler_ServeHTTP(t *testing.T) { count, err := promtest.GatherAndCount( reg, "cortex_query_seconds_total", - "cortex_query_fetched_series_per_query", - "cortex_query_fetched_chunks_bytes_per_query", + "cortex_query_fetched_series_total", + "cortex_query_fetched_chunks_bytes_total", ) assert.NoError(t, err)