Skip to content

Commit efd8f5d

Browse files
authored
refactor(blooms): Add queue to bloom planner and enqueue tasks (#13005)
1 parent d6f29fc commit efd8f5d

File tree

6 files changed

+145
-22
lines changed

6 files changed

+145
-22
lines changed

docs/sources/shared/configuration.md

+9
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,10 @@ bloom_build:
351351
# CLI flag: -bloom-build.planner.max-table-offset
352352
[max_table_offset: <int> | default = 2]
353353

354+
# Maximum number of tasks to queue per tenant.
355+
# CLI flag: -bloom-build.planner.max-tasks-per-tenant
356+
[max_queued_tasks_per_tenant: <int> | default = 30000]
357+
354358
builder:
355359

356360
# Experimental: The bloom_gateway block configures the Loki bloom gateway
@@ -3409,6 +3413,11 @@ shard_streams:
34093413
# CLI flag: -bloom-build.split-keyspace-by
34103414
[bloom_split_series_keyspace_by: <int> | default = 256]
34113415

3416+
# Experimental. Maximum number of builders to use when building blooms. 0 allows
3417+
# unlimited builders.
3418+
# CLI flag: -bloom-build.max-builders
3419+
[bloom_build_max_builders: <int> | default = 0]
3420+
34123421
# Experimental. Length of the n-grams created when computing blooms from log
34133422
# lines.
34143423
# CLI flag: -bloom-compactor.ngram-length

pkg/bloombuild/planner/config.go

+29-3
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import (
88

99
// Config configures the bloom-planner component.
1010
type Config struct {
11-
PlanningInterval time.Duration `yaml:"planning_interval"`
12-
MinTableOffset int `yaml:"min_table_offset"`
13-
MaxTableOffset int `yaml:"max_table_offset"`
11+
PlanningInterval time.Duration `yaml:"planning_interval"`
12+
MinTableOffset int `yaml:"min_table_offset"`
13+
MaxTableOffset int `yaml:"max_table_offset"`
14+
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
1415
}
1516

1617
// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
@@ -24,6 +25,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
2425
// dynamically reloaded.
2526
// I'm doing it the simple way for now.
2627
f.IntVar(&cfg.MaxTableOffset, prefix+".max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
28+
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
2729
}
2830

2931
func (cfg *Config) Validate() error {
@@ -37,4 +39,28 @@ func (cfg *Config) Validate() error {
3739
type Limits interface {
3840
BloomCreationEnabled(tenantID string) bool
3941
BloomSplitSeriesKeyspaceBy(tenantID string) int
42+
BloomBuildMaxBuilders(tenantID string) int
43+
}
44+
45+
type QueueLimits struct {
46+
limits Limits
47+
}
48+
49+
func NewQueueLimits(limits Limits) *QueueLimits {
50+
return &QueueLimits{limits: limits}
51+
}
52+
53+
// MaxConsumers is used to compute how many of the available builders are allowed to handle tasks for a given tenant.
54+
// 0 is returned when neither limits are applied. 0 means all builders can be used.
55+
func (c *QueueLimits) MaxConsumers(tenantID string, allConsumers int) int {
56+
if c == nil || c.limits == nil {
57+
return 0
58+
}
59+
60+
maxBuilders := c.limits.BloomBuildMaxBuilders(tenantID)
61+
if maxBuilders == 0 {
62+
return 0
63+
}
64+
65+
return min(allConsumers, maxBuilders)
4066
}

pkg/bloombuild/planner/metrics.go

+39-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package planner
22

33
import (
4+
"time"
5+
46
"github.com/prometheus/client_golang/prometheus"
57
"github.com/prometheus/client_golang/prometheus/promauto"
8+
9+
"github.com/grafana/loki/v3/pkg/queue"
610
)
711

812
const (
@@ -16,21 +20,51 @@ const (
1620
type Metrics struct {
1721
running prometheus.Gauge
1822

23+
// Extra Queue metrics
24+
connectedBuilders prometheus.GaugeFunc
25+
queueDuration prometheus.Histogram
26+
inflightRequests prometheus.Summary
27+
1928
buildStarted prometheus.Counter
2029
buildCompleted *prometheus.CounterVec
2130
buildTime *prometheus.HistogramVec
2231

2332
tenantsDiscovered prometheus.Counter
2433
}
2534

26-
func NewMetrics(r prometheus.Registerer) *Metrics {
35+
func NewMetrics(
36+
r prometheus.Registerer,
37+
getConnectedBuilders func() float64,
38+
) *Metrics {
2739
return &Metrics{
2840
running: promauto.With(r).NewGauge(prometheus.GaugeOpts{
2941
Namespace: metricsNamespace,
3042
Subsystem: metricsSubsystem,
3143
Name: "running",
3244
Help: "Value will be 1 if bloom planner is currently running on this instance",
3345
}),
46+
connectedBuilders: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
47+
Namespace: metricsNamespace,
48+
Subsystem: metricsSubsystem,
49+
Name: "connected_builders",
50+
Help: "Number of builders currently connected to the planner.",
51+
}, getConnectedBuilders),
52+
queueDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
53+
Namespace: metricsNamespace,
54+
Subsystem: metricsSubsystem,
55+
Name: "queue_duration_seconds",
56+
Help: "Time spend by tasks in queue before getting picked up by a builder.",
57+
Buckets: prometheus.DefBuckets,
58+
}),
59+
inflightRequests: promauto.With(r).NewSummary(prometheus.SummaryOpts{
60+
Namespace: metricsNamespace,
61+
Subsystem: metricsSubsystem,
62+
Name: "inflight_tasks",
63+
Help: "Number of inflight tasks (either queued or processing) sampled at a regular interval. Quantile buckets keep track of inflight tasks over the last 60s.",
64+
Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
65+
MaxAge: time.Minute,
66+
AgeBuckets: 6,
67+
}),
3468

3569
buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
3670
Namespace: metricsNamespace,
@@ -60,3 +94,7 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
6094
}),
6195
}
6296
}
97+
98+
func NewQueueMetrics(r prometheus.Registerer) *queue.Metrics {
99+
return queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
100+
}

pkg/bloombuild/planner/planner.go

+55-18
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,21 @@ import (
1212
"github.com/prometheus/client_golang/prometheus"
1313
"github.com/prometheus/common/model"
1414

15+
"github.com/grafana/loki/v3/pkg/queue"
1516
"github.com/grafana/loki/v3/pkg/storage"
1617
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
1718
"github.com/grafana/loki/v3/pkg/storage/config"
1819
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
1920
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
21+
"github.com/grafana/loki/v3/pkg/util"
2022
utillog "github.com/grafana/loki/v3/pkg/util/log"
2123
)
2224

2325
type Planner struct {
2426
services.Service
27+
// Subservices manager.
28+
subservices *services.Manager
29+
subservicesWatcher *services.FailureWatcher
2530

2631
cfg Config
2732
limits Limits
@@ -30,6 +35,9 @@ type Planner struct {
3035
tsdbStore TSDBStore
3136
bloomStore bloomshipper.Store
3237

38+
tasksQueue *queue.RequestQueue
39+
activeUsers *util.ActiveUsersCleanupService
40+
3341
metrics *Metrics
3442
logger log.Logger
3543
}
@@ -51,15 +59,34 @@ func New(
5159
return nil, fmt.Errorf("error creating TSDB store: %w", err)
5260
}
5361

62+
// Queue to manage tasks
63+
queueMetrics := NewQueueMetrics(r)
64+
tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, NewQueueLimits(limits), queueMetrics)
65+
66+
// Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour
67+
activeUsers := util.NewActiveUsersCleanupService(5*time.Minute, 1*time.Hour, func(user string) {
68+
queueMetrics.Cleanup(user)
69+
})
70+
5471
p := &Planner{
55-
cfg: cfg,
56-
limits: limits,
57-
schemaCfg: schemaCfg,
58-
tsdbStore: tsdbStore,
59-
bloomStore: bloomStore,
60-
metrics: NewMetrics(r),
61-
logger: logger,
72+
cfg: cfg,
73+
limits: limits,
74+
schemaCfg: schemaCfg,
75+
tsdbStore: tsdbStore,
76+
bloomStore: bloomStore,
77+
tasksQueue: tasksQueue,
78+
activeUsers: activeUsers,
79+
metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric),
80+
logger: logger,
81+
}
82+
83+
svcs := []services.Service{p.tasksQueue, p.activeUsers}
84+
p.subservices, err = services.NewManager(svcs...)
85+
if err != nil {
86+
return nil, fmt.Errorf("error creating subservices manager: %w", err)
6287
}
88+
p.subservicesWatcher = services.NewFailureWatcher()
89+
p.subservicesWatcher.WatchManager(p.subservices)
6390

