Skip to content

Commit

Permalink
query-frontend: Do not break scheduler connection on malformed queries (
Browse files Browse the repository at this point in the history
#9786)

* query-frontend: Do not break scheduler connection on malformed queries

Signed-off-by: Ganesh Vernekar <[email protected]>

* Updated tests

Signed-off-by: Ganesh Vernekar <[email protected]>

* lint

Signed-off-by: Ganesh Vernekar <[email protected]>

---------

Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome authored Oct 31, 2024
1 parent 93b9bc1 commit 587d234
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 11 deletions.
8 changes: 8 additions & 0 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ const (

type enqueueResult struct {
status enqueueStatus
// If the status is failed and if it was because of a client error on the frontend,
// the clientErr should be updated with the appropriate error.
clientErr error

cancelCh chan<- uint64 // Channel that can be used for request cancellation. If nil, cancellation is not possible.
}
Expand Down Expand Up @@ -285,6 +288,11 @@ enqueueAgain:
cancelCh = enqRes.cancelCh
break // go wait for response.
} else if enqRes.status == failed {
if enqRes.clientErr != nil {
// It failed because of a client error. No need to retry.
return nil, nil, httpgrpc.Errorf(http.StatusBadRequest, "failed to enqueue request: %s", enqRes.clientErr.Error())
}

retries--
if retries > 0 {
spanLogger.DebugLog("msg", "enqueuing request failed, will retry")
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ func (w *frontendSchedulerWorker) enqueueRequest(loop schedulerpb.SchedulerForFr
frontendToSchedulerRequest, err := w.toSchedulerAdapter.frontendToSchedulerEnqueueRequest(req, w.frontendAddr)
if err != nil {
level.Warn(spanLogger).Log("msg", "error converting frontend request to scheduler request", "err", err)
req.enqueue <- enqueueResult{status: failed}
return err
req.enqueue <- enqueueResult{status: failed, clientErr: err}
return nil
}

err = loop.Send(frontendToSchedulerRequest)
Expand Down
60 changes: 51 additions & 9 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,59 @@ func TestFrontendTooManyRequests(t *testing.T) {
require.Equal(t, int32(http.StatusTooManyRequests), resp.Code)
}

func TestFrontendEnqueueFailure(t *testing.T) {
f, _ := setupFrontend(t, nil, func(*Frontend, *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
func TestFrontendEnqueueFailures(t *testing.T) {
t.Run("scheduler is shutting down with valid query", func(t *testing.T) {
f, _ := setupFrontend(t, nil, func(*Frontend, *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
})

req := &httpgrpc.HTTPRequest{
Url: "/api/v1/query_range?start=946684800&end=946771200&step=60&query=up",
}
_, _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), req)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "failed to enqueue request"))
})
t.Run("scheduler is running fine", func(t *testing.T) {
f, _ := setupFrontend(t, nil, func(*Frontend, *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
})

req := &httpgrpc.HTTPRequest{
Url: "/api/v1/query_range?start=946684800&end=946771200&step=60",
}
_, _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), req)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "failed to enqueue request"))
cases := []struct {
name, url, error string
}{
{
name: "start time is wrong",
url: "/api/v1/query_range?start=9466camnsd84800&end=946771200&step=60&query=up{}",
error: `rpc error: code = Code(400) desc = failed to enqueue request: invalid parameter "start": cannot parse "9466camnsd84800" to a valid timestamp`,
},
{
name: "end time is wrong",
url: "/api/v1/query_range?start=946684800&end=946771200dgiu&step=60&query=up{}",
error: `rpc error: code = Code(400) desc = failed to enqueue request: invalid parameter "end": cannot parse "946771200dgiu" to a valid timestamp`,
},
{
name: "query time is wrong",
url: "/api/v1/query_range?start=946684800&end=946771200&step=60&query=up{",
error: `rpc error: code = Code(400) desc = failed to enqueue request: invalid parameter "query": 1:4: parse error: unexpected end of input inside braces`,
},
{
name: "no query provided",
url: "/api/v1/query_range?start=946684800&end=946771200&step=60",
error: `rpc error: code = Code(400) desc = failed to enqueue request: invalid parameter "query": unknown position: parse error: no expression found in input`,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
req := &httpgrpc.HTTPRequest{
Url: c.url,
}
_, _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), req)
require.Error(t, err)
require.Equal(t, c.error, err.Error())
})
}
})
}

func TestFrontendCancellation(t *testing.T) {
Expand Down

0 comments on commit 587d234

Please sign in to comment.