Skip to content

Commit

Permalink
bucket verify: repair out of order labels (#964)
Browse files Browse the repository at this point in the history
* bucket verify: repair out of order labels

* verify repair: correctly order series in the index on rewrite

When we have label sets that are not in the correct order, fixing that
changes the order of the series in the index.  So the index must be
rewritten in that new order.  This makes this repair tool take up a
bunch more memory, but produces blocks that verify correctly.

* Fix the TSDB block safe-delete function

The directory name must be the block ID name exactly to verify.  A temp
directory or random name will not work here.

* verify repair: fix duplicate chunk detection

Pointer/reference logic error was eliminating all chunks for a series in
a given TSDB block that wasn't the first chunk.  Chunks are now
referenced correctly via pointers.

* PR feedback: use errors.Errorf() instead of fmt.Errorf()

* Use errors.New()

Some linters catch errors.Errorf() as its not really part of the errors
package.

* Liberally comment this for loop

We're comparing items by pointers, using Go's range variables is
misleading here and we need not fall into the same trap.

* Take advantage of sort.Interface

This prevents us from having to re-implement label sorting.

* PR Feedback: Comments are full sentences.
  • Loading branch information
jjneely authored and bwplotka committed Apr 18, 2019
1 parent d436a04 commit b8c9dcf
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 23 deletions.
5 changes: 3 additions & 2 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
var backupBkt objstore.Bucket
if len(backupconfContentYaml) == 0 {
if *repair {
return errors.Wrap(err, "repair is specified, so backup client is required")
return errors.New("repair is specified, so backup client is required")
}
} else {
backupBkt, err = client.NewBucket(logger, backupconfContentYaml, reg, name)
// nil Prometheus registerer: don't create conflicting metrics
backupBkt, err = client.NewBucket(logger, backupconfContentYaml, nil, name)
if err != nil {
return err
}
Expand Down
62 changes: 45 additions & 17 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func IgnoreDuplicateOutsideChunk(_ int64, _ int64, last *chunks.Meta, curr *chun
// the current one.
if curr.MinTime != last.MinTime || curr.MaxTime != last.MaxTime {
return false, errors.Errorf("non-sequential chunks not equal: [%d, %d] and [%d, %d]",
last.MaxTime, last.MaxTime, curr.MinTime, curr.MaxTime)
last.MinTime, last.MaxTime, curr.MinTime, curr.MaxTime)
}
ca := crc32.Checksum(last.Chunk.Bytes(), castagnoli)
cb := crc32.Checksum(curr.Chunk.Bytes(), castagnoli)
Expand Down Expand Up @@ -563,9 +563,14 @@ func sanitizeChunkSequence(chks []chunks.Meta, mint int64, maxt int64, ignoreChk
var last *chunks.Meta

OUTER:
for _, c := range chks {
// This compares the current chunk to the chunk from the last iteration
// by pointers. If we use "i, c := range chks" the variable c is a new
// variable who's address doesn't change through the entire loop.
// The current element of the chks slice is copied into it. We must take
// the address of the indexed slice instead.
for i := range chks {
for _, ignoreChkFn := range ignoreChkFns {
ignore, err := ignoreChkFn(mint, maxt, last, &c)
ignore, err := ignoreChkFn(mint, maxt, last, &chks[i])
if err != nil {
return nil, errors.Wrap(err, "ignore function")
}
Expand All @@ -575,13 +580,18 @@ OUTER:
}
}

last = &c
repl = append(repl, c)
last = &chks[i]
repl = append(repl, chks[i])
}

return repl, nil
}

type seriesRepair struct {
lset labels.Labels
chks []chunks.Meta
}

// rewrite writes all data from the readers back into the writers while cleaning
// up mis-ordered and duplicated chunks.
func rewrite(
Expand Down Expand Up @@ -609,17 +619,20 @@ func rewrite(
postings = index.NewMemPostings()
values = map[string]stringset{}
i = uint64(0)
series = []seriesRepair{}
)

var lset labels.Labels
var chks []chunks.Meta

for all.Next() {
var lset labels.Labels
var chks []chunks.Meta
id := all.At()

if err := indexr.Series(id, &lset, &chks); err != nil {
return err
}
// Make sure labels are in sorted order.
sort.Sort(lset)

for i, c := range chks {
chks[i].Chunk, err = chunkr.Chunk(c.Ref)
if err != nil {
Expand All @@ -636,34 +649,49 @@ func rewrite(
continue
}

if err := chunkw.WriteChunks(chks...); err != nil {
series = append(series, seriesRepair{
lset: lset,
chks: chks,
})
}

if all.Err() != nil {
return errors.Wrap(all.Err(), "iterate series")
}

// Sort the series, if labels are re-ordered then the ordering of series
// will be different.
sort.Slice(series, func(i, j int) bool {
return labels.Compare(series[i].lset, series[j].lset) < 0
})

// Build a new TSDB block.
for _, s := range series {
if err := chunkw.WriteChunks(s.chks...); err != nil {
return errors.Wrap(err, "write chunks")
}
if err := indexw.AddSeries(i, lset, chks...); err != nil {
if err := indexw.AddSeries(i, s.lset, s.chks...); err != nil {
return errors.Wrap(err, "add series")
}

meta.Stats.NumChunks += uint64(len(chks))
meta.Stats.NumChunks += uint64(len(s.chks))
meta.Stats.NumSeries++

for _, chk := range chks {
for _, chk := range s.chks {
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
}

for _, l := range lset {
for _, l := range s.lset {
valset, ok := values[l.Name]
if !ok {
valset = stringset{}
values[l.Name] = valset
}
valset.set(l.Value)
}
postings.Add(i, lset)
postings.Add(i, s.lset)
i++
}
if all.Err() != nil {
return errors.Wrap(all.Err(), "iterate series")
}

s := make([]string, 0, 256)
for n, v := range values {
Expand Down
13 changes: 9 additions & 4 deletions pkg/verifier/safe_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package verifier

import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -31,13 +31,18 @@ func SafeDelete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac
return errors.Errorf("%s dir seems to exists in backup bucket. Remove this block manually if you are sure it is safe to do", id)
}

dir, err := ioutil.TempDir("", fmt.Sprintf("safe-delete-%s", id))
tempdir, err := ioutil.TempDir("", "safe-delete")
if err != nil {
return err
}
dir := filepath.Join(tempdir, id.String())
err = os.Mkdir(dir, 0755)
if err != nil {
return err
}
defer func() {
if err := os.RemoveAll(dir); err != nil {
level.Warn(logger).Log("msg", "failed to delete dir", "dir", dir, "err", err)
if err := os.RemoveAll(tempdir); err != nil {
level.Warn(logger).Log("msg", "failed to delete dir", "dir", tempdir, "err", err)
}
}()

Expand Down

0 comments on commit b8c9dcf

Please sign in to comment.