Skip to content

Commit

Permalink
Update grpc_push_check to support reads in ingesters
Browse files Browse the repository at this point in the history
This change renames grpc_push_check to grpc_call_check and broadens it to support pushes and reads. This allows read request checks, such as for circuit breaking, to be performed by a tap handle, similar to how push request checks are performed, rather than explicitly performing them inside the gRPC call implementation methods.
  • Loading branch information
jhalterman committed Feb 3, 2025
1 parent cf0809e commit fb2227b
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 146 deletions.
4 changes: 4 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,10 @@ func (d *Distributor) StartPushRequest(ctx context.Context, httpgrpcRequestSize
return ctx, err
}

func (d *Distributor) PreparePushRequest(_ context.Context) (func(error), error) {
return nil, nil
}

// startPushRequest does limits checks at the beginning of Push request in distributor.
// This can be called from different places, even multiple times for the same request:
//
Expand Down
5 changes: 0 additions & 5 deletions pkg/ingester/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ const activeSeriesMaxSizeBytes = 1 * 1024 * 1024
// series that match the given matchers.
func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream client.Ingester_ActiveSeriesServer) (err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return err
}
defer func() { finishReadRequest(err) }()

spanlog, ctx := spanlogger.NewWithLogger(stream.Context(), i.logger, "Ingester.ActiveSeries")
defer spanlog.Finish()
Expand Down
129 changes: 53 additions & 76 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,8 @@ type pushStats struct {
type ctxKey int

var pushReqCtxKey ctxKey = 1
var readReqCtxKey ctxKey = 2
var readPermitCtxKey ctxKey = 3

type pushRequestState struct {
requestSize int64
Expand All @@ -992,20 +994,35 @@ type pushRequestState struct {
pushErr error
}

type readRequestState struct {
requestFinish func()
}

func getPushRequestState(ctx context.Context) *pushRequestState {
if st, ok := ctx.Value(pushReqCtxKey).(*pushRequestState); ok {
return st
}
return nil
}

func getReadRequestState(ctx context.Context) *readRequestState {
if st, ok := ctx.Value(readReqCtxKey).(*readRequestState); ok {
return st
}
return nil
}

// StartPushRequest checks if ingester can start push request, and increments relevant counters.
// If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated.
func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) {
ctx, _, err := i.startPushRequest(ctx, reqSize)
return ctx, err
}

func (i *Ingester) PreparePushRequest(ctx context.Context) (finishFn func(error), err error) {
return nil, nil
}

func (i *Ingester) FinishPushRequest(ctx context.Context) {
st := getPushRequestState(ctx)
if st == nil {
Expand Down Expand Up @@ -1645,13 +1662,45 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
return nil
}

func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (resp *client.ExemplarQueryResponse, err error) {
// StartReadRequest tries to start a read request. If it was successful, startReadRequest returns a context with a
// function that should be called to finish the started read request once the request is completed. If it wasn't
// successful, the causing error is returned and a nil context is returned.
func (i *Ingester) StartReadRequest(ctx context.Context) (resultCtx context.Context, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {

if err := i.checkAvailableForRead(); err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()
if err := i.checkReadOverloaded(); err != nil {
return nil, err
}
if finish, err := i.circuitBreaker.tryAcquireReadPermit(); err != nil {
return nil, err
} else {
start := time.Now()
ctx = context.WithValue(ctx, readReqCtxKey, &readRequestState{
requestFinish: func() {
finish(time.Since(start), err)
},
})
}
return ctx, nil
}

func (i *Ingester) PrepareReadRequest(ctx context.Context) (finishFn func(error), err error) {
return nil, nil
}

func (i *Ingester) FinishReadRequest(ctx context.Context) {
st := getReadRequestState(ctx)
if st == nil {
return
}
st.requestFinish()
}

func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (resp *client.ExemplarQueryResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()

spanlog, ctx := spanlogger.NewWithLogger(ctx, i.logger, "Ingester.QueryExemplars")
defer spanlog.Finish()
Expand Down Expand Up @@ -1711,11 +1760,6 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery

func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (resp *client.LabelValuesResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()

labelName, startTimestampMs, endTimestampMs, matchers, err := client.FromLabelValuesRequest(req)
if err != nil {
Expand Down Expand Up @@ -1764,11 +1808,6 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque

func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (resp *client.LabelNamesResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()

spanlog, ctx := spanlogger.NewWithLogger(ctx, i.logger, "Ingester.LabelNames")
defer spanlog.Finish()
Expand Down Expand Up @@ -1817,11 +1856,6 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest
// MetricsForLabelMatchers implements IngesterServer.
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (resp *client.MetricsForLabelMatchersResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()

userID, err := tenant.TenantID(ctx)
if err != nil {
Expand Down Expand Up @@ -1893,11 +1927,6 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr

func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (resp *client.UserStatsResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()

userID, err := tenant.TenantID(ctx)
if err != nil {
Expand All @@ -1924,11 +1953,6 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest)
// because the purpose of this function is to show a snapshot of the live ingester's state.
func (i *Ingester) AllUserStats(_ context.Context, req *client.UserStatsRequest) (resp *client.UsersStatsResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()

i.tsdbsMtx.RLock()
defer i.tsdbsMtx.RUnlock()
Expand Down Expand Up @@ -1957,11 +1981,6 @@ const labelNamesAndValuesTargetSizeBytes = 1 * 1024 * 1024

func (i *Ingester) LabelNamesAndValues(request *client.LabelNamesAndValuesRequest, stream client.Ingester_LabelNamesAndValuesServer) (err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return err
}
defer func() { finishReadRequest(err) }()

userID, err := tenant.TenantID(stream.Context())
if err != nil {
Expand Down Expand Up @@ -2011,11 +2030,6 @@ const labelValuesCardinalityTargetSizeBytes = 1 * 1024 * 1024

func (i *Ingester) LabelValuesCardinality(req *client.LabelValuesCardinalityRequest, srv client.Ingester_LabelValuesCardinalityServer) (err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return err
}
defer func() { finishReadRequest(err) }()

userID, err := tenant.TenantID(srv.Context())
if err != nil {
Expand Down Expand Up @@ -2097,11 +2111,6 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024
// QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) (err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return err
}
defer func() { finishReadRequest(err) }()

spanlog, ctx := spanlogger.NewWithLogger(stream.Context(), i.logger, "Ingester.QueryStream")
defer spanlog.Finish()
Expand Down Expand Up @@ -3827,33 +3836,6 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

// startReadRequest tries to start a read request.
// If it was successful, startReadRequest returns a function that should be
// called to finish the started read request once the request is completed.
// If it wasn't successful, the causing error is returned. In this case no
// function is returned.
func (i *Ingester) startReadRequest() (func(error), error) {
start := time.Now()
finish, err := i.circuitBreaker.tryAcquireReadPermit()
if err != nil {
return nil, err
}

finishReadRequest := func(err error) {
finish(time.Since(start), err)
}

if err = i.checkAvailableForRead(); err != nil {
finishReadRequest(err)
return nil, err
}
if err = i.checkReadOverloaded(); err != nil {
finishReadRequest(err)
return nil, err
}
return finishReadRequest, nil
}

// checkAvailableForRead checks whether the ingester is available for read requests,
// and if it is not the case returns an unavailableError error.
func (i *Ingester) checkAvailableForRead() error {
Expand Down Expand Up @@ -4025,11 +4007,6 @@ func (i *Ingester) purgeUserMetricsMetadata() {
// MetricsMetadata returns all the metrics metadata of a user.
func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (resp *client.MetricsMetadataResponse, err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
finishReadRequest, err := i.startReadRequest()
if err != nil {
return nil, err
}
defer func() { finishReadRequest(err) }()

userID, err := tenant.TenantID(ctx)
if err != nil {
Expand Down
68 changes: 51 additions & 17 deletions pkg/mimir/grpc_push_check.go → pkg/mimir/grpc_call_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,42 @@ import (

type pushReceiver interface {
StartPushRequest(ctx context.Context, requestSize int64) (context.Context, error)
PreparePushRequest(ctx context.Context) (func(error), error)
FinishPushRequest(ctx context.Context)
}

// getPushReceiver function must be constant -- return same value on each call.
type ingesterReceiver interface {
pushReceiver
// StartReadRequest is called before a request has been allocated resources, and hasn't yet been scheduled for processing.
StartReadRequest(ctx context.Context) (context.Context, error)
// PrepareReadRequest is called when a request is being processed, and may return a finish func.
PrepareReadRequest(ctx context.Context) (func(error), error)
// FinishReadRequest is called when a request is done being processed.
FinishReadRequest(ctx context.Context)
}

// pushReceiver function must be pure, returning the same value on each call.
// if getIngester or getDistributor functions are nil, those specific checks are not used.
func newGrpcInflightMethodLimiter(getIngester, getDistributor func() pushReceiver) *grpcInflightMethodLimiter {
func newGrpcInflightMethodLimiter(getIngester func() ingesterReceiver, getDistributor func() pushReceiver) *grpcInflightMethodLimiter {
return &grpcInflightMethodLimiter{getIngester: getIngester, getDistributor: getDistributor}
}

// grpcInflightMethodLimiter implements gRPC TapHandle and gRPC stats.Handler.
type grpcInflightMethodLimiter struct {
getIngester func() pushReceiver
getIngester func() ingesterReceiver
getDistributor func() pushReceiver
}

type ctxKey int

const (
pushTypeCtxKey ctxKey = 1 // ingester or distributor push
rpcCallCtxKey ctxKey = 1 // ingester or distributor push

pushTypeIngester = 1
pushTypeDistributor = 2
rpcCallIngesterPush = 1
rpcCallIngesterRead = 2
rpcCallDistributorPush = 3

ingesterMethod string = "/cortex.Ingester"
ingesterPushMethod string = "/cortex.Ingester/Push"
httpgrpcHandleMethod string = "/httpgrpc.HTTP/Handle"
)
Expand All @@ -50,19 +63,27 @@ var errNoIngester = status.Error(codes.Unavailable, "no ingester")
var errNoDistributor = status.Error(codes.Unavailable, "no distributor")

func (g *grpcInflightMethodLimiter) RPCCallStarting(ctx context.Context, methodName string, md metadata.MD) (context.Context, error) {
if g.getIngester != nil && methodName == ingesterPushMethod {
if g.getIngester != nil && strings.HasPrefix(methodName, ingesterMethod) {
ing := g.getIngester()
if ing == nil {
// We return error here, to make sure that RPCCallFinished doesn't get called for this RPC call.
return ctx, errNoIngester
}

ctx, err := ing.StartPushRequest(ctx, getMessageSize(md, grpcutil.MetadataMessageSize))
var err error
var rpcCallCtxValue int
if methodName == ingesterPushMethod {
ctx, err = ing.StartPushRequest(ctx, getMessageSize(md, grpcutil.MetadataMessageSize))
rpcCallCtxValue = rpcCallIngesterPush
} else {
ctx, err = ing.StartReadRequest(ctx)
rpcCallCtxValue = rpcCallIngesterRead
}

if err != nil {
return ctx, status.Error(codes.Unavailable, err.Error())
}

return context.WithValue(ctx, pushTypeCtxKey, pushTypeIngester), nil
return context.WithValue(ctx, rpcCallCtxKey, rpcCallCtxValue), nil
}

if g.getDistributor != nil && methodName == httpgrpcHandleMethod {
Expand All @@ -81,24 +102,37 @@ func (g *grpcInflightMethodLimiter) RPCCallStarting(ctx context.Context, methodN
return ctx, status.Error(codes.Unavailable, err.Error())
}

return context.WithValue(ctx, pushTypeCtxKey, pushTypeDistributor), nil
return context.WithValue(ctx, rpcCallCtxKey, rpcCallDistributorPush), nil
}
}

return ctx, nil
}

func (g *grpcInflightMethodLimiter) RPCCallProcessing(_ context.Context, _ string) (func(error), error) {
func (g *grpcInflightMethodLimiter) RPCCallProcessing(ctx context.Context, methodName string) (func(error), error) {
if rpcCall, ok := ctx.Value(rpcCallCtxKey).(int); ok {
switch rpcCall {
case rpcCallIngesterPush:
if ing := g.getIngester(); ing != nil {
return ing.PreparePushRequest(ctx)
}
case rpcCallIngesterRead:
if ing := g.getIngester(); ing != nil {
return ing.PrepareReadRequest(ctx)
}
}
}
return nil, nil
}

func (g *grpcInflightMethodLimiter) RPCCallFinished(ctx context.Context) {
if pt, ok := ctx.Value(pushTypeCtxKey).(int); ok {
switch pt {
case pushTypeIngester:
if rpcCall, ok := ctx.Value(rpcCallCtxKey).(int); ok {
switch rpcCall {
case rpcCallIngesterPush:
g.getIngester().FinishPushRequest(ctx)

case pushTypeDistributor:
case rpcCallIngesterRead:
g.getIngester().FinishReadRequest(ctx)
case rpcCallDistributorPush:
g.getDistributor().FinishPushRequest(ctx)
}
}
Expand Down
Loading

0 comments on commit fb2227b

Please sign in to comment.