From d3ba83df3e9ab8aed362d68dbd9419da61254a00 Mon Sep 17 00:00:00 2001
From: Christian Haudum <christian.haudum@gmail.com>
Date: Mon, 2 Sep 2024 10:02:11 +0200
Subject: [PATCH] feat(blooms): Add bloom planner and bloom builder to
 `backend` target (#13997)

Previously, the bloom compactor component was part of the `backend` target in the Simple Scalable Deployment (SSD) mode. However, the bloom compactor was removed (https://github.com/grafana/loki/pull/13969) in favour of planner and builder, and therefore also removed from the backend target.

This PR adds the planner and builder components to the backend target so it can continue building blooms if enabled.

The planner needs to be run as singleton, therefore there must only be one instance that creates tasks for the builders, even if multiple replicas of the backend target are deployed.
This is achieved by leader election through the already existing index gateway ring in the backend target. The planner leader is determined by the ownership of the leader key. Builders connect to the planner leader to pull tasks.

----

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
(cherry picked from commit bf60455c8e52b87774df9ca90232b4c72d72e46b)
---
 .../operations/query-acceleration-blooms.md   |   4 +-
 pkg/bloombuild/builder/builder.go             |  42 ++++++-
 pkg/bloombuild/builder/builder_test.go        |   2 +-
 pkg/bloombuild/common/ringwatcher.go          | 119 ++++++++++++++++++
 pkg/bloombuild/planner/planner.go             |  48 ++++++-
 pkg/bloombuild/planner/planner_test.go        |   2 +-
 pkg/loki/loki.go                              |   2 +-
 pkg/loki/modules.go                           |  18 +++
 8 files changed, 222 insertions(+), 15 deletions(-)
 create mode 100644 pkg/bloombuild/common/ringwatcher.go

diff --git a/docs/sources/operations/query-acceleration-blooms.md b/docs/sources/operations/query-acceleration-blooms.md
index 1d18a0350c4b2..5eccc45cf2023 100644
--- a/docs/sources/operations/query-acceleration-blooms.md
+++ b/docs/sources/operations/query-acceleration-blooms.md
@@ -43,8 +43,8 @@ and querying the bloom filters that only pays off at large scale deployments.
 {{< /admonition >}}
 
 To start building and using blooms you need to:
-- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components and enable the component in the [Bloom Build config][bloom-build-cfg].
-- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] Backend target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
+- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components (as [microservices][microservices] or via the [SSD][ssd] `backend` target) and enable the components in the [Bloom Build config][bloom-build-cfg].
+- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] `backend` target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
 - Enable blooms building and filtering for each tenant individually, or for all of them by default.
 
 ```yaml
diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go
index 672ce24f0f403..6cc2ecfa32f61 100644
--- a/pkg/bloombuild/builder/builder.go
+++ b/pkg/bloombuild/builder/builder.go
@@ -30,6 +30,7 @@ import (
 	"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
 	"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
 	utillog "github.com/grafana/loki/v3/pkg/util/log"
+	"github.com/grafana/loki/v3/pkg/util/ring"
 )
 
 type Builder struct {
@@ -47,6 +48,10 @@ type Builder struct {
 	chunkLoader ChunkLoader
 
 	client protos.PlannerForBuilderClient
+
+	// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
+	// therefore is nil when planner is run in microservice mode (default)
+	ringWatcher *common.RingWatcher
 }
 
 func New(
@@ -59,6 +64,7 @@ func New(
 	bloomStore bloomshipper.Store,
 	logger log.Logger,
 	r prometheus.Registerer,
+	rm *ring.RingManager,
 ) (*Builder, error) {
 	utillog.WarnExperimentalUse("Bloom Builder", logger)
 
@@ -82,11 +88,20 @@ func New(
 		logger:      logger,
 	}
 
+	if rm != nil {
+		b.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
+	}
+
 	b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
 	return b, nil
 }
 
-func (b *Builder) starting(_ context.Context) error {
+func (b *Builder) starting(ctx context.Context) error {
+	if b.ringWatcher != nil {
+		if err := services.StartAndAwaitRunning(ctx, b.ringWatcher); err != nil {
+			return fmt.Errorf("error starting builder subservices: %w", err)
+		}
+	}
 	b.metrics.running.Set(1)
 	return nil
 }
@@ -94,6 +109,12 @@ func (b *Builder) starting(_ context.Context) error {
 func (b *Builder) stopping(_ error) error {
 	defer b.metrics.running.Set(0)
 
+	if b.ringWatcher != nil {
+		if err := services.StopAndAwaitTerminated(context.Background(), b.ringWatcher); err != nil {
+			return fmt.Errorf("error stopping builder subservices: %w", err)
+		}
+	}
+
 	if b.client != nil {
 		// The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled
 		// We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server.
@@ -137,16 +158,27 @@ func (b *Builder) running(ctx context.Context) error {
 	return nil
 }
 
-func (b *Builder) connectAndBuild(
-	ctx context.Context,
-) error {
+func (b *Builder) plannerAddress() string {
+	if b.ringWatcher == nil {
+		return b.cfg.PlannerAddress
+	}
+
+	addr, err := b.ringWatcher.GetLeaderAddress()
+	if err != nil {
+		return b.cfg.PlannerAddress
+	}
+
+	return addr
+}
+
+func (b *Builder) connectAndBuild(ctx context.Context) error {
 	opts, err := b.cfg.GrpcConfig.DialOption(nil, nil)
 	if err != nil {
 		return fmt.Errorf("failed to create grpc dial options: %w", err)
 	}
 
 	// nolint:staticcheck // grpc.DialContext() has been deprecated; we'll address it before upgrading to gRPC 2.
-	conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...)
+	conn, err := grpc.DialContext(ctx, b.plannerAddress(), opts...)
 	if err != nil {
 		return fmt.Errorf("failed to dial bloom planner: %w", err)
 	}
diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go
index 38ed1b1930fd9..6197c6209974f 100644
--- a/pkg/bloombuild/builder/builder_test.go
+++ b/pkg/bloombuild/builder/builder_test.go
@@ -88,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) {
 	}
 	flagext.DefaultValues(&cfg.GrpcConfig)
 
-	builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer)
+	builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer, nil)
 	require.NoError(t, err)
 	t.Cleanup(func() {
 		err = services.StopAndAwaitTerminated(context.Background(), builder)
diff --git a/pkg/bloombuild/common/ringwatcher.go b/pkg/bloombuild/common/ringwatcher.go
new file mode 100644
index 0000000000000..f5045354d2493
--- /dev/null
+++ b/pkg/bloombuild/common/ringwatcher.go
@@ -0,0 +1,119 @@
+package common
+
+import (
+	"context"
+	"time"
+
+	"github.com/go-kit/log"
+	"github.com/go-kit/log/level"
+	"github.com/grafana/dskit/ring"
+	"github.com/grafana/dskit/services"
+)
+
+const (
+	RingKeyOfLeader = 0xffff
+)
+
+type RingWatcher struct {
+	services.Service
+	id           string
+	ring         *ring.Ring
+	leader       *ring.InstanceDesc
+	lookupPeriod time.Duration
+	logger       log.Logger
+}
+
+// NewRingWatcher creates a service.Service that watches a ring for a leader instance.
+// The leader instance is the instance that owns the key `RingKeyOfLeader`.
+// It provides functions to get the leader's address, and to check whether a given instance in the ring is leader.
+// Bloom planner and bloom builder use this ring watcher to hook into index gateway ring when they are run as
+// part of the `backend` target of the Simple Scalable Deployment (SSD).
+// It should not be used for any other components outside of the bloombuild package.
+func NewRingWatcher(id string, ring *ring.Ring, lookupPeriod time.Duration, logger log.Logger) *RingWatcher {
+	w := &RingWatcher{
+		id:           id,
+		ring:         ring,
+		lookupPeriod: lookupPeriod,
+		logger:       logger,
+	}
+	w.Service = services.NewBasicService(nil, w.updateLoop, nil)
+	return w
+}
+
+func (w *RingWatcher) waitForInitialLeader(ctx context.Context) error {
+	syncTicker := time.NewTicker(time.Second)
+	defer syncTicker.Stop()
+
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case <-syncTicker.C:
+			w.lookupAddresses()
+			if w.leader != nil {
+				return nil
+			}
+		}
+	}
+}
+
+func (w *RingWatcher) updateLoop(ctx context.Context) error {
+	_ = w.waitForInitialLeader(ctx)
+
+	syncTicker := time.NewTicker(w.lookupPeriod)
+	defer syncTicker.Stop()
+
+	for {
+		select {
+		case <-ctx.Done():
+			return nil
+		case <-syncTicker.C:
+			w.lookupAddresses()
+		}
+	}
+}
+
+func (w *RingWatcher) lookupAddresses() {
+	bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
+	rs, err := w.ring.Get(RingKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones)
+	if err != nil {
+		level.Error(w.logger).Log("msg", "failed to get replicationset for key", "key", RingKeyOfLeader, "err", err)
+		w.leader = nil
+		return
+	}
+
+	for i := range rs.Instances {
+		inst := rs.Instances[i]
+		state, err := w.ring.GetInstanceState(inst.Id)
+		if err != nil || state != ring.ACTIVE {
+			return
+		}
+		tr, err := w.ring.GetTokenRangesForInstance(inst.Id)
+		if err != nil && (len(tr) == 0 || tr.IncludesKey(RingKeyOfLeader)) {
+			if w.leader == nil || w.leader.Id != inst.Id {
+				level.Info(w.logger).Log("msg", "updated leader", "new_leader", inst)
+			}
+			w.leader = &inst
+			return
+		}
+	}
+
+	w.leader = nil
+}
+
+func (w *RingWatcher) IsLeader() bool {
+	return w.IsInstanceLeader(w.id)
+}
+
+func (w *RingWatcher) IsInstanceLeader(instanceID string) bool {
+	res := w.leader != nil && w.leader.Id == instanceID
+	level.Debug(w.logger).Log("msg", "check if instance is leader", "inst", instanceID, "curr_leader", w.leader, "is_leader", res)
+	return res
+}
+
+func (w *RingWatcher) GetLeaderAddress() (string, error) {
+	if w.leader == nil {
+		return "", ring.ErrEmptyRing
+	}
+	return w.leader.Addr, nil
+}
diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go
index f65fdf59c9acb..f66748f1832b8 100644
--- a/pkg/bloombuild/planner/planner.go
+++ b/pkg/bloombuild/planner/planner.go
@@ -27,9 +27,13 @@ import (
 	"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
 	"github.com/grafana/loki/v3/pkg/util"
 	utillog "github.com/grafana/loki/v3/pkg/util/log"
+	"github.com/grafana/loki/v3/pkg/util/ring"
 )
 
-var errPlannerIsNotRunning = errors.New("planner is not running")
+var (
+	errPlannerIsNotRunning = errors.New("planner is not running")
+	errPlannerIsNotLeader  = errors.New("planner is not leader")
+)
 
 type Planner struct {
 	services.Service
@@ -52,6 +56,10 @@ type Planner struct {
 
 	metrics *Metrics
 	logger  log.Logger
+
+	// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
+	// therefore is nil when planner is run in microservice mode (default)
+	ringWatcher *common.RingWatcher
 }
 
 func New(
@@ -63,6 +71,7 @@ func New(
 	bloomStore bloomshipper.StoreBase,
 	logger log.Logger,
 	r prometheus.Registerer,
+	rm *ring.RingManager,
 ) (*Planner, error) {
 	utillog.WarnExperimentalUse("Bloom Planner", logger)
 
@@ -101,6 +110,12 @@ func New(
 	)
 
 	svcs := []services.Service{p.tasksQueue, p.activeUsers}
+
+	if rm != nil {
+		p.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
+		svcs = append(svcs, p.ringWatcher)
+	}
+
 	p.subservices, err = services.NewManager(svcs...)
 	if err != nil {
 		return nil, fmt.Errorf("error creating subservices manager: %w", err)
@@ -112,6 +127,15 @@ func New(
 	return p, nil
 }
 
+func (p *Planner) isLeader() bool {
+	if p.ringWatcher == nil {
+		// when the planner runs as standalone service in microserivce mode, then there is no ringWatcher
+		// therefore we can safely assume that the planner is a singleton
+		return true
+	}
+	return p.ringWatcher.IsLeader()
+}
+
 func (p *Planner) starting(ctx context.Context) (err error) {
 	if err := services.StartManagerAndAwaitHealthy(ctx, p.subservices); err != nil {
 		return fmt.Errorf("error starting planner subservices: %w", err)
@@ -135,10 +159,9 @@ func (p *Planner) stopping(_ error) error {
 func (p *Planner) running(ctx context.Context) error {
 	go p.trackInflightRequests(ctx)
 
-	// run once at beginning
-	if err := p.runOne(ctx); err != nil {
-		level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err)
-	}
+	// run once at beginning, but delay by 1m to allow ring consolidation when running in SSD mode
+	initialPlanningTimer := time.NewTimer(time.Minute)
+	defer initialPlanningTimer.Stop()
 
 	planningTicker := time.NewTicker(p.cfg.PlanningInterval)
 	defer planningTicker.Stop()
@@ -154,6 +177,12 @@ func (p *Planner) running(ctx context.Context) error {
 			level.Debug(p.logger).Log("msg", "planner context done")
 			return nil
 
+		case <-initialPlanningTimer.C:
+			level.Info(p.logger).Log("msg", "starting initial bloom build iteration")
+			if err := p.runOne(ctx); err != nil {
+				level.Error(p.logger).Log("msg", "initial bloom build iteration failed", "err", err)
+			}
+
 		case <-planningTicker.C:
 			level.Info(p.logger).Log("msg", "starting bloom build iteration")
 			if err := p.runOne(ctx); err != nil {
@@ -192,6 +221,10 @@ type tenantTable struct {
 }
 
 func (p *Planner) runOne(ctx context.Context) error {
+	if !p.isLeader() {
+		return errPlannerIsNotLeader
+	}
+
 	var (
 		wg     sync.WaitGroup
 		start  = time.Now()
@@ -901,6 +934,11 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
 
 	builderID := resp.GetBuilderID()
 	logger := log.With(p.logger, "builder", builderID)
+
+	if !p.isLeader() {
+		return errPlannerIsNotLeader
+	}
+
 	level.Debug(logger).Log("msg", "builder connected")
 
 	p.tasksQueue.RegisterConsumerConnection(builderID)
diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go
index b44436860cd88..32f8d5798a7f2 100644
--- a/pkg/bloombuild/planner/planner_test.go
+++ b/pkg/bloombuild/planner/planner_test.go
@@ -532,7 +532,7 @@ func createPlanner(
 	bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
 	require.NoError(t, err)
 
-	planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg)
+	planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg, nil)
 	require.NoError(t, err)
 
 	return planner
diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go
index 84282974e7337..fa1b6cbfe4105 100644
--- a/pkg/loki/loki.go
+++ b/pkg/loki/loki.go
@@ -760,7 +760,7 @@ func (t *Loki) setupModuleManager() error {
 
 		Read:    {QueryFrontend, Querier},
 		Write:   {Ingester, IngesterRF1, Distributor, PatternIngester, IngesterKafka},
-		Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway},
+		Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway},
 
 		All: {QueryScheduler, QueryFrontend, Querier, Ingester, IngesterRF1, PatternIngester, Distributor, Ruler, Compactor, Metastore, IngesterKafka},
 	}
diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go
index 4410525b377a7..bbdaf85e7b7d0 100644
--- a/pkg/loki/modules.go
+++ b/pkg/loki/modules.go
@@ -1708,6 +1708,10 @@ func (t *Loki) initBloomPlanner() (services.Service, error) {
 
 	logger := log.With(util_log.Logger, "component", "bloom-planner")
 
+	if t.Cfg.isTarget(Backend) && t.indexGatewayRingManager != nil {
+		level.Info(logger).Log("msg", "initializing bloom planner in ring mode as part of backend target")
+	}
+
 	p, err := planner.New(
 		t.Cfg.BloomBuild.Planner,
 		t.Overrides,
@@ -1717,6 +1721,11 @@ func (t *Loki) initBloomPlanner() (services.Service, error) {
 		t.BloomStore,
 		logger,
 		prometheus.DefaultRegisterer,
+		// Bloom planner and builder are part of the backend target in Simple Scalable Deployment mode.
+		// To avoid creating a new ring just for this special case, we can use the index gateway ring, which is already
+		// part of the backend target. The planner creates a watcher service that regularly checks which replica is
+		// the leader. Only the leader plans the tasks. Builders connect to the leader instance to pull tasks.
+		t.indexGatewayRingManager,
 	)
 	if err != nil {
 		return nil, err
@@ -1733,6 +1742,10 @@ func (t *Loki) initBloomBuilder() (services.Service, error) {
 
 	logger := log.With(util_log.Logger, "component", "bloom-builder")
 
+	if t.Cfg.isTarget(Backend) && t.indexGatewayRingManager != nil {
+		level.Info(logger).Log("msg", "initializing bloom builder in ring mode as part of backend target")
+	}
+
 	return builder.New(
 		t.Cfg.BloomBuild.Builder,
 		t.Overrides,
@@ -1743,6 +1756,11 @@ func (t *Loki) initBloomBuilder() (services.Service, error) {
 		t.BloomStore,
 		logger,
 		prometheus.DefaultRegisterer,
+		// Bloom planner and builder are part of the backend target in Simple Scalable Deployment mode.
+		// To avoid creating a new ring just for this special case, we can use the index gateway ring, which is already
+		// part of the backend target. The planner creates a watcher service that regularly checks which replica is
+		// the leader. Only the leader plans the tasks. Builders connect to the leader instance to pull tasks.
+		t.indexGatewayRingManager,
 	)
 }