Skip to content

Commit

Permalink
Return error on the getTSDB function when user is not found
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot committed Jan 29, 2025
1 parent 4b32f29 commit 6daf8c2
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 73 deletions.
75 changes: 40 additions & 35 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ const (
var (
errExemplarRef = errors.New("exemplars not ingested because series not already present")
errIngesterStopping = errors.New("ingester stopping")
errNoUserDb = errors.New("no user db")

tsChunksPool zeropool.Pool[[]client.TimeSeriesChunk]
)
Expand Down Expand Up @@ -988,8 +989,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error {

func (i *Ingester) updateUserTSDBConfigs() {
for _, userID := range i.getTSDBUsers() {
userDB := i.getTSDB(userID)
if userDB == nil {
userDB, err := i.getTSDB(userID)
if err != nil || userDB == nil {
continue
}

Expand All @@ -1005,7 +1006,7 @@ func (i *Ingester) updateUserTSDBConfigs() {
}

// This method currently updates the MaxExemplars and OutOfOrderTimeWindow.
err := userDB.db.ApplyConfig(cfg)
err = userDB.db.ApplyConfig(cfg)
if err != nil {
level.Error(logutil.WithUserID(userID, i.logger)).Log("msg", "failed to update user tsdb configuration.")
}
Expand All @@ -1029,8 +1030,8 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) {
purgeTime := time.Now().Add(-i.cfg.ActiveSeriesMetricsIdleTimeout)

for _, userID := range i.getTSDBUsers() {
userDB := i.getTSDB(userID)
if userDB == nil {
userDB, err := i.getTSDB(userID)
if err != nil || userDB == nil {
continue
}

Expand All @@ -1045,8 +1046,8 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) {
func (i *Ingester) updateLabelSetMetrics() {
activeUserSet := make(map[string]map[uint64]struct{})
for _, userID := range i.getTSDBUsers() {
userDB := i.getTSDB(userID)
if userDB == nil {
userDB, err := i.getTSDB(userID)
if err != nil || userDB == nil {
continue
}

Expand Down Expand Up @@ -1549,8 +1550,8 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery

i.metrics.queries.Inc()

db := i.getTSDB(userID)
if db == nil {
db, err := i.getTSDB(userID)
if err != nil || db == nil {
return &client.ExemplarQueryResponse{}, nil
}

Expand Down Expand Up @@ -1642,8 +1643,8 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
return nil, cleanup, err
}

db := i.getTSDB(userID)
if db == nil {
db, err := i.getTSDB(userID)
if err != nil || db == nil {
return &client.LabelValuesResponse{}, cleanup, nil
}

Expand Down Expand Up @@ -1732,8 +1733,8 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
return nil, cleanup, err
}

db := i.getTSDB(userID)
if db == nil {
db, err := i.getTSDB(userID)
if err != nil || db == nil {
return &client.LabelNamesResponse{}, cleanup, nil
}

Expand Down Expand Up @@ -1830,8 +1831,8 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
return cleanup, err
}

db := i.getTSDB(userID)
if db == nil {
db, err := i.getTSDB(userID)
if err != nil || db == nil {
return cleanup, nil
}

Expand Down Expand Up @@ -1944,8 +1945,8 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest)
return nil, err
}

db := i.getTSDB(userID)
if db == nil {
db, err := i.getTSDB(userID)
if err != nil || db == nil {
return &client.UserStatsResponse{}, nil
}

Expand Down Expand Up @@ -2064,8 +2065,8 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_

i.metrics.queries.Inc()

db := i.getTSDB(userID)
if db == nil {
db, err := i.getTSDB(userID)
if err != nil || db == nil {
return nil
}

Expand Down Expand Up @@ -2216,11 +2217,14 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
return numSeries, numSamples, totalBatchSizeBytes, numChunks, nil
}

func (i *Ingester) getTSDB(userID string) *userTSDB {
func (i *Ingester) getTSDB(userID string) (*userTSDB, error) {
i.stoppedMtx.RLock()
defer i.stoppedMtx.RUnlock()
db := i.TSDBState.dbs[userID]
return db
if db == nil {
return nil, errNoUserDb
}
return db, nil
}

// List all users for which we have a TSDB. We do it here in order
Expand All @@ -2238,8 +2242,11 @@ func (i *Ingester) getTSDBUsers() []string {
}

func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) {
db := i.getTSDB(userID)
db, err := i.getTSDB(userID)
if db != nil {
if err != nil {
level.Warn(i.logger).Log("msg", "error getting user DB but userDB is not null", "err", err, "userID", userID)
}
return db, nil
}

Expand Down Expand Up @@ -2271,7 +2278,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
}

// Create the database and a shipper for a user
db, err := i.createTSDB(userID)
db, err = i.createTSDB(userID)
if err != nil {
return nil, err
}
Expand All @@ -2285,10 +2292,10 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)

func (i *Ingester) blockChunkQuerierFunc(userId string) tsdb.BlockChunkQuerierFunc {
return func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
db := i.getTSDB(userId)
db, err := i.getTSDB(userId)

var postingCache cortex_tsdb.ExpandedPostingsCache
if db != nil {
if err != nil || db == nil {
postingCache = db.postingCache
}

Expand Down Expand Up @@ -2650,8 +2657,8 @@ func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants)
}

// Get the user's DB. If the user doesn't exist, we skip it.
userDB := i.getTSDB(userID)
if userDB == nil || userDB.shipper == nil {
userDB, err := i.getTSDB(userID)
if err != nil || userDB == nil || userDB.shipper == nil {
return nil
}

Expand Down Expand Up @@ -2762,8 +2769,8 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util.
return nil
}

userDB := i.getTSDB(userID)
if userDB == nil {
userDB, err := i.getTSDB(userID)
if err != nil || userDB == nil {
return nil
}

Expand All @@ -2773,8 +2780,6 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util.
return nil
}

var err error

i.TSDBState.compactionsTriggered.Inc()

reason := ""
Expand Down Expand Up @@ -2823,8 +2828,8 @@ func (i *Ingester) expirePostingsCache(ctx context.Context) error {
if ctx.Err() != nil {
return nil
}
userDB := i.getTSDB(userID)
if userDB == nil || userDB.postingCache == nil {
userDB, err := i.getTSDB(userID)
if err != nil || userDB == nil || userDB.postingCache == nil {
continue
}
userDB.postingCache.PurgeExpiredItems()
Expand All @@ -2834,8 +2839,8 @@ func (i *Ingester) expirePostingsCache(ctx context.Context) error {
}

func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckResult {
userDB := i.getTSDB(userID)
if userDB == nil || userDB.shipper == nil {
userDB, err := i.getTSDB(userID)
if err != nil || userDB == nil || userDB.shipper == nil {
// We will not delete local data when not using shipping to storage.
return tsdbShippingDisabled
}
Expand Down
Loading

0 comments on commit 6daf8c2

Please sign in to comment.