Skip to content

Commit d3ba83d

Browse files
chaudumgrafana-delivery-bot[bot]
authored andcommitted
feat(blooms): Add bloom planner and bloom builder to backend target (#13997)
Previously, the bloom compactor component was part of the `backend` target in the Simple Scalable Deployment (SSD) mode. However, the bloom compactor was removed (#13969) in favour of planner and builder, and therefore also removed from the backend target. This PR adds the planner and builder components to the backend target so it can continue building blooms if enabled. The planner needs to be run as singleton, therefore there must only be one instance that creates tasks for the builders, even if multiple replicas of the backend target are deployed. This is achieved by leader election through the already existing index gateway ring in the backend target. The planner leader is determined by the ownership of the leader key. Builders connect to the planner leader to pull tasks. ---- Signed-off-by: Christian Haudum <[email protected]> (cherry picked from commit bf60455)
1 parent 6d3efa0 commit d3ba83d

File tree

8 files changed

+222
-15
lines changed

8 files changed

+222
-15
lines changed

docs/sources/operations/query-acceleration-blooms.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ and querying the bloom filters that only pays off at large scale deployments.
4343
{{< /admonition >}}
4444

4545
To start building and using blooms you need to:
46-
- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components and enable the component in the [Bloom Build config][bloom-build-cfg].
47-
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] Backend target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
46+
- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components (as [microservices][microservices] or via the [SSD][ssd] `backend` target) and enable the components in the [Bloom Build config][bloom-build-cfg].
47+
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] `backend` target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
4848
- Enable blooms building and filtering for each tenant individually, or for all of them by default.
4949