6491
p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
6592
return p, nil
@@ -91,6 +118,7 @@ func (p *Planner) running(ctx context.Context) error {
91118
return err
92119

93120
case <-ticker.C:
121+
level.Info(p.logger).Log("msg", "starting bloom build iteration")
94122
if err := p.runOne(ctx); err != nil {
95123
level.Error(p.logger).Log("msg", "bloom build iteration failed", "err", err)
96124
}
@@ -109,44 +137,53 @@ func (p *Planner) runOne(ctx context.Context) error {
109137
}()
110138

111139
p.metrics.buildStarted.Inc()
112-
level.Info(p.logger).Log("msg", "running bloom build planning")
113140

114141
tables := p.tables(time.Now())
115142
level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays())
116143

117144
work, err := p.loadWork(ctx, tables)
118145
if err != nil {
119-
level.Error(p.logger).Log("msg", "error loading work", "err", err)
120146
return fmt.Errorf("error loading work: %w", err)
121147
}
122148

123-
// TODO: Enqueue instead of buffering here
124-
// This is just a placeholder for now
125-
var tasks []Task
126-
149+
var totalTasks int
127150
for _, w := range work {
151+
logger := log.With(p.logger, "tenant", w.tenant, "table", w.table.Addr(), "ownership", w.ownershipRange.String())
152+
128153
gaps, err := p.findGapsForBounds(ctx, w.tenant, w.table, w.ownershipRange)
129154
if err != nil {
130-
level.Error(p.logger).Log("msg", "error finding gaps", "err", err, "tenant", w.tenant, "table", w.table, "ownership", w.ownershipRange.String())
131-
return fmt.Errorf("error finding gaps for tenant (%s) in table (%s) for bounds (%s): %w", w.tenant, w.table, w.ownershipRange, err)
155+
level.Error(logger).Log("msg", "error finding gaps", "err", err)
156+
continue
132157
}
133158

159+
now := time.Now()
134160
for _, gap := range gaps {
135-
tasks = append(tasks, Task{
161+
totalTasks++
162+
task := Task{
136163
table: w.table.Addr(),
137164
tenant: w.tenant,
138165
OwnershipBounds: w.ownershipRange,
139166
tsdb: gap.tsdb,
140167
gaps: gap.gaps,
141-
})
168+
169+
queueTime: now,
170+
ctx: ctx,
171+
}
172+
173+
p.activeUsers.UpdateUserTimestamp(task.tenant, now)
174+
if err := p.tasksQueue.Enqueue(task.tenant, nil, task, nil); err != nil {
175+
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
176+
continue
177+
}
142178
}
143179
}
144180

181+
level.Debug(p.logger).Log("msg", "planning completed", "tasks", totalTasks)
182+
145183
status = statusSuccess
146184
level.Info(p.logger).Log(
147185
"msg", "bloom build iteration completed",
148186
"duration", time.Since(start).Seconds(),
149-
"tasks", len(tasks),
150187
)
151188
return nil
152189
}

pkg/bloombuild/planner/task.go

+7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package planner
22

33
import (
4+
"context"
5+
"time"
6+
47
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
58
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
69
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
@@ -19,4 +22,8 @@ type Task struct {
1922
OwnershipBounds v1.FingerprintBounds
2023
tsdb tsdb.SingleTenantTSDBIdentifier
2124
gaps []GapWithBlocks
25+
26+
// Tracking
27+
queueTime time.Time
28+
ctx context.Context
2229
}

pkg/validation/limits.go

+6
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ type Limits struct {
207207

208208
BloomCreationEnabled bool `yaml:"bloom_creation_enabled" json:"bloom_creation_enabled" category:"experimental"`
209209
BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"`
210+
BloomBuildMaxBuilders int `yaml:"bloom_build_max_builders" json:"bloom_build_max_builders" category:"experimental"`
210211

211212
BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length" category:"experimental"`
212213
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip" category:"experimental"`
@@ -385,6 +386,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
385386

386387
f.BoolVar(&l.BloomCreationEnabled, "bloom-build.enable", false, "Experimental. Whether to create blooms for the tenant.")
387388
f.IntVar(&l.BloomSplitSeriesKeyspaceBy, "bloom-build.split-keyspace-by", 256, "Experimental. Number of splits to create for the series keyspace when building blooms. The series keyspace is split into this many parts to parallelize bloom creation.")
389+
f.IntVar(&l.BloomBuildMaxBuilders, "bloom-build.max-builders", 0, "Experimental. Maximum number of builders to use when building blooms. 0 allows unlimited builders.")
388390

389391
_ = l.BloomCompactorMaxBloomSize.Set(defaultBloomCompactorMaxBloomSize)
390392
f.Var(&l.BloomCompactorMaxBloomSize, "bloom-compactor.max-bloom-size",
@@ -987,6 +989,10 @@ func (o *Overrides) BloomSplitSeriesKeyspaceBy(userID string) int {
987989
return o.getOverridesForUser(userID).BloomSplitSeriesKeyspaceBy
988990
}
989991

992+
func (o *Overrides) BloomBuildMaxBuilders(userID string) int {
993+
return o.getOverridesForUser(userID).BloomBuildMaxBuilders
994+
}
995+
990996
func (o *Overrides) BloomNGramLength(userID string) int {
991997
return o.getOverridesForUser(userID).BloomNGramLength
992998
}

0 commit comments

Comments
 (0)