Skip to content

Commit

Permalink
Update grpc_push_check to support reads in ingesters
Browse files Browse the repository at this point in the history
This change renames grpc_push_check to grpc_call_check and broadens it to support pushes and reads. This allows read request checks, such as for circuit breaking, to be performed by a tap handle, similar to how push request checks are performed, rather than explicitly performing them inside the gRPC call implementation methods.
  • Loading branch information
jhalterman committed Feb 4, 2025
1 parent e3862e4 commit 7f69a26
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 241 deletions.
4 changes: 4 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,10 @@ func (d *Distributor) StartPushRequest(ctx context.Context, httpgrpcRequestSize
return ctx, err
}

func (d *Distributor) PreparePushRequest(_ context.Context) (func(error), error) {
return nil, nil
}

// startPushRequest does limits checks at the beginning of Push request in distributor.
// This can be called from different places, even multiple times for the same request:
//
Expand Down
5 changes: 0 additions & 5 deletions pkg/ingester/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ const activeSeriesMaxSizeBytes = 1 * 1024 * 1024
// series that match the given matchers.
func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream client.Ingester_ActiveSeriesServer) (err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return err
}
defer func() { finishReadRequest(err) }()

spanlog, ctx := spanlogger.NewWithLogger(stream.Context(), i.logger, "Ingester.ActiveSeries")
defer spanlog.Finish()
Expand Down
125 changes: 50 additions & 75 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ type pushStats struct {
type ctxKey int

var pushReqCtxKey ctxKey = 1
var readReqCtxKey ctxKey = 2

type pushRequestState struct {
requestSize int64
Expand All @@ -992,20 +993,35 @@ type pushRequestState struct {
pushErr error
}

type readRequestState struct {
requestFinish func()
}

func getPushRequestState(ctx context.Context) *pushRequestState {
if st, ok := ctx.Value(pushReqCtxKey).(*pushRequestState); ok {
return st
}
return nil
}

func getReadRequestState(ctx context.Context) *readRequestState {
if st, ok := ctx.Value(readReqCtxKey).(*readRequestState); ok {
return st
}
return nil
}

// StartPushRequest checks if ingester can start push request, and increments relevant counters.
// If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated.
func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) {
ctx, _, err := i.startPushRequest(ctx, reqSize)
return ctx, err
}

func (i *Ingester) PreparePushRequest(_ context.Context) (finishFn func(error), err error) {
return nil, nil
}

func (i *Ingester) FinishPushRequest(ctx context.Context) {
st := getPushRequestState(ctx)
if st == nil {
Expand Down Expand Up @@ -1645,13 +1661,44 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
return nil
}

func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (resp *client.ExemplarQueryResponse, err error) {
// StartReadRequest tries to start a read request. If it was successful, startReadRequest returns a context with a
// function that should be called to finish the started read request once the request is completed. If it wasn't
// successful, the causing error is returned and a nil context is returned.
func (i *Ingester) StartReadRequest(ctx context.Context) (resultCtx context.Context, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()

if err := i.checkAvailableForRead(); err != nil {
return nil, err
}
if err := i.checkReadOverloaded(); err != nil {
return nil, err
}
finish, err := i.circuitBreaker.tryAcquireReadPermit()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()
start := time.Now()
return context.WithValue(ctx, readReqCtxKey, &readRequestState{
requestFinish: func() {
finish(time.Since(start), err)
},
}), nil
}

func (i *Ingester) PrepareReadRequest(_ context.Context) (finishFn func(error), err error) {
return nil, nil
}

func (i *Ingester) FinishReadRequest(ctx context.Context) {
st := getReadRequestState(ctx)
if st == nil {
return
}
st.requestFinish()
}

func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (resp *client.ExemplarQueryResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()

spanlog, ctx := spanlogger.NewWithLogger(ctx, i.logger, "Ingester.QueryExemplars")
defer spanlog.Finish()
Expand Down Expand Up @@ -1711,11 +1758,6 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery

func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (resp *client.LabelValuesResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()

labelName, startTimestampMs, endTimestampMs, hints, matchers, err := client.FromLabelValuesRequest(req)
if err != nil {
Expand Down Expand Up @@ -1770,11 +1812,6 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque

func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (resp *client.LabelNamesResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()

spanlog, ctx := spanlogger.NewWithLogger(ctx, i.logger, "Ingester.LabelNames")
defer spanlog.Finish()
Expand Down Expand Up @@ -1829,11 +1866,6 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest
// MetricsForLabelMatchers implements IngesterServer.
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (resp *client.MetricsForLabelMatchersResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()

userID, err := tenant.TenantID(ctx)
if err != nil {
Expand Down Expand Up @@ -1905,11 +1937,6 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr

func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (resp *client.UserStatsResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()

userID, err := tenant.TenantID(ctx)
if err != nil {
Expand All @@ -1936,11 +1963,6 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest)
// because the purpose of this function is to show a snapshot of the live ingester's state.
func (i *Ingester) AllUserStats(_ context.Context, req *client.UserStatsRequest) (resp *client.UsersStatsResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()

i.tsdbsMtx.RLock()
defer i.tsdbsMtx.RUnlock()
Expand Down Expand Up @@ -1969,11 +1991,6 @@ const labelNamesAndValuesTargetSizeBytes = 1 * 1024 * 1024

func (i *Ingester) LabelNamesAndValues(request *client.LabelNamesAndValuesRequest, stream client.Ingester_LabelNamesAndValuesServer) (err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return err
}
defer func() { finishReadRequest(err) }()

userID, err := tenant.TenantID(stream.Context())
if err != nil {
Expand Down Expand Up @@ -2023,11 +2040,6 @@ const labelValuesCardinalityTargetSizeBytes = 1 * 1024 * 1024

func (i *Ingester) LabelValuesCardinality(req *client.LabelValuesCardinalityRequest, srv client.Ingester_LabelValuesCardinalityServer) (err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return err
}
defer func() { finishReadRequest(err) }()

userID, err := tenant.TenantID(srv.Context())
if err != nil {
Expand Down Expand Up @@ -2109,11 +2121,6 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024
// QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) (err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return err
}
defer func() { finishReadRequest(err) }()

spanlog, ctx := spanlogger.NewWithLogger(stream.Context(), i.logger, "Ingester.QueryStream")
defer spanlog.Finish()
Expand Down Expand Up @@ -3839,33 +3846,6 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

// startReadRequest tries to start a read request.
// If it was successful, startReadRequest returns a function that should be
// called to finish the started read request once the request is completed.
// If it wasn't successful, the causing error is returned. In this case no
// function is returned.
func (i *Ingester) startReadRequest() (func(error), error) {
start := time.Now()
finish, err := i.circuitBreaker.tryAcquireReadPermit()
if err != nil {
return nil, err
}

finishReadRequest := func(err error) {
finish(time.Since(start), err)
}

if err = i.checkAvailableForRead(); err != nil {
finishReadRequest(err)
return nil, err
}
if err = i.checkReadOverloaded(); err != nil {
finishReadRequest(err)
return nil, err
}
return finishReadRequest, nil
}

// checkAvailableForRead checks whether the ingester is available for read requests,
// and if it is not the case returns an unavailableError error.
func (i *Ingester) checkAvailableForRead() error {
Expand Down Expand Up @@ -4037,11 +4017,6 @@ func (i *Ingester) purgeUserMetricsMetadata() {
// MetricsMetadata returns all the metrics metadata of a user.
func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (resp *client.MetricsMetadataResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()

userID, err := tenant.TenantID(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit 7f69a26

Please sign in to comment.