9
9
"time"
10
10
11
11
"github.com/go-kit/log/level"
12
+ "golang.org/x/sync/singleflight"
12
13
13
14
"github.com/grafana/loki/pkg/storage/chunk/client"
14
15
util_log "github.com/grafana/loki/pkg/util/log"
@@ -17,6 +18,7 @@ import (
17
18
18
19
const (
19
20
cacheTimeout = 1 * time .Minute
21
+ refreshKey = "refresh"
20
22
)
21
23
22
24
type table struct {
@@ -26,19 +28,16 @@ type table struct {
26
28
userIDs []client.StorageCommonPrefix
27
29
userObjects map [string ][]client.StorageObject
28
30
29
- cacheBuiltAt time.Time
30
- buildCacheChan chan struct {}
31
- buildCacheWg sync.WaitGroup
32
- err error
31
+ cacheBuiltAt time.Time
32
+ buildCacheGroup singleflight.Group
33
33
}
34
34
35
35
func newTable (tableName string ) * table {
36
36
return & table {
37
- name : tableName ,
38
- buildCacheChan : make (chan struct {}, 1 ),
39
- userIDs : []client.StorageCommonPrefix {},
40
- userObjects : map [string ][]client.StorageObject {},
41
- commonObjects : []client.StorageObject {},
37
+ name : tableName ,
38
+ userIDs : []client.StorageCommonPrefix {},
39
+ userObjects : map [string ][]client.StorageObject {},
40
+ commonObjects : []client.StorageObject {},
42
41
}
43
42
}
44
43
@@ -50,43 +49,20 @@ type cachedObjectClient struct {
50
49
tablesMtx sync.RWMutex
51
50
tableNamesCacheBuiltAt time.Time
52
51
53
- buildTableNamesCacheChan chan struct {}
54
- buildTableNamesCacheWg sync.WaitGroup
55
- err error
52
+ buildCacheGroup singleflight.Group
56
53
}
57
54
58
55
func newCachedObjectClient (downstreamClient client.ObjectClient ) * cachedObjectClient {
59
56
return & cachedObjectClient {
60
- ObjectClient : downstreamClient ,
61
- tables : map [string ]* table {},
62
- buildTableNamesCacheChan : make (chan struct {}, 1 ),
63
- }
64
- }
65
-
66
- // buildCacheOnce makes sure we build the cache just once when it is called concurrently.
67
- // We have a buffered channel here with a capacity of 1 to make sure only one concurrent call makes it through.
68
- // We also have a sync.WaitGroup to make sure all the concurrent calls to buildCacheOnce wait until the cache gets rebuilt since
69
- // we are doing read-through cache, and we do not want to serve stale results.
70
- func buildCacheOnce (buildCacheWg * sync.WaitGroup , buildCacheChan chan struct {}, buildCacheFunc func ()) {
71
- buildCacheWg .Add (1 )
72
- defer buildCacheWg .Done ()
73
-
74
- // when the cache is expired, only one concurrent call must be able to rebuild it
75
- // all other calls will wait until the cache is built successfully or failed with an error
76
- select {
77
- case buildCacheChan <- struct {}{}:
78
- buildCacheFunc ()
79
- <- buildCacheChan
80
- default :
57
+ ObjectClient : downstreamClient ,
58
+ tables : map [string ]* table {},
81
59
}
82
60
}
83
61
84
62
func (c * cachedObjectClient ) RefreshIndexTableNamesCache (ctx context.Context ) {
85
- buildCacheOnce (& c .buildTableNamesCacheWg , c .buildTableNamesCacheChan , func () {
86
- c .err = nil
87
- c .err = c .buildTableNamesCache (ctx , true )
63
+ _ , _ , _ = c .buildCacheGroup .Do (refreshKey , func () (interface {}, error ) {
64
+ return nil , c .buildTableNamesCache (ctx )
88
65
})
89
- c .buildTableNamesCacheWg .Wait ()
90
66
}
91
67
92
68
func (c * cachedObjectClient ) RefreshIndexTableCache (ctx context.Context , tableName string ) {
@@ -97,20 +73,23 @@ func (c *cachedObjectClient) RefreshIndexTableCache(ctx context.Context, tableNa
97
73
return
98
74
}
99
75
100
- buildCacheOnce ( & tbl . buildCacheWg , tbl .buildCacheChan , func () {
101
- tbl . err = nil
102
- tbl . err = tbl . buildCache ( ctx , c . ObjectClient , true )
76
+ _ , _ , _ = tbl .buildCacheGroup . Do ( refreshKey , func () ( interface {}, error ) {
77
+ err := tbl . buildCache ( ctx , c . ObjectClient )
78
+ return nil , err
103
79
})
104
- tbl .buildCacheWg .Wait ()
105
80
}
106
81
107
82
func (c * cachedObjectClient ) List (ctx context.Context , prefix , objectDelimiter string , bypassCache bool ) ([]client.StorageObject , []client.StorageCommonPrefix , error ) {
108
83
if bypassCache {
109
84
return c .ObjectClient .List (ctx , prefix , objectDelimiter )
110
85
}
111
86
87
+ c .tablesMtx .RLock ()
88
+ neverBuiltCache := c .tableNamesCacheBuiltAt .IsZero ()
89
+ c .tablesMtx .RUnlock ()
90
+
112
91
// if we have never built table names cache, let us build it first.
113
- if c . tableNamesCacheBuiltAt . IsZero () {
92
+ if neverBuiltCache {
114
93
c .RefreshIndexTableNamesCache (ctx )
115
94
}
116
95
@@ -141,18 +120,9 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter s
141
120
}
142
121
143
122
func (c * cachedObjectClient ) listTableNames (ctx context.Context ) ([]client.StorageCommonPrefix , error ) {
144
- if time .Since (c .tableNamesCacheBuiltAt ) >= cacheTimeout {
145
- buildCacheOnce (& c .buildTableNamesCacheWg , c .buildTableNamesCacheChan , func () {
146
- c .err = nil
147
- c .err = c .buildTableNamesCache (ctx , false )
148
- })
149
- }
150
-
151
- // wait for cache build operation to finish, if running
152
- c .buildTableNamesCacheWg .Wait ()
153
-
154
- if c .err != nil {
155
- return nil , c .err
123
+ err := c .updateTableNamesCache (ctx )
124
+ if err != nil {
125
+ return nil , err
156
126
}
157
127
158
128
c .tablesMtx .RLock ()
@@ -167,18 +137,9 @@ func (c *cachedObjectClient) listTable(ctx context.Context, tableName string) ([
167
137
return []client.StorageObject {}, []client.StorageCommonPrefix {}, nil
168
138
}
169
139
170
- if time .Since (tbl .cacheBuiltAt ) >= cacheTimeout {
171
- buildCacheOnce (& tbl .buildCacheWg , tbl .buildCacheChan , func () {
172
- tbl .err = nil
173
- tbl .err = tbl .buildCache (ctx , c .ObjectClient , false )
174
- })
175
- }
176
-
177
- // wait for cache build operation to finish, if running
178
- tbl .buildCacheWg .Wait ()
179
-
180
- if tbl .err != nil {
181
- return nil , nil , tbl .err
140
+ err := tbl .updateCache (ctx , c .ObjectClient )
141
+ if err != nil {
142
+ return nil , nil , err
182
143
}
183
144
184
145
tbl .mtx .RLock ()
@@ -193,18 +154,9 @@ func (c *cachedObjectClient) listUserIndexInTable(ctx context.Context, tableName
193
154
return []client.StorageObject {}, nil
194
155
}
195
156
196
- if time .Since (tbl .cacheBuiltAt ) >= cacheTimeout {
197
- buildCacheOnce (& tbl .buildCacheWg , tbl .buildCacheChan , func () {
198
- tbl .err = nil
199
- tbl .err = tbl .buildCache (ctx , c .ObjectClient , false )
200
- })
201
- }
202
-
203
- // wait for cache build operation to finish, if running
204
- tbl .buildCacheWg .Wait ()
205
-
206
- if tbl .err != nil {
207
- return nil , tbl .err
157
+ err := tbl .updateCache (ctx , c .ObjectClient )
158
+ if err != nil {
159
+ return nil , err
208
160
}
209
161
210
162
tbl .mtx .RLock ()
@@ -217,11 +169,21 @@ func (c *cachedObjectClient) listUserIndexInTable(ctx context.Context, tableName
217
169
return []client.StorageObject {}, nil
218
170
}
219
171
220
- func (c * cachedObjectClient ) buildTableNamesCache (ctx context.Context , forceRefresh bool ) (err error ) {
221
- if ! forceRefresh && time .Since (c .tableNamesCacheBuiltAt ) < cacheTimeout {
172
+ // Check if the cache is out of date, and build it if so, ensuring only one cache-build is running at a time.
173
+ func (c * cachedObjectClient ) updateTableNamesCache (ctx context.Context ) error {
174
+ c .tablesMtx .RLock ()
175
+ outOfDate := time .Since (c .tableNamesCacheBuiltAt ) >= cacheTimeout
176
+ c .tablesMtx .RUnlock ()
177
+ if ! outOfDate {
222
178
return nil
223
179
}
180
+ _ , err , _ := c .buildCacheGroup .Do (refreshKey , func () (interface {}, error ) {
181
+ return nil , c .buildTableNamesCache (ctx )
182
+ })
183
+ return err
184
+ }
224
185
186
+ func (c * cachedObjectClient ) buildTableNamesCache (ctx context.Context ) (err error ) {
225
187
defer func () {
226
188
if err != nil {
227
189
level .Error (util_log .Logger ).Log ("msg" , "failed to build table names cache" , "err" , err )
@@ -292,11 +254,22 @@ func (c *cachedObjectClient) getTable(ctx context.Context, tableName string) *ta
292
254
return c .getCachedTable (tableName )
293
255
}
294
256
295
- func (t * table ) buildCache (ctx context.Context , objectClient client.ObjectClient , forceRefresh bool ) (err error ) {
296
- if ! forceRefresh && time .Since (t .cacheBuiltAt ) < cacheTimeout {
257
+ // Check if the cache is out of date, and build it if so, ensuring only one cache-build is running at a time.
258
+ func (t * table ) updateCache (ctx context.Context , objectClient client.ObjectClient ) error {
259
+ t .mtx .RLock ()
260
+ outOfDate := time .Since (t .cacheBuiltAt ) >= cacheTimeout
261
+ t .mtx .RUnlock ()
262
+ if ! outOfDate {
297
263
return nil
298
264
}
265
+ _ , err , _ := t .buildCacheGroup .Do (refreshKey , func () (interface {}, error ) {
266
+ err := t .buildCache (ctx , objectClient )
267
+ return nil , err
268
+ })
269
+ return err
270
+ }
299
271
272
+ func (t * table ) buildCache (ctx context.Context , objectClient client.ObjectClient ) (err error ) {
300
273
defer func () {
301
274
if err != nil {
302
275
level .Error (util_log .Logger ).Log ("msg" , "failed to build table cache" , "table_name" , t .name , "err" , err )
0 commit comments