From 9fa7338ab8f056bad0fbfa1770c4698d4edccc8d Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Date: Thu, 2 Jun 2022 15:40:41 +0200 Subject: [PATCH 1/6] Procect forwarder watchOverrides from race condition Reworks update logic in watchOverrides loop. The race condition comes from the unpredictable behaviour of ranging over a map. This could cause that a newly created queueManagers was added to the ranged map, and be evaluated for update. Since the queueManager was _just_ created, it wouldn't have enough running workers, which would trigger another update. --- modules/distributor/forwarder.go | 41 ++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/modules/distributor/forwarder.go b/modules/distributor/forwarder.go index f4eba0dec6f..5087fe0fb66 100644 --- a/modules/distributor/forwarder.go +++ b/modules/distributor/forwarder.go @@ -138,22 +138,43 @@ func (f *forwarder) watchOverrides() { select { case <-time.After(f.overridesInterval): f.mutex.Lock() + + var ( + queueManagersToDelete []*queueManager + queueManagersToAdd []struct { + tenantID string + queueSize, workerCount int + } + ) + for tenantID, tm := range f.queueManagers { queueSize, workerCount := f.getQueueManagerConfig(tenantID) - // if the queue size or worker count has changed, shutdown the queue manager and create a new one if tm.shouldUpdate(queueSize, workerCount) { - go func() { - // shutdown the queue manager - // this will block until all workers have finished and the queue is drained - if err := tm.shutdown(); err != nil { - level.Error(log.Logger).Log("msg", "error shutting down queue manager", "tenant", tenantID, "err", err) - } - }() - delete(f.queueManagers, tenantID) - f.queueManagers[tenantID] = newQueueManager(tenantID, queueSize, workerCount, f.forwardFunc) + queueManagersToDelete = append(queueManagersToDelete, tm) + queueManagersToAdd = append(queueManagersToAdd, struct { + tenantID string + queueSize, workerCount int + }{tenantID: tenantID, queueSize: queueSize, workerCount: workerCount}) } } + + // Spawn a goroutine to asynchronously shut down queue managers + go func() { + for _, qm := range queueManagersToDelete { + // shutdown the queue manager + // this will block until all workers have finished and the queue is drained + if err := qm.shutdown(); err != nil { + level.Error(log.Logger).Log("msg", "error shutting down queue manager", "tenant", qm.tenantID, "err", err) + } + } + }() + + // Synchronously update queue managers + for _, qm := range queueManagersToAdd { + f.queueManagers[qm.tenantID] = newQueueManager(qm.tenantID, qm.queueSize, qm.workerCount, f.forwardFunc) + } + f.mutex.Unlock() case <-f.shutdown: return From c2102aee94741c39d4f3e7981fad242d974ec9ca Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Date: Thu, 2 Jun 2022 15:46:04 +0200 Subject: [PATCH 2/6] Changelog --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f06ef99be1..da182f6278c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,11 @@ * [FEATURE] metrics-generator: support per-tenant processor configuration [#1434](https://github.com/grafana/tempo/pull/1434) (@kvrhdn) * [FEATURE] Include rollout dashboard [#1456](https://github.com/grafana/tempo/pull/1456) (@zalegrala) * [ENHANCEMENT] Added the ability to have a per tenant max search duration. [#1421](https://github.com/grafana/tempo/pull/1421) (@joe-elliott) -* [ENHANCEMENT] Azure Backend: Add support for authentication with Managed Identities. [#1457](https://github.com/grafana/tempo/pull/1457) (@joe-elliott) * [ENHANCEMENT] metrics-generator: expose max_active_series as a metric [#1471](https://github.com/grafana/tempo/pull/1471) (@kvrhdn) +* [ENHANCEMENT] Azure Backend: Add support for authentication with Managed Identities. * [BUGFIX] Fix nil pointer panic when the trace by id path errors. [#1441](https://github.com/grafana/tempo/pull/1441) (@joe-elliott) * [BUGFIX] Update tempo microservices Helm values example which missed the 'enabled' key for thriftHttp. [#1472](https://github.com/grafana/tempo/pull/1472) (@hajowieland) -* [ENHANCEMENT] Azure Backend: Add support for authentication with Managed Identities. +* [BUGFIX] Fix race condition in forwarder overrides loop. [1468](https://github.com/grafana/tempo/pull/1468) (@mapno) ## v1.4.1 / 2022-05-05 From c8eb3049d6d78a4dd1ec95b40633def55651b779 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Date: Thu, 2 Jun 2022 18:11:31 +0200 Subject: [PATCH 3/6] Add logs to queue managers updates --- modules/distributor/forwarder.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/modules/distributor/forwarder.go b/modules/distributor/forwarder.go index 5087fe0fb66..10c093ad267 100644 --- a/modules/distributor/forwarder.go +++ b/modules/distributor/forwarder.go @@ -151,6 +151,8 @@ func (f *forwarder) watchOverrides() { queueSize, workerCount := f.getQueueManagerConfig(tenantID) // if the queue size or worker count has changed, shutdown the queue manager and create a new one if tm.shouldUpdate(queueSize, workerCount) { + level.Info(log.Logger).Log("msg", "Marking queue manager for update", "tenant", tenantID, + "old_queue_size", tm.queueSize, "new_queue_size", queueSize, "old_worker_count", tm.workerCount, "new_worker_count", workerCount) queueManagersToDelete = append(queueManagersToDelete, tm) queueManagersToAdd = append(queueManagersToAdd, struct { tenantID string @@ -164,6 +166,7 @@ func (f *forwarder) watchOverrides() { for _, qm := range queueManagersToDelete { // shutdown the queue manager // this will block until all workers have finished and the queue is drained + level.Info(log.Logger).Log("msg", "Shutting down queue manager", "tenant", qm.tenantID) if err := qm.shutdown(); err != nil { level.Error(log.Logger).Log("msg", "error shutting down queue manager", "tenant", qm.tenantID, "err", err) } @@ -172,6 +175,7 @@ func (f *forwarder) watchOverrides() { // Synchronously update queue managers for _, qm := range queueManagersToAdd { + level.Info(log.Logger).Log("msg", "Updating queue manager", "tenant", qm.tenantID) f.queueManagers[qm.tenantID] = newQueueManager(qm.tenantID, qm.queueSize, qm.workerCount, f.forwardFunc) } From 112862d3cabab4156b38fc3b651d8982799d8068 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Date: Thu, 2 Jun 2022 18:42:54 +0200 Subject: [PATCH 4/6] Use time.Ticker instead of time.After --- modules/distributor/forwarder.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/distributor/forwarder.go b/modules/distributor/forwarder.go index 10c093ad267..f1f54f05891 100644 --- a/modules/distributor/forwarder.go +++ b/modules/distributor/forwarder.go @@ -134,9 +134,11 @@ func (f *forwarder) getQueueManager(tenantID string) (*queueManager, bool) { // watchOverrides watches the overrides for changes // and updates the queueManagers accordingly func (f *forwarder) watchOverrides() { + ticker := time.NewTicker(f.overridesInterval) + for { select { - case <-time.After(f.overridesInterval): + case <-ticker.C: f.mutex.Lock() var ( @@ -181,6 +183,7 @@ func (f *forwarder) watchOverrides() { f.mutex.Unlock() case <-f.shutdown: + ticker.Stop() return } } From 2ed7d22ca41a35625fd95e1852916bd10d1c0ab5 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Date: Thu, 2 Jun 2022 18:51:51 +0200 Subject: [PATCH 5/6] Fix shouldUpdate signature in queueManager --- modules/distributor/forwarder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/distributor/forwarder.go b/modules/distributor/forwarder.go index f1f54f05891..c4822345d2c 100644 --- a/modules/distributor/forwarder.go +++ b/modules/distributor/forwarder.go @@ -336,7 +336,7 @@ func (m *queueManager) stopWorkers(ctx context.Context) error { } // shouldUpdate returns true if the queue size or worker count (alive or total) has changed -func (m *queueManager) shouldUpdate(numWorkers int, queueSize int) bool { +func (m *queueManager) shouldUpdate(queueSize, numWorkers int) bool { // TODO: worker alive count could be 0 and shutting down the queue manager would be impossible // it'd be better if we were able to spawn new workers instead of just closing the queueManager // and creating a new one From 7c28ef99ab6ef9f712601892400f4a5aad3510c6 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Date: Fri, 3 Jun 2022 09:20:57 +0200 Subject: [PATCH 6/6] Fix changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index da182f6278c..ee534944c46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ * [FEATURE] Include rollout dashboard [#1456](https://github.com/grafana/tempo/pull/1456) (@zalegrala) * [ENHANCEMENT] Added the ability to have a per tenant max search duration. [#1421](https://github.com/grafana/tempo/pull/1421) (@joe-elliott) * [ENHANCEMENT] metrics-generator: expose max_active_series as a metric [#1471](https://github.com/grafana/tempo/pull/1471) (@kvrhdn) -* [ENHANCEMENT] Azure Backend: Add support for authentication with Managed Identities. +* [ENHANCEMENT] Azure Backend: Add support for authentication with Managed Identities. [#1457](https://github.com/grafana/tempo/pull/1457) (@joe-elliott) * [BUGFIX] Fix nil pointer panic when the trace by id path errors. [#1441](https://github.com/grafana/tempo/pull/1441) (@joe-elliott) * [BUGFIX] Update tempo microservices Helm values example which missed the 'enabled' key for thriftHttp. [#1472](https://github.com/grafana/tempo/pull/1472) (@hajowieland) * [BUGFIX] Fix race condition in forwarder overrides loop. [1468](https://github.com/grafana/tempo/pull/1468) (@mapno)