Skip to content

Commit e688bb7

Browse files
bborehamchaudum
authored andcommitted
indexshipper/storage: fix race conditions (#10314)
There were many race conditions reported if you run `go test -race` on this package. Google's 'singleflight' package is much neater and makes all the race warnings go away. **Which issue(s) this PR fixes**: Relates to #8586
1 parent 8c94fdd commit e688bb7

File tree

5 files changed

+263
-83
lines changed

5 files changed

+263
-83
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
* [9773](https://github.com/grafana/loki/pull/9773) **ssncferreira**: Fix instant query summary statistic's `splits` corresponding to the number of subqueries a query is split into based on `split_queries_by_interval`.
8888
* [9949](https://github.com/grafana/loki/pull/9949) **masslessparticle**: Fix pipelines to clear caches when tailing to avoid resource exhaustion.
8989
* [9936](https://github.com/grafana/loki/pull/9936) **masslessparticle**: Fix the way query stages are reordered when `unpack` is present.
90+
* [10314](https://github.com/grafana/loki/pull/10314) **bboreham**: Fix race conditions in indexshipper.
9091
* [10309](https://github.com/grafana/loki/pull/10309) **akhilanarayanan**: Fix race condition in series index store.
9192
* [10221](https://github.com/grafana/loki/pull/10221) **periklis**: Allow using the forget button when access via the internal server
9293

pkg/storage/stores/indexshipper/storage/cached_client.go

+55-82
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/go-kit/log/level"
12+
"golang.org/x/sync/singleflight"
1213

1314
"github.com/grafana/loki/pkg/storage/chunk/client"
1415
util_log "github.com/grafana/loki/pkg/util/log"
@@ -17,6 +18,7 @@ import (
1718

1819
const (
1920
cacheTimeout = 1 * time.Minute
21+
refreshKey = "refresh"
2022
)
2123

2224
type table struct {
@@ -26,19 +28,16 @@ type table struct {
2628
userIDs []client.StorageCommonPrefix
2729
userObjects map[string][]client.StorageObject
2830

29-
cacheBuiltAt time.Time
30-
buildCacheChan chan struct{}
31-
buildCacheWg sync.WaitGroup
32-
err error
31+
cacheBuiltAt time.Time
32+
buildCacheGroup singleflight.Group
3333
}
3434

3535
func newTable(tableName string) *table {
3636
return &table{
37-
name: tableName,
38-
buildCacheChan: make(chan struct{}, 1),
39-
userIDs: []client.StorageCommonPrefix{},
40-
userObjects: map[string][]client.StorageObject{},
41-
commonObjects: []client.StorageObject{},
37+
name: tableName,
38+
userIDs: []client.StorageCommonPrefix{},
39+
userObjects: map[string][]client.StorageObject{},
40+
commonObjects: []client.StorageObject{},
4241
}
4342
}
4443

@@ -50,43 +49,20 @@ type cachedObjectClient struct {
5049
tablesMtx sync.RWMutex
5150
tableNamesCacheBuiltAt time.Time
5251

53-
buildTableNamesCacheChan chan struct{}
54-
buildTableNamesCacheWg sync.WaitGroup
55-
err error
52+
buildCacheGroup singleflight.Group
5653
}
5754

5855
func newCachedObjectClient(downstreamClient client.ObjectClient) *cachedObjectClient {
5956
return &cachedObjectClient{
60-
ObjectClient: downstreamClient,
61-
tables: map[string]*table{},
62-
buildTableNamesCacheChan: make(chan struct{}, 1),
63-
}
64-
}
65-
66-
// buildCacheOnce makes sure we build the cache just once when it is called concurrently.
67-
// We have a buffered channel here with a capacity of 1 to make sure only one concurrent call makes it through.
68-
// We also have a sync.WaitGroup to make sure all the concurrent calls to buildCacheOnce wait until the cache gets rebuilt since
69-
// we are doing read-through cache, and we do not want to serve stale results.
70-
func buildCacheOnce(buildCacheWg *sync.WaitGroup, buildCacheChan chan struct{}, buildCacheFunc func()) {
71-
buildCacheWg.Add(1)
72-
defer buildCacheWg.Done()
73-
74-
// when the cache is expired, only one concurrent call must be able to rebuild it
75-
// all other calls will wait until the cache is built successfully or failed with an error
76-
select {
77-
case buildCacheChan <- struct{}{}:
78-
buildCacheFunc()
79-
<-buildCacheChan
80-
default:
57+
ObjectClient: downstreamClient,
58+
tables: map[string]*table{},
8159
}
8260
}
8361

8462
func (c *cachedObjectClient) RefreshIndexTableNamesCache(ctx context.Context) {
85-
buildCacheOnce(&c.buildTableNamesCacheWg, c.buildTableNamesCacheChan, func() {
86-
c.err = nil
87-
c.err = c.buildTableNamesCache(ctx, true)
63+
_, _, _ = c.buildCacheGroup.Do(refreshKey, func() (interface{}, error) {
64+
return nil, c.buildTableNamesCache(ctx)
8865
})
89-
c.buildTableNamesCacheWg.Wait()
9066
}
9167

9268
func (c *cachedObjectClient) RefreshIndexTableCache(ctx context.Context, tableName string) {
@@ -97,20 +73,23 @@ func (c *cachedObjectClient) RefreshIndexTableCache(ctx context.Context, tableNa
9773
return
9874
}
9975

100-
buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() {
101-
tbl.err = nil
102-
tbl.err = tbl.buildCache(ctx, c.ObjectClient, true)
76+
_, _, _ = tbl.buildCacheGroup.Do(refreshKey, func() (interface{}, error) {
77+
err := tbl.buildCache(ctx, c.ObjectClient)
78+
return nil, err
10379
})
104-
tbl.buildCacheWg.Wait()
10580
}
10681

10782
func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter string, bypassCache bool) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
10883
if bypassCache {
10984
return c.ObjectClient.List(ctx, prefix, objectDelimiter)
11085
}
11186

87+
c.tablesMtx.RLock()
88+
neverBuiltCache := c.tableNamesCacheBuiltAt.IsZero()
89+
c.tablesMtx.RUnlock()
90+
11291
// if we have never built table names cache, let us build it first.
113-
if c.tableNamesCacheBuiltAt.IsZero() {
92+
if neverBuiltCache {
11493
c.RefreshIndexTableNamesCache(ctx)
11594
}
11695

@@ -141,18 +120,9 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter s
141120
}
142121

143122
func (c *cachedObjectClient) listTableNames(ctx context.Context) ([]client.StorageCommonPrefix, error) {
144-
if time.Since(c.tableNamesCacheBuiltAt) >= cacheTimeout {
145-
buildCacheOnce(&c.buildTableNamesCacheWg, c.buildTableNamesCacheChan, func() {
146-
c.err = nil
147-
c.err = c.buildTableNamesCache(ctx, false)
148-
})
149-
}
150-
151-
// wait for cache build operation to finish, if running
152-
c.buildTableNamesCacheWg.Wait()
153-
154-
if c.err != nil {
155-
return nil, c.err
123+
err := c.updateTableNamesCache(ctx)
124+
if err != nil {
125+
return nil, err
156126
}
157127

158128
c.tablesMtx.RLock()
@@ -167,18 +137,9 @@ func (c *cachedObjectClient) listTable(ctx context.Context, tableName string) ([
167137
return []client.StorageObject{}, []client.StorageCommonPrefix{}, nil
168138
}
169139

170-
if time.Since(tbl.cacheBuiltAt) >= cacheTimeout {
171-
buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() {
172-
tbl.err = nil
173-
tbl.err = tbl.buildCache(ctx, c.ObjectClient, false)
174-
})
175-
}
176-
177-
// wait for cache build operation to finish, if running
178-
tbl.buildCacheWg.Wait()
179-
180-
if tbl.err != nil {
181-
return nil, nil, tbl.err
140+
err := tbl.updateCache(ctx, c.ObjectClient)
141+
if err != nil {
142+
return nil, nil, err
182143
}
183144

184145
tbl.mtx.RLock()
@@ -193,18 +154,9 @@ func (c *cachedObjectClient) listUserIndexInTable(ctx context.Context, tableName
193154
return []client.StorageObject{}, nil
194155
}
195156

196-
if time.Since(tbl.cacheBuiltAt) >= cacheTimeout {
197-
buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() {
198-
tbl.err = nil
199-
tbl.err = tbl.buildCache(ctx, c.ObjectClient, false)
200-
})
201-
}
202-
203-
// wait for cache build operation to finish, if running
204-
tbl.buildCacheWg.Wait()
205-
206-
if tbl.err != nil {
207-
return nil, tbl.err
157+
err := tbl.updateCache(ctx, c.ObjectClient)
158+
if err != nil {
159+
return nil, err
208160
}
209161

210162
tbl.mtx.RLock()
@@ -217,11 +169,21 @@ func (c *cachedObjectClient) listUserIndexInTable(ctx context.Context, tableName
217169
return []client.StorageObject{}, nil
218170
}
219171

220-
func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context, forceRefresh bool) (err error) {
221-
if !forceRefresh && time.Since(c.tableNamesCacheBuiltAt) < cacheTimeout {
172+
// Check if the cache is out of date, and build it if so, ensuring only one cache-build is running at a time.
173+
func (c *cachedObjectClient) updateTableNamesCache(ctx context.Context) error {
174+
c.tablesMtx.RLock()
175+
outOfDate := time.Since(c.tableNamesCacheBuiltAt) >= cacheTimeout
176+
c.tablesMtx.RUnlock()
177+
if !outOfDate {
222178
return nil
223179
}
180+
_, err, _ := c.buildCacheGroup.Do(refreshKey, func() (interface{}, error) {
181+
return nil, c.buildTableNamesCache(ctx)
182+
})
183+
return err
184+
}
224185

186+
func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context) (err error) {
225187
defer func() {
226188
if err != nil {
227189
level.Error(util_log.Logger).Log("msg", "failed to build table names cache", "err", err)
@@ -292,11 +254,22 @@ func (c *cachedObjectClient) getTable(ctx context.Context, tableName string) *ta
292254
return c.getCachedTable(tableName)
293255
}
294256

295-
func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient, forceRefresh bool) (err error) {
296-
if !forceRefresh && time.Since(t.cacheBuiltAt) < cacheTimeout {
257+
// Check if the cache is out of date, and build it if so, ensuring only one cache-build is running at a time.
258+
func (t *table) updateCache(ctx context.Context, objectClient client.ObjectClient) error {
259+
t.mtx.RLock()
260+
outOfDate := time.Since(t.cacheBuiltAt) >= cacheTimeout
261+
t.mtx.RUnlock()
262+
if !outOfDate {
297263
return nil
298264
}
265+
_, err, _ := t.buildCacheGroup.Do(refreshKey, func() (interface{}, error) {
266+
err := t.buildCache(ctx, objectClient)
267+
return nil, err
268+
})
269+
return err
270+
}
299271

272+
func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient) (err error) {
300273
defer func() {
301274
if err != nil {
302275
level.Error(util_log.Logger).Log("msg", "failed to build table cache", "table_name", t.name, "err", err)

pkg/storage/stores/indexshipper/storage/cached_client_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func TestCachedObjectClient_errors(t *testing.T) {
275275
wg.Add(1)
276276
go func() {
277277
defer wg.Done()
278-
objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), tc.prefix, "", false)
278+
objects, commonPrefixes, err := cachedObjectClient.List(context.Background(), tc.prefix, "", false)
279279
require.NoError(t, err)
280280
require.Equal(t, expectedListCallsCount, objectClient.listCallsCount)
281281
require.Equal(t, tc.expectedObjects, objects)

0 commit comments

Comments
 (0)