diff --git a/pkg/blockbuilder/tsdb.go b/pkg/blockbuilder/tsdb.go index ee2d610fe78..a3fcaf0c6ff 100644 --- a/pkg/blockbuilder/tsdb.go +++ b/pkg/blockbuilder/tsdb.go @@ -316,30 +316,26 @@ type blockUploader func(_ context.Context, tenantID, dbDir string, blockIDs []st // All the DBs are closed and directories cleared irrespective of success or failure of this function. func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUploader) (_ int, err error) { var ( - doneDBsMu sync.Mutex - doneDBs = make(map[*userTSDB]bool) + closedDBsMu sync.Mutex + closedDBs = make(map[*userTSDB]bool) ) b.tsdbsMu.Lock() defer func() { - b.tsdbsMu.Unlock() - var merr multierror.MultiError merr.Add(err) - // If some TSDB was not compacted or uploaded, it will be re-tried in the next cycle, so we remove it here. + // If some TSDB was not compacted or uploaded, it will be re-tried in the next cycle, so we always remove it here. for _, db := range b.tsdbs { - if doneDBs[db] { - continue + if !closedDBs[db] { + merr.Add(db.Close()) } - dbDir := db.Dir() - merr.Add(db.Close()) - merr.Add(os.RemoveAll(dbDir)) + merr.Add(os.RemoveAll(db.Dir())) } - - err = merr.Err() - // Clear the map so that it can be released from the memory. Not setting to nil in case we want to reuse the TSDBBuilder. clear(b.tsdbs) + b.tsdbsMu.Unlock() + + err = merr.Err() }() level.Info(b.logger).Log("msg", "compacting and uploading blocks", "num_tsdb", len(b.tsdbs)) @@ -384,14 +380,14 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp return err } + closedDBsMu.Lock() + closedDBs[db] = true + closedDBsMu.Unlock() + if err := uploadBlocks(ctx, tenant.tenantID, dbDir, blockIDs); err != nil { return err } - doneDBsMu.Lock() - doneDBs[db] = true - doneDBsMu.Unlock() - // Clear the DB from the disk. Don't need it anymore. return os.RemoveAll(dbDir) }) diff --git a/pkg/blockbuilder/tsdb_test.go b/pkg/blockbuilder/tsdb_test.go index 9b1ad07cfc7..ff36eb18301 100644 --- a/pkg/blockbuilder/tsdb_test.go +++ b/pkg/blockbuilder/tsdb_test.go @@ -281,6 +281,30 @@ func TestTSDBBuilder(t *testing.T) { } } +func TestTSDBBuilder_CompactAndUpload_fail(t *testing.T) { + overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + metrics := newTSDBBBuilderMetrics(prometheus.NewPedanticRegistry()) + builder := NewTSDBBuilder(log.NewNopLogger(), t.TempDir(), mimir_tsdb.BlocksStorageConfig{}, overrides, metrics, 0) + t.Cleanup(func() { + require.NoError(t, builder.Close()) + }) + + userID := strconv.Itoa(rand.Int()) + tenant := tsdbTenant{ + partitionID: 0, + tenantID: userID, + } + _, err = builder.getOrCreateTSDB(tenant) + require.NoError(t, err) + + errUploadFailed := fmt.Errorf("upload failed") + _, err = builder.CompactAndUpload(context.Background(), func(_ context.Context, _, _ string, _ []string) error { + return errUploadFailed + }) + require.ErrorIs(t, err, errUploadFailed) +} + func compareQuery(t *testing.T, db *tsdb.DB, expSamples []mimirpb.Sample, expHistograms []mimirpb.Histogram, matchers ...*labels.Matcher) { querier, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err)