Skip to content

Commit ae77be1

Browse files
Add backoff to flush op
This commit adds a configurable backoff to flush ops in the ingester. This is to prevent situations where the store put operation fails fast (i.e. 401 Unauthorized) and can cause ingesters to be rate limited.
1 parent c996349 commit ae77be1

File tree

6 files changed

+217
-31
lines changed

6 files changed

+217
-31
lines changed

docs/sources/shared/configuration.md

+17-1
Original file line numberDiff line numberDiff line change
@@ -2752,7 +2752,23 @@ lifecycler:
27522752
# CLI flag: -ingester.flush-check-period
27532753
[flush_check_period: <duration> | default = 30s]
27542754
2755-
# The timeout before a flush is cancelled.
2755+
flush_op_backoff:
2756+
# Minimum backoff period when a flush fails. Each concurrent flush has its own
2757+
# backoff, see `ingester.concurrent-flushes`.
2758+
# CLI flag: -ingester.flush-op-backoff-min-period
2759+
[min_period: <duration> | default = 10s]
2760+
2761+
# Maximum backoff period when a flush fails. Each concurrent flush has its own
2762+
# backoff, see `ingester.concurrent-flushes`.
2763+
# CLI flag: -ingester.flush-op-backoff-max-period
2764+
[max_period: <duration> | default = 1m]
2765+
2766+
# Maximum retries for failed flushes.
2767+
# CLI flag: -ingester.flush-op-backoff-retries
2768+
[max_retries: <int> | default = 10]
2769+
2770+
# The timeout for an individual flush. Will be retried up to
2771+
# `flush-op-backoff-retries` times.
27562772
# CLI flag: -ingester.flush-op-timeout
27572773
[flush_op_timeout: <duration> | default = 10m]
27582774

pkg/ingester/flush.go

+27-7
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"sync"
88
"time"
99

10+
"github.com/go-kit/log"
1011
"github.com/go-kit/log/level"
12+
"github.com/grafana/dskit/backoff"
1113
"github.com/grafana/dskit/ring"
1214
"github.com/grafana/dskit/user"
1315
"github.com/prometheus/client_golang/prometheus"
@@ -135,8 +137,9 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
135137
}
136138

137139
func (i *Ingester) flushLoop(j int) {
140+
l := log.With(i.logger, "loop", j)
138141
defer func() {
139-
level.Debug(i.logger).Log("msg", "Ingester.flushLoop() exited")
142+
level.Debug(l).Log("msg", "Ingester.flushLoop() exited")
140143
i.flushQueuesDone.Done()
141144
}()
142145

@@ -147,9 +150,10 @@ func (i *Ingester) flushLoop(j int) {
147150
}
148151
op := o.(*flushOp)
149152

150-
err := i.flushUserSeries(op.userID, op.fp, op.immediate)
153+
m := util_log.WithUserID(op.userID, l)
154+
err := i.flushOp(m, op)
151155
if err != nil {
152-
level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "err", err)
156+
level.Error(m).Log("msg", "failed to flush", "err", err)
153157
}
154158

155159
// If we're exiting & we failed to flush, put the failed operation
@@ -161,7 +165,23 @@ func (i *Ingester) flushLoop(j int) {
161165
}
162166
}
163167

