diff --git a/CHANGELOG.md b/CHANGELOG.md index 3949a50ff8..b99dc3cc02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ * [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346 * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 * [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335 +* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7936a6d3f6..0ee8b1e98a 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -480,9 +480,19 @@ func (i *Ingester) checkRunningOrStopping() error { return status.Error(codes.Unavailable, s.String()) } +// Using block store, the ingester is only available when it is in a Running state. The ingester is not available +// when stopping to prevent any read or writes to the TSDB after the ingester has closed them. +func (i *Ingester) checkRunning() error { + s := i.State() + if s == services.Running { + return nil + } + return status.Error(codes.Unavailable, s.String()) +} + // Push implements client.IngesterServer func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { + if err := i.checkRunning(); err != nil { return nil, err } @@ -762,14 +772,14 @@ func (i *Ingester) purgeUserMetricsMetadata() { // Query implements service.IngesterServer func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if i.cfg.BlocksStorageEnabled { return i.v2Query(ctx, req) } + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -829,14 +839,14 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client // QueryStream implements service.IngesterServer func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error { - if err := i.checkRunningOrStopping(); err != nil { - return err - } - if i.cfg.BlocksStorageEnabled { return i.v2QueryStream(req, stream) } + if err := i.checkRunningOrStopping(); err != nil { + return err + } + spanLog, ctx := spanlogger.New(stream.Context(), "QueryStream") defer spanLog.Finish() @@ -913,10 +923,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ // Query implements service.IngesterServer func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if !i.cfg.BlocksStorageEnabled { return nil, errors.New("not supported") } @@ -926,14 +932,14 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery // LabelValues returns all label values that are associated with a given label name. func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if i.cfg.BlocksStorageEnabled { return i.v2LabelValues(ctx, req) } + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() state, ok, err := i.userStates.getViaContext(ctx) @@ -951,14 +957,14 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque // LabelNames return all the label names. func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if i.cfg.BlocksStorageEnabled { return i.v2LabelNames(ctx, req) } + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() state, ok, err := i.userStates.getViaContext(ctx) @@ -976,14 +982,14 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest // MetricsForLabelMatchers returns all the metrics which match a set of matchers. func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if i.cfg.BlocksStorageEnabled { return i.v2MetricsForLabelMatchers(ctx, req) } + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() state, ok, err := i.userStates.getViaContext(ctx) @@ -1046,14 +1052,14 @@ func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetad // UserStats returns ingestion statistics for the current user. func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if i.cfg.BlocksStorageEnabled { return i.v2UserStats(ctx, req) } + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() state, ok, err := i.userStates.getViaContext(ctx) @@ -1075,14 +1081,14 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) // AllUserStats returns ingestion statistics for all users known to this ingester. func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if i.cfg.BlocksStorageEnabled { return i.v2AllUserStats(ctx, req) } + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() users := i.userStates.cp() diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index d0f2e43c6b..4b7ecaacce 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -980,6 +980,10 @@ func (u *userTSDB) releaseAppendLock() { } func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1036,6 +1040,10 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie } func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1083,6 +1091,10 @@ func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQue } func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + labelName, startTimestampMs, endTimestampMs, matchers, err := client.FromLabelValuesRequest(req) if err != nil { return nil, err @@ -1120,6 +1132,10 @@ func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesReq } func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1152,6 +1168,10 @@ func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesReque } func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1219,6 +1239,10 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.Me } func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1233,6 +1257,10 @@ func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest } func (i *Ingester) v2AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() @@ -1265,6 +1293,10 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024 // v2QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error { + if err := i.checkRunning(); err != nil { + return err + } + spanlog, ctx := spanlogger.New(stream.Context(), "v2QueryStream") defer spanlog.Finish() @@ -1788,6 +1820,10 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { // getMemorySeriesMetric returns the total number of in-memory series across all open TSDBs. func (i *Ingester) getMemorySeriesMetric() float64 { + if err := i.checkRunning(); err != nil { + return 0 + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock()