Skip to content

Commit 421b4ee

Browse files
authored
Merge branch 'main' into helm-fix-targetport-name-query-scheduler
2 parents 7716d84 + f6529c2 commit 421b4ee

File tree

9 files changed

+2992
-68
lines changed

9 files changed

+2992
-68
lines changed

pkg/bloombuild/planner/metrics.go

+7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Metrics struct {
2424
connectedBuilders prometheus.GaugeFunc
2525
queueDuration prometheus.Histogram
2626
inflightRequests prometheus.Summary
27+
taskLost prometheus.Counter
2728

2829
buildStarted prometheus.Counter
2930
buildCompleted *prometheus.CounterVec
@@ -65,6 +66,12 @@ func NewMetrics(
6566
MaxAge: time.Minute,
6667
AgeBuckets: 6,
6768
}),
69+
taskLost: promauto.With(r).NewCounter(prometheus.CounterOpts{
70+
Namespace: metricsNamespace,
71+
Subsystem: metricsSubsystem,
72+
Name: "tasks_lost_total",
73+
Help: "Total number of tasks lost due to not being picked up by a builder and failed to be requeued.",
74+
}),
6875

6976
buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
7077
Namespace: metricsNamespace,

pkg/bloombuild/planner/planner.go

+164-30
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@ import (
44
"context"
55
"fmt"
66
"sort"
7+
"sync"
78
"time"
89

910
"github.com/go-kit/log"
1011
"github.com/go-kit/log/level"
1112
"github.com/grafana/dskit/services"
13+
"github.com/pkg/errors"
1214
"github.com/prometheus/client_golang/prometheus"
1315
"github.com/prometheus/common/model"
1416

17+
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
1518
"github.com/grafana/loki/v3/pkg/queue"
1619
"github.com/grafana/loki/v3/pkg/storage"
1720
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
@@ -22,6 +25,8 @@ import (
2225
utillog "github.com/grafana/loki/v3/pkg/util/log"
2326
)
2427

28+
var errPlannerIsNotRunning = errors.New("planner is not running")
29+
2530
type Planner struct {
2631
services.Service
2732
// Subservices manager.
@@ -38,6 +43,8 @@ type Planner struct {
3843
tasksQueue *queue.RequestQueue
3944
activeUsers *util.ActiveUsersCleanupService
4045

46+
pendingTasks sync.Map
47+
4148
metrics *Metrics
4249
logger log.Logger
4350
}
@@ -92,13 +99,23 @@ func New(
9299
return p, nil
93100
}
94101

95-
func (p *Planner) starting(_ context.Context) (err error) {
102+
func (p *Planner) starting(ctx context.Context) (err error) {
103+
if err := services.StartManagerAndAwaitHealthy(ctx, p.subservices); err != nil {
104+
return fmt.Errorf("error starting planner subservices: %w", err)
105+
}
106+
96107
p.metrics.running.Set(1)
97-
return err
108+
return nil
98109
}
99110

100111
func (p *Planner) stopping(_ error) error {
101-
p.metrics.running.Set(0)
112+
defer p.metrics.running.Set(0)
113+
114+
// This will also stop the requests queue, which stop accepting new requests and errors out any pending requests.
115+
if err := services.StopManagerAndAwaitStopped(context.Background(), p.subservices); err != nil {
116+
return fmt.Errorf("error stopping planner subservices: %w", err)
117+
}
118+
102119
return nil
103120
}
104121

@@ -108,20 +125,32 @@ func (p *Planner) running(ctx context.Context) error {
108125
level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err)
109126
}
110127

111-
ticker := time.NewTicker(p.cfg.PlanningInterval)
112-
defer ticker.Stop()
128+
planningTicker := time.NewTicker(p.cfg.PlanningInterval)
129+
defer planningTicker.Stop()
130+
131+
inflightTasksTicker := time.NewTicker(250 * time.Millisecond)
132+
defer inflightTasksTicker.Stop()
133+
113134
for {
114135
select {
115136
case <-ctx.Done():
116-
err := ctx.Err()
117-
level.Debug(p.logger).Log("msg", "planner context done", "err", err)
118-
return err
137+
if err := ctx.Err(); !errors.Is(err, context.Canceled) {
138+
level.Error(p.logger).Log("msg", "planner context done with error", "err", err)
139+
return err
140+
}
119141

120-
case <-ticker.C:
142+
level.Debug(p.logger).Log("msg", "planner context done")
143+
return nil
144+
145+
case <-planningTicker.C:
121146
level.Info(p.logger).Log("msg", "starting bloom build iteration")
122147
if err := p.runOne(ctx); err != nil {
123148
level.Error(p.logger).Log("msg", "bloom build iteration failed", "err", err)
124149
}
150+
151+
case <-inflightTasksTicker.C:
152+
inflight := p.totalPendingTasks()
153+
p.metrics.inflightRequests.Observe(float64(inflight))
125154
}
126155
}
127156
}
@@ -159,19 +188,13 @@ func (p *Planner) runOne(ctx context.Context) error {
159188
now := time.Now()
160189
for _, gap := range gaps {
161190
totalTasks++
162-
task := Task{
163-
table: w.table.Addr(),
164-
tenant: w.tenant,
165-
OwnershipBounds: w.ownershipRange,
166-
tsdb: gap.tsdb,
167-
gaps: gap.gaps,
168-
169-
queueTime: now,
170-
ctx: ctx,
171-
}
172191

173-
p.activeUsers.UpdateUserTimestamp(task.tenant, now)
174-
if err := p.tasksQueue.Enqueue(task.tenant, nil, task, nil); err != nil {
192+
task := NewTask(
193+
ctx, now,
194+
protos.NewTask(w.table.Addr(), w.tenant, w.ownershipRange, gap.tsdb, gap.gaps),
195+
)
196+
197+
if err := p.enqueueTask(task); err != nil {
175198
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
176199
continue
177200
}
@@ -326,7 +349,7 @@ func (p *Planner) findGapsForBounds(
326349
// This is a performance optimization to avoid expensive re-reindexing
327350
type blockPlan struct {
328351
tsdb tsdb.SingleTenantTSDBIdentifier
329-
gaps []GapWithBlocks
352+
gaps []protos.GapWithBlocks
330353
}
331354

332355
func (p *Planner) findOutdatedGaps(
@@ -420,12 +443,12 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
420443
for _, idx := range tsdbs {
421444
plan := blockPlan{
422445
tsdb: idx.tsdb,
423-
gaps: make([]GapWithBlocks, 0, len(idx.gaps)),
446+
gaps: make([]protos.GapWithBlocks, 0, len(idx.gaps)),
424447
}
425448

426449
for _, gap := range idx.gaps {
427-
planGap := GapWithBlocks{
428-
bounds: gap,
450+
planGap := protos.GapWithBlocks{
451+
Bounds: gap,
429452
}
430453

431454
for _, meta := range metas {
@@ -442,18 +465,18 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
442465
}
443466
// this block overlaps the gap, add it to the plan
444467
// for this gap
445-
planGap.blocks = append(planGap.blocks, block)
468+
planGap.Blocks = append(planGap.Blocks, block)
446469
}
447470
}
448471

449472
// ensure we sort blocks so deduping iterator works as expected
450-
sort.Slice(planGap.blocks, func(i, j int) bool {
451-
return planGap.blocks[i].Bounds.Less(planGap.blocks[j].Bounds)
473+
sort.Slice(planGap.Blocks, func(i, j int) bool {
474+
return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds)
452475
})
453476

454477
peekingBlocks := v1.NewPeekingIter[bloomshipper.BlockRef](
455478
v1.NewSliceIter[bloomshipper.BlockRef](
456-
planGap.blocks,
479+
planGap.Blocks,
457480
),
458481
)
459482
// dedupe blocks which could be in multiple metas
@@ -472,7 +495,7 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
472495
if err != nil {
473496
return nil, fmt.Errorf("failed to dedupe blocks: %w", err)
474497
}
475-
planGap.blocks = deduped
498+
planGap.Blocks = deduped
476499

477500
plan.gaps = append(plan.gaps, planGap)
478501
}
@@ -482,3 +505,114 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
482505

483506
return plans, nil
484507
}
508+
509+
func (p *Planner) addPendingTask(task *Task) {
510+
p.pendingTasks.Store(task.ID, task)
511+
}
512+
513+
func (p *Planner) removePendingTask(task *Task) {
514+
p.pendingTasks.Delete(task.ID)
515+
}
516+
517+
func (p *Planner) totalPendingTasks() (total int) {
518+
p.pendingTasks.Range(func(_, _ interface{}) bool {
519+
total++
520+
return true
521+
})
522+
return total
523+
}
524+
525+
func (p *Planner) enqueueTask(task *Task) error {
526+
p.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now())
527+
return p.tasksQueue.Enqueue(task.Tenant, nil, task, func() {
528+
p.addPendingTask(task)
529+
})
530+
}
531+
532+
func (p *Planner) NotifyBuilderShutdown(
533+
_ context.Context,
534+
req *protos.NotifyBuilderShutdownRequest,
535+
) (*protos.NotifyBuilderShutdownResponse, error) {
536+
level.Debug(p.logger).Log("msg", "builder shutdown", "builder", req.BuilderID)
537+
p.tasksQueue.UnregisterConsumerConnection(req.GetBuilderID())
538+
539+
return &protos.NotifyBuilderShutdownResponse{}, nil
540+
}
541+
542+
func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer) error {
543+
resp, err := builder.Recv()
544+
if err != nil {
545+
return fmt.Errorf("error receiving message from builder: %w", err)
546+
}
547+
548+
builderID := resp.GetBuilderID()
549+
logger := log.With(p.logger, "builder", builderID)
550+
level.Debug(logger).Log("msg", "builder connected")
551+
552+
p.tasksQueue.RegisterConsumerConnection(builderID)
553+
defer p.tasksQueue.UnregisterConsumerConnection(builderID)
554+
555+
lastIndex := queue.StartIndex
556+
for p.isRunningOrStopping() {
557+
item, idx, err := p.tasksQueue.Dequeue(builder.Context(), lastIndex, builderID)
558+
if err != nil {
559+
return fmt.Errorf("error dequeuing task: %w", err)
560+
}
561+
lastIndex = idx
562+
563+
if item == nil {
564+
565+
return fmt.Errorf("dequeue() call resulted in nil response. builder: %s", builderID)
566+
}
567+
task := item.(*Task)
568+
569+
queueTime := time.Since(task.queueTime)
570+
p.metrics.queueDuration.Observe(queueTime.Seconds())
571+
572+
if task.ctx.Err() != nil {
573+
level.Warn(logger).Log("msg", "task context done after dequeue", "err", task.ctx.Err())
574+
lastIndex = lastIndex.ReuseLastIndex()
575+
p.removePendingTask(task)
576+
continue
577+
}
578+
579+
if err := p.forwardTaskToBuilder(builder, builderID, task); err != nil {
580+
// Re-queue the task if the builder is failing to process the tasks
581+
if err := p.enqueueTask(task); err != nil {
582+
p.metrics.taskLost.Inc()
583+
level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err)
584+
}
585+
586+
return fmt.Errorf("error forwarding task to builder (%s). Task requeued: %w", builderID, err)
587+
}
588+
589+
}
590+
591+
return errPlannerIsNotRunning
592+
}
593+
594+
func (p *Planner) forwardTaskToBuilder(
595+
builder protos.PlannerForBuilder_BuilderLoopServer,
596+
builderID string,
597+
task *Task,
598+
) error {
599+
defer p.removePendingTask(task)
600+
601+
msg := &protos.PlannerToBuilder{
602+
Task: task.ToProtoTask(),
603+
}
604+
605+
if err := builder.Send(msg); err != nil {
606+
return fmt.Errorf("error sending task to builder (%s): %w", builderID, err)
607+
}
608+
609+
// TODO(salvacorts): Implement timeout and retry for builder response.
610+
_, err := builder.Recv()
611+
612+
return err
613+
}
614+
615+
func (p *Planner) isRunningOrStopping() bool {
616+
st := p.State()
617+
return st == services.Running || st == services.Stopping
618+
}

0 commit comments

Comments
 (0)