164-
func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
168+
func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
169+
ctx, cancelFunc := context.WithCancel(context.Background())
170+
defer cancelFunc()
171+
172+
b := backoff.New(ctx, i.cfg.FlushOpBackoff)
173+
for b.Ongoing() {
174+
err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate)
175+
if err == nil {
176+
break
177+
}
178+
level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err)
179+
b.Wait()
180+
}
181+
return b.Err()
182+
}
183+
184+
func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error {
165185
instance, ok := i.getInstanceByID(userID)
166186
if !ok {
167187
return nil
@@ -175,9 +195,9 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
175195
lbs := labels.String()
176196
level.Info(i.logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs)
177197

178-
ctx := user.InjectOrgID(context.Background(), userID)
179-
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
180-
defer cancel()
198+
ctx = user.InjectOrgID(ctx, userID)
199+
ctx, cancelFunc := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
200+
defer cancelFunc()
181201
err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx)
182202
if err != nil {
183203
return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs)

pkg/ingester/flush_test.go

+66
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ingester
22

33
import (
4+
"errors"
45
"fmt"
56
"os"
67
"sort"
@@ -102,6 +103,67 @@ func Benchmark_FlushLoop(b *testing.B) {
102103
}
103104
}
104105

106+
func Test_FlushOp(t *testing.T) {
107+
t.Run("no error", func(t *testing.T) {
108+
cfg := defaultIngesterTestConfig(t)
109+
cfg.FlushOpBackoff.MinBackoff = time.Second
110+
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
111+
cfg.FlushOpBackoff.MaxRetries = 1
112+
cfg.FlushCheckPeriod = 100 * time.Millisecond
113+
114+
_, ing := newTestStore(t, cfg, nil)
115+
116+
ctx := user.InjectOrgID(context.Background(), "foo")
117+
ins, err := ing.GetOrCreateInstance("foo")
118+
require.NoError(t, err)
119+
120+
lbs := makeRandomLabels()
121+
req := &logproto.PushRequest{Streams: []logproto.Stream{{
122+
Labels: lbs.String(),
123+
Entries: entries(5, time.Now()),
124+
}}}
125+
require.NoError(t, ins.Push(ctx, req))
126+
127+
time.Sleep(cfg.FlushCheckPeriod)
128+
require.NoError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
129+
immediate: true,
130+
userID: "foo",
131+
fp: ins.getHashForLabels(lbs),
132+
}))
133+
})
134+
135+
t.Run("max retries exceeded", func(t *testing.T) {
136+
cfg := defaultIngesterTestConfig(t)
137+
cfg.FlushOpBackoff.MinBackoff = time.Second
138+
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
139+
cfg.FlushOpBackoff.MaxRetries = 1
140+
cfg.FlushCheckPeriod = 100 * time.Millisecond
141+
142+
store, ing := newTestStore(t, cfg, nil)
143+
store.onPut = func(_ context.Context, _ []chunk.Chunk) error {
144+
return errors.New("failed to write chunks")
145+
}
146+
147+
ctx := user.InjectOrgID(context.Background(), "foo")
148+
ins, err := ing.GetOrCreateInstance("foo")
149+
require.NoError(t, err)
150+
151+
lbs := makeRandomLabels()
152+
req := &logproto.PushRequest{Streams: []logproto.Stream{{
153+
Labels: lbs.String(),
154+
Entries: entries(5, time.Now()),
155+
}}}
156+
require.NoError(t, ins.Push(ctx, req))
157+
158+
time.Sleep(cfg.FlushCheckPeriod)
159+
require.EqualError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
160+
immediate: true,
161+
userID: "foo",
162+
fp: ins.getHashForLabels(lbs),
163+
}), "terminated after 1 retries")
164+
})
165+
}
166+
105167
func Test_Flush(t *testing.T) {
106168
var (
107169
store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil)
@@ -297,6 +359,10 @@ func defaultIngesterTestConfig(t testing.TB) Config {
297359

298360
cfg := Config{}
299361
flagext.DefaultValues(&cfg)
362+
cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond
363+
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
364+
cfg.FlushOpBackoff.MaxRetries = 1
365+
cfg.FlushOpTimeout = 15 * time.Second
300366
cfg.FlushCheckPeriod = 99999 * time.Hour
301367
cfg.MaxChunkIdle = 99999 * time.Hour
302368
cfg.ConcurrentFlushes = 1

pkg/ingester/ingester.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/go-kit/log"
2323
"github.com/go-kit/log/level"
24+
"github.com/grafana/dskit/backoff"
2425
"github.com/grafana/dskit/concurrency"
2526
"github.com/grafana/dskit/modules"
2627
"github.com/grafana/dskit/multierror"
@@ -82,6 +83,7 @@ type Config struct {
8283

8384
ConcurrentFlushes int `yaml:"concurrent_flushes"`
8485
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
86+
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
8587
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
8688
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
8789
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
@@ -127,7 +129,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
127129

128130
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
129131
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
130-
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.")
132+
f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
133+
f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
134+
f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.")
135+
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.")
131136
f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.")
132137
f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
133138
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
@@ -155,6 +160,15 @@ func (cfg *Config) Validate() error {
155160
return err
156161
}
157162

163+
if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff {
164+
return errors.New("invalid flush op min backoff: cannot be larger than max backoff")
165+
}
166+
if cfg.FlushOpBackoff.MaxRetries <= 0 {
167+
return fmt.Errorf("invalid flush op max retries: %d", cfg.FlushOpBackoff.MaxRetries)
168+
}
169+
if cfg.FlushOpTimeout <= 0 {
170+
return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout)
171+
}
158172
if cfg.IndexShards <= 0 {
159173
return fmt.Errorf("invalid ingester index shard factor: %d", cfg.IndexShards)
160174
}

pkg/ingester/ingester_test.go

+82-19
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/go-kit/log"
1414
"github.com/go-kit/log/level"
15+
"github.com/grafana/dskit/backoff"
1516
"github.com/grafana/dskit/flagext"
1617
"github.com/grafana/dskit/httpgrpc"
1718
"github.com/grafana/dskit/middleware"
@@ -676,57 +677,119 @@ func TestIngester_asyncStoreMaxLookBack(t *testing.T) {
676677

677678
func TestValidate(t *testing.T) {
678679
for i, tc := range []struct {
679-
in Config
680-
err bool
681-
expected Config
680+
in Config
681+
expected Config
682+
expectedErr string
682683
}{
683684
{
684685
in: Config{
685-
MaxChunkAge: time.Minute,
686686
ChunkEncoding: chunkenc.EncGZIP.String(),
687-
IndexShards: index.DefaultIndexShards,
687+
FlushOpBackoff: backoff.Config{
688+
MinBackoff: 100 * time.Millisecond,
689+
MaxBackoff: 10 * time.Second,
690+
MaxRetries: 1,
691+
},
692+
FlushOpTimeout: 15 * time.Second,
693+
IndexShards: index.DefaultIndexShards,
694+
MaxChunkAge: time.Minute,
688695
},
689696
expected: Config{
697+
ChunkEncoding: chunkenc.EncGZIP.String(),
698+
FlushOpBackoff: backoff.Config{
699+
MinBackoff: 100 * time.Millisecond,
700+
MaxBackoff: 10 * time.Second,
701+
MaxRetries: 1,
702+
},
703+
FlushOpTimeout: 15 * time.Second,
704+
IndexShards: index.DefaultIndexShards,
690705
MaxChunkAge: time.Minute,
691-
ChunkEncoding: chunkenc.EncGZIP.String(),
692706
parsedEncoding: chunkenc.EncGZIP,
693-
IndexShards: index.DefaultIndexShards,
694707
},
695708
},
696709
{
697710
in: Config{
698711
ChunkEncoding: chunkenc.EncSnappy.String(),
699-
IndexShards: index.DefaultIndexShards,
712+
FlushOpBackoff: backoff.Config{
713+
MinBackoff: 100 * time.Millisecond,
714+
MaxBackoff: 10 * time.Second,
715+
MaxRetries: 1,
716+
},
717+
FlushOpTimeout: 15 * time.Second,
718+
IndexShards: index.DefaultIndexShards,
700719
},
701720
expected: Config{
702-
ChunkEncoding: chunkenc.EncSnappy.String(),
703-
parsedEncoding: chunkenc.EncSnappy,
721+
ChunkEncoding: chunkenc.EncSnappy.String(),
722+
FlushOpBackoff: backoff.Config{
723+
MinBackoff: 100 * time.Millisecond,
724+
MaxBackoff: 10 * time.Second,
725+
MaxRetries: 1,
726+
},
727+
FlushOpTimeout: 15 * time.Second,
704728
IndexShards: index.DefaultIndexShards,
729+
parsedEncoding: chunkenc.EncSnappy,
705730
},
706731
},
707732
{
708733
in: Config{
709-
IndexShards: index.DefaultIndexShards,
710734
ChunkEncoding: "bad-enc",
735+
FlushOpBackoff: backoff.Config{
736+
MinBackoff: 100 * time.Millisecond,
737+
MaxBackoff: 10 * time.Second,
738+
MaxRetries: 1,
739+
},
740+
FlushOpTimeout: 15 * time.Second,
741+
IndexShards: index.DefaultIndexShards,
742+
},
743+
expectedErr: "invalid encoding: bad-enc, supported: none, gzip, lz4-64k, snappy, lz4-256k, lz4-1M, lz4, flate, zstd",
744+
},
745+
{
746+
in: Config{
747+
ChunkEncoding: chunkenc.EncGZIP.String(),
748+
FlushOpBackoff: backoff.Config{
749+
MinBackoff: 100 * time.Millisecond,
750+
MaxBackoff: 10 * time.Second,
751+
},
752+
FlushOpTimeout: 15 * time.Second,
753+
IndexShards: index.DefaultIndexShards,
754+
MaxChunkAge: time.Minute,
755+
},
756+
expectedErr: "invalid flush op max retries: 0",
757+
},
758+
{
759+
in: Config{
760+
ChunkEncoding: chunkenc.EncGZIP.String(),
761+
FlushOpBackoff: backoff.Config{
762+
MinBackoff: 100 * time.Millisecond,
763+
MaxBackoff: 10 * time.Second,
764+
MaxRetries: 1,
765+
},
766+
IndexShards: index.DefaultIndexShards,
767+
MaxChunkAge: time.Minute,
711768
},
712-
err: true,
769+
expectedErr: "invalid flush op timeout: 0s",
713770
},
714771
{
715772
in: Config{
716-
MaxChunkAge: time.Minute,
717773
ChunkEncoding: chunkenc.EncGZIP.String(),
774+
FlushOpBackoff: backoff.Config{
775+
MinBackoff: 100 * time.Millisecond,
776+
MaxBackoff: 10 * time.Second,
777+
MaxRetries: 1,
778+
},
779+
FlushOpTimeout: 15 * time.Second,
780+
MaxChunkAge: time.Minute,
718781
},
719-
err: true,
782+
expectedErr: "invalid ingester index shard factor: 0",
720783
},
721784
} {
722785
t.Run(fmt.Sprint(i), func(t *testing.T) {
723786
err := tc.in.Validate()
724-
if tc.err {
725-
require.NotNil(t, err)
726-
return
787+
if tc.expectedErr != "" {
788+
require.EqualError(t, err, tc.expectedErr)
789+
} else {
790+
require.NoError(t, err)
791+
require.Equal(t, tc.expected, tc.in)
727792
}
728-
require.Nil(t, err)
729-
require.Equal(t, tc.expected, tc.in)
730793
})
731794
}
732795
}

0 commit comments

Comments
 (0)