Skip to content

Commit 691b174

Browse files
authored
fix: properly return http status codes from ingester to querier for RPC function calls (#13134)
Signed-off-by: Callum Styan <[email protected]>
1 parent bb864b3 commit 691b174

File tree

3 files changed

+72
-0
lines changed

3 files changed

+72
-0
lines changed

pkg/ingester/ingester.go

+32
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import (
3434
"github.com/prometheus/prometheus/model/labels"
3535
"google.golang.org/grpc/health/grpc_health_v1"
3636

37+
server_util "github.com/grafana/loki/v3/pkg/util/server"
38+
3739
"github.com/grafana/loki/v3/pkg/analytics"
3840
"github.com/grafana/loki/v3/pkg/chunkenc"
3941
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
@@ -1041,6 +1043,13 @@ func (i *Ingester) asyncStoreMaxLookBack() time.Duration {
10411043

10421044
// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb.
10431045
func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
1046+
gcr, err := i.getChunkIDs(ctx, req)
1047+
err = server_util.ClientGrpcStatusAndError(err)
1048+
return gcr, err
1049+
}
1050+
1051+
// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb.
1052+
func (i *Ingester) getChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
10441053
orgID, err := tenant.TenantID(ctx)
10451054
if err != nil {
10461055
return nil, err
@@ -1168,6 +1177,12 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
11681177

11691178
// Series queries the ingester for log stream identifiers (label sets) matching a set of matchers
11701179
func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
1180+
sr, err := i.series(ctx, req)
1181+
err = server_util.ClientGrpcStatusAndError(err)
1182+
return sr, err
1183+
}
1184+
1185+
func (i *Ingester) series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
11711186
instanceID, err := tenant.TenantID(ctx)
11721187
if err != nil {
11731188
return nil, err
@@ -1331,6 +1346,11 @@ func (i *Ingester) getInstances() []*instance {
13311346

13321347
// Tail logs matching given query
13331348
func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error {
1349+
err := i.tail(req, queryServer)
1350+
err = server_util.ClientGrpcStatusAndError(err)
1351+
return err
1352+
}
1353+
func (i *Ingester) tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error {
13341354
select {
13351355
case <-i.tailersQuit:
13361356
return errors.New("Ingester is stopping")
@@ -1376,6 +1396,12 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
13761396

13771397
// TailersCount returns count of active tail requests from a user
13781398
func (i *Ingester) TailersCount(ctx context.Context, _ *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) {
1399+
tcr, err := i.tailersCount(ctx)
1400+
err = server_util.ClientGrpcStatusAndError(err)
1401+
return tcr, err
1402+
}
1403+
1404+
func (i *Ingester) tailersCount(ctx context.Context) (*logproto.TailersCountResponse, error) {
13791405
instanceID, err := tenant.TenantID(ctx)
13801406
if err != nil {
13811407
return nil, err
@@ -1431,6 +1457,12 @@ func (i *Ingester) GetDetectedFields(_ context.Context, r *logproto.DetectedFiel
14311457

14321458
// GetDetectedLabels returns map of detected labels and unique values from this ingester
14331459
func (i *Ingester) GetDetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.LabelToValuesResponse, error) {
1460+
lvr, err := i.getDetectedLabels(ctx, req)
1461+
err = server_util.ClientGrpcStatusAndError(err)
1462+
return lvr, err
1463+
}
1464+
1465+
func (i *Ingester) getDetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.LabelToValuesResponse, error) {
14341466
userID, err := tenant.TenantID(ctx)
14351467
if err != nil {
14361468
return nil, err

pkg/ingester/instance.go

+31
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"github.com/grafana/loki/v3/pkg/util/deletion"
5050
util_log "github.com/grafana/loki/v3/pkg/util/log"
5151
mathutil "github.com/grafana/loki/v3/pkg/util/math"
52+
server_util "github.com/grafana/loki/v3/pkg/util/server"
5253
"github.com/grafana/loki/v3/pkg/validation"
5354
)
5455

@@ -441,6 +442,12 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels
441442
}
442443

443444
func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) {
445+
it, err := i.query(ctx, req)
446+
err = server_util.ClientGrpcStatusAndError(err)
447+
return it, err
448+
}
449+
450+
func (i *instance) query(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) {
444451
expr, err := req.LogSelector()
445452
if err != nil {
446453
return nil, err
@@ -495,6 +502,12 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E
495502
}
496503

497504
func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) {
505+
it, err := i.querySample(ctx, req)
506+
err = server_util.ClientGrpcStatusAndError(err)
507+
return it, err
508+
}
509+
510+
func (i *instance) querySample(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) {
498511
expr, err := req.Expr()
499512
if err != nil {
500513
return nil, err
@@ -556,6 +569,12 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
556569
// If label matchers are given only the matching streams are fetched from the index.
557570
// The label names or values are then retrieved from those matching streams.
558571
func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest, matchers ...*labels.Matcher) (*logproto.LabelResponse, error) {
572+
lr, err := i.label(ctx, req, matchers...)
573+
err = server_util.ClientGrpcStatusAndError(err)
574+
return lr, err
575+
}
576+
577+
func (i *instance) label(ctx context.Context, req *logproto.LabelRequest, matchers ...*labels.Matcher) (*logproto.LabelResponse, error) {
559578
if len(matchers) == 0 {
560579
var labels []string
561580
if req.Values {
@@ -709,6 +728,12 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo
709728
}
710729

711730
func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) {
731+
isr, err := i.getStats(ctx, req)
732+
err = server_util.ClientGrpcStatusAndError(err)
733+
return isr, err
734+
}
735+
736+
func (i *instance) getStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) {
712737
matchers, err := syntax.ParseMatchers(req.Matchers, true)
713738
if err != nil {
714739
return nil, err
@@ -765,6 +790,12 @@ func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest
765790
}
766791

767792
func (i *instance) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) {
793+
vr, err := i.getVolume(ctx, req)
794+
err = server_util.ClientGrpcStatusAndError(err)
795+
return vr, err
796+
}
797+
798+
func (i *instance) getVolume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) {
768799
matchers, err := syntax.ParseMatchers(req.Matchers, true)
769800
if err != nil && req.Matchers != seriesvolume.MatchAny {
770801
return nil, err

pkg/util/server/error.go

+9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,15 @@ const (
2727
ErrDeadlineExceeded = "Request timed out, decrease the duration of the request or add more label matchers (prefer exact match over regex match) to reduce the amount of data processed."
2828
)
2929

30+
func ClientGrpcStatusAndError(err error) error {
31+
if err == nil {
32+
return nil
33+
}
34+
35+
status, newErr := ClientHTTPStatusAndError(err)
36+
return httpgrpc.Errorf(status, "%s", newErr.Error())
37+
}
38+
3039
// WriteError write a go error with the correct status code.
3140
func WriteError(err error, w http.ResponseWriter) {
3241
status, cerr := ClientHTTPStatusAndError(err)

0 commit comments

Comments
 (0)