diff --git a/docs/sources/operations/query-acceleration-blooms.md b/docs/sources/operations/query-acceleration-blooms.md index 1d18a0350c4b2..5eccc45cf2023 100644 --- a/docs/sources/operations/query-acceleration-blooms.md +++ b/docs/sources/operations/query-acceleration-blooms.md @@ -43,8 +43,8 @@ and querying the bloom filters that only pays off at large scale deployments. {{< /admonition >}} To start building and using blooms you need to: -- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components and enable the component in the [Bloom Build config][bloom-build-cfg]. -- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] Backend target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg]. +- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components (as [microservices][microservices] or via the [SSD][ssd] `backend` target) and enable the components in the [Bloom Build config][bloom-build-cfg]. +- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] `backend` target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg]. - Enable blooms building and filtering for each tenant individually, or for all of them by default. ```yaml diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 672ce24f0f403..6cc2ecfa32f61 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -30,6 +30,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" utillog "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/v3/pkg/util/ring" ) type Builder struct { @@ -47,6 +48,10 @@ type Builder struct { chunkLoader ChunkLoader client protos.PlannerForBuilderClient + + // used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue + // therefore is nil when planner is run in microservice mode (default) + ringWatcher *common.RingWatcher } func New( @@ -59,6 +64,7 @@ func New( bloomStore bloomshipper.Store, logger log.Logger, r prometheus.Registerer, + rm *ring.RingManager, ) (*Builder, error) { utillog.WarnExperimentalUse("Bloom Builder", logger) @@ -82,11 +88,20 @@ func New( logger: logger, } + if rm != nil { + b.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger) + } + b.Service = services.NewBasicService(b.starting, b.running, b.stopping) return b, nil } -func (b *Builder) starting(_ context.Context) error { +func (b *Builder) starting(ctx context.Context) error { + if b.ringWatcher != nil { + if err := services.StartAndAwaitRunning(ctx, b.ringWatcher); err != nil { + return fmt.Errorf("error starting builder subservices: %w", err) + } + } b.metrics.running.Set(1) return nil } @@ -94,6 +109,12 @@ func (b *Builder) starting(_ context.Context) error { func (b *Builder) stopping(_ error) error { defer b.metrics.running.Set(0) + if b.ringWatcher != nil { + if err := services.StopAndAwaitTerminated(context.Background(), b.ringWatcher); err != nil { + return fmt.Errorf("error stopping builder subservices: %w", err) + } + } + if b.client != nil { // The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled // We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server. @@ -137,16 +158,27 @@ func (b *Builder) running(ctx context.Context) error { return nil } -func (b *Builder) connectAndBuild( - ctx context.Context, -) error { +func (b *Builder) plannerAddress() string { + if b.ringWatcher == nil { + return b.cfg.PlannerAddress + } + + addr, err := b.ringWatcher.GetLeaderAddress() + if err != nil { + return b.cfg.PlannerAddress + } + + return addr +} + +func (b *Builder) connectAndBuild(ctx context.Context) error { opts, err := b.cfg.GrpcConfig.DialOption(nil, nil) if err != nil { return fmt.Errorf("failed to create grpc dial options: %w", err) } // nolint:staticcheck // grpc.DialContext() has been deprecated; we'll address it before upgrading to gRPC 2. - conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...) + conn, err := grpc.DialContext(ctx, b.plannerAddress(), opts...) if err != nil { return fmt.Errorf("failed to dial bloom planner: %w", err) } diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go index 38ed1b1930fd9..6197c6209974f 100644 --- a/pkg/bloombuild/builder/builder_test.go +++ b/pkg/bloombuild/builder/builder_test.go @@ -88,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) { } flagext.DefaultValues(&cfg.GrpcConfig) - builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer) + builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer, nil) require.NoError(t, err) t.Cleanup(func() { err = services.StopAndAwaitTerminated(context.Background(), builder) diff --git a/pkg/bloombuild/common/ringwatcher.go b/pkg/bloombuild/common/ringwatcher.go new file mode 100644 index 0000000000000..f5045354d2493 --- /dev/null +++ b/pkg/bloombuild/common/ringwatcher.go @@ -0,0 +1,119 @@ +package common + +import ( + "context" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/services" +) + +const ( + RingKeyOfLeader = 0xffff +) + +type RingWatcher struct { + services.Service + id string + ring *ring.Ring + leader *ring.InstanceDesc + lookupPeriod time.Duration + logger log.Logger +} + +// NewRingWatcher creates a service.Service that watches a ring for a leader instance. +// The leader instance is the instance that owns the key `RingKeyOfLeader`. +// It provides functions to get the leader's address, and to check whether a given instance in the ring is leader. +// Bloom planner and bloom builder use this ring watcher to hook into index gateway ring when they are run as +// part of the `backend` target of the Simple Scalable Deployment (SSD). +// It should not be used for any other components outside of the bloombuild package. +func NewRingWatcher(id string, ring *ring.Ring, lookupPeriod time.Duration, logger log.Logger) *RingWatcher { + w := &RingWatcher{ + id: id, + ring: ring, + lookupPeriod: lookupPeriod, + logger: logger, + } + w.Service = services.NewBasicService(nil, w.updateLoop, nil) + return w +} + +func (w *RingWatcher) waitForInitialLeader(ctx context.Context) error { + syncTicker := time.NewTicker(time.Second) + defer syncTicker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-syncTicker.C: + w.lookupAddresses() + if w.leader != nil { + return nil + } + } + } +} + +func (w *RingWatcher) updateLoop(ctx context.Context) error { + _ = w.waitForInitialLeader(ctx) + + syncTicker := time.NewTicker(w.lookupPeriod) + defer syncTicker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-syncTicker.C: + w.lookupAddresses() + } + } +} + +func (w *RingWatcher) lookupAddresses() { + bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() + rs, err := w.ring.Get(RingKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones) + if err != nil { + level.Error(w.logger).Log("msg", "failed to get replicationset for key", "key", RingKeyOfLeader, "err", err) + w.leader = nil + return + } + + for i := range rs.Instances { + inst := rs.Instances[i] + state, err := w.ring.GetInstanceState(inst.Id) + if err != nil || state != ring.ACTIVE { + return + } + tr, err := w.ring.GetTokenRangesForInstance(inst.Id) + if err != nil && (len(tr) == 0 || tr.IncludesKey(RingKeyOfLeader)) { + if w.leader == nil || w.leader.Id != inst.Id { + level.Info(w.logger).Log("msg", "updated leader", "new_leader", inst) + } + w.leader = &inst + return + } + } + + w.leader = nil +} + +func (w *RingWatcher) IsLeader() bool { + return w.IsInstanceLeader(w.id) +} + +func (w *RingWatcher) IsInstanceLeader(instanceID string) bool { + res := w.leader != nil && w.leader.Id == instanceID + level.Debug(w.logger).Log("msg", "check if instance is leader", "inst", instanceID, "curr_leader", w.leader, "is_leader", res) + return res +} + +func (w *RingWatcher) GetLeaderAddress() (string, error) { + if w.leader == nil { + return "", ring.ErrEmptyRing + } + return w.leader.Addr, nil +} diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index f65fdf59c9acb..f66748f1832b8 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -27,9 +27,13 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" "github.com/grafana/loki/v3/pkg/util" utillog "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/v3/pkg/util/ring" ) -var errPlannerIsNotRunning = errors.New("planner is not running") +var ( + errPlannerIsNotRunning = errors.New("planner is not running") + errPlannerIsNotLeader = errors.New("planner is not leader") +) type Planner struct { services.Service @@ -52,6 +56,10 @@ type Planner struct { metrics *Metrics logger log.Logger + + // used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue + // therefore is nil when planner is run in microservice mode (default) + ringWatcher *common.RingWatcher } func New( @@ -63,6 +71,7 @@ func New( bloomStore bloomshipper.StoreBase, logger log.Logger, r prometheus.Registerer, + rm *ring.RingManager, ) (*Planner, error) { utillog.WarnExperimentalUse("Bloom Planner", logger) @@ -101,6 +110,12 @@ func New( ) svcs := []services.Service{p.tasksQueue, p.activeUsers} + + if rm != nil { + p.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger) + svcs = append(svcs, p.ringWatcher) + } + p.subservices, err = services.NewManager(svcs...) if err != nil { return nil, fmt.Errorf("error creating subservices manager: %w", err) @@ -112,6 +127,15 @@ func New( return p, nil } +func (p *Planner) isLeader() bool { + if p.ringWatcher == nil { + // when the planner runs as standalone service in microserivce mode, then there is no ringWatcher + // therefore we can safely assume that the planner is a singleton + return true + } + return p.ringWatcher.IsLeader() +} + func (p *Planner) starting(ctx context.Context) (err error) { if err := services.StartManagerAndAwaitHealthy(ctx, p.subservices); err != nil { return fmt.Errorf("error starting planner subservices: %w", err) @@ -135,10 +159,9 @@ func (p *Planner) stopping(_ error) error { func (p *Planner) running(ctx context.Context) error { go p.trackInflightRequests(ctx) - // run once at beginning - if err := p.runOne(ctx); err != nil { - level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err) - } + // run once at beginning, but delay by 1m to allow ring consolidation when running in SSD mode + initialPlanningTimer := time.NewTimer(time.Minute) + defer initialPlanningTimer.Stop() planningTicker := time.NewTicker(p.cfg.PlanningInterval) defer planningTicker.Stop() @@ -154,6 +177,12 @@ func (p *Planner) running(ctx context.Context) error { level.Debug(p.logger).Log("msg", "planner context done") return nil + case <-initialPlanningTimer.C: + level.Info(p.logger).Log("msg", "starting initial bloom build iteration") + if err := p.runOne(ctx); err != nil { + level.Error(p.logger).Log("msg", "initial bloom build iteration failed", "err", err) + } + case <-planningTicker.C: level.Info(p.logger).Log("msg", "starting bloom build iteration") if err := p.runOne(ctx); err != nil { @@ -192,6 +221,10 @@ type tenantTable struct { } func (p *Planner) runOne(ctx context.Context) error { + if !p.isLeader() { + return errPlannerIsNotLeader + } + var ( wg sync.WaitGroup start = time.Now() @@ -901,6 +934,11 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer builderID := resp.GetBuilderID() logger := log.With(p.logger, "builder", builderID) + + if !p.isLeader() { + return errPlannerIsNotLeader + } + level.Debug(logger).Log("msg", "builder connected") p.tasksQueue.RegisterConsumerConnection(builderID) diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index b44436860cd88..32f8d5798a7f2 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -532,7 +532,7 @@ func createPlanner( bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger) require.NoError(t, err) - planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg) + planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg, nil) require.NoError(t, err) return planner diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 84282974e7337..fa1b6cbfe4105 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -760,7 +760,7 @@ func (t *Loki) setupModuleManager() error { Read: {QueryFrontend, Querier}, Write: {Ingester, IngesterRF1, Distributor, PatternIngester, IngesterKafka}, - Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway}, + Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, IngesterRF1, PatternIngester, Distributor, Ruler, Compactor, Metastore, IngesterKafka}, } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 4410525b377a7..bbdaf85e7b7d0 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1708,6 +1708,10 @@ func (t *Loki) initBloomPlanner() (services.Service, error) { logger := log.With(util_log.Logger, "component", "bloom-planner") + if t.Cfg.isTarget(Backend) && t.indexGatewayRingManager != nil { + level.Info(logger).Log("msg", "initializing bloom planner in ring mode as part of backend target") + } + p, err := planner.New( t.Cfg.BloomBuild.Planner, t.Overrides, @@ -1717,6 +1721,11 @@ func (t *Loki) initBloomPlanner() (services.Service, error) { t.BloomStore, logger, prometheus.DefaultRegisterer, + // Bloom planner and builder are part of the backend target in Simple Scalable Deployment mode. + // To avoid creating a new ring just for this special case, we can use the index gateway ring, which is already + // part of the backend target. The planner creates a watcher service that regularly checks which replica is + // the leader. Only the leader plans the tasks. Builders connect to the leader instance to pull tasks. + t.indexGatewayRingManager, ) if err != nil { return nil, err @@ -1733,6 +1742,10 @@ func (t *Loki) initBloomBuilder() (services.Service, error) { logger := log.With(util_log.Logger, "component", "bloom-builder") + if t.Cfg.isTarget(Backend) && t.indexGatewayRingManager != nil { + level.Info(logger).Log("msg", "initializing bloom builder in ring mode as part of backend target") + } + return builder.New( t.Cfg.BloomBuild.Builder, t.Overrides, @@ -1743,6 +1756,11 @@ func (t *Loki) initBloomBuilder() (services.Service, error) { t.BloomStore, logger, prometheus.DefaultRegisterer, + // Bloom planner and builder are part of the backend target in Simple Scalable Deployment mode. + // To avoid creating a new ring just for this special case, we can use the index gateway ring, which is already + // part of the backend target. The planner creates a watcher service that regularly checks which replica is + // the leader. Only the leader plans the tasks. Builders connect to the leader instance to pull tasks. + t.indexGatewayRingManager, ) }