Skip to content

Commit

Permalink
Bucket Index Cleanup: Concurrent updateBlockIndexEntry
Browse files Browse the repository at this point in the history
Getting the meta.json can take a minute to fail, this makes the operation run concurrently to get through partial blocks faster
  • Loading branch information
julienduchesne committed Oct 31, 2024
1 parent 3be1c68 commit aed7b0b
Showing 1 changed file with 20 additions and 3 deletions.
23 changes: 20 additions & 3 deletions pkg/storage/tsdb/bucketindex/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"encoding/json"
"io"
"path"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -116,23 +118,38 @@ func (w *Updater) updateBlocks(ctx context.Context, old []*Block) (blocks []*Blo
// Remaining blocks are new ones and we have to fetch the meta.json for each of them, in order
// to find out if their upload has been completed (meta.json is uploaded last) and get the block
// information to store in the bucket index.
// Process concurrently, but limit the number of concurrent requests.
discoveredSlice := make([]ulid.ULID, 0, len(discovered))
for id := range discovered {
discoveredSlice = append(discoveredSlice, id)
}
assignMu := sync.Mutex{}
err = concurrency.ForEachJob(ctx, len(discovered), 4, func(ctx context.Context, idx int) error {
id := discoveredSlice[idx]
b, err := w.updateBlockIndexEntry(ctx, id)

// Processing is done, the rest can be done sequentially.
assignMu.Lock()
defer assignMu.Unlock()

if err == nil {
blocks = append(blocks, b)
continue
return nil
}

if errors.Is(err, ErrBlockMetaNotFound) {
partials[id] = err
level.Warn(w.logger).Log("msg", "skipped partial block when updating bucket index", "block", id.String())
continue
return nil
}
if errors.Is(err, ErrBlockMetaCorrupted) {
partials[id] = err
level.Error(w.logger).Log("msg", "skipped block with corrupted meta.json when updating bucket index", "block", id.String(), "err", err)
continue
return nil
}
return err
})
if err != nil {
return nil, nil, err
}
level.Info(w.logger).Log("msg", "fetched blocks metas for newly discovered blocks", "total_blocks", len(blocks), "partial_errors", len(partials))
Expand Down

0 comments on commit aed7b0b

Please sign in to comment.