Skip to content

Commit 7343290

Browse files
committed
indexshipper/storage: fix race conditions
Google's 'singleflight' package is much neater and makes all the race warnings go away.
1 parent b1cd5da commit 7343290

File tree

5 files changed

+265
-84
lines changed

5 files changed

+265
-84
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
* [9773](https://github.com/grafana/loki/pull/9773) **ssncferreira**: Fix instant query summary statistic's `splits` corresponding to the number of subqueries a query is split into based on `split_queries_by_interval`.
7777
* [9949](https://github.com/grafana/loki/pull/9949) **masslessparticle**: Fix pipelines to clear caches when tailing to avoid resource exhaustion.
7878
* [9936](https://github.com/grafana/loki/pull/9936) **masslessparticle**: Fix the way query stages are reordered when `unpack` is present.
79+
* [10314](https://github.com/grafana/loki/pull/10314) **bboreham**: Fix race conditions in indexshipper.
7980

8081
##### Changes
8182

pkg/storage/stores/indexshipper/storage/cached_client.go

+57-83
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/go-kit/log/level"
12+
"golang.org/x/sync/singleflight"
1213

1314
"github.com/grafana/loki/pkg/storage/chunk/client"
1415
util_log "github.com/grafana/loki/pkg/util/log"
@@ -17,6 +18,7 @@ import (
1718

1819
const (
1920
cacheTimeout = 1 * time.Minute
21+
refreshKey = "refresh"
2022
)
2123

2224
type table struct {
@@ -26,19 +28,16 @@ type table struct {
2628
userIDs []client.StorageCommonPrefix
2729
userObjects map[string][]client.StorageObject
2830

29-
cacheBuiltAt time.Time
30-
buildCacheChan chan struct{}
31-
buildCacheWg sync.WaitGroup
32-
err error
31+
cacheBuiltAt time.Time
32+
buildCacheGroup singleflight.Group
3333
}
3434

3535
func newTable(tableName string) *table {
3636
return &table{
37-
name: tableName,
38-
buildCacheChan: make(chan struct{}, 1),
39-
userIDs: []client.StorageCommonPrefix{},
40-
userObjects: map[string][]client.StorageObject{},
41-
commonObjects: []client.StorageObject{},
37+
name: tableName,
38+
userIDs: []client.StorageCommonPrefix{},
39+
userObjects: map[string][]client.StorageObject{},
40+
commonObjects: []client.StorageObject{},
4241
}
4342
}
4443

@@ -50,43 +49,21 @@ type cachedObjectClient struct {
5049
tablesMtx sync.RWMutex
5150
tableNamesCacheBuiltAt time.Time
5251

53-
buildTableNamesCacheChan chan struct{}
54-
buildTableNamesCacheWg sync.WaitGroup
55-
err error
52+
buildCacheGroup singleflight.Group
5653
}
5754

5855
func newCachedObjectClient(downstreamClient client.ObjectClient) *cachedObjectClient {
5956
return &cachedObjectClient{
60-
ObjectClient: downstreamClient,
61-
tables: map[string]*table{},
62-
buildTableNamesCacheChan: make(chan struct{}, 1),
57+
ObjectClient: downstreamClient,
58+
tables: map[string]*table{},
6359
}
6460
}
6561

66-
// buildCacheOnce makes sure we build the cache just once when it is called concurrently.
67-
// We have a buffered channel here with a capacity of 1 to make sure only one concurrent call makes it through.
68-
// We also have a sync.WaitGroup to make sure all the concurrent calls to buildCacheOnce wait until the cache gets rebuilt since
69-
// we are doing read-through cache, and we do not want to serve stale results.
70-
func buildCacheOnce(buildCacheWg *sync.WaitGroup, buildCacheChan chan struct{}, buildCacheFunc func()) {
71-
buildCacheWg.Add(1)
72-
defer buildCacheWg.Done()
73-
74-
// when the cache is expired, only one concurrent call must be able to rebuild it
75-
// all other calls will wait until the cache is built successfully or failed with an error
76-
select {
77-
case buildCacheChan <- struct{}{}:
78-
buildCacheFunc()
79-
<-buildCacheChan
80-
default:
81-
}
82-
}
83-
84-
func (c *cachedObjectClient) RefreshIndexTableNamesCache(ctx context.Context) {
85-
buildCacheOnce(&c.buildTableNamesCacheWg, c.buildTableNamesCacheChan, func() {
86-
c.err = nil
87-
c.err = c.buildTableNamesCache(ctx, true)
62+
func (c *cachedObjectClient) RefreshIndexTableNamesCache(ctx context.Context) error {
63+
_, err, _ := c.buildCacheGroup.Do(refreshKey, func() (interface{}, error) {
64+
return nil, c.buildTableNamesCache(ctx)
8865
})
89-
c.buildTableNamesCacheWg.Wait()
66+
return err
9067
}
9168

9269
func (c *cachedObjectClient) RefreshIndexTableCache(ctx context.Context, tableName string) {
@@ -103,20 +80,23 @@ func (c *cachedObjectClient) RefreshIndexTableCache(ctx context.Context, tableNa
10380
}
10481
}
10582

106-
buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() {
107-
tbl.err = nil
108-
tbl.err = tbl.buildCache(ctx, c.ObjectClient, true)
83+
_, _, _ = tbl.buildCacheGroup.Do(refreshKey, func() (interface{}, error) {
84+
err := tbl.buildCache(ctx, c.ObjectClient)
85+
return nil, err
10986
})
110-
tbl.buildCacheWg.Wait()
11187
}
11288

11389
func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter string, bypassCache bool) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
11490
if bypassCache {
11591
return c.ObjectClient.List(ctx, prefix, objectDelimiter)
11692
}
11793

94+
c.tablesMtx.RLock()
95+
neverBuiltCache := c.tableNamesCacheBuiltAt.IsZero()
96+
c.tablesMtx.RUnlock()
97+
11898
// if we have never built table names cache, let us build it first.
119-
if c.tableNamesCacheBuiltAt.IsZero() {
99+
if neverBuiltCache {
120100
c.RefreshIndexTableNamesCache(ctx)
121101
}
122102

@@ -147,18 +127,9 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter s
147127
}
148128

149129
func (c *cachedObjectClient) listTableNames(ctx context.Context) ([]client.StorageCommonPrefix, error) {
150-
if time.Since(c.tableNamesCacheBuiltAt) >= cacheTimeout {
151-
buildCacheOnce(&c.buildTableNamesCacheWg, c.buildTableNamesCacheChan, func() {
152-
c.err = nil
153-
c.err = c.buildTableNamesCache(ctx, false)
154-
})
155-
}
156-
157-
// wait for cache build operation to finish, if running
158-
c.buildTableNamesCacheWg.Wait()
159-
160-
if c.err != nil {
161-
return nil, c.err
130+
err := c.updateTableNamesCache(ctx)
131+
if err != nil {
132+
return nil, err
162133
}
163134

164135
c.tablesMtx.RLock()
@@ -173,18 +144,9 @@ func (c *cachedObjectClient) listTable(ctx context.Context, tableName string) ([
173144
return []client.StorageObject{}, []client.StorageCommonPrefix{}, nil
174145
}
175146

176-
if time.Since(tbl.cacheBuiltAt) >= cacheTimeout {
177-
buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() {
178-
tbl.err = nil
179-
tbl.err = tbl.buildCache(ctx, c.ObjectClient, false)
180-
})
181-
}
182-
183-
// wait for cache build operation to finish, if running
184-
tbl.buildCacheWg.Wait()
185-
186-
if tbl.err != nil {
187-
return nil, nil, tbl.err
147+
err := tbl.updateCache(ctx, c.ObjectClient)
148+
if err != nil {
149+
return nil, nil, err
188150
}
189151

190152
tbl.mtx.RLock()
@@ -199,18 +161,9 @@ func (c *cachedObjectClient) listUserIndexInTable(ctx context.Context, tableName
199161
return []client.StorageObject{}, nil
200162
}
201163

202-
if time.Since(tbl.cacheBuiltAt) >= cacheTimeout {
203-
buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() {
204-
tbl.err = nil
205-
tbl.err = tbl.buildCache(ctx, c.ObjectClient, false)
206-
})
207-
}
208-
209-
// wait for cache build operation to finish, if running
210-
tbl.buildCacheWg.Wait()
211-
212-
if tbl.err != nil {
213-
return nil, tbl.err
164+
err := tbl.updateCache(ctx, c.ObjectClient)
165+
if err != nil {
166+
return nil, err
214167
}
215168

216169
tbl.mtx.RLock()
@@ -223,11 +176,21 @@ func (c *cachedObjectClient) listUserIndexInTable(ctx context.Context, tableName
223176
return []client.StorageObject{}, nil
224177
}
225178

226-
func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context, forceRefresh bool) (err error) {
227-
if !forceRefresh && time.Since(c.tableNamesCacheBuiltAt) < cacheTimeout {
179+
// Check if the cache is out of date, and build it if so, ensuring only one cache-build is running at a time.
180+
func (c *cachedObjectClient) updateTableNamesCache(ctx context.Context) error {
181+
c.tablesMtx.RLock()
182+
outOfDate := time.Since(c.tableNamesCacheBuiltAt) >= cacheTimeout
183+
c.tablesMtx.RUnlock()
184+
if !outOfDate {
228185
return nil
229186
}
187+
_, err, _ := c.buildCacheGroup.Do(refreshKey, func() (interface{}, error) {
188+
return nil, c.buildTableNamesCache(ctx)
189+
})
190+
return err
191+
}
230192

193+
func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context) (err error) {
231194
defer func() {
232195
if err != nil {
233196
level.Error(util_log.Logger).Log("msg", "failed to build table names cache", "err", err)
@@ -282,11 +245,22 @@ func (c *cachedObjectClient) getTable(tableName string) *table {
282245
return c.tables[tableName]
283246
}
284247

285-
func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient, forceRefresh bool) (err error) {
286-
if !forceRefresh && time.Since(t.cacheBuiltAt) < cacheTimeout {
248+
// Check if the cache is out of date, and build it if so, ensuring only one cache-build is running at a time.
249+
func (t *table) updateCache(ctx context.Context, objectClient client.ObjectClient) error {
250+
t.mtx.RLock()
251+
outOfDate := time.Since(t.cacheBuiltAt) >= cacheTimeout
252+
t.mtx.RUnlock()
253+
if !outOfDate {
287254
return nil
288255
}
256+
_, err, _ := t.buildCacheGroup.Do(refreshKey, func() (interface{}, error) {
257+
err := t.buildCache(ctx, objectClient)
258+
return nil, err
259+
})
260+
return err
261+
}
289262

263+
func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient) (err error) {
290264
defer func() {
291265
if err != nil {
292266
level.Error(util_log.Logger).Log("msg", "failed to build table cache", "table_name", t.name, "err", err)

pkg/storage/stores/indexshipper/storage/cached_client_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func TestCachedObjectClient_errors(t *testing.T) {
219219
wg.Add(1)
220220
go func() {
221221
defer wg.Done()
222-
objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), tc.prefix, "", false)
222+
objects, commonPrefixes, err := cachedObjectClient.List(context.Background(), tc.prefix, "", false)
223223
require.NoError(t, err)
224224
require.Equal(t, expectedListCallsCount, objectClient.listCallsCount)
225225
require.Equal(t, tc.expectedObjects, objects)

0 commit comments

Comments
 (0)