Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frontend cleanup and perf improvement #3996

Merged
merged 5 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [ENHANCEMENT] TraceQL: Attribute iterators collect matched array values [#3867](https://github.com/grafana/tempo/pull/3867) (@electron0zero, @stoewer)
* [ENHANCEMENT] Add bytes and spans received to usage stats [#3983](https://github.com/grafana/tempo/pull/3983) (@joe-elliott)
* [ENHANCEMENT] Prevent massive allocations in the frontend if there is not sufficient pressure from the query pipeline. [#3996](https://github.com/grafana/tempo/pull/3996) (@joe-elliott)

# v2.6.0-rc.0

Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (t *App) initQuerier() (services.Service, error) {
func (t *App) initQueryFrontend() (services.Service, error) {
// cortexTripper is a bridge between http and httpgrpc.
// It does the job of passing data to the cortex frontend code.
cortexTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, frontend.CortexNoQuerierLimits{}, log.Logger, prometheus.DefaultRegisterer)
cortexTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ querier:
query_relevant_ingesters: false
query_frontend:
max_outstanding_per_tenant: 2000
querier_forget_delay: 0s
max_batch_size: 5
log_query_request_headers: ""
max_retries: 2
Expand Down
8 changes: 2 additions & 6 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,15 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {

type CortexNoQuerierLimits struct{}

var _ v1.Limits = (*CortexNoQuerierLimits)(nil)

func (CortexNoQuerierLimits) MaxQueriersPerUser(string) int { return 0 }

// InitFrontend initializes V1 frontend
//
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil
// (if there are no errors), and it uses the returned frontend (if any).
func InitFrontend(cfg v1.Config, limits v1.Limits, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) {
func InitFrontend(cfg v1.Config, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) {
statVersion.Set("v1")
// No scheduler = use original frontend.
fr, err := v1.New(cfg, limits, log, reg)
fr, err := v1.New(cfg, log, reg)
if err != nil {
return nil, nil, err
}
Expand Down
74 changes: 25 additions & 49 deletions modules/frontend/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (

"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)

const (
// How frequently to check for disconnected queriers that should be forgotten.
forgetCheckPeriod = 5 * time.Second
forgetCheckPeriod = 30 * time.Second // every 30 seconds b/c the the stopping code requires there to be no queues. i would like to make this 5-10 minutes but then shutdowns would be blocked
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we put this info somewhere in docs? feel like an internal detail so not sure if we should.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure where i would do that

)

var (
Expand Down Expand Up @@ -49,8 +48,6 @@ type Request interface{}
type RequestQueue struct {
services.Service

connectedQuerierWorkers *atomic.Int32

mtx sync.RWMutex
cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected.
queues *queues
Expand All @@ -60,16 +57,15 @@ type RequestQueue struct {
discardedRequests *prometheus.CounterVec // Per user.
}

func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
func NewRequestQueue(maxOutstandingPerTenant int, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
q := &RequestQueue{
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
queues: newUserQueues(maxOutstandingPerTenant),
queueLength: queueLength,
discardedRequests: discardedRequests,
}

q.cond = contextCond{Cond: sync.NewCond(&q.mtx)}
q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.forgetDisconnectedQueriers, q.stopping).WithName("request queue")
q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.cleanupQueues, q.stopping).WithName("request queue")

return q
}
Expand All @@ -79,7 +75,7 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, que
// between calls.
//
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int) error {
func (q *RequestQueue) EnqueueRequest(userID string, req Request) error {
q.mtx.RLock()
// don't defer a release. we won't know what we need to release until we call getQueueUnderRlock

Expand All @@ -89,7 +85,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
}

// try to grab the user queue under read lock
queue, cleanup, err := q.getQueueUnderRlock(userID, maxQueriers)
queue, cleanup, err := q.getQueueUnderRlock(userID)
defer cleanup()
if err != nil {
return err
Expand All @@ -109,7 +105,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
// getQueueUnderRlock attempts to get the queue for the given user under read lock. if it is not
// possible it upgrades the RLock to a Lock. This method also returns a cleanup function that
// will release whichever lock it had to acquire to get the queue.
func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan Request, func(), error) {
func (q *RequestQueue) getQueueUnderRlock(userID string) (chan Request, func(), error) {
cleanup := func() {
q.mtx.RUnlock()
}
Expand All @@ -128,7 +124,7 @@ func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan
q.mtx.Unlock()
}

queue := q.queues.getOrAddQueue(userID, maxQueriers)
queue := q.queues.getOrAddQueue(userID)
if queue == nil {
// This can only happen if userID is "".
return nil, cleanup, errors.New("no queue found")
Expand Down Expand Up @@ -180,13 +176,10 @@ FindQueue:
}

qLen := len(queue)
if qLen == 0 {
q.queues.deleteQueue(userID)
}
q.queueLength.WithLabelValues(userID).Set(float64(qLen))

// Tell close() we've processed a request.
q.cond.Broadcast()
// q.cond.Broadcast()

return batchBuffer, last, nil
}
Expand All @@ -197,13 +190,22 @@ FindQueue:
goto FindQueue
}

func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error {
func (q *RequestQueue) cleanupQueues(_ context.Context) error {
q.mtx.Lock()
defer q.mtx.Unlock()

if q.queues.forgetDisconnectedQueriers(time.Now()) > 0 {
// We need to notify goroutines cause having removed some queriers
// may have caused a resharding.
removedQueue := false

// look for 0 len queues and remove them
for userID, uq := range q.queues.userQueues {
if uq.ch != nil && len(uq.ch) == 0 {
removedQueue = true
q.queues.deleteQueue(userID)
}
}

// if we removed a queue, notify stopping
if removedQueue {
q.cond.Broadcast()
}

Expand All @@ -214,7 +216,7 @@ func (q *RequestQueue) stopping(_ error) error {
q.mtx.Lock()
defer q.mtx.Unlock()

for q.queues.len() > 0 && q.connectedQuerierWorkers.Load() > 0 {
for q.queues.len() > 0 {
q.cond.Wait(context.Background())
}

Expand All @@ -227,32 +229,6 @@ func (q *RequestQueue) stopping(_ error) error {
return nil
}

func (q *RequestQueue) RegisterQuerierConnection(querier string) {
q.connectedQuerierWorkers.Inc()

q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.addQuerierConnection(querier)
}

func (q *RequestQueue) UnregisterQuerierConnection(querier string) {
q.connectedQuerierWorkers.Dec()

q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.removeQuerierConnection(querier, time.Now())
}

func (q *RequestQueue) NotifyQuerierShutdown(querierID string) {
q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.notifyQuerierShutdown(querierID)
}

func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 {
return float64(q.connectedQuerierWorkers.Load())
}

// contextCond is a *sync.Cond with Wait() method overridden to support context-based waiting.
type contextCond struct {
*sync.Cond
Expand Down
57 changes: 10 additions & 47 deletions modules/frontend/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestGetNextForQuerierOneUser(t *testing.T) {
close(start)

for j := 0; j < messages; j++ {
err := q.EnqueueRequest("test", &mockRequest{}, 0)
err := q.EnqueueRequest("test", &mockRequest{})
require.NoError(t, err)
}

Expand Down Expand Up @@ -65,7 +65,7 @@ func TestGetNextForQuerierRandomUsers(t *testing.T) {
close(start)

for j := 0; j < messages; j++ {
err := q.EnqueueRequest(test.RandomString(), &mockRequest{}, 0)
err := q.EnqueueRequest(test.RandomString(), &mockRequest{})
require.NoError(t, err)
}

Expand Down Expand Up @@ -93,7 +93,7 @@ func TestGetNextBatches(t *testing.T) {
close(start)

for j := 0; j < messages; j++ {
err := q.EnqueueRequest("user", &mockRequest{}, 0)
err := q.EnqueueRequest("user", &mockRequest{})
require.NoError(t, err)
}

Expand Down Expand Up @@ -136,7 +136,7 @@ func benchmarkGetNextForQuerier(b *testing.B, listeners int, messages int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < messages; j++ {
err := q.EnqueueRequest(user, req, 0)
err := q.EnqueueRequest(user, req)
if err != nil {
panic(err)
}
Expand All @@ -160,7 +160,7 @@ func queueWithListeners(ctx context.Context, listeners int, batchSize int, liste
Name: "test_discarded",
}, []string{"user"})

q := NewRequestQueue(100_000, 0, g, c)
q := NewRequestQueue(100_000, g, c)
start := make(chan struct{})

for i := 0; i < listeners; i++ {
Expand All @@ -184,49 +184,12 @@ func queueWithListeners(ctx context.Context, listeners int, batchSize int, liste
}()
}

return q, start
}

func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) {
const forgetDelay = 3 * time.Second

queue := NewRequestQueue(1, forgetDelay,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}))

// Start the queue service.
ctx := context.Background()
require.NoError(t, services.StartAndAwaitRunning(ctx, queue))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, queue))
})

// Two queriers connect.
queue.RegisterQuerierConnection("querier-1")
queue.RegisterQuerierConnection("querier-2")

// Querier-2 waits for a new request.
querier2wg := sync.WaitGroup{}
querier2wg.Add(1)
go func() {
defer querier2wg.Done()
_, _, err := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-2", make([]Request, 1))
require.NoError(t, err)
}()

// Querier-1 crashes (no graceful shutdown notification).
queue.UnregisterQuerierConnection("querier-1")

// Enqueue a request from an user which would be assigned to querier-1.
// NOTE: "user-1" hash falls in the querier-1 shard.
require.NoError(t, queue.EnqueueRequest("user-1", "request", 1))

startTime := time.Now()
querier2wg.Wait()
waitTime := time.Since(startTime)
err := services.StartAndAwaitRunning(context.Background(), q)
if err != nil {
panic(err)
}

// We expect that querier-2 got the request only after querier-1 forget delay is passed.
assert.GreaterOrEqual(t, waitTime.Milliseconds(), forgetDelay.Milliseconds())
return q, start
}

func TestContextCond(t *testing.T) {
Expand Down
Loading