Skip to content

Commit

Permalink
ingester should read/write only when state is running for block store
Browse files Browse the repository at this point in the history
Signed-off-by: ilangofman <[email protected]>
  • Loading branch information
ilangofman committed Jul 2, 2021
1 parent feb4f42 commit 3c4798b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 76 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## master / unreleased

* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [BUGFIX] Ingester: Prevent any reads on TSDBs when the ingester is stopping. #4304
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304

## 1.10.0-rc.0 / 2021-06-28

Expand Down
97 changes: 67 additions & 30 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,26 @@ func (i *Ingester) checkRunningOrStopping() error {
return status.Error(codes.Unavailable, s.String())
}

// Using block store, the ingester is only available when it is in a Running state. The ingester is not available
// when stopping to prevent any read or writes to the TSDB after the ingester has closed them.
func (i *Ingester) checkRunning() error {
s := i.State()
if s == services.Running {
return nil
}
return status.Error(codes.Unavailable, s.String())
}

// Push implements client.IngesterServer
func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
if i.cfg.BlocksStorageEnabled {
if err := i.checkRunning(); err != nil {
return nil, err
}
} else {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}
}

// We will report *this* request in the error too.
Expand Down Expand Up @@ -762,14 +778,17 @@ func (i *Ingester) purgeUserMetricsMetadata() {

// Query implements service.IngesterServer
func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.BlocksStorageEnabled {
if err := i.checkRunning(); err != nil {
return nil, err
}
return i.v2Query(ctx, req)
}

if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -829,14 +848,17 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client

// QueryStream implements service.IngesterServer
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
if err := i.checkRunningOrStopping(); err != nil {
return err
}

if i.cfg.BlocksStorageEnabled {
if err := i.checkRunning(); err != nil {
return err
}
return i.v2QueryStream(req, stream)
}

if err := i.checkRunningOrStopping(); err != nil {
return err
}

spanLog, ctx := spanlogger.New(stream.Context(), "QueryStream")
defer spanLog.Finish()

Expand Down Expand Up @@ -926,14 +948,17 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery

// LabelValues returns all label values that are associated with a given label name.
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.BlocksStorageEnabled {
if err := i.checkRunning(); err != nil {
return nil, err
}
return i.v2LabelValues(ctx, req)
}

if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
state, ok, err := i.userStates.getViaContext(ctx)
Expand All @@ -951,14 +976,17 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque

// LabelNames return all the label names.
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.BlocksStorageEnabled {
if err := i.checkRunning(); err != nil {
return nil, err
}
return i.v2LabelNames(ctx, req)
}

if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
state, ok, err := i.userStates.getViaContext(ctx)
Expand All @@ -976,14 +1004,17 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest

// MetricsForLabelMatchers returns all the metrics which match a set of matchers.
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.BlocksStorageEnabled {
if err := i.checkRunning(); err != nil {
return nil, err
}
return i.v2MetricsForLabelMatchers(ctx, req)
}

if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
state, ok, err := i.userStates.getViaContext(ctx)
Expand Down Expand Up @@ -1046,14 +1077,17 @@ func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetad

// UserStats returns ingestion statistics for the current user.
func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.BlocksStorageEnabled {
if err := i.checkRunning(); err != nil {
return nil, err
}
return i.v2UserStats(ctx, req)
}

if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
state, ok, err := i.userStates.getViaContext(ctx)
Expand All @@ -1075,14 +1109,17 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest)

// AllUserStats returns ingestion statistics for all users known to this ingester.
func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.BlocksStorageEnabled {
if err := i.checkRunning(); err != nil {
return nil, err
}
return i.v2AllUserStats(ctx, req)
}

if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
users := i.userStates.cp()
Expand Down
49 changes: 4 additions & 45 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,10 +992,6 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie

i.metrics.queries.Inc()

if i.checkIfAllTSDBClosing() {
return &client.QueryResponse{}, nil
}

db := i.getTSDB(userID)
if db == nil {
return &client.QueryResponse{}, nil
Expand Down Expand Up @@ -1052,10 +1048,6 @@ func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQue

i.metrics.queries.Inc()

if i.checkIfAllTSDBClosing() {
return &client.ExemplarQueryResponse{}, nil
}

db := i.getTSDB(userID)
if db == nil {
return &client.ExemplarQueryResponse{}, nil
Expand Down Expand Up @@ -1101,10 +1093,6 @@ func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesReq
return nil, err
}

if i.checkIfAllTSDBClosing() {
return &client.LabelValuesResponse{}, nil
}

db := i.getTSDB(userID)
if db == nil {
return &client.LabelValuesResponse{}, nil
Expand Down Expand Up @@ -1137,10 +1125,6 @@ func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesReque
return nil, err
}

if i.checkIfAllTSDBClosing() {
return &client.LabelNamesResponse{}, nil
}

db := i.getTSDB(userID)
if db == nil {
return &client.LabelNamesResponse{}, nil
Expand Down Expand Up @@ -1173,10 +1157,6 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.Me
return nil, err
}

if i.checkIfAllTSDBClosing() {
return &client.MetricsForLabelMatchersResponse{}, nil
}

db := i.getTSDB(userID)
if db == nil {
return &client.MetricsForLabelMatchersResponse{}, nil
Expand Down Expand Up @@ -1244,10 +1224,6 @@ func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest
return nil, err
}

if i.checkIfAllTSDBClosing() {
return &client.UserStatsResponse{}, nil
}

db := i.getTSDB(userID)
if db == nil {
return &client.UserStatsResponse{}, nil
Expand All @@ -1260,10 +1236,6 @@ func (i *Ingester) v2AllUserStats(ctx context.Context, req *client.UserStatsRequ
i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()

if i.checkIfAllTSDBClosing() {
return &client.UsersStatsResponse{}, nil
}

users := i.TSDBState.dbs

response := &client.UsersStatsResponse{
Expand Down Expand Up @@ -1308,10 +1280,6 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste

i.metrics.queries.Inc()

if i.checkIfAllTSDBClosing() {
return nil
}

db := i.getTSDB(userID)
if db == nil {
return nil
Expand Down Expand Up @@ -1815,15 +1783,14 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {

// getMemorySeriesMetric returns the total number of in-memory series across all open TSDBs.
func (i *Ingester) getMemorySeriesMetric() float64 {
if err := i.checkRunning(); err != nil {
return 0
}

i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()

count := uint64(0)

if i.checkIfAllTSDBClosing() {
return 0
}

for _, db := range i.TSDBState.dbs {
count += db.Head().NumSeries()
}
Expand Down Expand Up @@ -2282,11 +2249,3 @@ func (i *Ingester) getInstanceLimits() *InstanceLimits {

return l
}

func (i *Ingester) checkIfAllTSDBClosing() bool {
if i.State() == services.Stopping {
level.Debug(i.logger).Log("msg", "TSDB is unavailable, as the Ingester is in the process of stopping and closing all TSDB")
return true
}
return false
}

0 comments on commit 3c4798b

Please sign in to comment.