Skip to content

Commit d26e496

Browse files
Add backoff to flush op
This commit adds a configurable backoff to flush ops in the ingester. This is to prevent situations where the store put operation fails fast (i.e. 401 Unauthorized) and can cause ingesters to be rate limited.
1 parent 86b119a commit d26e496

File tree

6 files changed

+264
-31
lines changed

6 files changed

+264
-31
lines changed

docs/sources/shared/configuration.md

+17-1
Original file line numberDiff line numberDiff line change
@@ -2752,7 +2752,23 @@ lifecycler:
27522752
# CLI flag: -ingester.flush-check-period
27532753
[flush_check_period: <duration> | default = 30s]
27542754
2755-
# The timeout before a flush is cancelled.
2755+
flush_op_backoff:
2756+
# Minimum backoff period when a flush fails. Each concurrent flush has its own
2757+
# backoff, see `ingester.concurrent-flushes`.
2758+
# CLI flag: -ingester.flush-op-backoff-min-period
2759+
[min_period: <duration> | default = 10s]
2760+
2761+
# Maximum backoff period when a flush fails. Each concurrent flush has its own
2762+
# backoff, see `ingester.concurrent-flushes`.
2763+
# CLI flag: -ingester.flush-op-backoff-max-period
2764+
[max_period: <duration> | default = 1m]
2765+
2766+
# Maximum retries for failed flushes. Is canceled when
2767+
# `ingester.flush-op-timeout` is exceeded.
2768+
# CLI flag: -ingester.flush-op-backoff-retries
2769+
[max_retries: <int> | default = 10]
2770+
2771+
# The timeout before a flush is canceled.
27562772
# CLI flag: -ingester.flush-op-timeout
27572773
[flush_op_timeout: <duration> | default = 10m]
27582774

pkg/ingester/flush.go

+41-7
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package ingester
22

33
import (
44
"bytes"
5+
"errors"
56
"fmt"
67
"net/http"
78
"sync"
89
"time"
910

11+
"github.com/go-kit/log"
1012
"github.com/go-kit/log/level"
13+
"github.com/grafana/dskit/backoff"
1114
"github.com/grafana/dskit/ring"
1215
"github.com/grafana/dskit/user"
1316
"github.com/prometheus/client_golang/prometheus"
@@ -135,8 +138,9 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
135138
}
136139

137140
func (i *Ingester) flushLoop(j int) {
141+
l := log.With(i.logger, "loop", j)
138142
defer func() {
139-
level.Debug(i.logger).Log("msg", "Ingester.flushLoop() exited")
143+
level.Debug(l).Log("msg", "Ingester.flushLoop() exited")
140144
i.flushQueuesDone.Done()
141145
}()
142146

@@ -147,9 +151,10 @@ func (i *Ingester) flushLoop(j int) {
147151
}
148152
op := o.(*flushOp)
149153

150-
err := i.flushUserSeries(op.userID, op.fp, op.immediate)
154+
m := util_log.WithUserID(op.userID, l)
155+
err := i.flushOp(m, op)
151156
if err != nil {
152-
level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "err", err)
157+
level.Error(m).Log("msg", "failed to flush", "err", err)
153158
}
154159

155160
// If we're exiting & we failed to flush, put the failed operation
@@ -161,7 +166,39 @@ func (i *Ingester) flushLoop(j int) {
161166
}
162167
}
163168

