-
Notifications
You must be signed in to change notification settings - Fork 544
/
Copy pathqueue.go
313 lines (256 loc) · 9.46 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
package queue
import (
"context"
"errors"
"sync"
"time"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)
const (
// How frequently to check for disconnected queriers that should be forgotten.
forgetCheckPeriod = 5 * time.Second
)
var (
ErrTooManyRequests = errors.New("too many outstanding requests")
ErrStopped = errors.New("queue is stopped")
)
// UserIndex is opaque type that allows to resume iteration over users between successive calls
// of RequestQueue.GetNextRequestForQuerier method.
type UserIndex struct {
last int
}
// Modify index to start iteration on the same user, for which last queue was returned.
func (ui UserIndex) ReuseLastUser() UserIndex {
if ui.last >= 0 {
return UserIndex{last: ui.last - 1}
}
return ui
}
// FirstUser returns UserIndex that starts iteration over user queues from the very first user.
func FirstUser() UserIndex {
return UserIndex{last: -1}
}
// Request stored into the queue.
type Request interface{}
// RequestQueue holds incoming requests in per-user queues. It also assigns each user specified number of queriers,
// and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests
// in a fair fashion.
type RequestQueue struct {
services.Service
connectedQuerierWorkers *atomic.Int32
mtx sync.RWMutex
cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected.
queues *queues
stopped bool
queueLength *prometheus.GaugeVec // Per user and reason.
discardedRequests *prometheus.CounterVec // Per user.
}
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
q := &RequestQueue{
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
}
q.cond = contextCond{Cond: sync.NewCond(&q.mtx)}
q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.forgetDisconnectedQueriers, q.stopping).WithName("request queue")
return q
}
// EnqueueRequest puts the request into the queue. MaxQueries is user-specific value that specifies how many queriers can
// this user use (zero or negative = all queriers). It is passed to each EnqueueRequest, because it can change
// between calls.
//
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int) error {
q.mtx.RLock()
// don't defer a release. we won't know what we need to release until we call getQueueUnderRlock
if q.stopped {
q.mtx.RUnlock()
return ErrStopped
}
// try to grab the user queue under read lock
queue, cleanup, err := q.getQueueUnderRlock(userID, maxQueriers)
defer cleanup()
if err != nil {
return err
}
select {
case queue <- req:
q.queueLength.WithLabelValues(userID).Inc()
q.cond.Broadcast()
return nil
default:
q.discardedRequests.WithLabelValues(userID).Inc()
return ErrTooManyRequests
}
}
// getQueueUnderRlock attempts to get the queue for the given user under read lock. if it is not
// possible it upgrades the RLock to a Lock. This method also returns a cleanup function that
// will release whichever lock it had to acquire to get the queue.
func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan Request, func(), error) {
cleanup := func() {
q.mtx.RUnlock()
}
uq := q.queues.userQueues[userID]
if uq != nil {
return uq.ch, cleanup, nil
}
// trade the read lock for a rw lock and then defer the opposite
// this method should be called under RLock() and return under RLock()
q.mtx.RUnlock()
q.mtx.Lock()
cleanup = func() {
q.mtx.Unlock()
}
queue := q.queues.getOrAddQueue(userID, maxQueriers)
if queue == nil {
// This can only happen if userID is "".
return nil, cleanup, errors.New("no queue found")
}
return queue, cleanup, nil
}
// GetNextRequestForQuerier find next user queue and attempts to dequeue N requests as defined by the length of
// batchBuffer. This slice is a reusable buffer to fill up with requests
func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string, batchBuffer []Request) ([]Request, UserIndex, error) {
requestedCount := len(batchBuffer)
if requestedCount == 0 {
return nil, last, errors.New("batch buffer must have len > 0")
}
q.mtx.Lock()
defer q.mtx.Unlock()
querierWait := false
FindQueue:
// We need to wait if there are no users, or no pending requests for given querier.
for (q.queues.len() == 0 || querierWait) && ctx.Err() == nil && !q.stopped {
querierWait = false
q.cond.Wait(ctx)
}
if q.stopped {
return nil, last, ErrStopped
}
if err := ctx.Err(); err != nil {
return nil, last, err
}
queue, userID, idx := q.queues.getNextQueueForQuerier(last.last, querierID)
last.last = idx
if queue != nil {
// this is all threadsafe b/c all users queues are blocked by q.mtx
if len(queue) < requestedCount {
requestedCount = len(queue)
}
// Pick next requests from the queue.
batchBuffer = batchBuffer[:requestedCount]
for i := 0; i < requestedCount; i++ {
batchBuffer[i] = <-queue
}
qLen := len(queue)
if qLen == 0 {
q.queues.deleteQueue(userID)
}
q.queueLength.WithLabelValues(userID).Set(float64(qLen))
// Tell close() we've processed a request.
q.cond.Broadcast()
return batchBuffer, last, nil
}
// There are no unexpired requests, so we can get back
// and wait for more requests.
querierWait = true
goto FindQueue
}
func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.queues.forgetDisconnectedQueriers(time.Now()) > 0 {
// We need to notify goroutines cause having removed some queriers
// may have caused a resharding.
q.cond.Broadcast()
}
return nil
}
func (q *RequestQueue) stopping(_ error) error {
q.mtx.Lock()
defer q.mtx.Unlock()
for q.queues.len() > 0 && q.connectedQuerierWorkers.Load() > 0 {
q.cond.Wait(context.Background())
}
// Only stop after dispatching enqueued requests.
q.stopped = true
// If there are still goroutines in GetNextRequestForQuerier method, they get notified.
q.cond.Broadcast()
return nil
}
func (q *RequestQueue) RegisterQuerierConnection(querier string) {
q.connectedQuerierWorkers.Inc()
q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.addQuerierConnection(querier)
}
func (q *RequestQueue) UnregisterQuerierConnection(querier string) {
q.connectedQuerierWorkers.Dec()
q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.removeQuerierConnection(querier, time.Now())
}
func (q *RequestQueue) NotifyQuerierShutdown(querierID string) {
q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.notifyQuerierShutdown(querierID)
}
func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 {
return float64(q.connectedQuerierWorkers.Load())
}
// contextCond is a *sync.Cond with Wait() method overridden to support context-based waiting.
type contextCond struct {
*sync.Cond
// testHookBeforeWaiting is called before calling Cond.Wait() if it's not nil.
// Yes, it's ugly, but the http package settled jurisprudence:
// https://github.com/golang/go/blob/6178d25fc0b28724b1b5aec2b1b74fc06d9294c7/src/net/http/client.go#L596-L601
testHookBeforeWaiting func()
}
// Wait does c.cond.Wait() but will also return if the context provided is done.
// All the documentation of sync.Cond.Wait() applies, but it's especially important to remember that the mutex of
// the cond should be held while Wait() is called (and mutex will be held once it returns)
func (c contextCond) Wait(ctx context.Context) {
// "condWait" goroutine does q.cond.Wait() and signals through condWait channel.
condWait := make(chan struct{})
go func() {
if c.testHookBeforeWaiting != nil {
c.testHookBeforeWaiting()
}
c.Cond.Wait()
close(condWait)
}()
// "waiting" goroutine: signals that the condWait goroutine has started waiting.
// Notice that a closed waiting channel implies that the goroutine above has started waiting
// (because it has unlocked the mutex), but the other way is not true:
// - condWait it may have unlocked and is waiting, but someone else locked the mutex faster than us:
// in this case that caller will eventually unlock, and we'll be able to enter here.
// - condWait called Wait(), unlocked, received a broadcast and locked again faster than we were able to lock here:
// in this case condWait channel will be closed, and this goroutine will be waiting until we unlock.
waiting := make(chan struct{})
go func() {
c.L.Lock()
close(waiting)
c.L.Unlock()
}()
select {
case <-condWait:
// We don't know whether the waiting goroutine is done or not, but we don't care:
// it will be done once nobody is fighting for the mutex anymore.
case <-ctx.Done():
// In order to avoid leaking the condWait goroutine, we can send a broadcast.
// Before sending the broadcast we need to make sure that condWait goroutine is already waiting (or has already waited).
select {
case <-condWait:
// No need to broadcast as q.cond.Wait() has returned already.
return
case <-waiting:
// q.cond.Wait() might be still waiting (or maybe not!), so we'll poke it just in case.
c.Broadcast()
}
// Make sure we are not waiting anymore, we need to do that before returning as the caller will need to unlock the mutex.
<-condWait
}
}