diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 80418c1e980..721f633684d 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1395,6 +1395,10 @@ func (d *Distributor) StartPushRequest(ctx context.Context, httpgrpcRequestSize return ctx, err } +func (d *Distributor) PreparePushRequest(_ context.Context) (func(error), error) { + return nil, nil +} + // startPushRequest does limits checks at the beginning of Push request in distributor. // This can be called from different places, even multiple times for the same request: // diff --git a/pkg/ingester/active_series.go b/pkg/ingester/active_series.go index c0c40b8de10..abf45e77adf 100644 --- a/pkg/ingester/active_series.go +++ b/pkg/ingester/active_series.go @@ -28,11 +28,6 @@ const activeSeriesMaxSizeBytes = 1 * 1024 * 1024 // series that match the given matchers. func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream client.Ingester_ActiveSeriesServer) (err error) { defer func() { err = i.mapReadErrorToErrorWithStatus(err) }() - finishReadRequest, err := i.startReadRequest() - if err != nil { - return err - } - defer func() { finishReadRequest(err) }() spanlog, ctx := spanlogger.NewWithLogger(stream.Context(), i.logger, "Ingester.ActiveSeries") defer spanlog.Finish() diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 415531d9a80..dcd88d4d90c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -984,6 +984,7 @@ type pushStats struct { type ctxKey int var pushReqCtxKey ctxKey = 1 +var readReqCtxKey ctxKey = 2 type pushRequestState struct { requestSize int64 @@ -992,6 +993,10 @@ type pushRequestState struct { pushErr error } +type readRequestState struct { + requestFinish func() +} + func getPushRequestState(ctx context.Context) *pushRequestState { if st, ok := ctx.Value(pushReqCtxKey).(*pushRequestState); ok { return st @@ -999,6 +1004,13 @@ func getPushRequestState(ctx context.Context) *pushRequestState { return nil } +func getReadRequestState(ctx context.Context) *readRequestState { + if st, ok := ctx.Value(readReqCtxKey).(*readRequestState); ok { + return st + } + return nil +} + // StartPushRequest checks if ingester can start push request, and increments relevant counters. // If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated. func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) { @@ -1006,6 +1018,10 @@ func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context return ctx, err } +func (i *Ingester) PreparePushRequest(_ context.Context) (finishFn func(error), err error) { + return nil, nil +} + func (i *Ingester) FinishPushRequest(ctx context.Context) { st := getPushRequestState(ctx) if st == nil { @@ -1645,13 +1661,44 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return nil } -func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (resp *client.ExemplarQueryResponse, err error) { +// StartReadRequest tries to start a read request. If it was successful, startReadRequest returns a context with a +// function that should be called to finish the started read request once the request is completed. If it wasn't +// successful, the causing error is returned and a nil context is returned. +func (i *Ingester) StartReadRequest(ctx context.Context) (resultCtx context.Context, err error) { defer func() { err = i.mapReadErrorToErrorWithStatus(err) }() - finishReadRequest, err := i.startReadRequest() + + if err := i.checkAvailableForRead(); err != nil { + return nil, err + } + if err := i.checkReadOverloaded(); err != nil { + return nil, err + } + finish, err := i.circuitBreaker.tryAcquireReadPermit() if err != nil { return nil, err } - defer func() { finishReadRequest(err) }() + start := time.Now() + return context.WithValue(ctx, readReqCtxKey, &readRequestState{ + requestFinish: func() { + finish(time.Since(start), err) + }, + }), nil +} + +func (i *Ingester) PrepareReadRequest(_ context.Context) (finishFn func(error), err error) { + return nil, nil +} + +func (i *Ingester) FinishReadRequest(ctx context.Context) { + st := getReadRequestState(ctx) + if st == nil { + return + } + st.requestFinish() +} + +func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (resp *client.ExemplarQueryResponse, err error) { + defer func() { err = i.mapReadErrorToErrorWithStatus(err) }() spanlog, ctx := spanlogger.NewWithLogger(ctx, i.logger, "Ingester.QueryExemplars") defer spanlog.Finish() @@ -1711,11 +1758,6 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (resp *client.LabelValuesResponse, err error) { defer func() { err = i.mapReadErrorToErrorWithStatus(err) }() - finishReadRequest, err := i.startReadRequest() - if err != nil { - return nil, err - } - defer func() { finishReadRequest(err) }() labelName, startTimestampMs, endTimestampMs, hints, matchers, err := client.FromLabelValuesRequest(req) if err != nil { @@ -1770,11 +1812,6 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (resp *client.LabelNamesResponse, err error) { defer func() { err = i.mapReadErrorToErrorWithStatus(err) }() - finishReadRequest, err := i.startReadRequest() - if err != nil { - return nil, err - } - defer func() { finishReadRequest(err) }() spanlog, ctx := spanlogger.NewWithLogger(ctx, i.logger, "Ingester.LabelNames") defer spanlog.Finish() @@ -1829,11 +1866,6 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest // MetricsForLabelMatchers implements IngesterServer. func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (resp *client.MetricsForLabelMatchersResponse, err error) { defer func() { err = i.mapReadErrorToErrorWithStatus(err) }() - finishReadRequest, err := i.startReadRequest() - if err != nil { - return nil, err - } - defer func() { finishReadRequest(err) }() userID, err := tenant.TenantID(ctx) if err != nil { @@ -1905,11 +1937,6 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (resp *client.UserStatsResponse, err error) { defer func() { err = i.mapReadErrorToErrorWithStatus(err) }() - finishReadRequest, err := i.startReadRequest() - if err != nil { - return nil, err - } - defer func() { finishReadRequest(err) }() userID, err := tenant.TenantID(ctx) if err != nil { @@ -1936,11 +1963,6 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) // because the purpose of this function is to show a snapshot of the live ingester's state. func (i *Ingester) AllUserStats(_ context.Context, req *client.UserStatsRequest) (resp *client.UsersStatsResponse, err error) { defer func() { err = i.mapReadErrorToErrorWithStatus(err) }() - finishReadRequest, err := i.startReadRequest() - if err != nil { - return nil, err - } - defer func() { finishReadRequest(err) }() i.tsdbsMtx.RLock() defer i.tsdbsMtx.RUnlock() @@ -1969,11 +1991,6 @@ const labelNamesAndValuesTargetSizeBytes = 1 * 1024 * 1024 func (i *Ingester) LabelNamesAndValues(request *client.LabelNamesAndValuesRequest, stream client.Ingester_LabelNamesAndValuesServer) (err error) { defer func() { err = i.mapReadErrorToErrorWithStatus(err) }() - finishReadRequest, err := i.startReadRequest() - if err != nil { - return err - } - defer func() { finishReadRequest(err) }() userID, err := tenant.TenantID(stream.Context()) if err != nil { @@ -2023,11 +2040,6 @@ const labelValuesCardinalityTargetSizeBytes = 1 * 1024 * 1024 func (i *Ingester) LabelValuesCardinality(req *client.LabelValuesCardinalityRequest, srv client.Ingester_LabelValuesCardinalityServer) (err error) { defer func() { err = i.mapReadErrorToErrorWithStatus(err) }() - finishReadRequest, err := i.startReadRequest() - if err != nil { - return err - } - defer func() { finishReadRequest(err) }() userID, err := tenant.TenantID(srv.Context()) if err != nil { @@ -2109,11 +2121,6 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024 // QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) (err error) { defer func() { err = i.mapReadErrorToErrorWithStatus(err) }() - finishReadRequest, err := i.startReadRequest() - if err != nil { - return err - } - defer func() { finishReadRequest(err) }() spanlog, ctx := spanlogger.NewWithLogger(stream.Context(), i.logger, "Ingester.QueryStream") defer spanlog.Finish() @@ -3839,33 +3846,6 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNoContent) } -// startReadRequest tries to start a read request. -// If it was successful, startReadRequest returns a function that should be -// called to finish the started read request once the request is completed. -// If it wasn't successful, the causing error is returned. In this case no -// function is returned. -func (i *Ingester) startReadRequest() (func(error), error) { - start := time.Now() - finish, err := i.circuitBreaker.tryAcquireReadPermit() - if err != nil { - return nil, err - } - - finishReadRequest := func(err error) { - finish(time.Since(start), err) - } - - if err = i.checkAvailableForRead(); err != nil { - finishReadRequest(err) - return nil, err - } - if err = i.checkReadOverloaded(); err != nil { - finishReadRequest(err) - return nil, err - } - return finishReadRequest, nil -} - // checkAvailableForRead checks whether the ingester is available for read requests, // and if it is not the case returns an unavailableError error. func (i *Ingester) checkAvailableForRead() error { @@ -4037,11 +4017,6 @@ func (i *Ingester) purgeUserMetricsMetadata() { // MetricsMetadata returns all the metrics metadata of a user. func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (resp *client.MetricsMetadataResponse, err error) { defer func() { err = i.mapReadErrorToErrorWithStatus(err) }() - finishReadRequest, err := i.startReadRequest() - if err != nil { - return nil, err - } - defer func() { finishReadRequest(err) }() userID, err := tenant.TenantID(ctx) if err != nil { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index d7eec717668..a0698dddb0a 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -328,24 +328,24 @@ func TestIngester_StartReadRequest(t *testing.T) { } defer services.StopAndAwaitTerminated(context.Background(), failingIng) //nolint:errcheck - finish, err := failingIng.startReadRequest() + ctx, err := failingIng.StartReadRequest(context.Background()) require.Equal(t, int64(tc.expectedAcquiredPermitCount), acquiredPermitCount.Load()) if err == nil { require.Nil(t, tc.verifyErr) - require.NotNil(t, finish) + require.NotNil(t, ctx) // Calling finish must release a potentially acquired permit // and in that case record a success, and no failures. expectedSuccessCount := acquiredPermitCount.Load() - finish(err) + failingIng.FinishReadRequest(ctx) require.Equal(t, int64(0), acquiredPermitCount.Load()) require.Equal(t, expectedSuccessCount, recordedSuccessCount.Load()) require.Equal(t, int64(0), recordedFailureCount.Load()) } else { require.NotNil(t, tc.verifyErr) tc.verifyErr(err) - require.Nil(t, finish) + require.Nil(t, ctx) } }) } @@ -4121,21 +4121,6 @@ func Test_Ingester_LabelNames(t *testing.T) { require.NoError(t, err) assert.ElementsMatch(t, expected, res.LabelNames) }) - - t.Run("limited due to resource utilization", func(t *testing.T) { - origLimiter := i.utilizationBasedLimiter - t.Cleanup(func() { - i.utilizationBasedLimiter = origLimiter - }) - i.utilizationBasedLimiter = &fakeUtilizationBasedLimiter{limitingReason: "cpu"} - - _, err := i.LabelNames(ctx, &client.LabelNamesRequest{}) - stat, ok := grpcutil.ErrorToStatus(err) - require.True(t, ok) - require.Equal(t, codes.ResourceExhausted, stat.Code()) - require.Equal(t, ingesterTooBusyMsg, stat.Message()) - verifyUtilizationLimitedRequestsMetric(t, registry) - }) } func Test_Ingester_LabelValues(t *testing.T) { @@ -4414,21 +4399,6 @@ func TestIngester_LabelNamesAndValues(t *testing.T) { assert.ElementsMatch(t, extractItemsWithSortedValues(s.SentResponses), tc.expected) }) } - - t.Run("limited due to resource utilization", func(t *testing.T) { - origLimiter := i.utilizationBasedLimiter - t.Cleanup(func() { - i.utilizationBasedLimiter = origLimiter - }) - i.utilizationBasedLimiter = &fakeUtilizationBasedLimiter{limitingReason: "cpu"} - - err := i.LabelNamesAndValues(&client.LabelNamesAndValuesRequest{}, nil) - stat, ok := grpcutil.ErrorToStatus(err) - require.True(t, ok) - require.Equal(t, codes.ResourceExhausted, stat.Code()) - require.Equal(t, ingesterTooBusyMsg, stat.Message()) - verifyUtilizationLimitedRequestsMetric(t, registry) - }) } func TestIngester_LabelValuesCardinality(t *testing.T) { @@ -4534,21 +4504,6 @@ func TestIngester_LabelValuesCardinality(t *testing.T) { require.ElementsMatch(t, s.SentResponses[0].Items, tc.expectedItems) }) } - - t.Run("limited due to resource utilization", func(t *testing.T) { - origLimiter := i.utilizationBasedLimiter - t.Cleanup(func() { - i.utilizationBasedLimiter = origLimiter - }) - i.utilizationBasedLimiter = &fakeUtilizationBasedLimiter{limitingReason: "cpu"} - - err := i.LabelValuesCardinality(&client.LabelValuesCardinalityRequest{}, nil) - stat, ok := grpcutil.ErrorToStatus(err) - require.True(t, ok) - require.Equal(t, codes.ResourceExhausted, stat.Code()) - require.Equal(t, ingesterTooBusyMsg, stat.Message()) - verifyUtilizationLimitedRequestsMetric(t, registry) - }) } type series struct { @@ -5013,21 +4968,6 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) { assert.ElementsMatch(t, testData.expected, res.Metric) }) } - - t.Run("limited due to resource utilization", func(t *testing.T) { - origLimiter := i.utilizationBasedLimiter - t.Cleanup(func() { - i.utilizationBasedLimiter = origLimiter - }) - i.utilizationBasedLimiter = &fakeUtilizationBasedLimiter{limitingReason: "cpu"} - - _, err := i.MetricsForLabelMatchers(ctx, &client.MetricsForLabelMatchersRequest{}) - stat, ok := grpcutil.ErrorToStatus(err) - require.True(t, ok) - require.Equal(t, codes.ResourceExhausted, stat.Code()) - require.Equal(t, ingesterTooBusyMsg, stat.Message()) - verifyUtilizationLimitedRequestsMetric(t, registry) - }) } func Test_Ingester_MetricsForLabelMatchers_Deduplication(t *testing.T) { @@ -5419,7 +5359,7 @@ func TestIngester_QueryStream(t *testing.T) { }) i.utilizationBasedLimiter = &fakeUtilizationBasedLimiter{limitingReason: "cpu"} - err = i.QueryStream(&client.QueryRequest{}, nil) + _, err := i.StartReadRequest(context.Background()) stat, ok := grpcutil.ErrorToStatus(err) require.True(t, ok) require.Equal(t, codes.ResourceExhausted, stat.Code()) @@ -5857,7 +5797,7 @@ func TestIngester_QueryStream_StreamingWithManySeries(t *testing.T) { func TestIngester_QueryExemplars(t *testing.T) { cfg := defaultIngesterTestConfig(t) - ctx := user.InjectOrgID(context.Background(), userID) + user.InjectOrgID(context.Background(), userID) registry := prometheus.NewRegistry() i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, registry) require.NoError(t, err) @@ -5870,21 +5810,6 @@ func TestIngester_QueryExemplars(t *testing.T) { test.Poll(t, 1*time.Second, 1, func() interface{} { return i.lifecycler.HealthyInstancesCount() }) - - t.Run("limited due to resource utilization", func(t *testing.T) { - origLimiter := i.utilizationBasedLimiter - t.Cleanup(func() { - i.utilizationBasedLimiter = origLimiter - }) - i.utilizationBasedLimiter = &fakeUtilizationBasedLimiter{limitingReason: "cpu"} - - _, err := i.QueryExemplars(ctx, &client.ExemplarQueryRequest{}) - stat, ok := grpcutil.ErrorToStatus(err) - require.True(t, ok) - require.Equal(t, codes.ResourceExhausted, stat.Code()) - require.Equal(t, ingesterTooBusyMsg, stat.Message()) - verifyUtilizationLimitedRequestsMetric(t, registry) - }) } // This test shows a single ingester returns compacted OOO and in-order chunks separately after compaction, even if they overlap. @@ -7408,21 +7333,6 @@ func Test_Ingester_UserStats(t *testing.T) { // Active series are considered according to the wall time during the push, not the sample timestamp. // Therefore all three series are still active at this point. assert.Equal(t, uint64(3), res.NumSeries) - - t.Run("limited due to resource utilization", func(t *testing.T) { - origLimiter := i.utilizationBasedLimiter - t.Cleanup(func() { - i.utilizationBasedLimiter = origLimiter - }) - i.utilizationBasedLimiter = &fakeUtilizationBasedLimiter{limitingReason: "cpu"} - - _, err := i.UserStats(ctx, &client.UserStatsRequest{}) - stat, ok := grpcutil.ErrorToStatus(err) - require.True(t, ok) - require.Equal(t, codes.ResourceExhausted, stat.Code()) - require.Equal(t, ingesterTooBusyMsg, stat.Message()) - verifyUtilizationLimitedRequestsMetric(t, registry) - }) } func Test_Ingester_AllUserStats(t *testing.T) { diff --git a/pkg/mimir/grpc_push_check.go b/pkg/mimir/grpc_call_check.go similarity index 56% rename from pkg/mimir/grpc_push_check.go rename to pkg/mimir/grpc_call_check.go index 735d195cb18..70e0a2b0cf9 100644 --- a/pkg/mimir/grpc_push_check.go +++ b/pkg/mimir/grpc_call_check.go @@ -19,29 +19,42 @@ import ( type pushReceiver interface { StartPushRequest(ctx context.Context, requestSize int64) (context.Context, error) + PreparePushRequest(ctx context.Context) (func(error), error) FinishPushRequest(ctx context.Context) } -// getPushReceiver function must be constant -- return same value on each call. +type ingesterReceiver interface { + pushReceiver + // StartReadRequest is called before a request has been allocated resources, and hasn't yet been scheduled for processing. + StartReadRequest(ctx context.Context) (context.Context, error) + // PrepareReadRequest is called when a request is being processed, and may return a finish func. + PrepareReadRequest(ctx context.Context) (func(error), error) + // FinishReadRequest is called when a request is done being processed. + FinishReadRequest(ctx context.Context) +} + +// pushReceiver function must be pure, returning the same value on each call. // if getIngester or getDistributor functions are nil, those specific checks are not used. -func newGrpcInflightMethodLimiter(getIngester, getDistributor func() pushReceiver) *grpcInflightMethodLimiter { +func newGrpcInflightMethodLimiter(getIngester func() ingesterReceiver, getDistributor func() pushReceiver) *grpcInflightMethodLimiter { return &grpcInflightMethodLimiter{getIngester: getIngester, getDistributor: getDistributor} } // grpcInflightMethodLimiter implements gRPC TapHandle and gRPC stats.Handler. type grpcInflightMethodLimiter struct { - getIngester func() pushReceiver + getIngester func() ingesterReceiver getDistributor func() pushReceiver } type ctxKey int const ( - pushTypeCtxKey ctxKey = 1 // ingester or distributor push + rpcCallCtxKey ctxKey = 1 // ingester or distributor push - pushTypeIngester = 1 - pushTypeDistributor = 2 + rpcCallIngesterPush = 1 + rpcCallIngesterRead = 2 + rpcCallDistributorPush = 3 + ingesterMethod string = "/cortex.Ingester" ingesterPushMethod string = "/cortex.Ingester/Push" httpgrpcHandleMethod string = "/httpgrpc.HTTP/Handle" ) @@ -50,19 +63,27 @@ var errNoIngester = status.Error(codes.Unavailable, "no ingester") var errNoDistributor = status.Error(codes.Unavailable, "no distributor") func (g *grpcInflightMethodLimiter) RPCCallStarting(ctx context.Context, methodName string, md metadata.MD) (context.Context, error) { - if g.getIngester != nil && methodName == ingesterPushMethod { + if g.getIngester != nil && strings.HasPrefix(methodName, ingesterMethod) { ing := g.getIngester() if ing == nil { // We return error here, to make sure that RPCCallFinished doesn't get called for this RPC call. return ctx, errNoIngester } - ctx, err := ing.StartPushRequest(ctx, getMessageSize(md, grpcutil.MetadataMessageSize)) + var err error + var rpcCallCtxValue int + if methodName == ingesterPushMethod { + ctx, err = ing.StartPushRequest(ctx, getMessageSize(md, grpcutil.MetadataMessageSize)) + rpcCallCtxValue = rpcCallIngesterPush + } else { + ctx, err = ing.StartReadRequest(ctx) + rpcCallCtxValue = rpcCallIngesterRead + } + if err != nil { return ctx, status.Error(codes.Unavailable, err.Error()) } - - return context.WithValue(ctx, pushTypeCtxKey, pushTypeIngester), nil + return context.WithValue(ctx, rpcCallCtxKey, rpcCallCtxValue), nil } if g.getDistributor != nil && methodName == httpgrpcHandleMethod { @@ -81,24 +102,37 @@ func (g *grpcInflightMethodLimiter) RPCCallStarting(ctx context.Context, methodN return ctx, status.Error(codes.Unavailable, err.Error()) } - return context.WithValue(ctx, pushTypeCtxKey, pushTypeDistributor), nil + return context.WithValue(ctx, rpcCallCtxKey, rpcCallDistributorPush), nil } } return ctx, nil } -func (g *grpcInflightMethodLimiter) RPCCallProcessing(_ context.Context, _ string) (func(error), error) { +func (g *grpcInflightMethodLimiter) RPCCallProcessing(ctx context.Context, _ string) (func(error), error) { + if rpcCall, ok := ctx.Value(rpcCallCtxKey).(int); ok { + switch rpcCall { + case rpcCallIngesterPush: + if ing := g.getIngester(); ing != nil { + return ing.PreparePushRequest(ctx) + } + case rpcCallIngesterRead: + if ing := g.getIngester(); ing != nil { + return ing.PrepareReadRequest(ctx) + } + } + } return nil, nil } func (g *grpcInflightMethodLimiter) RPCCallFinished(ctx context.Context) { - if pt, ok := ctx.Value(pushTypeCtxKey).(int); ok { - switch pt { - case pushTypeIngester: + if rpcCall, ok := ctx.Value(rpcCallCtxKey).(int); ok { + switch rpcCall { + case rpcCallIngesterPush: g.getIngester().FinishPushRequest(ctx) - - case pushTypeDistributor: + case rpcCallIngesterRead: + g.getIngester().FinishReadRequest(ctx) + case rpcCallDistributorPush: g.getDistributor().FinishPushRequest(ctx) } } diff --git a/pkg/mimir/grpc_push_check_test.go b/pkg/mimir/grpc_call_check_test.go similarity index 59% rename from pkg/mimir/grpc_push_check_test.go rename to pkg/mimir/grpc_call_check_test.go index e4a5c582743..ba64ca0143b 100644 --- a/pkg/mimir/grpc_push_check_test.go +++ b/pkg/mimir/grpc_call_check_test.go @@ -17,7 +17,7 @@ import ( func TestGrpcInflightMethodLimiter(t *testing.T) { t.Run("nil ingester and distributor receiver", func(t *testing.T) { - l := newGrpcInflightMethodLimiter(func() pushReceiver { return nil }, func() pushReceiver { return nil }) + l := newGrpcInflightMethodLimiter(func() ingesterReceiver { return nil }, func() pushReceiver { return nil }) ctx, err := l.RPCCallStarting(context.Background(), "test", nil) require.NoError(t, err) @@ -33,102 +33,142 @@ func TestGrpcInflightMethodLimiter(t *testing.T) { require.Panics(t, func() { // In practice, this will not be called, since l.RPCCallStarting() for ingester push returns error if there's no ingester. - l.RPCCallFinished(context.WithValue(ctx, pushTypeCtxKey, pushTypeIngester)) + l.RPCCallFinished(context.WithValue(ctx, rpcCallCtxKey, rpcCallIngesterPush)) }) require.Panics(t, func() { // In practice, this will not be called, since l.RPCCallStarting() distributor push returns error if there's no distributor. - l.RPCCallFinished(context.WithValue(ctx, pushTypeCtxKey, pushTypeDistributor)) + l.RPCCallFinished(context.WithValue(ctx, rpcCallCtxKey, rpcCallDistributorPush)) }) }) t.Run("ingester push receiver, wrong method name", func(t *testing.T) { m := &mockIngesterReceiver{} - l := newGrpcInflightMethodLimiter(func() pushReceiver { return m }, nil) + l := newGrpcInflightMethodLimiter(func() ingesterReceiver { return m }, nil) ctx, err := l.RPCCallStarting(context.Background(), "test", nil) require.NoError(t, err) require.NotPanics(t, func() { l.RPCCallFinished(ctx) }) - require.Equal(t, 0, m.startCalls) - require.Equal(t, 0, m.finishCalls) + require.Equal(t, 0, m.startPushCalls) + require.Equal(t, 0, m.preparePushCalls) + require.Equal(t, 0, m.finishPushCalls) }) t.Run("ingester push receiver, check returns error", func(t *testing.T) { m := &mockIngesterReceiver{} - l := newGrpcInflightMethodLimiter(func() pushReceiver { return m }, nil) + l := newGrpcInflightMethodLimiter(func() ingesterReceiver { return m }, nil) m.returnError = errors.New("hello there") ctx, err := l.RPCCallStarting(context.Background(), ingesterPushMethod, nil) require.Error(t, err) _, ok := grpcutil.ErrorToStatus(err) require.True(t, ok) - require.Nil(t, ctx.Value(pushTypeCtxKey)) // Original context expected in case of errors. + require.Nil(t, ctx.Value(rpcCallCtxKey)) // Original context expected in case of errors. }) t.Run("ingester push receiver, without size", func(t *testing.T) { m := &mockIngesterReceiver{} - l := newGrpcInflightMethodLimiter(func() pushReceiver { return m }, nil) + l := newGrpcInflightMethodLimiter(func() ingesterReceiver { return m }, nil) ctx, err := l.RPCCallStarting(context.Background(), ingesterPushMethod, nil) require.NoError(t, err) - require.Equal(t, 1, m.startCalls) - require.Equal(t, int64(0), m.startBytes) - require.Equal(t, 0, m.finishCalls) - require.Equal(t, int64(0), m.finishBytes) - + require.Equal(t, 1, m.startPushCalls) + require.Equal(t, int64(0), m.startPushBytes) + require.Equal(t, 0, m.preparePushCalls) + require.Equal(t, 0, m.finishPushCalls) + require.Equal(t, int64(0), m.finishPushBytes) + _, err = l.RPCCallProcessing(ctx, ingesterPushMethod) + require.NoError(t, err) + require.Equal(t, 1, m.preparePushCalls) require.NotPanics(t, func() { l.RPCCallFinished(ctx) }) - require.Equal(t, 1, m.startCalls) - require.Equal(t, int64(0), m.startBytes) - require.Equal(t, 1, m.finishCalls) - require.Equal(t, int64(0), m.finishBytes) + require.Equal(t, 1, m.startPushCalls) + require.Equal(t, int64(0), m.startPushBytes) + require.Equal(t, 1, m.finishPushCalls) + require.Equal(t, int64(0), m.finishPushBytes) }) t.Run("ingester push receiver, with size provided", func(t *testing.T) { m := &mockIngesterReceiver{} - l := newGrpcInflightMethodLimiter(func() pushReceiver { return m }, nil) + l := newGrpcInflightMethodLimiter(func() ingesterReceiver { return m }, nil) ctx, err := l.RPCCallStarting(context.Background(), ingesterPushMethod, metadata.New(map[string]string{ grpcutil.MetadataMessageSize: "123456", })) require.NoError(t, err) - require.Equal(t, 1, m.startCalls) - require.Equal(t, int64(123456), m.startBytes) - require.Equal(t, 0, m.finishCalls) - require.Equal(t, int64(0), m.finishBytes) - + require.Equal(t, 1, m.startPushCalls) + require.Equal(t, int64(123456), m.startPushBytes) + require.Equal(t, 0, m.preparePushCalls) + require.Equal(t, 0, m.finishPushCalls) + require.Equal(t, int64(0), m.finishPushBytes) + _, err = l.RPCCallProcessing(ctx, ingesterPushMethod) + require.NoError(t, err) + require.Equal(t, 1, m.preparePushCalls) require.NotPanics(t, func() { l.RPCCallFinished(ctx) }) - require.Equal(t, 1, m.startCalls) - require.Equal(t, int64(123456), m.startBytes) - require.Equal(t, 1, m.finishCalls) - require.Equal(t, int64(123456), m.finishBytes) + require.Equal(t, 1, m.startPushCalls) + require.Equal(t, int64(123456), m.startPushBytes) + require.Equal(t, 1, m.finishPushCalls) + require.Equal(t, int64(123456), m.finishPushBytes) }) t.Run("ingester push receiver, with wrong size", func(t *testing.T) { m := &mockIngesterReceiver{} - l := newGrpcInflightMethodLimiter(func() pushReceiver { return m }, nil) + l := newGrpcInflightMethodLimiter(func() ingesterReceiver { return m }, nil) ctx, err := l.RPCCallStarting(context.Background(), ingesterPushMethod, metadata.New(map[string]string{ grpcutil.MetadataMessageSize: "wrong", })) require.NoError(t, err) - require.Equal(t, 1, m.startCalls) - require.Equal(t, int64(0), m.startBytes) - require.Equal(t, 0, m.finishCalls) - require.Equal(t, int64(0), m.finishBytes) + require.Equal(t, 1, m.startPushCalls) + require.Equal(t, int64(0), m.startPushBytes) + require.Equal(t, 0, m.preparePushCalls) + require.Equal(t, 0, m.finishPushCalls) + require.Equal(t, int64(0), m.finishPushBytes) + _, err = l.RPCCallProcessing(ctx, ingesterPushMethod) + require.NoError(t, err) + require.Equal(t, 1, m.preparePushCalls) + require.NotPanics(t, func() { + l.RPCCallFinished(ctx) + }) + require.Equal(t, 1, m.startPushCalls) + require.Equal(t, int64(0), m.startPushBytes) + require.Equal(t, 1, m.finishPushCalls) + require.Equal(t, int64(0), m.finishPushBytes) + }) + + t.Run("ingester read receiver, check returns error", func(t *testing.T) { + m := &mockIngesterReceiver{} + l := newGrpcInflightMethodLimiter(func() ingesterReceiver { return m }, nil) + m.returnError = errors.New("hello there") + ctx, err := l.RPCCallStarting(context.Background(), ingesterMethod, nil) + require.Error(t, err) + _, ok := grpcutil.ErrorToStatus(err) + require.True(t, ok) + require.Nil(t, ctx.Value(rpcCallCtxKey)) + }) + + t.Run("ingester read receiver, without size", func(t *testing.T) { + m := &mockIngesterReceiver{} + l := newGrpcInflightMethodLimiter(func() ingesterReceiver { return m }, nil) + + ctx, err := l.RPCCallStarting(context.Background(), ingesterMethod, nil) + require.NoError(t, err) + require.Equal(t, 1, m.startReadCalls) + require.Equal(t, 0, m.prepareReadCalls) + require.Equal(t, 0, m.finishReadCalls) + _, err = l.RPCCallProcessing(ctx, ingesterMethod) + require.NoError(t, err) + require.Equal(t, 1, m.prepareReadCalls) require.NotPanics(t, func() { l.RPCCallFinished(ctx) }) - require.Equal(t, 1, m.startCalls) - require.Equal(t, int64(0), m.startBytes) - require.Equal(t, 1, m.finishCalls) - require.Equal(t, int64(0), m.finishBytes) + require.Equal(t, 1, m.finishReadCalls) }) t.Run("distributor push via httpgrpc", func(t *testing.T) { @@ -171,7 +211,7 @@ func TestGrpcInflightMethodLimiter(t *testing.T) { require.Error(t, err) _, ok := grpcutil.ErrorToStatus(err) require.True(t, ok) - require.Nil(t, ctx.Value(pushTypeCtxKey)) // Original context expected in case of errors. + require.Nil(t, ctx.Value(rpcCallCtxKey)) // Original context expected in case of errors. }) t.Run("distributor push via httpgrpc, GET", func(t *testing.T) { @@ -224,27 +264,52 @@ func TestGrpcInflightMethodLimiter(t *testing.T) { type mockIngesterReceiver struct { lastRequestSize int64 - startCalls int - startBytes int64 - finishCalls int - finishBytes int64 - returnError error + startPushCalls int + startPushBytes int64 + preparePushCalls int + finishPushCalls int + finishPushBytes int64 + returnError error + + startReadCalls int + prepareReadCalls int + finishReadCalls int } func (i *mockIngesterReceiver) StartPushRequest(ctx context.Context, size int64) (context.Context, error) { i.lastRequestSize = size - i.startCalls++ - i.startBytes += size + i.startPushCalls++ + i.startPushBytes += size return ctx, i.returnError } +func (i *mockIngesterReceiver) PreparePushRequest(_ context.Context) (func(error), error) { + i.preparePushCalls++ + return nil, i.returnError +} + func (i *mockIngesterReceiver) FinishPushRequest(_ context.Context) { - i.finishCalls++ - i.finishBytes += i.lastRequestSize + i.finishPushCalls++ + i.finishPushBytes += i.lastRequestSize +} + +func (i *mockIngesterReceiver) StartReadRequest(ctx context.Context) (context.Context, error) { + i.startReadCalls++ + return ctx, i.returnError +} + +func (i *mockIngesterReceiver) PrepareReadRequest(_ context.Context) (func(error), error) { + i.prepareReadCalls++ + return nil, i.returnError +} + +func (i *mockIngesterReceiver) FinishReadRequest(_ context.Context) { + i.finishReadCalls++ } type mockDistributorReceiver struct { startCalls int + prepareCalls int finishCalls int lastRequestSize int64 returnError error @@ -256,6 +321,11 @@ func (i *mockDistributorReceiver) StartPushRequest(ctx context.Context, requestS return ctx, i.returnError } +func (i *mockDistributorReceiver) PreparePushRequest(_ context.Context) (func(error), error) { + i.prepareCalls++ + return nil, i.returnError +} + func (i *mockDistributorReceiver) FinishPushRequest(_ context.Context) { i.finishCalls++ } diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 56eb9d98d90..c5a507b545d 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -299,7 +299,7 @@ func (t *Mimir) initServer() (services.Service, error) { // t.Ingester or t.Distributor will be available. There's no race condition here, because gRPC server (service returned by this method, ie. initServer) // is started only after t.Ingester and t.Distributor are set in initIngester or initDistributorService. - ingFn := func() pushReceiver { + ingFn := func() ingesterReceiver { // Return explicit nil if there's no ingester. We don't want to return typed-nil as interface value. if t.Ingester == nil { return nil