Skip to content

Commit 9767807

Browse files
feat: Add backoff to flush op (grafana#13140)
1 parent b31e04e commit 9767807

File tree

6 files changed

+217
-31
lines changed

6 files changed

+217
-31
lines changed

docs/sources/shared/configuration.md

+17-1
Original file line numberDiff line numberDiff line change
@@ -2752,7 +2752,23 @@ lifecycler:
27522752
# CLI flag: -ingester.flush-check-period
27532753
[flush_check_period: <duration> | default = 30s]
27542754
2755-
# The timeout before a flush is cancelled.
2755+
flush_op_backoff:
2756+
# Minimum backoff period when a flush fails. Each concurrent flush has its own
2757+
# backoff, see `ingester.concurrent-flushes`.
2758+
# CLI flag: -ingester.flush-op-backoff-min-period
2759+
[min_period: <duration> | default = 10s]
2760+
2761+
# Maximum backoff period when a flush fails. Each concurrent flush has its own
2762+
# backoff, see `ingester.concurrent-flushes`.
2763+
# CLI flag: -ingester.flush-op-backoff-max-period
2764+
[max_period: <duration> | default = 1m]
2765+
2766+
# Maximum retries for failed flushes.
2767+
# CLI flag: -ingester.flush-op-backoff-retries
2768+
[max_retries: <int> | default = 10]
2769+
2770+
# The timeout for an individual flush. Will be retried up to
2771+
# `flush-op-backoff-retries` times.
27562772
# CLI flag: -ingester.flush-op-timeout
27572773
[flush_op_timeout: <duration> | default = 10m]
27582774

pkg/ingester/flush.go

+27-7
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"sync"
88
"time"
99

10+
"github.com/go-kit/log"
1011
"github.com/go-kit/log/level"
12+
"github.com/grafana/dskit/backoff"
1113
"github.com/grafana/dskit/ring"
1214
"github.com/grafana/dskit/user"
1315
"github.com/prometheus/client_golang/prometheus"
@@ -135,8 +137,9 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
135137
}
136138

137139
func (i *Ingester) flushLoop(j int) {
140+
l := log.With(i.logger, "loop", j)
138141
defer func() {
139-
level.Debug(i.logger).Log("msg", "Ingester.flushLoop() exited")
142+
level.Debug(l).Log("msg", "Ingester.flushLoop() exited")
140143
i.flushQueuesDone.Done()
141144
}()
142145

@@ -147,9 +150,10 @@ func (i *Ingester) flushLoop(j int) {
147150
}
148151
op := o.(*flushOp)
149152

150-
err := i.flushUserSeries(op.userID, op.fp, op.immediate)
153+
m := util_log.WithUserID(op.userID, l)
154+
err := i.flushOp(m, op)
151155
if err != nil {
152-
level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "err", err)
156+
level.Error(m).Log("msg", "failed to flush", "err", err)
153157
}
154158

155159
// If we're exiting & we failed to flush, put the failed operation
@@ -161,7 +165,23 @@ func (i *Ingester) flushLoop(j int) {
161165
}
162166
}
163167

