@@ -4,14 +4,17 @@ import (
4
4
"context"
5
5
"fmt"
6
6
"sort"
7
+ "sync"
7
8
"time"
8
9
9
10
"github.com/go-kit/log"
10
11
"github.com/go-kit/log/level"
11
12
"github.com/grafana/dskit/services"
13
+ "github.com/pkg/errors"
12
14
"github.com/prometheus/client_golang/prometheus"
13
15
"github.com/prometheus/common/model"
14
16
17
+ "github.com/grafana/loki/v3/pkg/bloombuild/protos"
15
18
"github.com/grafana/loki/v3/pkg/queue"
16
19
"github.com/grafana/loki/v3/pkg/storage"
17
20
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
@@ -22,6 +25,8 @@ import (
22
25
utillog "github.com/grafana/loki/v3/pkg/util/log"
23
26
)
24
27
28
+ var errPlannerIsNotRunning = errors .New ("planner is not running" )
29
+
25
30
type Planner struct {
26
31
services.Service
27
32
// Subservices manager.
@@ -38,6 +43,8 @@ type Planner struct {
38
43
tasksQueue * queue.RequestQueue
39
44
activeUsers * util.ActiveUsersCleanupService
40
45
46
+ pendingTasks sync.Map
47
+
41
48
metrics * Metrics
42
49
logger log.Logger
43
50
}
@@ -92,13 +99,23 @@ func New(
92
99
return p , nil
93
100
}
94
101
95
- func (p * Planner ) starting (_ context.Context ) (err error ) {
102
+ func (p * Planner ) starting (ctx context.Context ) (err error ) {
103
+ if err := services .StartManagerAndAwaitHealthy (ctx , p .subservices ); err != nil {
104
+ return fmt .Errorf ("error starting planner subservices: %w" , err )
105
+ }
106
+
96
107
p .metrics .running .Set (1 )
97
- return err
108
+ return nil
98
109
}
99
110
100
111
func (p * Planner ) stopping (_ error ) error {
101
- p .metrics .running .Set (0 )
112
+ defer p .metrics .running .Set (0 )
113
+
114
+ // This will also stop the requests queue, which stop accepting new requests and errors out any pending requests.
115
+ if err := services .StopManagerAndAwaitStopped (context .Background (), p .subservices ); err != nil {
116
+ return fmt .Errorf ("error stopping planner subservices: %w" , err )
117
+ }
118
+
102
119
return nil
103
120
}
104
121
@@ -108,20 +125,32 @@ func (p *Planner) running(ctx context.Context) error {
108
125
level .Error (p .logger ).Log ("msg" , "bloom build iteration failed for the first time" , "err" , err )
109
126
}
110
127
111
- ticker := time .NewTicker (p .cfg .PlanningInterval )
112
- defer ticker .Stop ()
128
+ planningTicker := time .NewTicker (p .cfg .PlanningInterval )
129
+ defer planningTicker .Stop ()
130
+
131
+ inflightTasksTicker := time .NewTicker (250 * time .Millisecond )
132
+ defer inflightTasksTicker .Stop ()
133
+
113
134
for {
114
135
select {
115
136
case <- ctx .Done ():
116
- err := ctx .Err ()
117
- level .Debug (p .logger ).Log ("msg" , "planner context done" , "err" , err )
118
- return err
137
+ if err := ctx .Err (); ! errors .Is (err , context .Canceled ) {
138
+ level .Error (p .logger ).Log ("msg" , "planner context done with error" , "err" , err )
139
+ return err
140
+ }
119
141
120
- case <- ticker .C :
142
+ level .Debug (p .logger ).Log ("msg" , "planner context done" )
143
+ return nil
144
+
145
+ case <- planningTicker .C :
121
146
level .Info (p .logger ).Log ("msg" , "starting bloom build iteration" )
122
147
if err := p .runOne (ctx ); err != nil {
123
148
level .Error (p .logger ).Log ("msg" , "bloom build iteration failed" , "err" , err )
124
149
}
150
+
151
+ case <- inflightTasksTicker .C :
152
+ inflight := p .totalPendingTasks ()
153
+ p .metrics .inflightRequests .Observe (float64 (inflight ))
125
154
}
126
155
}
127
156
}
@@ -159,19 +188,13 @@ func (p *Planner) runOne(ctx context.Context) error {
159
188
now := time .Now ()
160
189
for _ , gap := range gaps {
161
190
totalTasks ++
162
- task := Task {
163
- table : w .table .Addr (),
164
- tenant : w .tenant ,
165
- OwnershipBounds : w .ownershipRange ,
166
- tsdb : gap .tsdb ,
167
- gaps : gap .gaps ,
168
-
169
- queueTime : now ,
170
- ctx : ctx ,
171
- }
172
191
173
- p .activeUsers .UpdateUserTimestamp (task .tenant , now )
174
- if err := p .tasksQueue .Enqueue (task .tenant , nil , task , nil ); err != nil {
192
+ task := NewTask (
193
+ ctx , now ,
194
+ protos .NewTask (w .table .Addr (), w .tenant , w .ownershipRange , gap .tsdb , gap .gaps ),
195
+ )
196
+
197
+ if err := p .enqueueTask (task ); err != nil {
175
198
level .Error (logger ).Log ("msg" , "error enqueuing task" , "err" , err )
176
199
continue
177
200
}
@@ -326,7 +349,7 @@ func (p *Planner) findGapsForBounds(
326
349
// This is a performance optimization to avoid expensive re-reindexing
327
350
type blockPlan struct {
328
351
tsdb tsdb.SingleTenantTSDBIdentifier
329
- gaps []GapWithBlocks
352
+ gaps []protos. GapWithBlocks
330
353
}
331
354
332
355
func (p * Planner ) findOutdatedGaps (
@@ -420,12 +443,12 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
420
443
for _ , idx := range tsdbs {
421
444
plan := blockPlan {
422
445
tsdb : idx .tsdb ,
423
- gaps : make ([]GapWithBlocks , 0 , len (idx .gaps )),
446
+ gaps : make ([]protos. GapWithBlocks , 0 , len (idx .gaps )),
424
447
}
425
448
426
449
for _ , gap := range idx .gaps {
427
- planGap := GapWithBlocks {
428
- bounds : gap ,
450
+ planGap := protos. GapWithBlocks {
451
+ Bounds : gap ,
429
452
}
430
453
431
454
for _ , meta := range metas {
@@ -442,18 +465,18 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
442
465
}
443
466
// this block overlaps the gap, add it to the plan
444
467
// for this gap
445
- planGap .blocks = append (planGap .blocks , block )
468
+ planGap .Blocks = append (planGap .Blocks , block )
446
469
}
447
470
}
448
471
449
472
// ensure we sort blocks so deduping iterator works as expected
450
- sort .Slice (planGap .blocks , func (i , j int ) bool {
451
- return planGap .blocks [i ].Bounds .Less (planGap .blocks [j ].Bounds )
473
+ sort .Slice (planGap .Blocks , func (i , j int ) bool {
474
+ return planGap .Blocks [i ].Bounds .Less (planGap .Blocks [j ].Bounds )
452
475
})
453
476
454
477
peekingBlocks := v1.NewPeekingIter [bloomshipper.BlockRef ](
455
478
v1.NewSliceIter [bloomshipper.BlockRef ](
456
- planGap .blocks ,
479
+ planGap .Blocks ,
457
480
),
458
481
)
459
482
// dedupe blocks which could be in multiple metas
@@ -472,7 +495,7 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
472
495
if err != nil {
473
496
return nil , fmt .Errorf ("failed to dedupe blocks: %w" , err )
474
497
}
475
- planGap .blocks = deduped
498
+ planGap .Blocks = deduped
476
499
477
500
plan .gaps = append (plan .gaps , planGap )
478
501
}
@@ -482,3 +505,114 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
482
505
483
506
return plans , nil
484
507
}
508
+
509
+ func (p * Planner ) addPendingTask (task * Task ) {
510
+ p .pendingTasks .Store (task .ID , task )
511
+ }
512
+
513
+ func (p * Planner ) removePendingTask (task * Task ) {
514
+ p .pendingTasks .Delete (task .ID )
515
+ }
516
+
517
+ func (p * Planner ) totalPendingTasks () (total int ) {
518
+ p .pendingTasks .Range (func (_ , _ interface {}) bool {
519
+ total ++
520
+ return true
521
+ })
522
+ return total
523
+ }
524
+
525
+ func (p * Planner ) enqueueTask (task * Task ) error {
526
+ p .activeUsers .UpdateUserTimestamp (task .Tenant , time .Now ())
527
+ return p .tasksQueue .Enqueue (task .Tenant , nil , task , func () {
528
+ p .addPendingTask (task )
529
+ })
530
+ }
531
+
532
+ func (p * Planner ) NotifyBuilderShutdown (
533
+ _ context.Context ,
534
+ req * protos.NotifyBuilderShutdownRequest ,
535
+ ) (* protos.NotifyBuilderShutdownResponse , error ) {
536
+ level .Debug (p .logger ).Log ("msg" , "builder shutdown" , "builder" , req .BuilderID )
537
+ p .tasksQueue .UnregisterConsumerConnection (req .GetBuilderID ())
538
+
539
+ return & protos.NotifyBuilderShutdownResponse {}, nil
540
+ }
541
+
542
+ func (p * Planner ) BuilderLoop (builder protos.PlannerForBuilder_BuilderLoopServer ) error {
543
+ resp , err := builder .Recv ()
544
+ if err != nil {
545
+ return fmt .Errorf ("error receiving message from builder: %w" , err )
546
+ }
547
+
548
+ builderID := resp .GetBuilderID ()
549
+ logger := log .With (p .logger , "builder" , builderID )
550
+ level .Debug (logger ).Log ("msg" , "builder connected" )
551
+
552
+ p .tasksQueue .RegisterConsumerConnection (builderID )
553
+ defer p .tasksQueue .UnregisterConsumerConnection (builderID )
554
+
555
+ lastIndex := queue .StartIndex
556
+ for p .isRunningOrStopping () {
557
+ item , idx , err := p .tasksQueue .Dequeue (builder .Context (), lastIndex , builderID )
558
+ if err != nil {
559
+ return fmt .Errorf ("error dequeuing task: %w" , err )
560
+ }
561
+ lastIndex = idx
562
+
563
+ if item == nil {
564
+
565
+ return fmt .Errorf ("dequeue() call resulted in nil response. builder: %s" , builderID )
566
+ }
567
+ task := item .(* Task )
568
+
569
+ queueTime := time .Since (task .queueTime )
570
+ p .metrics .queueDuration .Observe (queueTime .Seconds ())
571
+
572
+ if task .ctx .Err () != nil {
573
+ level .Warn (logger ).Log ("msg" , "task context done after dequeue" , "err" , task .ctx .Err ())
574
+ lastIndex = lastIndex .ReuseLastIndex ()
575
+ p .removePendingTask (task )
576
+ continue
577
+ }
578
+
579
+ if err := p .forwardTaskToBuilder (builder , builderID , task ); err != nil {
580
+ // Re-queue the task if the builder is failing to process the tasks
581
+ if err := p .enqueueTask (task ); err != nil {
582
+ p .metrics .taskLost .Inc ()
583
+ level .Error (logger ).Log ("msg" , "error re-enqueuing task. this task will be lost" , "err" , err )
584
+ }
585
+
586
+ return fmt .Errorf ("error forwarding task to builder (%s). Task requeued: %w" , builderID , err )
587
+ }
588
+
589
+ }
590
+
591
+ return errPlannerIsNotRunning
592
+ }
593
+
594
+ func (p * Planner ) forwardTaskToBuilder (
595
+ builder protos.PlannerForBuilder_BuilderLoopServer ,
596
+ builderID string ,
597
+ task * Task ,
598
+ ) error {
599
+ defer p .removePendingTask (task )
600
+
601
+ msg := & protos.PlannerToBuilder {
602
+ Task : task .ToProtoTask (),
603
+ }
604
+
605
+ if err := builder .Send (msg ); err != nil {
606
+ return fmt .Errorf ("error sending task to builder (%s): %w" , builderID , err )
607
+ }
608
+
609
+ // TODO(salvacorts): Implement timeout and retry for builder response.
610
+ _ , err := builder .Recv ()
611
+
612
+ return err
613
+ }
614
+
615
+ func (p * Planner ) isRunningOrStopping () bool {
616
+ st := p .State ()
617
+ return st == services .Running || st == services .Stopping
618
+ }
0 commit comments