Skip to content

Commit

Permalink
Procect forwarder watchOverrides from race condition
Browse files Browse the repository at this point in the history
Reworks update logic in watchOverrides loop.
The race condition comes from the unpredictable behaviour of ranging over a map.

This could cause that a newly created queueManagers was added to the ranged map,
and be evaluated for update.
Since the queueManager was _just_ created, it wouldn't have enough running workers,
which would trigger another update.
  • Loading branch information
mapno committed Jun 2, 2022
1 parent 93a1359 commit 4226c83
Showing 1 changed file with 31 additions and 10 deletions.
41 changes: 31 additions & 10 deletions modules/distributor/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,43 @@ func (f *forwarder) watchOverrides() {
select {
case <-time.After(f.overridesInterval):
f.mutex.Lock()

var (
queueManagersToDelete []*queueManager
queueManagersToAdd []struct {
tenantID string
queueSize, workerCount int
}
)

for tenantID, tm := range f.queueManagers {
queueSize, workerCount := f.getQueueManagerConfig(tenantID)

// if the queue size or worker count has changed, shutdown the queue manager and create a new one
if tm.shouldUpdate(queueSize, workerCount) {
go func() {
// shutdown the queue manager
// this will block until all workers have finished and the queue is drained
if err := tm.shutdown(); err != nil {
level.Error(log.Logger).Log("msg", "error shutting down queue manager", "tenant", tenantID, "err", err)
}
}()
delete(f.queueManagers, tenantID)
f.queueManagers[tenantID] = newQueueManager(tenantID, queueSize, workerCount, f.forwardFunc)
queueManagersToDelete = append(queueManagersToDelete, tm)
queueManagersToAdd = append(queueManagersToAdd, struct {
tenantID string
queueSize, workerCount int
}{tenantID: tenantID, queueSize: queueSize, workerCount: workerCount})
}
}

// Spawn a goroutine to asynchronously shut down queue managers
go func() {
for _, qm := range queueManagersToDelete {
// shutdown the queue manager
// this will block until all workers have finished and the queue is drained
if err := qm.shutdown(); err != nil {
level.Error(log.Logger).Log("msg", "error shutting down queue manager", "tenant", qm.tenantID, "err", err)
}
}
}()

// Synchronously update queue managers
for _, qm := range queueManagersToAdd {
f.queueManagers[qm.tenantID] = newQueueManager(qm.tenantID, qm.queueSize, qm.workerCount, f.forwardFunc)
}

f.mutex.Unlock()
case <-f.shutdown:
return
Expand Down

0 comments on commit 4226c83

Please sign in to comment.