164-
func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
169+
func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
170+
// A flush is retried until either it is successful, the maximum number
171+
// of retries is exceeded, or the timeout has expired. The context is
172+
// used to cancel the backoff should the latter happen.
173+
ctx, cancelFunc := context.WithTimeout(context.Background(), i.cfg.FlushOpTimeout)
174+
defer cancelFunc()
175+
176+
b := backoff.New(ctx, i.cfg.FlushOpBackoff)
177+
for b.Ongoing() {
178+
err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate)
179+
if err == nil {
180+
break
181+
}
182+
level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err)
183+
b.Wait()
184+
}
185+
186+
if err := b.Err(); err != nil {
187+
// If we got here then either the maximum number of retries have been
188+
// exceeded or the timeout expired. We do not need to check ctx.Err()
189+
// as it is checked in b.Err().
190+
if errors.Is(err, context.DeadlineExceeded) {
191+
return fmt.Errorf("timed out after %s: %w", i.cfg.FlushOpTimeout, err)
192+
}
193+
return err
194+
}
195+
196+
return nil
197+
}
198+
199+
func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error {
200+
ctx = user.InjectOrgID(ctx, userID)
201+
165202
instance, ok := i.getInstanceByID(userID)
166203
if !ok {
167204
return nil
@@ -175,9 +212,6 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
175212
lbs := labels.String()
176213
level.Info(i.logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs)
177214

178-
ctx := user.InjectOrgID(context.Background(), userID)
179-
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
180-
defer cancel()
181215
err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx)
182216
if err != nil {
183217
return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs)

pkg/ingester/flush_test.go

+99
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ingester
22

33
import (
4+
"errors"
45
"fmt"
56
"os"
67
"sort"
@@ -102,6 +103,100 @@ func Benchmark_FlushLoop(b *testing.B) {
102103
}
103104
}
104105

106+
func Test_FlushOp(t *testing.T) {
107+
t.Run("no error", func(t *testing.T) {
108+
cfg := defaultIngesterTestConfig(t)
109+
cfg.FlushOpBackoff.MinBackoff = time.Second
110+
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
111+
cfg.FlushOpBackoff.MaxRetries = 1
112+
cfg.FlushCheckPeriod = 100 * time.Millisecond
113+
114+
_, ing := newTestStore(t, cfg, nil)
115+
116+
ctx := user.InjectOrgID(context.Background(), "foo")
117+
ins, err := ing.GetOrCreateInstance("foo")
118+
require.NoError(t, err)
119+
120+
lbs := makeRandomLabels()
121+
req := &logproto.PushRequest{Streams: []logproto.Stream{{
122+
Labels: lbs.String(),
123+
Entries: entries(5, time.Now()),
124+
}}}
125+
require.NoError(t, ins.Push(ctx, req))
126+
127+
time.Sleep(cfg.FlushCheckPeriod)
128+
require.NoError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
129+
immediate: true,
130+
userID: "foo",
131+
fp: ins.getHashForLabels(lbs),
132+
}))
133+
})
134+
135+
t.Run("max retries exceeded", func(t *testing.T) {
136+
cfg := defaultIngesterTestConfig(t)
137+
cfg.FlushOpBackoff.MinBackoff = time.Second
138+
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
139+
cfg.FlushOpBackoff.MaxRetries = 1
140+
cfg.FlushCheckPeriod = 100 * time.Millisecond
141+
142+
store, ing := newTestStore(t, cfg, nil)
143+
store.onPut = func(_ context.Context, _ []chunk.Chunk) error {
144+
return errors.New("failed to write chunks")
145+
}
146+
147+
ctx := user.InjectOrgID(context.Background(), "foo")
148+
ins, err := ing.GetOrCreateInstance("foo")
149+
require.NoError(t, err)
150+
151+
lbs := makeRandomLabels()
152+
req := &logproto.PushRequest{Streams: []logproto.Stream{{
153+
Labels: lbs.String(),
154+
Entries: entries(5, time.Now()),
155+
}}}
156+
require.NoError(t, ins.Push(ctx, req))
157+
158+
time.Sleep(cfg.FlushCheckPeriod)
159+
require.EqualError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
160+
immediate: true,
161+
userID: "foo",
162+
fp: ins.getHashForLabels(lbs),
163+
}), "terminated after 1 retries")
164+
})
165+
166+
t.Run("timeout expired", func(t *testing.T) {
167+
cfg := defaultIngesterTestConfig(t)
168+
cfg.FlushOpBackoff.MinBackoff = time.Second
169+
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
170+
cfg.FlushOpBackoff.MaxRetries = 1
171+
cfg.FlushOpTimeout = 100 * time.Millisecond
172+
cfg.FlushCheckPeriod = 100 * time.Millisecond
173+
174+
store, ing := newTestStore(t, cfg, nil)
175+
store.onPut = func(_ context.Context, _ []chunk.Chunk) error {
176+
time.Sleep(150 * time.Millisecond)
177+
return errors.New("store is unavailable")
178+
}
179+
180+
ctx := user.InjectOrgID(context.Background(), "foo")
181+
ins, err := ing.GetOrCreateInstance("foo")
182+
require.NoError(t, err)
183+
184+
lbs := makeRandomLabels()
185+
req := &logproto.PushRequest{Streams: []logproto.Stream{{
186+
Labels: lbs.String(),
187+
Entries: entries(5, time.Now()),
188+
}}}
189+
require.NoError(t, ins.Push(ctx, req))
190+
191+
time.Sleep(cfg.FlushCheckPeriod)
192+
require.EqualError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
193+
immediate: true,
194+
userID: "foo",
195+
fp: ins.getHashForLabels(lbs),
196+
}), "timed out after 100ms: context deadline exceeded")
197+
})
198+
}
199+
105200
func Test_Flush(t *testing.T) {
106201
var (
107202
store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil)
@@ -297,6 +392,10 @@ func defaultIngesterTestConfig(t testing.TB) Config {
297392

298393
cfg := Config{}
299394
flagext.DefaultValues(&cfg)
395+
cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond
396+
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
397+
cfg.FlushOpBackoff.MaxRetries = 1
398+
cfg.FlushOpTimeout = 15 * time.Second
300399
cfg.FlushCheckPeriod = 99999 * time.Hour
301400
cfg.MaxChunkIdle = 99999 * time.Hour
302401
cfg.ConcurrentFlushes = 1

pkg/ingester/ingester.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/go-kit/log"
2323
"github.com/go-kit/log/level"
24+
"github.com/grafana/dskit/backoff"
2425
"github.com/grafana/dskit/concurrency"
2526
"github.com/grafana/dskit/modules"
2627
"github.com/grafana/dskit/multierror"
@@ -82,6 +83,7 @@ type Config struct {
8283

8384
ConcurrentFlushes int `yaml:"concurrent_flushes"`
8485
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
86+
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
8587
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
8688
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
8789
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
@@ -127,7 +129,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
127129

128130
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
129131
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
130-
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.")
132+
f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
133+
f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
134+
f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes. Is canceled when `ingester.flush-op-timeout` is exceeded.")
135+
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is canceled.")
131136
f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.")
132137
f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
133138
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
@@ -155,6 +160,15 @@ func (cfg *Config) Validate() error {
155160
return err
156161
}
157162

163+
if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff {
164+
return errors.New("invalid flush op min backoff: cannot be larger than max backoff")
165+
}
166+
if cfg.FlushOpBackoff.MaxRetries <= 0 {
167+
return fmt.Errorf("invalid flush op max retries: %d", cfg.FlushOpBackoff.MaxRetries)
168+
}
169+
if cfg.FlushOpTimeout <= 0 {
170+
return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout)
171+
}
158172
if cfg.IndexShards <= 0 {
159173
return fmt.Errorf("invalid ingester index shard factor: %d", cfg.IndexShards)
160174
}

0 commit comments

Comments
 (0)