From 89151bd8ab9d1531a5ad3a5ad8da62d045006638 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Tue, 22 Jun 2021 09:40:01 -0400 Subject: [PATCH 01/13] Prevent read on TSDB once closeAllTSDB function has been called Signed-off-by: ilangofman --- CHANGELOG.md | 1 + pkg/ingester/ingester_v2.go | 49 +++++++++++++++++++++++++++++--- pkg/ingester/ingester_v2_test.go | 46 ++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 92627b43d5..ec84408c13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 +* [BUGFIX] Ingester: Prevent any reads on TSDBs when the ingester is stopping. #4304 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index d43e09ac9c..34a325a1ce 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -59,7 +59,8 @@ const ( ) var ( - errExemplarRef = errors.New("exemplars not ingested because series not already present") + errExemplarRef = errors.New("exemplars not ingested because series not already present") + errAllTSDBClosing = errors.New("TSDB's are closing and cannot be accessed") ) // Shipper interface is used to have an easy way to mock it in tests. @@ -415,6 +416,9 @@ type TSDBState struct { appenderAddDuration prometheus.Histogram appenderCommitDuration prometheus.Histogram idleTsdbChecks *prometheus.CounterVec + + closed bool // protected by userStatesMtx + } type requestWithUsersAndCallback struct { @@ -677,6 +681,12 @@ func (i *Ingester) updateLoop(ctx context.Context) error { i.ingestionRate.Tick() case <-rateUpdateTicker.C: i.userStatesMtx.RLock() + + if i.TSDBState.closed { + i.userStatesMtx.RUnlock() + return errAllTSDBClosing + } + for _, db := range i.TSDBState.dbs { db.ingestedAPISamples.Tick() db.ingestedRuleSamples.Tick() @@ -1236,6 +1246,10 @@ func (i *Ingester) v2AllUserStats(ctx context.Context, req *client.UserStatsRequ i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() + if i.TSDBState.closed { + return nil, errAllTSDBClosing + } + users := i.TSDBState.dbs response := &client.UsersStatsResponse{ @@ -1483,6 +1497,10 @@ func (i *Ingester) v2QueryStreamChunks(ctx context.Context, db *userTSDB, from, func (i *Ingester) getTSDB(userID string) *userTSDB { i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() + if i.TSDBState.closed { + level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") + return nil + } db := i.TSDBState.dbs[userID] return db } @@ -1493,6 +1511,11 @@ func (i *Ingester) getTSDBUsers() []string { i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() + if i.TSDBState.closed { + level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") + return []string{} + } + ids := make([]string, 0, len(i.TSDBState.dbs)) for userID := range i.TSDBState.dbs { ids = append(ids, userID) @@ -1510,10 +1533,10 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) i.userStatesMtx.Lock() defer i.userStatesMtx.Unlock() - // Check again for DB in the event it was created in-between locks + // Check again for DB in the event it was created or closed in-between locks var ok bool db, ok = i.TSDBState.dbs[userID] - if ok { + if ok && !i.TSDBState.closed { return db, nil } @@ -1534,6 +1557,11 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) } } + // If all the TSDB's are in the process of closing, then should not proceed to creating a new TSDB + if i.TSDBState.closed && !force { + return nil, errAllTSDBClosing + } + // Create the database and a shipper for a user db, err := i.createTSDB(userID) if err != nil { @@ -1549,6 +1577,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) // createTSDB creates a TSDB for a given userID, and returns the created db. func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { + tsdbPromReg := prometheus.NewRegistry() udir := i.cfg.BlocksStorageConfig.TSDB.BlocksDir(userID) userLogger := logutil.WithUserID(userID, i.logger) @@ -1646,8 +1675,12 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { } func (i *Ingester) closeAllTSDB() { + i.userStatesMtx.Lock() + // Set to true, to prevent any read occurring on TSDBs once this function has been called + i.TSDBState.closed = true + wg := &sync.WaitGroup{} wg.Add(len(i.TSDBState.dbs)) @@ -1673,11 +1706,13 @@ func (i *Ingester) closeAllTSDB() { i.metrics.memUsers.Dec() i.metrics.activeSeriesPerUser.DeleteLabelValues(userID) + }(userDB) } // Wait until all Close() completed i.userStatesMtx.Unlock() + wg.Wait() } @@ -1785,8 +1820,14 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { func (i *Ingester) getMemorySeriesMetric() float64 { i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() - count := uint64(0) + + // If the TSDB is in the processes of closing, then return 0 + if i.TSDBState.closed { + level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") + return float64(count) + } + for _, db := range i.TSDBState.dbs { count += db.Head().NumSeries() } diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 6122569f11..28c2c5cc24 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -3880,3 +3880,49 @@ func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest return cortexpb.ToWriteRequest(lbls, samples, nil, cortexpb.API) } + +func TestIngester_PreventTSDBsReadOnShutdown(t *testing.T) { + cfg := defaultIngesterTestConfig() + cfg.LifecyclerConfig.JoinAfter = 0 + + // Create ingester + i, err := prepareIngesterWithBlocksStorage(t, cfg, nil) + require.NoError(t, err) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(context.Background(), i) + }) + + // Wait until it's ACTIVE + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + // Push some data. + pushSingleSampleWithMetadata(t, i) + + db := i.getTSDB(userID) + require.NotNil(t, db) + + // Verify that the TSDBState is not closed before calling the close function + assert.Equal(t, i.TSDBState.closed, false) + + // Close all the TSDB + i.closeAllTSDB() + + // Verify that the TSDBState is closed + assert.Equal(t, i.TSDBState.closed, true) + + // Verify that DB is no longer in memory, but was closed + db = i.getTSDB(userID) + require.Nil(t, db) + + // Trying to create a TSDB after closing should return an error + expectedErr := errAllTSDBClosing.Error() + + db, err = i.getOrCreateTSDB(userID, false) + verifyErrorString(t, err, expectedErr) + require.Nil(t, db) + +} From 7445aa5b675343018dba80b3d4d6cbda315b33cd Mon Sep 17 00:00:00 2001 From: ilangofman Date: Tue, 22 Jun 2021 13:30:57 -0400 Subject: [PATCH 02/13] Fix formatting issues Signed-off-by: ilangofman --- pkg/ingester/ingester_v2.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 34a325a1ce..e9ed5508c8 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -1577,7 +1577,6 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) // createTSDB creates a TSDB for a given userID, and returns the created db. func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { - tsdbPromReg := prometheus.NewRegistry() udir := i.cfg.BlocksStorageConfig.TSDB.BlocksDir(userID) userLogger := logutil.WithUserID(userID, i.logger) @@ -1675,7 +1674,6 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { } func (i *Ingester) closeAllTSDB() { - i.userStatesMtx.Lock() // Set to true, to prevent any read occurring on TSDBs once this function has been called @@ -1706,13 +1704,11 @@ func (i *Ingester) closeAllTSDB() { i.metrics.memUsers.Dec() i.metrics.activeSeriesPerUser.DeleteLabelValues(userID) - }(userDB) } // Wait until all Close() completed i.userStatesMtx.Unlock() - wg.Wait() } @@ -1820,6 +1816,7 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { func (i *Ingester) getMemorySeriesMetric() float64 { i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() + count := uint64(0) // If the TSDB is in the processes of closing, then return 0 From efcdc9cd16cf6efdba3122448933da9fd877b000 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Wed, 23 Jun 2021 12:06:37 -0400 Subject: [PATCH 03/13] Address PR comments and remove unit test no longer required Signed-off-by: ilangofman --- pkg/ingester/ingester_v2.go | 66 +++++++++++++++++++------------- pkg/ingester/ingester_v2_test.go | 46 ---------------------- 2 files changed, 40 insertions(+), 72 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index e9ed5508c8..2a6a017f31 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -682,11 +682,6 @@ func (i *Ingester) updateLoop(ctx context.Context) error { case <-rateUpdateTicker.C: i.userStatesMtx.RLock() - if i.TSDBState.closed { - i.userStatesMtx.RUnlock() - return errAllTSDBClosing - } - for _, db := range i.TSDBState.dbs { db.ingestedAPISamples.Tick() db.ingestedRuleSamples.Tick() @@ -1002,6 +997,11 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie i.metrics.queries.Inc() + if i.State() == services.Stopping { + level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + return &client.QueryResponse{}, nil + } + db := i.getTSDB(userID) if db == nil { return &client.QueryResponse{}, nil @@ -1058,6 +1058,11 @@ func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQue i.metrics.queries.Inc() + if i.State() == services.Stopping { + level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + return &client.ExemplarQueryResponse{}, nil + } + db := i.getTSDB(userID) if db == nil { return &client.ExemplarQueryResponse{}, nil @@ -1103,6 +1108,11 @@ func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesReq return nil, err } + if i.State() == services.Stopping { + level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + return &client.LabelValuesResponse{}, nil + } + db := i.getTSDB(userID) if db == nil { return &client.LabelValuesResponse{}, nil @@ -1135,6 +1145,11 @@ func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesReque return nil, err } + if i.State() == services.Stopping { + level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + return &client.LabelNamesResponse{}, nil + } + db := i.getTSDB(userID) if db == nil { return &client.LabelNamesResponse{}, nil @@ -1167,6 +1182,11 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.Me return nil, err } + if i.State() == services.Stopping { + level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + return &client.MetricsForLabelMatchersResponse{}, nil + } + db := i.getTSDB(userID) if db == nil { return &client.MetricsForLabelMatchersResponse{}, nil @@ -1234,6 +1254,11 @@ func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest return nil, err } + if i.State() == services.Stopping { + level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + return &client.UserStatsResponse{}, nil + } + db := i.getTSDB(userID) if db == nil { return &client.UserStatsResponse{}, nil @@ -1246,8 +1271,9 @@ func (i *Ingester) v2AllUserStats(ctx context.Context, req *client.UserStatsRequ i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() - if i.TSDBState.closed { - return nil, errAllTSDBClosing + if i.State() == services.Stopping { + level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") + return &client.UsersStatsResponse{}, nil } users := i.TSDBState.dbs @@ -1294,6 +1320,11 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste i.metrics.queries.Inc() + if i.State() == services.Stopping { + level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + return nil + } + db := i.getTSDB(userID) if db == nil { return nil @@ -1497,10 +1528,6 @@ func (i *Ingester) v2QueryStreamChunks(ctx context.Context, db *userTSDB, from, func (i *Ingester) getTSDB(userID string) *userTSDB { i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() - if i.TSDBState.closed { - level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") - return nil - } db := i.TSDBState.dbs[userID] return db } @@ -1511,11 +1538,6 @@ func (i *Ingester) getTSDBUsers() []string { i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() - if i.TSDBState.closed { - level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") - return []string{} - } - ids := make([]string, 0, len(i.TSDBState.dbs)) for userID := range i.TSDBState.dbs { ids = append(ids, userID) @@ -1536,7 +1558,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) // Check again for DB in the event it was created or closed in-between locks var ok bool db, ok = i.TSDBState.dbs[userID] - if ok && !i.TSDBState.closed { + if ok { return db, nil } @@ -1557,11 +1579,6 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) } } - // If all the TSDB's are in the process of closing, then should not proceed to creating a new TSDB - if i.TSDBState.closed && !force { - return nil, errAllTSDBClosing - } - // Create the database and a shipper for a user db, err := i.createTSDB(userID) if err != nil { @@ -1676,9 +1693,6 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { func (i *Ingester) closeAllTSDB() { i.userStatesMtx.Lock() - // Set to true, to prevent any read occurring on TSDBs once this function has been called - i.TSDBState.closed = true - wg := &sync.WaitGroup{} wg.Add(len(i.TSDBState.dbs)) @@ -1820,7 +1834,7 @@ func (i *Ingester) getMemorySeriesMetric() float64 { count := uint64(0) // If the TSDB is in the processes of closing, then return 0 - if i.TSDBState.closed { + if i.State() == services.Stopping { level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") return float64(count) } diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 28c2c5cc24..6122569f11 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -3880,49 +3880,3 @@ func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest return cortexpb.ToWriteRequest(lbls, samples, nil, cortexpb.API) } - -func TestIngester_PreventTSDBsReadOnShutdown(t *testing.T) { - cfg := defaultIngesterTestConfig() - cfg.LifecyclerConfig.JoinAfter = 0 - - // Create ingester - i, err := prepareIngesterWithBlocksStorage(t, cfg, nil) - require.NoError(t, err) - - require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) - t.Cleanup(func() { - _ = services.StopAndAwaitTerminated(context.Background(), i) - }) - - // Wait until it's ACTIVE - test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { - return i.lifecycler.GetState() - }) - - // Push some data. - pushSingleSampleWithMetadata(t, i) - - db := i.getTSDB(userID) - require.NotNil(t, db) - - // Verify that the TSDBState is not closed before calling the close function - assert.Equal(t, i.TSDBState.closed, false) - - // Close all the TSDB - i.closeAllTSDB() - - // Verify that the TSDBState is closed - assert.Equal(t, i.TSDBState.closed, true) - - // Verify that DB is no longer in memory, but was closed - db = i.getTSDB(userID) - require.Nil(t, db) - - // Trying to create a TSDB after closing should return an error - expectedErr := errAllTSDBClosing.Error() - - db, err = i.getOrCreateTSDB(userID, false) - verifyErrorString(t, err, expectedErr) - require.Nil(t, db) - -} From 3df93f07a2316ebe46ea91b324ecb8e90b087b62 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Wed, 23 Jun 2021 12:14:04 -0400 Subject: [PATCH 04/13] Remove closed bool no longer used Signed-off-by: ilangofman --- pkg/ingester/ingester_v2.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 2a6a017f31..8505c2cce6 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -416,9 +416,6 @@ type TSDBState struct { appenderAddDuration prometheus.Histogram appenderCommitDuration prometheus.Histogram idleTsdbChecks *prometheus.CounterVec - - closed bool // protected by userStatesMtx - } type requestWithUsersAndCallback struct { @@ -681,7 +678,6 @@ func (i *Ingester) updateLoop(ctx context.Context) error { i.ingestionRate.Tick() case <-rateUpdateTicker.C: i.userStatesMtx.RLock() - for _, db := range i.TSDBState.dbs { db.ingestedAPISamples.Tick() db.ingestedRuleSamples.Tick() @@ -1555,7 +1551,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) i.userStatesMtx.Lock() defer i.userStatesMtx.Unlock() - // Check again for DB in the event it was created or closed in-between locks + // Check again for DB in the event it was created in-between locks var ok bool db, ok = i.TSDBState.dbs[userID] if ok { From 224815b2c8bc4df7225c460e1c241354a7b3a564 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Wed, 23 Jun 2021 12:49:28 -0400 Subject: [PATCH 05/13] Remove error no longer used Signed-off-by: ilangofman --- pkg/ingester/ingester_v2.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 8505c2cce6..cea794c3d2 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -59,8 +59,7 @@ const ( ) var ( - errExemplarRef = errors.New("exemplars not ingested because series not already present") - errAllTSDBClosing = errors.New("TSDB's are closing and cannot be accessed") + errExemplarRef = errors.New("exemplars not ingested because series not already present") ) // Shipper interface is used to have an easy way to mock it in tests. From 2ba3ba2fa9ec49459b2786a7e26d42e5a0a104fe Mon Sep 17 00:00:00 2001 From: ilangofman Date: Thu, 24 Jun 2021 15:39:22 -0400 Subject: [PATCH 06/13] Remove comment and change return var Signed-off-by: ilangofman --- pkg/ingester/ingester_v2.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index cea794c3d2..d405436f30 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -1828,10 +1828,9 @@ func (i *Ingester) getMemorySeriesMetric() float64 { count := uint64(0) - // If the TSDB is in the processes of closing, then return 0 if i.State() == services.Stopping { level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") - return float64(count) + return 0 } for _, db := range i.TSDBState.dbs { From 0267a21c9a6036e30593cb619f76ec94c35a7cb0 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Tue, 29 Jun 2021 23:22:09 -0400 Subject: [PATCH 07/13] Update log message to debug Signed-off-by: ilangofman --- pkg/ingester/ingester_v2.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index d405436f30..38fa716b58 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -993,7 +993,7 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie i.metrics.queries.Inc() if i.State() == services.Stopping { - level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") return &client.QueryResponse{}, nil } @@ -1054,7 +1054,7 @@ func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQue i.metrics.queries.Inc() if i.State() == services.Stopping { - level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") return &client.ExemplarQueryResponse{}, nil } @@ -1104,7 +1104,7 @@ func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesReq } if i.State() == services.Stopping { - level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") return &client.LabelValuesResponse{}, nil } @@ -1141,7 +1141,7 @@ func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesReque } if i.State() == services.Stopping { - level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") return &client.LabelNamesResponse{}, nil } @@ -1178,7 +1178,7 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.Me } if i.State() == services.Stopping { - level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") return &client.MetricsForLabelMatchersResponse{}, nil } @@ -1250,7 +1250,7 @@ func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest } if i.State() == services.Stopping { - level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") return &client.UserStatsResponse{}, nil } @@ -1267,7 +1267,7 @@ func (i *Ingester) v2AllUserStats(ctx context.Context, req *client.UserStatsRequ defer i.userStatesMtx.RUnlock() if i.State() == services.Stopping { - level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") + level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") return &client.UsersStatsResponse{}, nil } @@ -1316,7 +1316,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste i.metrics.queries.Inc() if i.State() == services.Stopping { - level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") return nil } @@ -1829,7 +1829,7 @@ func (i *Ingester) getMemorySeriesMetric() float64 { count := uint64(0) if i.State() == services.Stopping { - level.Warn(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") + level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") return 0 } From d0a8ea54a7d057ffc4bea66759be6f4650c4733c Mon Sep 17 00:00:00 2001 From: ilangofman Date: Wed, 30 Jun 2021 00:33:34 -0400 Subject: [PATCH 08/13] Moved log message to separate function Signed-off-by: ilangofman --- pkg/ingester/ingester_v2.go | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 38fa716b58..ddef2f492e 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -992,8 +992,7 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie i.metrics.queries.Inc() - if i.State() == services.Stopping { - level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + if i.checkIfTSDBClosed() { return &client.QueryResponse{}, nil } @@ -1053,8 +1052,7 @@ func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQue i.metrics.queries.Inc() - if i.State() == services.Stopping { - level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + if i.checkIfTSDBClosed() { return &client.ExemplarQueryResponse{}, nil } @@ -1103,8 +1101,7 @@ func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesReq return nil, err } - if i.State() == services.Stopping { - level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + if i.checkIfTSDBClosed() { return &client.LabelValuesResponse{}, nil } @@ -1140,8 +1137,7 @@ func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesReque return nil, err } - if i.State() == services.Stopping { - level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + if i.checkIfTSDBClosed() { return &client.LabelNamesResponse{}, nil } @@ -1177,8 +1173,7 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.Me return nil, err } - if i.State() == services.Stopping { - level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + if i.checkIfTSDBClosed() { return &client.MetricsForLabelMatchersResponse{}, nil } @@ -1249,8 +1244,7 @@ func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest return nil, err } - if i.State() == services.Stopping { - level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + if i.checkIfTSDBClosed() { return &client.UserStatsResponse{}, nil } @@ -1266,8 +1260,7 @@ func (i *Ingester) v2AllUserStats(ctx context.Context, req *client.UserStatsRequ i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() - if i.State() == services.Stopping { - level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") + if i.checkIfTSDBClosed() { return &client.UsersStatsResponse{}, nil } @@ -1315,8 +1308,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste i.metrics.queries.Inc() - if i.State() == services.Stopping { - level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of stopping and closing all TSBD") + if i.checkIfTSDBClosed() { return nil } @@ -1828,8 +1820,7 @@ func (i *Ingester) getMemorySeriesMetric() float64 { count := uint64(0) - if i.State() == services.Stopping { - level.Debug(i.logger).Log("Cannot retrieve TSDB, as the Ingester is in the process of closing all TSBD") + if i.checkIfTSDBClosed() { return 0 } @@ -2291,3 +2282,11 @@ func (i *Ingester) getInstanceLimits() *InstanceLimits { return l } + +func (i *Ingester) checkIfTSDBClosed() bool { + if i.State() == services.Stopping { + level.Debug(i.logger).Log("msg", "TSDB is unavailable, as the Ingester is in the process of stopping and closing all TSDB") + return true + } + return false +} From feb4f42dbcb0a3c3ff3f9b8427e35d580f363097 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Wed, 30 Jun 2021 00:51:50 -0400 Subject: [PATCH 09/13] change function name for checking if tsdb is closing Signed-off-by: ilangofman --- pkg/ingester/ingester_v2.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index ddef2f492e..30c632475d 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -992,7 +992,7 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie i.metrics.queries.Inc() - if i.checkIfTSDBClosed() { + if i.checkIfAllTSDBClosing() { return &client.QueryResponse{}, nil } @@ -1052,7 +1052,7 @@ func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQue i.metrics.queries.Inc() - if i.checkIfTSDBClosed() { + if i.checkIfAllTSDBClosing() { return &client.ExemplarQueryResponse{}, nil } @@ -1101,7 +1101,7 @@ func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesReq return nil, err } - if i.checkIfTSDBClosed() { + if i.checkIfAllTSDBClosing() { return &client.LabelValuesResponse{}, nil } @@ -1137,7 +1137,7 @@ func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesReque return nil, err } - if i.checkIfTSDBClosed() { + if i.checkIfAllTSDBClosing() { return &client.LabelNamesResponse{}, nil } @@ -1173,7 +1173,7 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.Me return nil, err } - if i.checkIfTSDBClosed() { + if i.checkIfAllTSDBClosing() { return &client.MetricsForLabelMatchersResponse{}, nil } @@ -1244,7 +1244,7 @@ func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest return nil, err } - if i.checkIfTSDBClosed() { + if i.checkIfAllTSDBClosing() { return &client.UserStatsResponse{}, nil } @@ -1260,7 +1260,7 @@ func (i *Ingester) v2AllUserStats(ctx context.Context, req *client.UserStatsRequ i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() - if i.checkIfTSDBClosed() { + if i.checkIfAllTSDBClosing() { return &client.UsersStatsResponse{}, nil } @@ -1308,7 +1308,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste i.metrics.queries.Inc() - if i.checkIfTSDBClosed() { + if i.checkIfAllTSDBClosing() { return nil } @@ -1820,7 +1820,7 @@ func (i *Ingester) getMemorySeriesMetric() float64 { count := uint64(0) - if i.checkIfTSDBClosed() { + if i.checkIfAllTSDBClosing() { return 0 } @@ -2283,7 +2283,7 @@ func (i *Ingester) getInstanceLimits() *InstanceLimits { return l } -func (i *Ingester) checkIfTSDBClosed() bool { +func (i *Ingester) checkIfAllTSDBClosing() bool { if i.State() == services.Stopping { level.Debug(i.logger).Log("msg", "TSDB is unavailable, as the Ingester is in the process of stopping and closing all TSDB") return true From 3c4798b8ed62153a921ddea461e6727ed9d82ec6 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Fri, 2 Jul 2021 16:00:11 -0400 Subject: [PATCH 10/13] ingester should read/write only when state is running for block store Signed-off-by: ilangofman --- CHANGELOG.md | 2 +- pkg/ingester/ingester.go | 97 +++++++++++++++++++++++++------------ pkg/ingester/ingester_v2.go | 49 ++----------------- 3 files changed, 72 insertions(+), 76 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec84408c13..45dc6f400a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## master / unreleased * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 -* [BUGFIX] Ingester: Prevent any reads on TSDBs when the ingester is stopping. #4304 +* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7936a6d3f6..213bef9bac 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -480,10 +480,26 @@ func (i *Ingester) checkRunningOrStopping() error { return status.Error(codes.Unavailable, s.String()) } +// Using block store, the ingester is only available when it is in a Running state. The ingester is not available +// when stopping to prevent any read or writes to the TSDB after the ingester has closed them. +func (i *Ingester) checkRunning() error { + s := i.State() + if s == services.Running { + return nil + } + return status.Error(codes.Unavailable, s.String()) +} + // Push implements client.IngesterServer func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err + if i.cfg.BlocksStorageEnabled { + if err := i.checkRunning(); err != nil { + return nil, err + } + } else { + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } } // We will report *this* request in the error too. @@ -762,14 +778,17 @@ func (i *Ingester) purgeUserMetricsMetadata() { // Query implements service.IngesterServer func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if i.cfg.BlocksStorageEnabled { + if err := i.checkRunning(); err != nil { + return nil, err + } return i.v2Query(ctx, req) } + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -829,14 +848,17 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client // QueryStream implements service.IngesterServer func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error { - if err := i.checkRunningOrStopping(); err != nil { - return err - } - if i.cfg.BlocksStorageEnabled { + if err := i.checkRunning(); err != nil { + return err + } return i.v2QueryStream(req, stream) } + if err := i.checkRunningOrStopping(); err != nil { + return err + } + spanLog, ctx := spanlogger.New(stream.Context(), "QueryStream") defer spanLog.Finish() @@ -926,14 +948,17 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery // LabelValues returns all label values that are associated with a given label name. func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if i.cfg.BlocksStorageEnabled { + if err := i.checkRunning(); err != nil { + return nil, err + } return i.v2LabelValues(ctx, req) } + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() state, ok, err := i.userStates.getViaContext(ctx) @@ -951,14 +976,17 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque // LabelNames return all the label names. func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if i.cfg.BlocksStorageEnabled { + if err := i.checkRunning(); err != nil { + return nil, err + } return i.v2LabelNames(ctx, req) } + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() state, ok, err := i.userStates.getViaContext(ctx) @@ -976,14 +1004,17 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest // MetricsForLabelMatchers returns all the metrics which match a set of matchers. func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if i.cfg.BlocksStorageEnabled { + if err := i.checkRunning(); err != nil { + return nil, err + } return i.v2MetricsForLabelMatchers(ctx, req) } + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() state, ok, err := i.userStates.getViaContext(ctx) @@ -1046,14 +1077,17 @@ func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetad // UserStats returns ingestion statistics for the current user. func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if i.cfg.BlocksStorageEnabled { + if err := i.checkRunning(); err != nil { + return nil, err + } return i.v2UserStats(ctx, req) } + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() state, ok, err := i.userStates.getViaContext(ctx) @@ -1075,14 +1109,17 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) // AllUserStats returns ingestion statistics for all users known to this ingester. func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if i.cfg.BlocksStorageEnabled { + if err := i.checkRunning(); err != nil { + return nil, err + } return i.v2AllUserStats(ctx, req) } + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() users := i.userStates.cp() diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 30c632475d..408ea73695 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -992,10 +992,6 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie i.metrics.queries.Inc() - if i.checkIfAllTSDBClosing() { - return &client.QueryResponse{}, nil - } - db := i.getTSDB(userID) if db == nil { return &client.QueryResponse{}, nil @@ -1052,10 +1048,6 @@ func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQue i.metrics.queries.Inc() - if i.checkIfAllTSDBClosing() { - return &client.ExemplarQueryResponse{}, nil - } - db := i.getTSDB(userID) if db == nil { return &client.ExemplarQueryResponse{}, nil @@ -1101,10 +1093,6 @@ func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesReq return nil, err } - if i.checkIfAllTSDBClosing() { - return &client.LabelValuesResponse{}, nil - } - db := i.getTSDB(userID) if db == nil { return &client.LabelValuesResponse{}, nil @@ -1137,10 +1125,6 @@ func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesReque return nil, err } - if i.checkIfAllTSDBClosing() { - return &client.LabelNamesResponse{}, nil - } - db := i.getTSDB(userID) if db == nil { return &client.LabelNamesResponse{}, nil @@ -1173,10 +1157,6 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.Me return nil, err } - if i.checkIfAllTSDBClosing() { - return &client.MetricsForLabelMatchersResponse{}, nil - } - db := i.getTSDB(userID) if db == nil { return &client.MetricsForLabelMatchersResponse{}, nil @@ -1244,10 +1224,6 @@ func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest return nil, err } - if i.checkIfAllTSDBClosing() { - return &client.UserStatsResponse{}, nil - } - db := i.getTSDB(userID) if db == nil { return &client.UserStatsResponse{}, nil @@ -1260,10 +1236,6 @@ func (i *Ingester) v2AllUserStats(ctx context.Context, req *client.UserStatsRequ i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() - if i.checkIfAllTSDBClosing() { - return &client.UsersStatsResponse{}, nil - } - users := i.TSDBState.dbs response := &client.UsersStatsResponse{ @@ -1308,10 +1280,6 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste i.metrics.queries.Inc() - if i.checkIfAllTSDBClosing() { - return nil - } - db := i.getTSDB(userID) if db == nil { return nil @@ -1815,15 +1783,14 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { // getMemorySeriesMetric returns the total number of in-memory series across all open TSDBs. func (i *Ingester) getMemorySeriesMetric() float64 { + if err := i.checkRunning(); err != nil { + return 0 + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() count := uint64(0) - - if i.checkIfAllTSDBClosing() { - return 0 - } - for _, db := range i.TSDBState.dbs { count += db.Head().NumSeries() } @@ -2282,11 +2249,3 @@ func (i *Ingester) getInstanceLimits() *InstanceLimits { return l } - -func (i *Ingester) checkIfAllTSDBClosing() bool { - if i.State() == services.Stopping { - level.Debug(i.logger).Log("msg", "TSDB is unavailable, as the Ingester is in the process of stopping and closing all TSDB") - return true - } - return false -} From 140d5253659ee79132fb0cc1347b3144c9cf44d7 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Mon, 5 Jul 2021 09:59:19 -0400 Subject: [PATCH 11/13] Move running check to ingester v2 file Signed-off-by: ilangofman --- pkg/ingester/ingester.go | 41 ++++---------------------------- pkg/ingester/ingester_v2.go | 47 +++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 36 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 213bef9bac..de1af5b551 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -493,13 +493,11 @@ func (i *Ingester) checkRunning() error { // Push implements client.IngesterServer func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { if i.cfg.BlocksStorageEnabled { - if err := i.checkRunning(); err != nil { - return nil, err - } - } else { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } + return i.v2Push(ctx, req) + } + + if err := i.checkRunningOrStopping(); err != nil { + return nil, err } // We will report *this* request in the error too. @@ -513,10 +511,6 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte } } - if i.cfg.BlocksStorageEnabled { - return i.v2Push(ctx, req) - } - // NOTE: because we use `unsafe` in deserialisation, we must not // retain anything from `req` past the call to ReuseSlice defer cortexpb.ReuseSlice(req.Timeseries) @@ -779,9 +773,6 @@ func (i *Ingester) purgeUserMetricsMetadata() { // Query implements service.IngesterServer func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) { if i.cfg.BlocksStorageEnabled { - if err := i.checkRunning(); err != nil { - return nil, err - } return i.v2Query(ctx, req) } @@ -849,9 +840,6 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client // QueryStream implements service.IngesterServer func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error { if i.cfg.BlocksStorageEnabled { - if err := i.checkRunning(); err != nil { - return err - } return i.v2QueryStream(req, stream) } @@ -935,10 +923,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ // Query implements service.IngesterServer func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) { - if err := i.checkRunningOrStopping(); err != nil { - return nil, err - } - if !i.cfg.BlocksStorageEnabled { return nil, errors.New("not supported") } @@ -949,9 +933,6 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery // LabelValues returns all label values that are associated with a given label name. func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) { if i.cfg.BlocksStorageEnabled { - if err := i.checkRunning(); err != nil { - return nil, err - } return i.v2LabelValues(ctx, req) } @@ -977,9 +958,6 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque // LabelNames return all the label names. func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) { if i.cfg.BlocksStorageEnabled { - if err := i.checkRunning(); err != nil { - return nil, err - } return i.v2LabelNames(ctx, req) } @@ -1005,9 +983,6 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest // MetricsForLabelMatchers returns all the metrics which match a set of matchers. func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) { if i.cfg.BlocksStorageEnabled { - if err := i.checkRunning(); err != nil { - return nil, err - } return i.v2MetricsForLabelMatchers(ctx, req) } @@ -1078,9 +1053,6 @@ func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetad // UserStats returns ingestion statistics for the current user. func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) { if i.cfg.BlocksStorageEnabled { - if err := i.checkRunning(); err != nil { - return nil, err - } return i.v2UserStats(ctx, req) } @@ -1110,9 +1082,6 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) // AllUserStats returns ingestion statistics for all users known to this ingester. func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) { if i.cfg.BlocksStorageEnabled { - if err := i.checkRunning(); err != nil { - return nil, err - } return i.v2AllUserStats(ctx, req) } diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 408ea73695..3cd746c2f4 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -716,6 +716,21 @@ type extendedAppender interface { // v2Push adds metrics to a block func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + + // We will report *this* request in the error too. + inflight := i.inflightPushRequests.Inc() + defer i.inflightPushRequests.Dec() + + gl := i.getInstanceLimits() + if gl != nil && gl.MaxInflightPushRequests > 0 { + if inflight > gl.MaxInflightPushRequests { + return nil, errTooManyInflightPushRequests + } + } + var firstPartialErr error // NOTE: because we use `unsafe` in deserialisation, we must not @@ -980,6 +995,10 @@ func (u *userTSDB) releaseAppendLock() { } func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1036,6 +1055,10 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie } func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1083,6 +1106,10 @@ func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQue } func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + labelName, startTimestampMs, endTimestampMs, matchers, err := client.FromLabelValuesRequest(req) if err != nil { return nil, err @@ -1120,6 +1147,10 @@ func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesReq } func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1152,6 +1183,10 @@ func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesReque } func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1219,6 +1254,10 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.Me } func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1233,6 +1272,10 @@ func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest } func (i *Ingester) v2AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) { + if err := i.checkRunning(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() @@ -1265,6 +1308,10 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024 // v2QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error { + if err := i.checkRunning(); err != nil { + return err + } + spanlog, ctx := spanlogger.New(stream.Context(), "v2QueryStream") defer spanlog.Finish() From b7ac2c4aaff6a4567bb122588652b87c27896483 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Mon, 5 Jul 2021 10:07:21 -0400 Subject: [PATCH 12/13] Remove extra space Signed-off-by: ilangofman --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e9b708db44..c0cde31480 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,8 @@ ## master / unreleased * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 -* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304 * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 - +* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304 ## 1.10.0-rc.0 / 2021-06-28 From 342781320895a29b3714c396f61ffd12fd549ffe Mon Sep 17 00:00:00 2001 From: ilangofman Date: Thu, 15 Jul 2021 09:21:11 -0400 Subject: [PATCH 13/13] Remove duplication from push func Signed-off-by: ilangofman --- pkg/ingester/ingester.go | 10 +++++----- pkg/ingester/ingester_v2.go | 15 --------------- 2 files changed, 5 insertions(+), 20 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index de1af5b551..0ee8b1e98a 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -492,11 +492,7 @@ func (i *Ingester) checkRunning() error { // Push implements client.IngesterServer func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { - if i.cfg.BlocksStorageEnabled { - return i.v2Push(ctx, req) - } - - if err := i.checkRunningOrStopping(); err != nil { + if err := i.checkRunning(); err != nil { return nil, err } @@ -511,6 +507,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte } } + if i.cfg.BlocksStorageEnabled { + return i.v2Push(ctx, req) + } + // NOTE: because we use `unsafe` in deserialisation, we must not // retain anything from `req` past the call to ReuseSlice defer cortexpb.ReuseSlice(req.Timeseries) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 3cd746c2f4..ec9a4f5def 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -716,21 +716,6 @@ type extendedAppender interface { // v2Push adds metrics to a block func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { - if err := i.checkRunning(); err != nil { - return nil, err - } - - // We will report *this* request in the error too. - inflight := i.inflightPushRequests.Inc() - defer i.inflightPushRequests.Dec() - - gl := i.getInstanceLimits() - if gl != nil && gl.MaxInflightPushRequests > 0 { - if inflight > gl.MaxInflightPushRequests { - return nil, errTooManyInflightPushRequests - } - } - var firstPartialErr error // NOTE: because we use `unsafe` in deserialisation, we must not