Skip to content

Commit b101b28

Browse files
committed
Suppress error from resolving server addresses for blocks
Instead of returning the error, `FilterChunks` returns the full, unfiltered list of chunks. This makes sure that queries do not fail if no bloom gateway instances are available. Signed-off-by: Christian Haudum <[email protected]>
1 parent d451e23 commit b101b28

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

pkg/bloomgateway/client.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,20 @@ type Client interface {
116116
FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
117117
}
118118

119+
// clientPool is a minimal interface that is satisfied by the JumpHashClientPool.
120+
// It does only expose functions that are used by the GatewayClient
121+
// and is required to mock the JumpHashClientPool in tests.
122+
type clientPool interface {
123+
GetClientFor(string) (ringclient.PoolClient, error)
124+
Addr(string) (string, error)
125+
Stop()
126+
}
127+
119128
type GatewayClient struct {
120129
cfg ClientConfig
121130
logger log.Logger
122131
metrics *clientMetrics
123-
pool *JumpHashClientPool
132+
pool clientPool
124133
dnsProvider *discovery.DNS
125134
}
126135

@@ -211,8 +220,15 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
211220
servers := make([]addrWithGroups, 0, len(blocks))
212221
for _, blockWithSeries := range blocks {
213222
addr, err := c.pool.Addr(blockWithSeries.block.String())
223+
224+
// the client should return the full, unfiltered list of chunks instead of an error
214225
if err != nil {
215-
return nil, errors.Wrapf(err, "server address for block: %s", blockWithSeries.block)
226+
level.Error(c.logger).Log("msg", "failed to resolve server address for block", "block", blockWithSeries.block, "err", err)
227+
var series [][]*logproto.GroupedChunkRefs
228+
for i := range blocks {
229+
series = append(series, blocks[i].series)
230+
}
231+
return mergeSeries(series, nil)
216232
}
217233

218234
if idx, found := pos[addr]; found {

pkg/bloomgateway/client_test.go

+40-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/go-kit/log"
88
"github.com/grafana/dskit/flagext"
9+
"github.com/pkg/errors"
910
"github.com/prometheus/client_golang/prometheus"
1011
"github.com/prometheus/common/model"
1112
"github.com/stretchr/testify/require"
@@ -16,16 +17,24 @@ import (
1617
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
1718
)
1819

20+
type errorMockPool struct {
21+
*JumpHashClientPool
22+
}
23+
24+
func (p *errorMockPool) Addr(_ string) (string, error) {
25+
return "", errors.New("no server found")
26+
}
27+
1928
func TestBloomGatewayClient(t *testing.T) {
2029
logger := log.NewNopLogger()
21-
reg := prometheus.NewRegistry()
2230

2331
limits := newLimits()
2432

2533
cfg := ClientConfig{}
2634
flagext.DefaultValues(&cfg)
2735

2836
t.Run("FilterChunks returns response", func(t *testing.T) {
37+
reg := prometheus.NewRegistry()
2938
c, err := NewClient(cfg, limits, reg, logger, nil, false)
3039
require.NoError(t, err)
3140
expr, err := syntax.ParseExpr(`{foo="bar"}`)
@@ -34,6 +43,33 @@ func TestBloomGatewayClient(t *testing.T) {
3443
require.NoError(t, err)
3544
require.Equal(t, 0, len(res))
3645
})
46+
47+
t.Run("pool error is suppressed and returns full list of chunks", func(t *testing.T) {
48+
reg := prometheus.NewRegistry()
49+
c, err := NewClient(cfg, limits, reg, logger, nil, false)
50+
require.NoError(t, err)
51+
c.pool = &errorMockPool{}
52+
53+
expected := []*logproto.GroupedChunkRefs{
54+
{Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1)}},
55+
{Fingerprint: 0x9f, Refs: []*logproto.ShortRef{shortRef(0, 1, 2)}},
56+
{Fingerprint: 0xa0, Refs: []*logproto.ShortRef{shortRef(0, 1, 3)}},
57+
{Fingerprint: 0xff, Refs: []*logproto.ShortRef{shortRef(0, 1, 4)}},
58+
}
59+
60+
blocks := []blockWithSeries{
61+
{block: mkBlockRef(0x00, 0x9f), series: expected[0:2]},
62+
{block: mkBlockRef(0xa0, 0xff), series: expected[2:4]},
63+
}
64+
expr, err := syntax.ParseExpr(`{foo="bar"}`)
65+
require.NoError(t, err)
66+
67+
res, err := c.FilterChunks(context.Background(), "tenant", bloomshipper.NewInterval(0, 0), blocks, plan.QueryPlan{AST: expr})
68+
require.NoError(t, err)
69+
require.Equal(t, 4, len(res))
70+
71+
require.Equal(t, expected, res)
72+
})
3773
}
3874

3975
func shortRef(f, t model.Time, c uint32) *logproto.ShortRef {
@@ -94,3 +130,6 @@ func TestGatewayClient_MergeChunkSets(t *testing.T) {
94130
result := mergeChunkSets(inp1, inp2)
95131
require.Equal(t, expected, result)
96132
}
133+
134+
func TestGatewayClient_FilterChunks(t *testing.T) {
135+
}

0 commit comments

Comments
 (0)