Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent read on TSDB once closeAllTSDB function has been called #4304

Merged
merged 15 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304

## 1.10.0-rc.0 / 2021-06-28

Expand Down
72 changes: 39 additions & 33 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,19 @@ func (i *Ingester) checkRunningOrStopping() error {
return status.Error(codes.Unavailable, s.String())
}

// Using block store, the ingester is only available when it is in a Running state. The ingester is not available
// when stopping to prevent any read or writes to the TSDB after the ingester has closed them.
func (i *Ingester) checkRunning() error {
s := i.State()
if s == services.Running {
return nil
}
return status.Error(codes.Unavailable, s.String())
}

// Push implements client.IngesterServer
func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
if err := i.checkRunning(); err != nil {
return nil, err
}

Expand Down Expand Up @@ -762,14 +772,14 @@ func (i *Ingester) purgeUserMetricsMetadata() {

// Query implements service.IngesterServer
func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.BlocksStorageEnabled {
return i.v2Query(ctx, req)
}

if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -829,14 +839,14 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client

// QueryStream implements service.IngesterServer
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
if err := i.checkRunningOrStopping(); err != nil {
return err
}

if i.cfg.BlocksStorageEnabled {
return i.v2QueryStream(req, stream)
}

if err := i.checkRunningOrStopping(); err != nil {
return err
}

spanLog, ctx := spanlogger.New(stream.Context(), "QueryStream")
defer spanLog.Finish()

Expand Down Expand Up @@ -913,10 +923,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_

// Query implements service.IngesterServer
func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if !i.cfg.BlocksStorageEnabled {
return nil, errors.New("not supported")
}
Expand All @@ -926,14 +932,14 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery

// LabelValues returns all label values that are associated with a given label name.
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.BlocksStorageEnabled {
return i.v2LabelValues(ctx, req)
}

if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
state, ok, err := i.userStates.getViaContext(ctx)
Expand All @@ -951,14 +957,14 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque

// LabelNames return all the label names.
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.BlocksStorageEnabled {
return i.v2LabelNames(ctx, req)
}

if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
state, ok, err := i.userStates.getViaContext(ctx)
Expand All @@ -976,14 +982,14 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest

// MetricsForLabelMatchers returns all the metrics which match a set of matchers.
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.BlocksStorageEnabled {
return i.v2MetricsForLabelMatchers(ctx, req)
}

if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
state, ok, err := i.userStates.getViaContext(ctx)
Expand Down Expand Up @@ -1046,14 +1052,14 @@ func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetad

// UserStats returns ingestion statistics for the current user.
func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.BlocksStorageEnabled {
return i.v2UserStats(ctx, req)
}

if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
state, ok, err := i.userStates.getViaContext(ctx)
Expand All @@ -1075,14 +1081,14 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest)

// AllUserStats returns ingestion statistics for all users known to this ingester.
func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.BlocksStorageEnabled {
return i.v2AllUserStats(ctx, req)
}

if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
users := i.userStates.cp()
Expand Down
36 changes: 36 additions & 0 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,10 @@ func (u *userTSDB) releaseAppendLock() {
}

func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) {
if err := i.checkRunning(); err != nil {
return nil, err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1036,6 +1040,10 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie
}

func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) {
if err := i.checkRunning(); err != nil {
return nil, err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1083,6 +1091,10 @@ func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQue
}

func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
if err := i.checkRunning(); err != nil {
return nil, err
}

labelName, startTimestampMs, endTimestampMs, matchers, err := client.FromLabelValuesRequest(req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1120,6 +1132,10 @@ func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesReq
}

func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
if err := i.checkRunning(); err != nil {
return nil, err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1152,6 +1168,10 @@ func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesReque
}

func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
if err := i.checkRunning(); err != nil {
return nil, err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1219,6 +1239,10 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.Me
}

func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) {
if err := i.checkRunning(); err != nil {
return nil, err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand All @@ -1233,6 +1257,10 @@ func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest
}

func (i *Ingester) v2AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) {
if err := i.checkRunning(); err != nil {
return nil, err
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()

Expand Down Expand Up @@ -1265,6 +1293,10 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024

// v2QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
if err := i.checkRunning(); err != nil {
return err
}

spanlog, ctx := spanlogger.New(stream.Context(), "v2QueryStream")
defer spanlog.Finish()

Expand Down Expand Up @@ -1788,6 +1820,10 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {

// getMemorySeriesMetric returns the total number of in-memory series across all open TSDBs.
func (i *Ingester) getMemorySeriesMetric() float64 {
if err := i.checkRunning(); err != nil {
return 0
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()

Expand Down