Skip to content

Commit e2d6b70

Browse files
committed
chore: Add chunk count from GetChunkRef call to stats context (#14636)
This PR improves the observability of the GetChunkRef call, especially for when bloom filters are enabled. Signed-off-by: Christian Haudum <[email protected]>
1 parent c018a49 commit e2d6b70

File tree

7 files changed

+293
-237
lines changed

7 files changed

+293
-237
lines changed

pkg/indexgateway/gateway.go

+7-13
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
iter "github.com/grafana/loki/v3/pkg/iter/v2"
2323
"github.com/grafana/loki/v3/pkg/logproto"
2424
"github.com/grafana/loki/v3/pkg/logql/syntax"
25-
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
2625
"github.com/grafana/loki/v3/pkg/querier/plan"
2726
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
2827
"github.com/grafana/loki/v3/pkg/storage/chunk"
@@ -236,6 +235,9 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
236235
}
237236

238237
initialChunkCount := len(result.Refs)
238+
result.Stats.TotalChunks = int64(initialChunkCount)
239+
result.Stats.PostFilterChunks = int64(initialChunkCount) // populate early for error reponses
240+
239241
defer func() {
240242
if err == nil {
241243
g.metrics.preFilterChunks.WithLabelValues(routeChunkRefs).Observe(float64(initialChunkCount))
@@ -262,6 +264,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
262264

263265
result.Refs = chunkRefs
264266
level.Info(logger).Log("msg", "return filtered chunk refs", "unfiltered", initialChunkCount, "filtered", len(result.Refs))
267+
result.Stats.PostFilterChunks = int64(len(result.Refs))
265268
return result, nil
266269
}
267270

@@ -486,16 +489,7 @@ func (g *Gateway) boundedShards(
486489
g.metrics.preFilterChunks.WithLabelValues(routeShards).Observe(float64(ct))
487490
g.metrics.postFilterChunks.WithLabelValues(routeShards).Observe(float64(len(filtered)))
488491

489-
statistics := stats.Result{
490-
Index: stats.Index{
491-
TotalChunks: int64(ct),
492-
PostFilterChunks: int64(len(filtered)),
493-
},
494-
}
495-
496-
resp := &logproto.ShardsResponse{
497-
Statistics: statistics,
498-
}
492+
resp := &logproto.ShardsResponse{}
499493

500494
// Edge case: if there are no chunks after filtering, we still need to return a single shard
501495
if len(filtered) == 0 {
@@ -530,8 +524,8 @@ func (g *Gateway) boundedShards(
530524
ms := syntax.MatchersExpr{Mts: p.Matchers}
531525
level.Debug(logger).Log(
532526
"msg", "send shards response",
533-
"total_chunks", statistics.Index.TotalChunks,
534-
"post_filter_chunks", statistics.Index.PostFilterChunks,
527+
"total_chunks", ct,
528+
"post_filter_chunks", len(filtered),
535529
"shards", len(resp.Shards),
536530
"query", req.Query,
537531
"target_bytes_per_shard", datasize.ByteSize(req.TargetBytesPerShard).HumanReadable(),

pkg/logproto/logproto.pb.go

+252-193
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/logproto/logproto.proto

+1
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ message GetChunkRefRequest {
352352

353353
message GetChunkRefResponse {
354354
repeated ChunkRef refs = 1;
355+
stats.Index stats = 2 [(gogoproto.nullable) = false];
355356
}
356357

357358
message GetSeriesRequest {

pkg/logqlmodel/stats/context.go

+13
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ func (c *Context) Store() Store {
103103
return c.store
104104
}
105105

106+
// Index returns the index statistics accumulated so far.
107+
func (c *Context) Index() Index {
108+
return c.index
109+
}
110+
106111
// Caches returns the cache statistics accumulated so far.
107112
func (c *Context) Caches() Caches {
108113
return Caches{
@@ -402,6 +407,14 @@ func (c *Context) AddChunksRef(i int64) {
402407
atomic.AddInt64(&c.store.TotalChunksRef, i)
403408
}
404409

410+
func (c *Context) AddIndexTotalChunkRefs(i int64) {
411+
atomic.AddInt64(&c.index.TotalChunks, i)
412+
}
413+
414+
func (c *Context) AddIndexPostFilterChunkRefs(i int64) {
415+
atomic.AddInt64(&c.index.PostFilterChunks, i)
416+
}
417+
405418
// AddCacheEntriesFound counts the number of cache entries requested and found
406419
func (c *Context) AddCacheEntriesFound(t CacheType, i int) {
407420
stats := c.getCacheStatsByType(t)

pkg/storage/async_store.go

+1-8
Original file line numberDiff line numberDiff line change
@@ -372,15 +372,8 @@ func mergeShardsFromIngestersAndStore(
372372

373373
shards := sharding.LinearShards(int(totalBytes/targetBytesPerShard), totalBytes)
374374

375-
// increment the total chunks by the number seen from ingesters
376-
// NB(owen-d): this isn't perfect as it mixes signals a bit by joining
377-
// store chunks which _could_ possibly be filtered with ingester chunks which can't,
378-
// but it's still directionally helpful
379-
updatedStats := storeResp.Statistics
380-
updatedStats.Index.TotalChunks += int64(statsResp.Chunks)
381375
return &logproto.ShardsResponse{
382-
Shards: shards,
383-
Statistics: updatedStats,
376+
Shards: shards,
384377
// explicitly nil chunkgroups when we've changed the shards+included chunkrefs from ingesters
385378
ChunkGroups: nil,
386379
}

pkg/storage/async_store_test.go

+14-23
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,12 @@ import (
66
"time"
77

88
"github.com/go-kit/log"
9-
10-
"github.com/grafana/loki/v3/pkg/logproto"
11-
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
12-
139
"github.com/prometheus/common/model"
1410
"github.com/prometheus/prometheus/model/labels"
1511
"github.com/stretchr/testify/mock"
1612
"github.com/stretchr/testify/require"
1713

14+
"github.com/grafana/loki/v3/pkg/logproto"
1815
"github.com/grafana/loki/v3/pkg/storage/chunk"
1916
"github.com/grafana/loki/v3/pkg/storage/chunk/fetcher"
2017
"github.com/grafana/loki/v3/pkg/storage/config"
@@ -374,14 +371,9 @@ func TestMergeShardsFromIngestersAndStore(t *testing.T) {
374371
}
375372

376373
// creates n shards with bytesPerShard * n bytes and chks chunks
377-
mkShards := func(n int, bytesPerShard uint64, chks int64) logproto.ShardsResponse {
374+
mkShards := func(n int, bytesPerShard uint64) logproto.ShardsResponse {
378375
return logproto.ShardsResponse{
379376
Shards: sharding.LinearShards(n, bytesPerShard*uint64(n)),
380-
Statistics: stats.Result{
381-
Index: stats.Index{
382-
TotalChunks: chks,
383-
},
384-
},
385377
}
386378
}
387379

@@ -396,32 +388,32 @@ func TestMergeShardsFromIngestersAndStore(t *testing.T) {
396388
{
397389
desc: "zero bytes returns one full shard",
398390
ingester: mkStats(0, 0),
399-
store: mkShards(0, 0, 0),
400-
exp: mkShards(1, 0, 0),
391+
store: mkShards(0, 0),
392+
exp: mkShards(1, 0),
401393
},
402394
{
403395
desc: "zero ingester bytes honors store",
404396
ingester: mkStats(0, 0),
405-
store: mkShards(10, uint64(targetBytesPerShard), 10),
406-
exp: mkShards(10, uint64(targetBytesPerShard), 10),
397+
store: mkShards(10, uint64(targetBytesPerShard)),
398+
exp: mkShards(10, uint64(targetBytesPerShard)),
407399
},
408400
{
409401
desc: "zero store bytes honors ingester",
410402
ingester: mkStats(uint64(targetBytesPerShard*10), 10),
411-
store: mkShards(0, 0, 0),
412-
exp: mkShards(10, uint64(targetBytesPerShard), 10),
403+
store: mkShards(0, 0),
404+
exp: mkShards(10, uint64(targetBytesPerShard)),
413405
},
414406
{
415407
desc: "ingester bytes below threshold ignored",
416-
ingester: mkStats(uint64(targetBytesPerShard*2), 10), // 2 shards worth from ingesters
417-
store: mkShards(10, uint64(targetBytesPerShard), 10), // 10 shards worth from store
418-
exp: mkShards(10, uint64(targetBytesPerShard), 10), // use the store's resp
408+
ingester: mkStats(uint64(targetBytesPerShard*2), 10), // 2 shards worth from ingesters
409+
store: mkShards(10, uint64(targetBytesPerShard)), // 10 shards worth from store
410+
exp: mkShards(10, uint64(targetBytesPerShard)), // use the store's resp
419411
},
420412
{
421413
desc: "ingester bytes above threshold recreate shards",
422-
ingester: mkStats(uint64(targetBytesPerShard*4), 10), // 4 shards worth from ingesters
423-
store: mkShards(10, uint64(targetBytesPerShard), 10), // 10 shards worth from store
424-
exp: mkShards(14, uint64(targetBytesPerShard), 20), // regenerate 14 shards
414+
ingester: mkStats(uint64(targetBytesPerShard*4), 10), // 4 shards worth from ingesters
415+
store: mkShards(10, uint64(targetBytesPerShard)), // 10 shards worth from store
416+
exp: mkShards(14, uint64(targetBytesPerShard)), // regenerate 14 shards
425417
},
426418
} {
427419

@@ -434,7 +426,6 @@ func TestMergeShardsFromIngestersAndStore(t *testing.T) {
434426
)
435427
require.Equal(t, tc.exp.Statistics, got.Statistics)
436428
require.Equal(t, tc.exp.ChunkGroups, got.ChunkGroups)
437-
require.Equal(t, tc.exp.Statistics.Index.TotalChunks, got.Statistics.Index.TotalChunks)
438429
for i, shard := range tc.exp.Shards {
439430
require.Equal(t, shard, got.Shards[i], "shard %d", i)
440431
}

pkg/storage/stores/series/series_index_gateway_store.go

+5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/grafana/loki/v3/pkg/logproto"
1313
"github.com/grafana/loki/v3/pkg/logql/syntax"
14+
statscontext "github.com/grafana/loki/v3/pkg/logqlmodel/stats"
1415
"github.com/grafana/loki/v3/pkg/storage/chunk"
1516
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
1617
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
@@ -58,6 +59,10 @@ func (c *IndexGatewayClientStore) GetChunkRefs(ctx context.Context, _ string, fr
5859
result[i] = *ref
5960
}
6061

62+
statsCtx := statscontext.FromContext(ctx)
63+
statsCtx.AddIndexTotalChunkRefs(response.Stats.TotalChunks)
64+
statsCtx.AddIndexPostFilterChunkRefs(response.Stats.PostFilterChunks)
65+
6166
return result, nil
6267
}
6368

0 commit comments

Comments
 (0)