Skip to content

Commit

Permalink
bucket verify: fix to parse all blocks
Browse files Browse the repository at this point in the history
Only the first block is parsed. This commit fixes this issue to parse all blocks present in the bucket.

Signed-off-by: Aymeric <[email protected]>
  • Loading branch information
Aymeric committed Nov 19, 2021
1 parent 243526d commit ee4df6f
Showing 1 changed file with 72 additions and 57 deletions.
129 changes: 72 additions & 57 deletions pkg/verifier/index_issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (IndexKnownIssues) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool

for id, meta := range metas {
if idMatcher != nil && !idMatcher(id) {
return nil
continue
}

tmpdir, err := ioutil.TempDir("", fmt.Sprintf("index-issue-block-%s-", id))
Expand All @@ -51,78 +51,93 @@ func (IndexKnownIssues) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool
}
}()

if err = objstore.DownloadFile(ctx, ctx.Logger, ctx.Bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(tmpdir, block.IndexFilename)); err != nil {
return errors.Wrapf(err, "download index file %s", path.Join(id.String(), block.IndexFilename))
}

stats, err := block.GatherIndexHealthStats(ctx.Logger, filepath.Join(tmpdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return errors.Wrapf(err, "gather index issues %s", id)
}

level.Debug(ctx.Logger).Log("stats", fmt.Sprintf("%+v", stats), "id", id)
if err = stats.AnyErr(); err == nil {
return nil
stats, err := verifyIndex(ctx, id, tmpdir, meta)
if err == nil {
level.Debug(ctx.Logger).Log("msg", "no issue", "id", id)
continue
}

level.Warn(ctx.Logger).Log("msg", "detected issue", "id", id, "err", err)

if !repair {
// Only verify.
return nil
continue
}

if stats.OutOfOrderChunks > stats.DuplicatedChunks {
level.Warn(ctx.Logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id)
// Verify repaired block before uploading it.
if err = repairIndex(stats, ctx, id, meta, tmpdir); err != nil {
level.Error(ctx.Logger).Log("msg", "could not repair index", "err", err)
}
level.Info(ctx.Logger).Log("msg", "all good, continuing", "id", id)
continue
}

if stats.OutsideChunks > (stats.CompleteOutsideChunks + stats.Issue347OutsideChunks) {
level.Warn(ctx.Logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id)
}
level.Info(ctx.Logger).Log("msg", "verified issue", "with-repair", repair)
return nil
}

if meta.Thanos.Downsample.Resolution > 0 {
return errors.New("cannot repair downsampled blocks")
}
func repairIndex(stats block.HealthStats, ctx Context, id ulid.ULID, meta *metadata.Meta, dir string) (err error) {
if stats.OutOfOrderChunks > stats.DuplicatedChunks {
level.Warn(ctx.Logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id)
}

level.Info(ctx.Logger).Log("msg", "downloading block for repair", "id", id)
if err = block.Download(ctx, ctx.Logger, ctx.Bkt, id, path.Join(tmpdir, id.String())); err != nil {
return errors.Wrapf(err, "download block %s", id)
}
level.Info(ctx.Logger).Log("msg", "downloaded block to be repaired", "id", id, "issue")

level.Info(ctx.Logger).Log("msg", "repairing block", "id", id, "issue")
resid, err := block.Repair(
ctx.Logger,
tmpdir,
id,
metadata.BucketRepairSource,
block.IgnoreCompleteOutsideChunk,
block.IgnoreDuplicateOutsideChunk,
block.IgnoreIssue347OutsideChunk,
)
if err != nil {
return errors.Wrapf(err, "repair failed for block %s", id)
}
level.Info(ctx.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid)
if stats.OutsideChunks > (stats.CompleteOutsideChunks + stats.Issue347OutsideChunks) {
level.Warn(ctx.Logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id)
}

// Verify repaired block before uploading it.
if err := block.VerifyIndex(ctx.Logger, filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil {
return errors.Wrapf(err, "repaired block is invalid %s", resid)
}
if meta.Thanos.Downsample.Resolution > 0 {
return errors.Wrapf(err, "cannot repair downsampled blocks", "id", id)
}

level.Info(ctx.Logger).Log("msg", "uploading repaired block", "newID", resid)
if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc); err != nil {
return errors.Wrapf(err, "upload of %s failed", resid)
}
level.Info(ctx.Logger).Log("msg", "downloading block for repair", "id", id)
if err = block.Download(ctx, ctx.Logger, ctx.Bkt, id, path.Join(dir, id.String())); err != nil {
return errors.Wrapf(err, "download block", "id", id)
}
level.Info(ctx.Logger).Log("msg", "downloaded block to be repaired", "id", id, "issue")

level.Info(ctx.Logger).Log("msg", "repairing block", "id", id, "issue")
resid, err := block.Repair(
ctx.Logger,
dir,
id,
metadata.BucketRepairSource,
block.IgnoreCompleteOutsideChunk,
block.IgnoreDuplicateOutsideChunk,
block.IgnoreIssue347OutsideChunk,
)
if err != nil {
return errors.Wrapf(err, "repair failed for block", "id", id)
}
level.Info(ctx.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid)

level.Info(ctx.Logger).Log("msg", "safe deleting broken block", "id", id, "issue")
if err := BackupAndDeleteDownloaded(ctx, filepath.Join(tmpdir, id.String()), id); err != nil {
return errors.Wrapf(err, "safe deleting old block %s failed", id)
}
level.Info(ctx.Logger).Log("msg", "all good, continuing", "id", id)
return nil
if err := block.VerifyIndex(ctx.Logger, filepath.Join(dir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil {
return errors.Wrapf(err, "repaired block is invalid", "resid", resid)
}

level.Info(ctx.Logger).Log("msg", "uploading repaired block", "newID", resid)
if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(dir, resid.String()), metadata.NoneFunc); err != nil {
return errors.Wrapf(err, "upload of %s failed", resid)
}

level.Info(ctx.Logger).Log("msg", "safe deleting broken block", "id", id, "issue")
if err := BackupAndDeleteDownloaded(ctx, filepath.Join(dir, id.String()), id); err != nil {
return errors.Wrapf(err, "safe deleting old block %s failed", id)
}

level.Info(ctx.Logger).Log("msg", "verified issue", "with-repair", repair)
return nil
}

func verifyIndex(ctx Context, id ulid.ULID, dir string, meta *metadata.Meta) (stats block.HealthStats, err error) {
if err := objstore.DownloadFile(ctx, ctx.Logger, ctx.Bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(dir, block.IndexFilename)); err != nil {
return stats, errors.Wrapf(err, "download index file", path.Join(id.String(), block.IndexFilename))
}

stats, err = block.GatherIndexHealthStats(ctx.Logger, filepath.Join(dir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return stats, errors.Wrapf(err, "gather index issues", "id", id)
}

level.Debug(ctx.Logger).Log("stats", fmt.Sprintf("%+v", stats), "id", id)

return stats, stats.AnyErr()
}

0 comments on commit ee4df6f

Please sign in to comment.