Skip to content

Commit 075af5d

Browse files
feat: add backoff mechanism to the retention process (#14182)
Signed-off-by: Vladyslav Diachenko <[email protected]> (cherry picked from commit 3136880)
1 parent 9d66018 commit 075af5d

File tree

4 files changed

+120
-27
lines changed

4 files changed

+120
-27
lines changed

docs/sources/shared/configuration.md

+13
Original file line numberDiff line numberDiff line change
@@ -2368,6 +2368,19 @@ The `compactor` block configures the compactor component, which compacts index s
23682368
# CLI flag: -compactor.retention-table-timeout
23692369
[retention_table_timeout: <duration> | default = 0s]
23702370
2371+
retention_backoff_config:
2372+
# Minimum delay when backing off.
2373+
# CLI flag: -compactor.retention-backoff-config.backoff-min-period
2374+
[min_period: <duration> | default = 100ms]
2375+
2376+
# Maximum delay when backing off.
2377+
# CLI flag: -compactor.retention-backoff-config.backoff-max-period
2378+
[max_period: <duration> | default = 10s]
2379+
2380+
# Number of times to backoff and retry before failing.
2381+
# CLI flag: -compactor.retention-backoff-config.backoff-retries
2382+
[max_retries: <int> | default = 10]
2383+
23712384
# Store used for managing delete requests.
23722385
# CLI flag: -compactor.delete-request-store
23732386
[delete_request_store: <string> | default = ""]

pkg/compactor/compactor.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ import (
1212
"time"
1313

1414
"github.com/go-kit/log/level"
15-
"github.com/grafana/dskit/kv"
16-
"github.com/grafana/dskit/ring"
17-
"github.com/grafana/dskit/services"
1815
"github.com/pkg/errors"
1916
"github.com/prometheus/client_golang/prometheus"
2017
"github.com/prometheus/common/model"
2118

19+
"github.com/grafana/dskit/backoff"
20+
"github.com/grafana/dskit/kv"
21+
"github.com/grafana/dskit/ring"
22+
"github.com/grafana/dskit/services"
23+
2224
"github.com/grafana/loki/v3/pkg/analytics"
2325
"github.com/grafana/loki/v3/pkg/compactor/deletion"
2426
"github.com/grafana/loki/v3/pkg/compactor/retention"
@@ -77,6 +79,7 @@ type Config struct {
7779
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
7880
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
7981
RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"`
82+
RetentionBackoffConfig backoff.Config `yaml:"retention_backoff_config"`
8083
DeleteRequestStore string `yaml:"delete_request_store"`
8184
DeleteRequestStoreKeyPrefix string `yaml:"delete_request_store_key_prefix"`
8285
DeleteBatchSize int `yaml:"delete_batch_size"`
@@ -110,6 +113,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
110113
f.IntVar(&cfg.TablesToCompact, "compactor.tables-to-compact", 0, "Number of tables that compactor will try to compact. Newer tables are chosen when this is less than the number of tables available.")
111114
f.IntVar(&cfg.SkipLatestNTables, "compactor.skip-latest-n-tables", 0, "Do not compact N latest tables. Together with -compactor.run-once and -compactor.tables-to-compact, this is useful when clearing compactor backlogs.")
112115

116+
cfg.RetentionBackoffConfig.RegisterFlagsWithPrefix("compactor.retention-backoff-config", f)
113117
// Ring
114118
skipFlags := []string{
115119
"compactor.ring.num-tokens",
@@ -323,7 +327,7 @@ func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClie
323327
}
324328
chunkClient := client.NewClient(objectClient, encoder, schemaConfig)
325329

326-
sc.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, r)
330+
sc.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, c.cfg.RetentionBackoffConfig, r)
327331
if err != nil {
328332
return fmt.Errorf("failed to init sweeper: %w", err)
329333
}

pkg/compactor/retention/retention.go

+36-18
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/go-kit/log"
1313
"github.com/go-kit/log/level"
14+
"github.com/grafana/dskit/backoff"
1415
"github.com/prometheus/client_golang/prometheus"
1516
"github.com/prometheus/common/model"
1617
"github.com/prometheus/prometheus/model/labels"
@@ -272,9 +273,17 @@ type Sweeper struct {
272273
markerProcessor MarkerProcessor
273274
chunkClient ChunkClient
274275
sweeperMetrics *sweeperMetrics
276+
backoffConfig backoff.Config
275277
}
276278

277-
func NewSweeper(workingDir string, deleteClient ChunkClient, deleteWorkerCount int, minAgeDelete time.Duration, r prometheus.Registerer) (*Sweeper, error) {
279+
func NewSweeper(
280+
workingDir string,
281+
deleteClient ChunkClient,
282+
deleteWorkerCount int,
283+
minAgeDelete time.Duration,
284+
backoffConfig backoff.Config,
285+
r prometheus.Registerer,
286+
) (*Sweeper, error) {
278287
m := newSweeperMetrics(r)
279288

280289
p, err := newMarkerStorageReader(workingDir, deleteWorkerCount, minAgeDelete, m)
@@ -285,34 +294,43 @@ func NewSweeper(workingDir string, deleteClient ChunkClient, deleteWorkerCount i
285294
markerProcessor: p,
286295
chunkClient: deleteClient,
287296
sweeperMetrics: m,
297+
backoffConfig: backoffConfig,
288298
}, nil
289299
}
290300

291301
func (s *Sweeper) Start() {
292-
s.markerProcessor.Start(func(ctx context.Context, chunkId []byte) error {
293-
status := statusSuccess
294-
start := time.Now()
295-
defer func() {
296-
s.sweeperMetrics.deleteChunkDurationSeconds.WithLabelValues(status).Observe(time.Since(start).Seconds())
297-
}()
298-
chunkIDString := unsafeGetString(chunkId)
299-
userID, err := getUserIDFromChunkID(chunkId)
300-
if err != nil {
301-
return err
302-
}
302+
s.markerProcessor.Start(s.deleteChunk)
303+
}
303304

305+
func (s *Sweeper) deleteChunk(ctx context.Context, chunkID []byte) error {
306+
status := statusSuccess
307+
start := time.Now()
308+
defer func() {
309+
s.sweeperMetrics.deleteChunkDurationSeconds.WithLabelValues(status).Observe(time.Since(start).Seconds())
310+
}()
311+
chunkIDString := unsafeGetString(chunkID)
312+
userID, err := getUserIDFromChunkID(chunkID)
313+
if err != nil {
314+
return err
315+
}
316+
317+
retry := backoff.New(ctx, s.backoffConfig)
318+
for retry.Ongoing() {
304319
err = s.chunkClient.DeleteChunk(ctx, unsafeGetString(userID), chunkIDString)
320+
if err == nil {
321+
return nil
322+
}
305323
if s.chunkClient.IsChunkNotFoundErr(err) {
306324
status = statusNotFound
307325
level.Debug(util_log.Logger).Log("msg", "delete on not found chunk", "chunkID", chunkIDString)
308326
return nil
309327
}
310-
if err != nil {
311-
level.Error(util_log.Logger).Log("msg", "error deleting chunk", "chunkID", chunkIDString, "err", err)
312-
status = statusFailure
313-
}
314-
return err
315-
})
328+
retry.Wait()
329+
}
330+
331+
level.Error(util_log.Logger).Log("msg", "error deleting chunk", "chunkID", chunkIDString, "err", err)
332+
status = statusFailure
333+
return err
316334
}
317335

318336
func getUserIDFromChunkID(chunkID []byte) ([]byte, error) {

pkg/compactor/retention/retention_test.go

+63-5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/sha256"
66
"encoding/base64"
7+
"fmt"
78
"os"
89
"path"
910
"path/filepath"
@@ -14,6 +15,7 @@ import (
1415
"testing"
1516
"time"
1617

18+
"github.com/grafana/dskit/backoff"
1719
"github.com/prometheus/client_golang/prometheus"
1820
"github.com/prometheus/common/model"
1921
"github.com/prometheus/prometheus/model/labels"
@@ -31,14 +33,37 @@ import (
3133
)
3234

3335
type mockChunkClient struct {
34-
mtx sync.Mutex
35-
deletedChunks map[string]struct{}
36+
mtx sync.Mutex
37+
deletedChunks map[string]struct{}
38+
unstableDeletion bool
39+
perObjectCounter map[string]uint32
40+
}
41+
42+
// newMockChunkClient creates a client that fails every first call to DeleteChunk if `unstableDeletion` is true.
43+
func newMockChunkClient(unstableDeletion bool) *mockChunkClient {
44+
return &mockChunkClient{
45+
deletedChunks: map[string]struct{}{},
46+
unstableDeletion: unstableDeletion,
47+
perObjectCounter: map[string]uint32{},
48+
}
49+
}
50+
51+
// shouldFail returns true for every first call
52+
func (m *mockChunkClient) shouldFail(objectKey string) bool {
53+
if !m.unstableDeletion {
54+
return false
55+
}
56+
shouldFail := m.perObjectCounter[objectKey]%2 == 0
57+
m.perObjectCounter[objectKey]++
58+
return shouldFail
3659
}
3760

3861
func (m *mockChunkClient) DeleteChunk(_ context.Context, _, chunkID string) error {
3962
m.mtx.Lock()
4063
defer m.mtx.Unlock()
41-
64+
if m.shouldFail(chunkID) {
65+
return fmt.Errorf("chunk deletion for chunkID:%s is failed by mockChunkClient", chunkID)
66+
}
4267
m.deletedChunks[string([]byte(chunkID))] = struct{}{} // forces a copy, because this string is only valid within the delete fn.
4368
return nil
4469
}
@@ -143,8 +168,9 @@ func Test_Retention(t *testing.T) {
143168
// marks and sweep
144169
expiration := NewExpirationChecker(tt.limits)
145170
workDir := filepath.Join(t.TempDir(), "retention")
146-
chunkClient := &mockChunkClient{deletedChunks: map[string]struct{}{}}
147-
sweep, err := NewSweeper(workDir, chunkClient, 10, 0, nil)
171+
// must not fail the process because deletion must be retried
172+
chunkClient := newMockChunkClient(true)
173+
sweep, err := NewSweeper(workDir, chunkClient, 10, 0, backoff.Config{MaxRetries: 2}, nil)
148174
require.NoError(t, err)
149175
sweep.Start()
150176
defer sweep.Stop()
@@ -175,6 +201,38 @@ func Test_Retention(t *testing.T) {
175201
}
176202
}
177203

204+
func Test_Sweeper_deleteChunk(t *testing.T) {
205+
chunkID := "1/3fff2c2d7595e046:1916fa8c4bd:1916fdfb33d:bd55fc5"
206+
tests := map[string]struct {
207+
maxRetries int
208+
expectedError error
209+
}{
210+
"expected error if chunk is not deleted and retry is disabled": {
211+
maxRetries: 1,
212+
expectedError: fmt.Errorf("chunk deletion for chunkID:%s is failed by mockChunkClient", chunkID),
213+
},
214+
"expected no error if chunk is not deleted at the first attempt but retried": {
215+
maxRetries: 2,
216+
},
217+
}
218+
for name, data := range tests {
219+
t.Run(name, func(t *testing.T) {
220+
workDir := filepath.Join(t.TempDir(), "retention")
221+
chunkClient := newMockChunkClient(true)
222+
sweep, err := NewSweeper(workDir, chunkClient, 10, 0, backoff.Config{MaxRetries: data.maxRetries}, nil)
223+
require.NoError(t, err)
224+
225+
err = sweep.deleteChunk(context.Background(), []byte(chunkID))
226+
if data.expectedError != nil {
227+
require.Equal(t, data.expectedError, err)
228+
} else {
229+
require.NoError(t, err)
230+
}
231+
})
232+
}
233+
234+
}
235+
178236
type noopWriter struct {
179237
count int64
180238
}

0 commit comments

Comments
 (0)