Skip to content

Commit a338260

Browse files
feat: Remove flush loop and queue from Ingester RF-1
This commit removes the flush loop and flush queue from Ingester RF-1. This code is from the original ingester code, and is no longer needed since we have the WAL Manager.
1 parent 3ac130b commit a338260

File tree

4 files changed

+159
-140
lines changed

4 files changed

+159
-140
lines changed

pkg/ingester-rf1/flush.go

+24-44
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package ingesterrf1
22

33
import (
44
"crypto/rand"
5+
"errors"
56
"fmt"
67
"net/http"
7-
"strconv"
88
"time"
99

1010
"github.com/dustin/go-humanize"
@@ -17,7 +17,6 @@ import (
1717
"golang.org/x/net/context"
1818

1919
"github.com/grafana/loki/v3/pkg/storage/wal"
20-
"github.com/grafana/loki/v3/pkg/util"
2120
util_log "github.com/grafana/loki/v3/pkg/util/log"
2221
)
2322

@@ -38,19 +37,19 @@ const (
3837

3938
// Note: this is called both during the WAL replay (zero or more times)
4039
// and then after replay as well.
41-
func (i *Ingester) InitFlushQueues() {
42-
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
40+
func (i *Ingester) InitFlushWorkers() {
41+
i.flushWorkersDone.Add(i.cfg.ConcurrentFlushes)
4342
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
44-
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueues)
45-
go i.flushLoop(j)
43+
go i.flushWorker(j)
4644
}
4745
}
4846

4947
// Flush implements ring.FlushTransferer
5048
// Flush triggers a flush of all the chunks and closes the flush queues.
5149
// Called from the Lifecycler as part of the ingester shutdown.
5250
func (i *Ingester) Flush() {
53-
i.flush()
51+
i.wal.Close()
52+
i.flushWorkersDone.Wait()
5453
}
5554

5655
// TransferOut implements ring.FlushTransferer
@@ -60,76 +59,57 @@ func (i *Ingester) TransferOut(_ context.Context) error {
6059
return ring.ErrTransferDisabled
6160
}
6261

63-
func (i *Ingester) flush() {
64-
// TODO: Flush the last chunks
65-
// Close the flush queues, to unblock waiting workers.
66-
for _, flushQueue := range i.flushQueues {
67-
flushQueue.Close()
68-
}
69-
70-
i.flushQueuesDone.Wait()
71-
level.Debug(i.logger).Log("msg", "flush queues have drained")
72-
}
73-
7462
// FlushHandler triggers a flush of all in memory chunks. Mainly used for
7563
// local testing.
7664
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
7765
w.WriteHeader(http.StatusNoContent)
7866
}
7967