164-
func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
168+
func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
169+
ctx, cancelFunc := context.WithCancel(context.Background())
170+
defer cancelFunc()
171+
172+
b := backoff.New(ctx, i.cfg.FlushOpBackoff)
173+
for b.Ongoing() {
174+
err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate)
175+
if err == nil {
176+
break
177+
}
178+
level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err)
179+
b.Wait()
180+
}
181+
return b.Err()
182+
}
183+
184+
func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error {
165185
instance, ok := i.getInstanceByID(userID)
166186
if !ok {
167187
return nil
@@ -175,9 +195,9 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
175195
lbs := labels.String()
176196
level.Info(i.logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs)
177197

178-
ctx := user.InjectOrgID(context.Background(), userID)
179-
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
180-
defer cancel()
198+
ctx = user.InjectOrgID(ctx, userID)
199+
ctx, cancelFunc := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
200+
defer cancelFunc()
181201
err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx)
182202
if err != nil {
183203
return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs)

pkg/ingester/flush_test.go

+66
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ingester
22

33
import (
4+
"errors"
45
"fmt"
56
"os"
67
"sort"
@@ -102,6 +103,67 @@ func Benchmark_FlushLoop(b *testing.B) {
102103
}
103104
}
104105

106+
func Test_FlushOp(t *testing.T) {
107+
t.Run("no error", func(t *testing.T) {
108+
cfg := defaultIngesterTestConfig(t)
109+
cfg.FlushOpBackoff.MinBackoff = time.Second
110+
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
111+
cfg.FlushOpBackoff.MaxRetries = 1
112+
cfg.FlushCheckPeriod = 100 * time.Millisecond
113+
114+
_, ing := newTestStore(t, cfg, nil)
115+
116+
ctx := user.InjectOrgID(context.Background(), "foo")
117+
ins, err := ing.GetOrCreateInstance("foo")
118+
require.NoError(t, err)
119+
120+
lbs := makeRandomLabels()
121+
req := &logproto.PushRequest{Streams: []logproto.Stream{{
122+
Labels: lbs.String(),
123+
Entries: entries(5, time.Now()),
124+
}}}
125+
require.NoError(t, ins.Push(ctx, req))
126+
127+
time.Sleep(cfg.FlushCheckPeriod)
128+
require.NoError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
129+
immediate: true,
130+
userID: "foo",
131+
fp: ins.getHashForLabels(lbs),
132+
}))
133+
})
134+
135+
t.Run("max retries exceeded", func(t *testing.T) {
136+
cfg := defaultIngesterTestConfig(t)
137+
cfg.FlushOpBackoff.MinBackoff = time.Second
138+
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
139+
cfg.FlushOpBackoff.MaxRetries = 1
140+
cfg.FlushCheckPeriod = 100 * time.Millisecond
141+
142+
store, ing := newTestStore(t, cfg, nil)
143+
store.onPut = func(_ context.Context, _ []chunk.Chunk) error {
144+
return errors.New("failed to write chunks")
145+
}
146+
147+
ctx := user.InjectOrgID(context.Background(), "foo")
148+
ins, err := ing.GetOrCreateInstance("foo")
149+
require.NoError(t, err)
150+
151+
lbs := makeRandomLabels()
152+
req := &logproto.PushRequest{Streams: []logproto.Stream{{
153+
Labels: lbs.String(),
154+
Entries: entries(5, time.Now()),
155+
}}}
156+
require.NoError(t, ins.Push(ctx, req))
157+
158+
time.Sleep(cfg.FlushCheckPeriod)
159+
require.EqualError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
160+
immediate: true,
161+
userID: "foo",
162+
fp: ins.getHashForLabels(lbs),
163+
}), "terminated after 1 retries")
164+
})
165+
}
166+
105167
func Test_Flush(t *testing.T) {
106168
var (
107169
store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil)
@@ -297,6 +359,10 @@ func defaultIngesterTestConfig(t testing.TB) Config {
297359

298360
cfg := Config{}
299361
flagext.DefaultValues(&cfg)
362+
cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond
363+
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
364+
cfg.FlushOpBackoff.MaxRetries = 1
365+
cfg.FlushOpTimeout = 15 * time.Second
300366
cfg.FlushCheckPeriod = 99999 * time.Hour
301367
cfg.MaxChunkIdle = 99999 * time.Hour
302368
cfg.ConcurrentFlushes = 1

pkg/ingester/ingester.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/go-kit/log"
2323
"github.com/go-kit/log/level"
24+
"github.com/grafana/dskit/backoff"
2425
"github.com/grafana/dskit/concurrency"
2526
"github.com/grafana/dskit/modules"
2627
"github.com/grafana/dskit/multierror"
@@ -84,6 +85,7 @@ type Config struct {
8485

8586
ConcurrentFlushes int `yaml:"concurrent_flushes"`
8687
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
88+
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
8789
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
8890
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
8991
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
@@ -129,7 +131,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
129131

130132
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
131133
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
132-
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.")
134+
f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
135+
f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
136+
f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.")
137+
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.")
133138
f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.")
134139
f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
135140
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
@@ -157,6 +162,15 @@ func (cfg *Config) Validate() error {
157162
return err
158163
}
159164

165+
if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff {
166+
return errors.New("invalid flush op min backoff: cannot be larger than max backoff")
167+
}
168+
if cfg.FlushOpBackoff.MaxRetries <= 0 {
169+
return fmt.Errorf("invalid flush op max retries: %d", cfg.FlushOpBackoff.MaxRetries)
170+
}
171+
if cfg.FlushOpTimeout <= 0 {
172+
return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout)
173+
}
160174
if cfg.IndexShards <= 0 {
161175
return fmt.Errorf("invalid ingester index shard factor: %d", cfg.IndexShards)
162176
}

pkg/ingester/ingester_test.go

+82-19
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/go-kit/log"
1414
"github.com/go-kit/log/level"
15+
"github.com/grafana/dskit/backoff"
1516
"github.com/grafana/dskit/flagext"
1617
"github.com/grafana/dskit/httpgrpc"
1718
"github.com/grafana/dskit/middleware"
@@ -676,57 +677,119 @@ func TestIngester_asyncStoreMaxLookBack(t *testing.T) {
676677

677678
func TestValidate(t *testing.T) {
678679
for i, tc := range []struct {
679-
in Config
680-
err bool
681-
expected Config
680+
in Config
681+
expected Config
682+
expectedErr string
682683
}{
683684
{
684685
in: Config{
685-
MaxChunkAge: time.Minute,
686686
ChunkEncoding: chunkenc.EncGZIP.String(),
687-
IndexShards: index.DefaultIndexShards,
687+
FlushOpBackoff: backoff.Config{
688+
MinBackoff: 100 * time.Millisecond,
689+
MaxBackoff: 10 * time.Second,
690+
MaxRetries: 1,
691+
},
692+
FlushOpTimeout: 15 * time.Second,
693+
IndexShards: index.DefaultIndexShards,
694+
MaxChunkAge: time.Minute,
688695
},
689696
expected: Config{
697+
ChunkEncoding: chunkenc.EncGZIP.String(),
698+
FlushOpBackoff: backoff.Config{
699+
MinBackoff: 100 * time.Millisecond,
700+
MaxBackoff: 10 * time.Second,
701+
MaxRetries: 1,
702+
},
703+
FlushOpTimeout: 15 * time.Second,
704+
IndexShards: index.DefaultIndexShards,
690705
MaxChunkAge: time.Minute,
691-
ChunkEncoding: chunkenc.EncGZIP.String(),
692706
parsedEncoding: chunkenc.EncGZIP,
693-
IndexShards: index.DefaultIndexShards,
694707
},
695708
},
696709
{
697710
in: Config{
698711
ChunkEncoding: chunkenc.EncSnappy.String(),
699-
IndexShards: index.DefaultIndexShards,
712+
FlushOpBackoff: backoff.Config{
713+
MinBackoff: 100 * time.Millisecond,
714+
MaxBackoff: 10 * time.Second,
715+
MaxRetries: 1,
716+
},
717+
FlushOpTimeout: 15 * time.Second,
718+
IndexShards: index.DefaultIndexShards,
700719
},
701720
expected: Config{
702-
ChunkEncoding: chunkenc.EncSnappy.String(),
703-
parsedEncoding: chunkenc.EncSnappy,
721+
ChunkEncoding: chunkenc.EncSnappy.String(),
722+
FlushOpBackoff: backoff.Config{
723+
MinBackoff: 100 * time.Millisecond,
724+
MaxBackoff: 10 * time.Second,
725+
MaxRetries: 1,
726+
},
727+
FlushOpTimeout: 15 * time.Second,
704728
IndexShards: index.DefaultIndexShards,
729+
parsedEncoding: chunkenc.EncSnappy,
705730
},
706731
},
707732
{
708733
in: Config{
709-
IndexShards: index.DefaultIndexShards,
710734
ChunkEncoding: "bad-enc",
735+
FlushOpBackoff: backoff.Config{
736+
MinBackoff: 100 * time.Millisecond,
737+
MaxBackoff: 10 * time.Second,
738+
MaxRetries: 1,
739+
},
740+
FlushOpTimeout: 15 * time.Second,
741+
IndexShards: index.DefaultIndexShards,
742+
},
743+
expectedErr: "invalid encoding: bad-enc, supported: none, gzip, lz4-64k, snappy, lz4-256k, lz4-1M, lz4, flate, zstd",
744+
},
745+
{
746+
in: Config{
747+
ChunkEncoding: chunkenc.EncGZIP.String(),
748+
FlushOpBackoff: backoff.Config{
749+
MinBackoff: 100 * time.Millisecond,
750+
MaxBackoff: 10 * time.Second,
751+
},
752+
FlushOpTimeout: 15 * time.Second,
753+
IndexShards: index.DefaultIndexShards,
754+
MaxChunkAge: time.Minute,
755+
},
756+
expectedErr: "invalid flush op max retries: 0",
757+
},
758+
{
759+
in: Config{
760+
ChunkEncoding: chunkenc.EncGZIP.String(),
761+
FlushOpBackoff: backoff.Config{
762+
MinBackoff: 100 * time.Millisecond,
763+
MaxBackoff: 10 * time.Second,
764+
MaxRetries: 1,
765+
},
766+
IndexShards: index.DefaultIndexShards,
767+
MaxChunkAge: time.Minute,
711768
},
712-
err: true,
769+
expectedErr: "invalid flush op timeout: 0s",
713770
},
714771
{
715772
in: Config{
716-
MaxChunkAge: time.Minute,
717773
ChunkEncoding: chunkenc.EncGZIP.String(),
774+
FlushOpBackoff: backoff.Config{
775+
MinBackoff: 100 * time.Millisecond,
776+
MaxBackoff: 10 * time.Second,
777+
MaxRetries: 1,
778+
},
779+
FlushOpTimeout: 15 * time.Second,
780+
MaxChunkAge: time.Minute,
718781
},
719-
err: true,
782+
expectedErr: "invalid ingester index shard factor: 0",
720783
},
721784
} {
722785
t.Run(fmt.Sprint(i), func(t *testing.T) {
723786
err := tc.in.Validate()
724-
if tc.err {
725-
require.NotNil(t, err)
726-
return
787+
if tc.expectedErr != "" {
788+
require.EqualError(t, err, tc.expectedErr)
789+
} else {
790+
require.NoError(t, err)
791+
require.Equal(t, tc.expected, tc.in)
727792
}
728-
require.Nil(t, err)
729-
require.Equal(t, tc.expected, tc.in)
730793
})
731794
}
732795
}

0 commit comments

Comments
 (0)