5050
```yaml

pkg/bloombuild/builder/builder.go

+37-5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
3131
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
3232
utillog "github.com/grafana/loki/v3/pkg/util/log"
33+
"github.com/grafana/loki/v3/pkg/util/ring"
3334
)
3435

3536
type Builder struct {
@@ -47,6 +48,10 @@ type Builder struct {
4748
chunkLoader ChunkLoader
4849

4950
client protos.PlannerForBuilderClient
51+
52+
// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
53+
// therefore is nil when planner is run in microservice mode (default)
54+
ringWatcher *common.RingWatcher
5055
}
5156

5257
func New(
@@ -59,6 +64,7 @@ func New(
5964
bloomStore bloomshipper.Store,
6065
logger log.Logger,
6166
r prometheus.Registerer,
67+
rm *ring.RingManager,
6268
) (*Builder, error) {
6369
utillog.WarnExperimentalUse("Bloom Builder", logger)
6470

@@ -82,18 +88,33 @@ func New(
8288
logger: logger,
8389
}
8490

91+
if rm != nil {
92+
b.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
93+
}
94+
8595
b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
8696
return b, nil
8797
}
8898

89-
func (b *Builder) starting(_ context.Context) error {
99+
func (b *Builder) starting(ctx context.Context) error {
100+
if b.ringWatcher != nil {
101+
if err := services.StartAndAwaitRunning(ctx, b.ringWatcher); err != nil {
102+
return fmt.Errorf("error starting builder subservices: %w", err)
103+
}
104+
}
90105
b.metrics.running.Set(1)
91106
return nil
92107
}
93108

94109
func (b *Builder) stopping(_ error) error {
95110
defer b.metrics.running.Set(0)
96111

112+
if b.ringWatcher != nil {
113+
if err := services.StopAndAwaitTerminated(context.Background(), b.ringWatcher); err != nil {
114+
return fmt.Errorf("error stopping builder subservices: %w", err)
115+
}
116+
}
117+
97118
if b.client != nil {
98119
// The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled
99120
// We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server.
@@ -137,16 +158,27 @@ func (b *Builder) running(ctx context.Context) error {
137158
return nil
138159
}
139160

140-
func (b *Builder) connectAndBuild(
141-
ctx context.Context,
142-
) error {
161+
func (b *Builder) plannerAddress() string {
162+
if b.ringWatcher == nil {
163+
return b.cfg.PlannerAddress
164+
}
165+
166+
addr, err := b.ringWatcher.GetLeaderAddress()
167+
if err != nil {
168+
return b.cfg.PlannerAddress
169+
}
170+
171+
return addr
172+
}
173+
174+
func (b *Builder) connectAndBuild(ctx context.Context) error {
143175
opts, err := b.cfg.GrpcConfig.DialOption(nil, nil)
144176
if err != nil {
145177
return fmt.Errorf("failed to create grpc dial options: %w", err)
146178
}
147179

148180
// nolint:staticcheck // grpc.DialContext() has been deprecated; we'll address it before upgrading to gRPC 2.
149-
conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...)
181+
conn, err := grpc.DialContext(ctx, b.plannerAddress(), opts...)
150182
if err != nil {
151183
return fmt.Errorf("failed to dial bloom planner: %w", err)
152184
}

pkg/bloombuild/builder/builder_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) {
8888
}
8989
flagext.DefaultValues(&cfg.GrpcConfig)
9090

91-
builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer)
91+
builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer, nil)
9292
require.NoError(t, err)
9393
t.Cleanup(func() {
9494
err = services.StopAndAwaitTerminated(context.Background(), builder)

pkg/bloombuild/common/ringwatcher.go

+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package common
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/go-kit/log"
8+
"github.com/go-kit/log/level"
9+
"github.com/grafana/dskit/ring"
10+
"github.com/grafana/dskit/services"
11+
)
12+
13+
const (
14+
RingKeyOfLeader = 0xffff
15+
)
16+
17+
type RingWatcher struct {
18+
services.Service
19+
id string
20+
ring *ring.Ring
21+
leader *ring.InstanceDesc
22+
lookupPeriod time.Duration
23+
logger log.Logger
24+
}
25+
26+
// NewRingWatcher creates a service.Service that watches a ring for a leader instance.
27+
// The leader instance is the instance that owns the key `RingKeyOfLeader`.
28+
// It provides functions to get the leader's address, and to check whether a given instance in the ring is leader.
29+
// Bloom planner and bloom builder use this ring watcher to hook into index gateway ring when they are run as
30+
// part of the `backend` target of the Simple Scalable Deployment (SSD).
31+
// It should not be used for any other components outside of the bloombuild package.
32+
func NewRingWatcher(id string, ring *ring.Ring, lookupPeriod time.Duration, logger log.Logger) *RingWatcher {
33+
w := &RingWatcher{
34+
id: id,
35+
ring: ring,
36+
lookupPeriod: lookupPeriod,
37+
logger: logger,
38+
}
39+
w.Service = services.NewBasicService(nil, w.updateLoop, nil)
40+
return w
41+
}
42+
43+
func (w *RingWatcher) waitForInitialLeader(ctx context.Context) error {
44+
syncTicker := time.NewTicker(time.Second)
45+
defer syncTicker.Stop()
46+
47+
for {
48+
select {
49+
case <-ctx.Done():
50+
return ctx.Err()
51+
case <-syncTicker.C:
52+
w.lookupAddresses()
53+
if w.leader != nil {
54+
return nil
55+
}
56+
}
57+
}
58+
}
59+
60+
func (w *RingWatcher) updateLoop(ctx context.Context) error {
61+
_ = w.waitForInitialLeader(ctx)
62+
63+
syncTicker := time.NewTicker(w.lookupPeriod)
64+
defer syncTicker.Stop()
65+
66+
for {
67+
select {
68+
case <-ctx.Done():
69+
return nil
70+
case <-syncTicker.C:
71+
w.lookupAddresses()
72+
}
73+
}
74+
}
75+
76+
func (w *RingWatcher) lookupAddresses() {
77+
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
78+
rs, err := w.ring.Get(RingKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones)
79+
if err != nil {
80+
level.Error(w.logger).Log("msg", "failed to get replicationset for key", "key", RingKeyOfLeader, "err", err)
81+
w.leader = nil
82+
return
83+
}
84+
85+
for i := range rs.Instances {
86+
inst := rs.Instances[i]
87+
state, err := w.ring.GetInstanceState(inst.Id)
88+
if err != nil || state != ring.ACTIVE {
89+
return
90+
}
91+
tr, err := w.ring.GetTokenRangesForInstance(inst.Id)
92+
if err != nil && (len(tr) == 0 || tr.IncludesKey(RingKeyOfLeader)) {
93+
if w.leader == nil || w.leader.Id != inst.Id {
94+
level.Info(w.logger).Log("msg", "updated leader", "new_leader", inst)
95+
}
96+
w.leader = &inst
97+
return
98+
}
99+
}
100+
101+
w.leader = nil
102+
}
103+
104+
func (w *RingWatcher) IsLeader() bool {
105+
return w.IsInstanceLeader(w.id)
106+
}
107+
108+
func (w *RingWatcher) IsInstanceLeader(instanceID string) bool {
109+
res := w.leader != nil && w.leader.Id == instanceID
110+
level.Debug(w.logger).Log("msg", "check if instance is leader", "inst", instanceID, "curr_leader", w.leader, "is_leader", res)
111+
return res
112+
}
113+
114+
func (w *RingWatcher) GetLeaderAddress() (string, error) {
115+
if w.leader == nil {
116+
return "", ring.ErrEmptyRing
117+
}
118+
return w.leader.Addr, nil
119+
}

pkg/bloombuild/planner/planner.go

+43-5
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@ import (
2727
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
2828
"github.com/grafana/loki/v3/pkg/util"
2929
utillog "github.com/grafana/loki/v3/pkg/util/log"
30+
"github.com/grafana/loki/v3/pkg/util/ring"
3031
)
3132

32-
var errPlannerIsNotRunning = errors.New("planner is not running")
33+
var (
34+
errPlannerIsNotRunning = errors.New("planner is not running")
35+
errPlannerIsNotLeader = errors.New("planner is not leader")
36+
)
3337

3438
type Planner struct {
3539
services.Service
@@ -52,6 +56,10 @@ type Planner struct {
5256

5357
metrics *Metrics
5458
logger log.Logger
59+
60+
// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
61+
// therefore is nil when planner is run in microservice mode (default)
62+
ringWatcher *common.RingWatcher
5563
}
5664

5765
func New(
@@ -63,6 +71,7 @@ func New(
6371
bloomStore bloomshipper.StoreBase,
6472
logger log.Logger,
6573
r prometheus.Registerer,
74+
rm *ring.RingManager,
6675
) (*Planner, error) {
6776
utillog.WarnExperimentalUse("Bloom Planner", logger)
6877

@@ -101,6 +110,12 @@ func New(
101110
)
102111

103112
svcs := []services.Service{p.tasksQueue, p.activeUsers}
113+
114+
if rm != nil {
115+
p.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
116+
svcs = append(svcs, p.ringWatcher)
117+
}
118+
104119
p.subservices, err = services.NewManager(svcs...)
105120
if err != nil {
106121
return nil, fmt.Errorf("error creating subservices manager: %w", err)
@@ -112,6 +127,15 @@ func New(
112127
return p, nil
113128
}
114129

130+
func (p *Planner) isLeader() bool {
131+
if p.ringWatcher == nil {
132+
// when the planner runs as standalone service in microserivce mode, then there is no ringWatcher
133+
// therefore we can safely assume that the planner is a singleton
134+
return true
135+
}
136+
return p.ringWatcher.IsLeader()
137+
}
138+
115139
func (p *Planner) starting(ctx context.Context) (err error) {
116140
if err := services.StartManagerAndAwaitHealthy(ctx, p.subservices); err != nil {
117141
return fmt.Errorf("error starting planner subservices: %w", err)
@@ -135,10 +159,9 @@ func (p *Planner) stopping(_ error) error {
135159
func (p *Planner) running(ctx context.Context) error {
136160
go p.trackInflightRequests(ctx)
137161

138-
// run once at beginning
139-
if err := p.runOne(ctx); err != nil {
140-
level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err)
141-
}
162+
// run once at beginning, but delay by 1m to allow ring consolidation when running in SSD mode
163+
initialPlanningTimer := time.NewTimer(time.Minute)
164+
defer initialPlanningTimer.Stop()
142165

143166
planningTicker := time.NewTicker(p.cfg.PlanningInterval)
144167
defer planningTicker.Stop()
@@ -154,6 +177,12 @@ func (p *Planner) running(ctx context.Context) error {
154177
level.Debug(p.logger).Log("msg", "planner context done")
155178
return nil
156179

180+
case <-initialPlanningTimer.C:
181+
level.Info(p.logger).Log("msg", "starting initial bloom build iteration")
182+
if err := p.runOne(ctx); err != nil {
183+
level.Error(p.logger).Log("msg", "initial bloom build iteration failed", "err", err)
184+
}
185+
157186
case <-planningTicker.C:
158187
level.Info(p.logger).Log("msg", "starting bloom build iteration")
159188
if err := p.runOne(ctx); err != nil {
@@ -192,6 +221,10 @@ type tenantTable struct {
192221
}
193222

194223
func (p *Planner) runOne(ctx context.Context) error {
224+
if !p.isLeader() {
225+
return errPlannerIsNotLeader
226+
}
227+
195228
var (
196229
wg sync.WaitGroup
197230
start = time.Now()
@@ -901,6 +934,11 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
901934

902935
builderID := resp.GetBuilderID()
903936
logger := log.With(p.logger, "builder", builderID)
937+
938+
if !p.isLeader() {
939+
return errPlannerIsNotLeader
940+
}
941+
904942
level.Debug(logger).Log("msg", "builder connected")
905943

906944
p.tasksQueue.RegisterConsumerConnection(builderID)

pkg/bloombuild/planner/planner_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ func createPlanner(
532532
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
533533
require.NoError(t, err)
534534

535-
planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg)
535+
planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg, nil)
536536
require.NoError(t, err)
537537

538538
return planner

pkg/loki/loki.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,7 @@ func (t *Loki) setupModuleManager() error {
760760

761761
Read: {QueryFrontend, Querier},
762762
Write: {Ingester, IngesterRF1, Distributor, PatternIngester, IngesterKafka},
763-
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway},
763+
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway},
764764

765765
All: {QueryScheduler, QueryFrontend, Querier, Ingester, IngesterRF1, PatternIngester, Distributor, Ruler, Compactor, Metastore, IngesterKafka},
766766
}

0 commit comments

Comments
 (0)