diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index 79158f98d8e..26e929dd62f 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -10,10 +10,12 @@ import ( "encoding/json" "io" "path" + "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -116,23 +118,38 @@ func (w *Updater) updateBlocks(ctx context.Context, old []*Block) (blocks []*Blo // Remaining blocks are new ones and we have to fetch the meta.json for each of them, in order // to find out if their upload has been completed (meta.json is uploaded last) and get the block // information to store in the bucket index. + // Process concurrently, but limit the number of concurrent requests. + discoveredSlice := make([]ulid.ULID, 0, len(discovered)) for id := range discovered { + discoveredSlice = append(discoveredSlice, id) + } + assignMu := sync.Mutex{} + err = concurrency.ForEachJob(ctx, len(discovered), 4, func(ctx context.Context, idx int) error { + id := discoveredSlice[idx] b, err := w.updateBlockIndexEntry(ctx, id) + + // Processing is done, the rest can be done sequentially. + assignMu.Lock() + defer assignMu.Unlock() + if err == nil { blocks = append(blocks, b) - continue + return nil } if errors.Is(err, ErrBlockMetaNotFound) { partials[id] = err level.Warn(w.logger).Log("msg", "skipped partial block when updating bucket index", "block", id.String()) - continue + return nil } if errors.Is(err, ErrBlockMetaCorrupted) { partials[id] = err level.Error(w.logger).Log("msg", "skipped block with corrupted meta.json when updating bucket index", "block", id.String(), "err", err) - continue + return nil } + return err + }) + if err != nil { return nil, nil, err } level.Info(w.logger).Log("msg", "fetched blocks metas for newly discovered blocks", "total_blocks", len(blocks), "partial_errors", len(partials))