@@ -14,6 +14,7 @@ import (
14
14
ringclient "github.com/grafana/dskit/ring/client"
15
15
"github.com/pkg/errors"
16
16
"github.com/prometheus/client_golang/prometheus"
17
+ "go.uber.org/atomic"
17
18
"golang.org/x/exp/slices"
18
19
"google.golang.org/grpc"
19
20
"google.golang.org/grpc/health/grpc_health_v1"
@@ -238,20 +239,8 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
238
239
}
239
240
}
240
241
241
- if len (servers ) > 0 {
242
- // cache locality score (higher is better):
243
- // `% keyspace / % instances`. Ideally converges to 1 (querying x% of keyspace requires x% of instances),
244
- // but can be less if the keyspace is not evenly distributed across instances. Ideal operation will see the range of
245
- // `1-2/num_instances` -> `1`, where the former represents slight
246
- // overlap on instances to the left and right of the range.
247
- pctKeyspace := float64 (lastFp - firstFp ) / float64 (math .MaxUint64 )
248
- pctInstances := float64 (len (servers )) / float64 (max (1 , len (c .pool .Addrs ())))
249
- cacheLocalityScore := pctKeyspace / pctInstances
250
- c .metrics .cacheLocalityScore .Observe (cacheLocalityScore )
251
- }
252
-
253
242
results := make ([][]* logproto.GroupedChunkRefs , len (servers ))
254
- count := 0
243
+ count := atomic . NewInt64 ( 0 )
255
244
err := concurrency .ForEachJob (ctx , len (servers ), len (servers ), func (ctx context.Context , i int ) error {
256
245
rs := servers [i ]
257
246
@@ -269,10 +258,24 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
269
258
}
270
259
resp , err := client .FilterChunkRefs (ctx , req )
271
260
if err != nil {
272
- return err
261
+ // We don't want a single bloom-gw failure to fail the entire query,
262
+ // so instrument & move on
263
+ level .Error (c .logger ).Log (
264
+ "msg" , "filter failed for instance, skipping" ,
265
+ "addr" , rs .addr ,
266
+ "series" , len (rs .groups ),
267
+ "blocks" , len (rs .blocks ),
268
+ "err" , err ,
269
+ )
270
+ // filter none of the results on failed request
271
+ c .metrics .clientRequests .WithLabelValues (typeError ).Inc ()
272
+ results [i ] = rs .groups
273
+ } else {
274
+ c .metrics .clientRequests .WithLabelValues (typeSuccess ).Inc ()
275
+ results [i ] = resp .ChunkRefs
273
276
}
274
- results [ i ] = resp . ChunkRefs
275
- count += len (resp . ChunkRefs )
277
+
278
+ count . Add ( int64 ( len (results [ i ])) )
276
279
return nil
277
280
})
278
281
})
@@ -281,7 +284,7 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
281
284
return nil , err
282
285
}
283
286
284
- buf := make ([]* logproto.GroupedChunkRefs , 0 , count )
287
+ buf := make ([]* logproto.GroupedChunkRefs , 0 , int ( count . Load ()) )
285
288
return mergeSeries (results , buf )
286
289
}
287
290
0 commit comments