80-
type flushOp struct {
81-
it *wal.PendingItem
82-
num int64
83-
}
84-
85-
func (o *flushOp) Key() string {
86-
return strconv.Itoa(int(o.num))
87-
}
88-
89-
func (o *flushOp) Priority() int64 {
90-
return -o.num
91-
}
92-
93-
func (i *Ingester) flushLoop(j int) {
94-
l := log.With(i.logger, "loop", j)
68+
func (i *Ingester) flushWorker(j int) {
69+
l := log.With(i.logger, "worker", j)
9570
defer func() {
96-
level.Debug(l).Log("msg", "Ingester.flushLoop() exited")
97-
i.flushQueuesDone.Done()
71+
level.Debug(l).Log("msg", "Ingester.flushWorker() exited")
72+
i.flushWorkersDone.Done()
9873
}()
9974

10075
for {
101-
o := i.flushQueues[j].Dequeue()
102-
if o == nil {
76+
it, err := i.wal.NextPending()
77+
if errors.Is(err, wal.ErrClosed) {
10378
return
10479
}
105-
op := o.(*flushOp)
80+
81+
if it == nil {
82+
// TODO: Do something more clever here instead.
83+
time.Sleep(100 * time.Millisecond)
84+
continue
85+
}
10686

10787
start := time.Now()
10888

10989
// We'll use this to log the size of the segment that was flushed.
110-
n := op.it.Writer.InputSize()
90+
n := it.Writer.InputSize()
11191
humanized := humanize.Bytes(uint64(n))
11292

113-
err := i.flushOp(l, op)
93+
err = i.flushItem(l, it)
11494
d := time.Since(start)
11595
if err != nil {
11696
level.Error(l).Log("msg", "failed to flush", "size", humanized, "duration", d, "err", err)
11797
} else {
11898
level.Debug(l).Log("msg", "flushed", "size", humanized, "duration", d)
11999
}
120100

121-
op.it.Result.SetDone(err)
122-
i.wal.Put(op.it)
101+
it.Result.SetDone(err)
102+
i.wal.Put(it)
123103
}
124104
}
125105

126-
func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
106+
func (i *Ingester) flushItem(l log.Logger, it *wal.PendingItem) error {
127107
ctx, cancelFunc := context.WithCancel(context.Background())
128108
defer cancelFunc()
129109

130110
b := backoff.New(ctx, i.cfg.FlushOpBackoff)
131111
for b.Ongoing() {
132-
err := i.flushSegment(ctx, op.it.Writer)
112+
err := i.flushSegment(ctx, it.Writer)
133113
if err == nil {
134114
break
135115
}

pkg/ingester-rf1/ingester.go

+13-81
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"flag"
66
"fmt"
77
"io"
8-
"math/rand"
98
"net/http"
109
"os"
1110
"path"
@@ -202,15 +201,11 @@ type Ingester struct {
202201
store Storage
203202
periodicConfigs []config.PeriodConfig
204203

205-
loopDone sync.WaitGroup
206-
loopQuit chan struct{}
207204
tailersQuit chan struct{}
208205

209206
// One queue per flush thread. Fingerprint is used to
210207
// pick a queue.
211-
numOps int64
212-
flushQueues []*util.PriorityQueue
213-
flushQueuesDone sync.WaitGroup
208+
flushWorkersDone sync.WaitGroup
214209

215210
wal *wal.Manager
216211

@@ -270,17 +265,16 @@ func New(cfg Config, clientConfig client.Config,
270265
}
271266

272267
i := &Ingester{
273-
cfg: cfg,
274-
logger: logger,
275-
clientConfig: clientConfig,
276-
tenantConfigs: configs,
277-
instances: map[string]*instance{},
278-
store: storage,
279-
periodicConfigs: periodConfigs,
280-
loopQuit: make(chan struct{}),
281-
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
282-
tailersQuit: make(chan struct{}),
283-
metrics: metrics,
268+
cfg: cfg,
269+
logger: logger,
270+
clientConfig: clientConfig,
271+
tenantConfigs: configs,
272+
instances: map[string]*instance{},
273+
store: storage,
274+
periodicConfigs: periodConfigs,
275+
flushWorkersDone: sync.WaitGroup{},
276+
tailersQuit: make(chan struct{}),
277+
metrics: metrics,
284278
// flushOnShutdownSwitch: &OnceSwitch{},
285279
terminateOnShutdown: false,
286280
streamRateCalculator: NewStreamRateCalculator(),
@@ -401,7 +395,7 @@ func (i *Ingester) ServeHTTP(w http.ResponseWriter, r *http.Request) {
401395
}
402396

403397
func (i *Ingester) starting(ctx context.Context) error {
404-
i.InitFlushQueues()
398+
i.InitFlushWorkers()
405399

406400
// pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
407401
err := i.lifecycler.StartAsync(context.Background())
@@ -435,9 +429,6 @@ func (i *Ingester) starting(ctx context.Context) error {
435429
return fmt.Errorf("can not ensure recalculate owned streams service is running: %w", err)
436430
}
437431

438-
// start our loop
439-
i.loopDone.Add(1)
440-
go i.loop()
441432
return nil
442433
}
443434

@@ -457,8 +448,6 @@ func (i *Ingester) running(ctx context.Context) error {
457448
// instance.closeTailers()
458449
//}
459450

460-
close(i.loopQuit)
461-
i.loopDone.Wait()
462451
return serviceError
463452
}
464453

@@ -474,10 +463,7 @@ func (i *Ingester) stopping(_ error) error {
474463
//}
475464
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler))
476465

477-
for _, flushQueue := range i.flushQueues {
478-
flushQueue.Close()
479-
}
480-
i.flushQueuesDone.Wait()
466+
i.flushWorkersDone.Wait()
481467

482468
// i.streamRateCalculator.Stop()
483469

@@ -518,60 +504,6 @@ func (i *Ingester) removeShutdownMarkerFile() {
518504
}
519505
}
520506

521-
func (i *Ingester) loop() {
522-
defer i.loopDone.Done()
523-
524-
// Delay the first flush operation by up to 0.8x the flush time period.
525-
// This will ensure that multiple ingesters started at the same time do not
526-
// flush at the same time. Flushing at the same time can cause concurrently
527-
// writing the same chunk to object storage, which in AWS S3 leads to being
528-
// rate limited.
529-
jitter := time.Duration(rand.Int63n(int64(float64(i.cfg.FlushCheckPeriod.Nanoseconds()) * 0.8)))
530-
initialDelay := time.NewTimer(jitter)
531-
defer initialDelay.Stop()
532-
533-
level.Info(i.logger).Log("msg", "sleeping for initial delay before starting periodic flushing", "delay", jitter)
534-
535-
select {
536-
case <-initialDelay.C:
537-
// do nothing and continue with flush loop
538-
case <-i.loopQuit:
539-
// ingester stopped while waiting for initial delay
540-
return
541-
}
542-
543-
// Add +/- 20% of flush interval as jitter.
544-
// The default flush check period is 30s so max jitter will be 6s.
545-
j := i.cfg.FlushCheckPeriod / 5
546-
flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j)
547-
defer flushTicker.Stop()
548-
549-
for {
550-
select {
551-
case <-flushTicker.C:
552-
i.doFlushTick()
553-
case <-i.loopQuit:
554-
return
555-
}
556-
}
557-
}
558-
559-
func (i *Ingester) doFlushTick() {
560-
for {
561-
// Keep adding ops to the queue until there are no more.
562-
it := i.wal.NextPending()
563-
if it == nil {
564-
break
565-
}
566-
i.numOps++
567-
flushQueueIndex := i.numOps % int64(i.cfg.ConcurrentFlushes)
568-
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{
569-
num: i.numOps,
570-
it: it,
571-
})
572-
}
573-
}
574-
575507
// PrepareShutdown will handle the /ingester/prepare_shutdown endpoint.
576508
//
577509
// Internally, when triggered, this handler will configure the ingester service to release their resources whenever a SIGTERM is received.

pkg/storage/wal/manager.go

+33-8
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ const (
2525
)
2626

2727
var (
28+
// ErrClosed is returned when the WAL is closed.
29+
ErrClosed = errors.New("WAL is closed")
30+
2831
// ErrFull is returned when an append fails because the WAL is full. This
2932
// happens when all segments are either in the pending list waiting to be
3033
// flushed, or in the process of being flushed.
@@ -111,9 +114,10 @@ type Manager struct {
111114
// pending is a list of segments that are waiting to be flushed. Once
112115
// flushed, the segment is reset and moved to the back of the available
113116
// list to accept writes again.
114-
pending *list.List
115-
shutdown chan struct{}
116-
mu sync.Mutex
117+
pending *list.List
118+
119+
closed bool
120+
mu sync.Mutex
117121
}
118122

119123
// item is similar to PendingItem, but it is an internal struct used in the
@@ -141,7 +145,6 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
141145
metrics: metrics.ManagerMetrics,
142146
available: list.New(),
143147
pending: list.New(),
144-
shutdown: make(chan struct{}),
145148
}
146149
m.metrics.NumPending.Set(0)
147150
m.metrics.NumFlushing.Set(0)
@@ -162,6 +165,9 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
162165
func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
163166
m.mu.Lock()
164167
defer m.mu.Unlock()
168+
if m.closed {
169+
return nil, ErrClosed
170+
}
165171
if m.available.Len() == 0 {
166172
return nil, ErrFull
167173
}
@@ -185,9 +191,25 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
185191
return it.r, nil
186192
}
187193

194+
func (m *Manager) Close() {
195+
m.mu.Lock()
196+
defer m.mu.Unlock()
197+
m.closed = true
198+
if m.available.Len() > 0 {
199+
el := m.available.Front()
200+
it := el.Value.(*item)
201+
if it.w.InputSize() >= 0 {
202+
m.pending.PushBack(it)
203+
m.metrics.NumPending.Inc()
204+
m.available.Remove(el)
205+
m.metrics.NumAvailable.Dec()
206+
}
207+
}
208+
}
209+
188210
// NextPending returns the next segment to be flushed. It returns nil if the
189-
// pending list is empty.
190-
func (m *Manager) NextPending() *PendingItem {
211+
// pending list is empty, and ErrClosed if the WAL is closed.
212+
func (m *Manager) NextPending() (*PendingItem, error) {
191213
m.mu.Lock()
192214
defer m.mu.Unlock()
193215
if m.pending.Len() == 0 {
@@ -205,15 +227,18 @@ func (m *Manager) NextPending() *PendingItem {
205227
}
206228
// If the pending list is still empty return nil.
207229
if m.pending.Len() == 0 {
208-
return nil
230+
if m.closed {
231+
return nil, ErrClosed
232+
}
233+
return nil, nil
209234
}
210235
}
211236
el := m.pending.Front()
212237
it := el.Value.(*item)
213238
m.pending.Remove(el)
214239
m.metrics.NumPending.Dec()
215240
m.metrics.NumFlushing.Inc()
216-
return &PendingItem{Result: it.r, Writer: it.w}
241+
return &PendingItem{Result: it.r, Writer: it.w}, nil
217242
}
218243

219244
// Put resets the segment and puts it back in the available list to accept

0 commit comments

Comments
 (0)