From ef0f7fab7e31eacd333c8533ad3f909c4c408719 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Sun, 16 May 2021 13:51:54 -0400 Subject: [PATCH 01/11] add offline dedup compactor Signed-off-by: yeya24 --- cmd/thanos/compact.go | 23 +- docs/components/compact.md | 9 + go.mod | 1 + pkg/compact/compact.go | 6 + pkg/compact/compact_e2e_test.go | 16 +- pkg/dedup/chunk_iter.go | 357 ++++++++++++++++++++++++++++++++ pkg/dedup/chunk_iter_test.go | 141 +++++++++++++ pkg/dedup/iter_test.go | 8 + test/e2e/compact_test.go | 71 ++++++- 9 files changed, 625 insertions(+), 7 deletions(-) create mode 100644 pkg/dedup/chunk_iter.go create mode 100644 pkg/dedup/chunk_iter_test.go diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 5d69b41242..d8e4b86e09 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -23,13 +23,16 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/dedup" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/extprom" @@ -302,9 +305,20 @@ func runCompact( cancel() } }() + + var mergeFunc storage.VerticalChunkSeriesMergeFunc + switch conf.dedupFunc { + case compact.DedupAlgorithmPenalty: + mergeFunc = dedup.NewDedupChunkSeriesMerger() + + if len(conf.dedupReplicaLabels) == 0 { + return errors.New("penalty based deduplication needs at least one replica label specified") + } + } + // Instantiate the compactor with different time slices. Timestamps in TSDB // are in milliseconds. - comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), nil) + comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), mergeFunc) if err != nil { return errors.Wrap(err, "create compactor") } @@ -564,6 +578,7 @@ type compactConfig struct { maxBlockIndexSize units.Base2Bytes hashFunc string enableVerticalCompaction bool + dedupFunc string } func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -627,6 +642,12 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set."). Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction) + cmd.Flag("compact.dedup-func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+ + "Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+ + "When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag."). + Default("").EnumVar(&cc.dedupFunc, compact.DedupAlgorithmPenalty, "") + + // Update this. This flag works for both dedup version compactor and the original compactor. cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+ "Experimental. When it is set to true, compactor will ignore the given labels so that vertical compaction can merge the blocks."+ "Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+ diff --git a/docs/components/compact.md b/docs/components/compact.md index ba7c183ac8..3d92201c50 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -343,6 +343,15 @@ Flags: happen at the end of an iteration. --compact.concurrency=1 Number of goroutines to use when compacting groups. + --compact.dedup-func= Experimental. Deduplication algorithm for + merging overlapping blocks. Possible values are: + "", "penalty". If no value is specified, the + default compact deduplication merger is used, + which performs 1:1 deduplication for samples. + When set to penalty, penalty based deduplication + algorithm will be used. At least one replica + label has to be set via + --deduplication.replica-label flag. --consistency-delay=30m Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of diff --git a/go.mod b/go.mod index bf64c42245..646cc36caa 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/prometheus/common v0.23.0 github.com/prometheus/exporter-toolkit v0.5.1 github.com/prometheus/prometheus v1.8.2-0.20210519120135-d95b0972505f + github.com/stretchr/testify v1.7.0 github.com/uber/jaeger-client-go v2.28.0+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible github.com/weaveworks/common v0.0.0-20210419092856-009d1eebd624 diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 58388a652f..1d956c49a1 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -41,6 +41,12 @@ const ( ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2) ) +const ( + // DedupAlgorithmPenalty is the penalty based compactor series merge algorithm. + // This is the same as the online deduplication of querier except counter reset handling. + DedupAlgorithmPenalty = "penalty" +) + // Syncer synchronizes block metas from a bucket into a local directory. // It sorts them into compaction groups based on equal label sets. type Syncer struct { diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 663bb5767f..0e620840d8 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -22,9 +22,12 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/dedup" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/objtesting" "github.com/thanos-io/thanos/pkg/testutil" @@ -167,7 +170,16 @@ func MetricCount(c prometheus.Collector) int { return mCount } -func TestGroup_Compact_e2e(t *testing.T) { +func TestGroupCompactE2E(t *testing.T) { + testGroupCompactE2e(t, nil) +} + +// Penalty based merger should get the same result as the blocks don't have overlap. +func TestGroupCompactPenaltyDedupE2E(t *testing.T) { + testGroupCompactE2e(t, dedup.NewDedupChunkSeriesMerger()) +} + +func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMergeFunc) { objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() @@ -194,7 +206,7 @@ func TestGroup_Compact_e2e(t *testing.T) { sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5) testutil.Ok(t, err) - comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil) + comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc) testutil.Ok(t, err) planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000}) diff --git a/pkg/dedup/chunk_iter.go b/pkg/dedup/chunk_iter.go new file mode 100644 index 0000000000..84c6883576 --- /dev/null +++ b/pkg/dedup/chunk_iter.go @@ -0,0 +1,357 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package dedup + +import ( + "bytes" + "container/heap" + + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + + "github.com/thanos-io/thanos/pkg/compact/downsample" +) + +// NewDedupChunkSeriesMerger merges several chunk series into one. +// Deduplication is based on penalty based deduplication algorithm without handling counter reset. +func NewDedupChunkSeriesMerger() storage.VerticalChunkSeriesMergeFunc { + return func(series ...storage.ChunkSeries) storage.ChunkSeries { + if len(series) == 0 { + return nil + } + return &storage.ChunkSeriesEntry{ + Lset: series[0].Labels(), + ChunkIteratorFn: func() chunks.Iterator { + iterators := make([]chunks.Iterator, 0, len(series)) + for _, s := range series { + iterators = append(iterators, s.Iterator()) + } + return &dedupChunksIterator{ + iterators: iterators, + } + }, + } + } +} + +type dedupChunksIterator struct { + iterators []chunks.Iterator + h chunkIteratorHeap + + err error + curr chunks.Meta +} + +func (d *dedupChunksIterator) At() chunks.Meta { + return d.curr +} + +// Next method is almost the same as https://github.com/prometheus/prometheus/blob/v2.27.1/storage/merge.go#L615. +// The difference is that it handles both XOR and Aggr chunk Encoding. +func (d *dedupChunksIterator) Next() bool { + if d.h == nil { + for _, iter := range d.iterators { + if iter.Next() { + heap.Push(&d.h, iter) + } + } + } + if len(d.h) == 0 { + return false + } + + iter := heap.Pop(&d.h).(chunks.Iterator) + d.curr = iter.At() + if iter.Next() { + heap.Push(&d.h, iter) + } + + var ( + om = newOverlappingMerger() + oMaxTime = d.curr.MaxTime + prev = d.curr + ) + + // Detect overlaps to compact. + for len(d.h) > 0 { + // Get the next oldest chunk by min, then max time. + next := d.h[0].At() + if next.MinTime > oMaxTime { + // No overlap with current one. + break + } + + if next.MinTime == prev.MinTime && + next.MaxTime == prev.MaxTime && + bytes.Equal(next.Chunk.Bytes(), prev.Chunk.Bytes()) { + // 1:1 duplicates, skip it. + } else { + // We operate on same series, so labels does not matter here. + om.addChunk(next) + + if next.MaxTime > oMaxTime { + oMaxTime = next.MaxTime + } + prev = next + } + + iter := heap.Pop(&d.h).(chunks.Iterator) + if iter.Next() { + heap.Push(&d.h, iter) + } + } + if om.empty() { + return true + } + + iter = om.iterator(d.curr) + if !iter.Next() { + if d.err = iter.Err(); d.err != nil { + return false + } + panic("unexpected seriesToChunkEncoder lack of iterations") + } + d.curr = iter.At() + if iter.Next() { + heap.Push(&d.h, iter) + } + return true +} + +func (d *dedupChunksIterator) Err() error { + return d.err +} + +type chunkIteratorHeap []chunks.Iterator + +func (h chunkIteratorHeap) Len() int { return len(h) } +func (h chunkIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h chunkIteratorHeap) Less(i, j int) bool { + at := h[i].At() + bt := h[j].At() + if at.MinTime == bt.MinTime { + return at.MaxTime < bt.MaxTime + } + return at.MinTime < bt.MinTime +} + +func (h *chunkIteratorHeap) Push(x interface{}) { + *h = append(*h, x.(chunks.Iterator)) +} + +func (h *chunkIteratorHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +type overlappingMerger struct { + xorIterators []chunkenc.Iterator + aggrIterators [5][]chunkenc.Iterator + + samplesMergeFunc func(a, b chunkenc.Iterator) chunkenc.Iterator +} + +func newOverlappingMerger() *overlappingMerger { + return &overlappingMerger{ + samplesMergeFunc: func(a, b chunkenc.Iterator) chunkenc.Iterator { + it := noopAdjustableSeriesIterator{a} + return newDedupSeriesIterator(it, noopAdjustableSeriesIterator{b}) + }, + } +} + +func (o *overlappingMerger) addChunk(chk chunks.Meta) { + switch chk.Chunk.Encoding() { + case chunkenc.EncXOR: + o.xorIterators = append(o.xorIterators, chk.Chunk.Iterator(nil)) + case downsample.ChunkEncAggr: + aggrChk := chk.Chunk.(*downsample.AggrChunk) + for i := downsample.AggrCount; i <= downsample.AggrCounter; i++ { + if c, err := aggrChk.Get(i); err == nil { + o.aggrIterators[i] = append(o.aggrIterators[i], c.Iterator(nil)) + } + } + } +} + +func (o *overlappingMerger) empty() bool { + // OverlappingMerger only contains either xor chunk or aggr chunk. + // If xor chunks are present then we don't need to check aggr chunks. + if len(o.xorIterators) > 0 { + return false + } + return len(o.aggrIterators[downsample.AggrCount]) == 0 +} + +// Return a chunk iterator based on the encoding of base chunk. +func (o *overlappingMerger) iterator(baseChk chunks.Meta) chunks.Iterator { + var it chunkenc.Iterator + switch baseChk.Chunk.Encoding() { + case chunkenc.EncXOR: + // If XOR encoding, we need to deduplicate the samples and re-encode them to chunks. + return storage.NewSeriesToChunkEncoder(&storage.SeriesEntry{ + SampleIteratorFn: func() chunkenc.Iterator { + it = baseChk.Chunk.Iterator(nil) + for _, i := range o.xorIterators { + it = o.samplesMergeFunc(it, i) + } + return it + }}).Iterator() + + case downsample.ChunkEncAggr: + // If Aggr encoding, each aggregated chunks need to be expanded and deduplicated, + // then re-encoded into Aggr chunks. + aggrChk := baseChk.Chunk.(*downsample.AggrChunk) + samplesIter := [5]chunkenc.Iterator{} + for i := downsample.AggrCount; i <= downsample.AggrCounter; i++ { + if c, err := aggrChk.Get(i); err == nil { + o.aggrIterators[i] = append(o.aggrIterators[i], c.Iterator(nil)) + } + + if len(o.aggrIterators[i]) > 0 { + for _, j := range o.aggrIterators[i][1:] { + o.aggrIterators[i][0] = o.samplesMergeFunc(o.aggrIterators[i][0], j) + } + samplesIter[i] = o.aggrIterators[i][0] + } else { + samplesIter[i] = nil + } + } + + return newAggrChunkIterator(samplesIter) + } + + // Impossible for now. + return nil +} + +type aggrChunkIterator struct { + iters [5]chunkenc.Iterator + curr chunks.Meta + countChkIter chunks.Iterator + + err error +} + +func newAggrChunkIterator(iters [5]chunkenc.Iterator) chunks.Iterator { + return &aggrChunkIterator{ + iters: iters, + countChkIter: storage.NewSeriesToChunkEncoder(&storage.SeriesEntry{ + SampleIteratorFn: func() chunkenc.Iterator { + return iters[downsample.AggrCount] + }, + }).Iterator(), + } +} + +func (a *aggrChunkIterator) Next() bool { + if !a.countChkIter.Next() { + return false + } + + countChk := a.countChkIter.At() + mint := countChk.MinTime + maxt := countChk.MaxTime + + var ( + chks [5]chunkenc.Chunk + chk *chunks.Meta + err error + ) + + chks[downsample.AggrCount] = countChk.Chunk + chk, err = a.toChunk(downsample.AggrSum, mint, maxt) + if err != nil { + a.err = err + return false + } + if chk != nil { + chks[downsample.AggrSum] = chk.Chunk + } + + chk, err = a.toChunk(downsample.AggrMin, mint, maxt) + if err != nil { + a.err = err + return false + } + if chk != nil { + chks[downsample.AggrMin] = chk.Chunk + } + + chk, err = a.toChunk(downsample.AggrMax, mint, maxt) + if err != nil { + a.err = err + return false + } + if chk != nil { + chks[downsample.AggrMax] = chk.Chunk + } + + chk, err = a.toChunk(downsample.AggrCounter, mint, maxt) + if err != nil { + a.err = err + return false + } + if chk != nil { + chks[downsample.AggrCounter] = chk.Chunk + } + + a.curr = chunks.Meta{ + MinTime: mint, + MaxTime: maxt, + Chunk: downsample.EncodeAggrChunk(chks), + } + return true +} + +func (a *aggrChunkIterator) At() chunks.Meta { + return a.curr +} + +func (a *aggrChunkIterator) Err() error { + return a.err +} + +func (a *aggrChunkIterator) toChunk(at downsample.AggrType, minTime, maxTime int64) (*chunks.Meta, error) { + if a.iters[at] == nil { + return nil, nil + } + c := chunkenc.NewXORChunk() + appender, err := c.Appender() + if err != nil { + return nil, err + } + + it := NewBoundedSeriesIterator(a.iters[at], minTime, maxTime) + + var ( + lastT int64 + lastV float64 + ) + for it.Next() { + lastT, lastV = it.At() + appender.Append(lastT, lastV) + } + + // No sample in the required time range. + if lastT == 0 && lastV == 0 { + return nil, nil + } + + // Encode last sample for AggrCounter. + if at == downsample.AggrCounter { + appender.Append(lastT, lastV) + } + + return &chunks.Meta{ + MinTime: minTime, + MaxTime: maxTime, + Chunk: c, + }, nil +} diff --git a/pkg/dedup/chunk_iter_test.go b/pkg/dedup/chunk_iter_test.go new file mode 100644 index 0000000000..4eab9666eb --- /dev/null +++ b/pkg/dedup/chunk_iter_test.go @@ -0,0 +1,141 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package dedup + +import ( + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/stretchr/testify/require" +) + +func TestDedupChunkSeriesMerger(t *testing.T) { + m := NewDedupChunkSeriesMerger() + + for _, tc := range []struct { + name string + input []storage.ChunkSeries + expected storage.ChunkSeries + }{ + { + name: "single empty series", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + { + name: "single series", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + }, + { + name: "two empty series", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + { + name: "two non overlapping", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}, []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + { + name: "two overlapping", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}, []tsdbutil.Sample{sample{10, 10}}), + }, + { + name: "two overlapping with large time diff", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{2, 2}, sample{5008, 5008}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + // sample{5008, 5008} is added to the result due to its large timestamp. + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{5008, 5008}}), + }, + { + name: "two duplicated", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + }, + { + name: "three overlapping", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}), + }, + // only samples from the last series are retained due to high penalty. + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}), + }, + { + name: "three in chained overlap", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 66}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}), + }, + // only samples from the last series are retained due to high penalty. + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + }, + { + name: "three in chained overlap complex", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, + []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}, + ), + }, + { + name: "110 overlapping samples", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 110)), // [0 - 110) + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 50)), // [60 - 110) + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + tsdbutil.GenerateSamples(0, 110), + ), + }, + { + name: "150 overlapping samples, split chunk", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 90)), // [0 - 90) + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 90)), // [90 - 150) + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + tsdbutil.GenerateSamples(0, 90), + ), + }, + } { + t.Run(tc.name, func(t *testing.T) { + merged := m(tc.input...) + require.Equal(t, tc.expected.Labels(), merged.Labels()) + actChks, actErr := storage.ExpandChunks(merged.Iterator()) + expChks, expErr := storage.ExpandChunks(tc.expected.Iterator()) + + require.Equal(t, expErr, actErr) + require.Equal(t, expChks, actChks) + }) + } +} diff --git a/pkg/dedup/iter_test.go b/pkg/dedup/iter_test.go index b3f52f5bde..79bd5de3c8 100644 --- a/pkg/dedup/iter_test.go +++ b/pkg/dedup/iter_test.go @@ -22,6 +22,14 @@ type sample struct { v float64 } +func (s sample) T() int64 { + return s.t +} + +func (s sample) V() float64 { + return s.v +} + type series struct { lset labels.Labels samples []sample diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index a0c6bfaf38..92cb21c090 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -70,8 +70,15 @@ func (b *blockDesc) Create(ctx context.Context, dir string, delay time.Duration, } func TestCompactWithStoreGateway(t *testing.T) { - t.Parallel() + testCompactWithStoreGateway(t, false) +} + +func TestCompactWithStoreGatewayWithPenaltyDedup(t *testing.T) { + testCompactWithStoreGateway(t, true) +} +func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) { + t.Parallel() logger := log.NewLogfmtLogger(os.Stdout) justAfterConsistencyDelay := 30 * time.Minute @@ -325,7 +332,11 @@ func TestCompactWithStoreGateway(t *testing.T) { }, ) - s, err := e2e.NewScenario("e2e_test_compact") + name := "e2e_test_compact" + if penaltyDedup { + name = "e2e_test_compact_penalty_dedup" + } + s, err := e2e.NewScenario(name) testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, s)) @@ -537,6 +548,49 @@ func TestCompactWithStoreGateway(t *testing.T) { {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, } + if penaltyDedup { + expectedEndVector = model.Vector{ + // NOTE(bwplotka): Even after deduplication some series has still replica labels. This is because those blocks did not overlap yet with anything. + // This is fine as querier deduplication will remove it if needed. + {Value: 360, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "no-compaction", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-one-block-marked-for-no-compact"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-one-block-marked-for-no-compact"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-after-dedup"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-after-dedup"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-after-dedup"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-after-dedup"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-after-dedup", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "a-partial-overlap-dedup-ready"}}, + // If no penalty dedup enabled, the value should be 360. + {Value: 200, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "a-partial-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "a-partial-overlap-dedup-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "a-partial-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "a-partial-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "a-partial-overlap-dedup-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "partial-multi-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "partial-multi-replica-overlap-dedup-ready", "replica": "1", "rule_replica": "1"}}, + // If no penalty dedup enabled, the value should be 240. + {Value: 195, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "partial-multi-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "partial-multi-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "partial-multi-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "partial-multi-replica-overlap-dedup-ready", "replica": "1", "rule_replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "full-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "full-replica-overlap-dedup-ready"}}, + {Value: 240, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "full-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, + } + } + // No replica label with overlaps should halt compactor. This test is sequential // because we do not want two Thanos Compact instances deleting the same partially // uploaded blocks and blocks with deletion marks. We also check that Thanos Compactor @@ -617,8 +671,13 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, block.Download(ctx, logger, bkt, id, filepath.Join(p, "compact", compact.DefaultGroupKey(m.Thanos), id.String()))) } + extArgs := []string{"--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica"} + if penaltyDedup { + extArgs = append(extArgs, "--compact.dedup-func=penalty") + } + // We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction. - c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica") + c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, extArgs...) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(c)) @@ -679,7 +738,11 @@ func TestCompactWithStoreGateway(t *testing.T) { } t.Run("dedup enabled; no delete delay; compactor should work and remove things as expected", func(t *testing.T) { - c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica", "--delete-delay=0s") + extArgs := []string{"--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica", "--delete-delay=0s"} + if penaltyDedup { + extArgs = append(extArgs, "--compact.dedup-func=penalty") + } + c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, extArgs...) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(c)) From 6bac9cdd6eda04c891b566aeab560b5f766435c6 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 21 May 2021 10:34:31 -0400 Subject: [PATCH 02/11] add tests for downsampled block Signed-off-by: yeya24 --- CHANGELOG.md | 1 + cmd/thanos/compact.go | 2 +- pkg/dedup/chunk_iter_test.go | 184 +++++++++++++++++++++++++++++++++++ 3 files changed, 186 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ce256defea..9673992d7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#4211](https://github.com/thanos-io/thanos/pull/4211) Add TLS and basic authentication to Thanos APIs - [#4249](https://github.com/thanos-io/thanos/pull/4249) UI: add dark theme - [#3707](https://github.com/thanos-io/thanos/pull/3707) Tools: Added `--rewrite.to-relabel-config` to bucket rewrite tool to support series relabel from given blocks. +- [#4239](https://github.com/thanos-io/thanos/pull/4239) Add penalty based deduplication mode for compactor. ### Fixed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index d8e4b86e09..fbd24d59db 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -638,7 +638,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { Default("48h").SetValue(&cc.deleteDelay) cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more."+ - "Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+ + "Please note that this uses a NAIVE algorithm for merging. If you need a smarter deduplication algorithm, please set it via -- compact.dedup-func."+ "NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set."). Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction) diff --git a/pkg/dedup/chunk_iter_test.go b/pkg/dedup/chunk_iter_test.go index 4eab9666eb..cfca35729c 100644 --- a/pkg/dedup/chunk_iter_test.go +++ b/pkg/dedup/chunk_iter_test.go @@ -8,8 +8,11 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/compact/downsample" ) func TestDedupChunkSeriesMerger(t *testing.T) { @@ -139,3 +142,184 @@ func TestDedupChunkSeriesMerger(t *testing.T) { }) } } + +func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) { + m := NewDedupChunkSeriesMerger() + + defaultLabels := labels.FromStrings("bar", "baz") + emptySamples := downsample.SamplesFromTSDBSamples([]tsdbutil.Sample{}) + // Samples are created with step 1m. So the 5m downsampled chunk has 2 samples. + samples1 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(0, 10, 60*1000)) + // Non overlapping samples with samples1. 5m downsampled chunk has 2 samples. + samples2 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(600000, 10, 60*1000)) + + samples3 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(120000, 10, 60*1000)) + + for _, tc := range []struct { + name string + input []storage.ChunkSeries + expected storage.ChunkSeries + }{ + { + name: "single empty series", + input: []storage.ChunkSeries{ + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(emptySamples, downsample.ResLevel1)...) + }, + }, + }, + expected: &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator() + }, + }, + }, + { + name: "single series", + input: []storage.ChunkSeries{ + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + }, + expected: &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + }, + { + name: "two empty series", + input: []storage.ChunkSeries{ + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(emptySamples, downsample.ResLevel1)...) + }, + }, + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(emptySamples, downsample.ResLevel1)...) + }, + }, + }, + expected: &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator() + }, + }, + }, + { + name: "two non overlapping series", + input: []storage.ChunkSeries{ + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples2, downsample.ResLevel1)...) + }, + }, + }, + expected: &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator( + append(downsample.DownsampleRaw(samples1, downsample.ResLevel1), + downsample.DownsampleRaw(samples2, downsample.ResLevel1)...)...) + }, + }, + }, + { + // 1:1 duplicated chunks are deduplicated. + name: "two same series", + input: []storage.ChunkSeries{ + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + }, + expected: &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator( + downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + }, + { + name: "two overlapping series", + input: []storage.ChunkSeries{ + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples3, downsample.ResLevel1)...) + }, + }, + }, + expected: &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(chunks.Meta{ + MinTime: 299999, + MaxTime: 540000, + Chunk: downsample.EncodeAggrChunk([5]chunkenc.Chunk{ + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 3}, sample{540000, 5}}).Chunk, + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 540000}, sample{540000, 2100000}}).Chunk, + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 120000}, sample{540000, 300000}}).Chunk, + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 240000}, sample{540000, 540000}}).Chunk, + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 240000}, sample{299999, 240000}}).Chunk, + }), + }) + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + merged := m(tc.input...) + require.Equal(t, tc.expected.Labels(), merged.Labels()) + actChks, actErr := storage.ExpandChunks(merged.Iterator()) + expChks, expErr := storage.ExpandChunks(tc.expected.Iterator()) + + require.Equal(t, expErr, actErr) + require.Equal(t, expChks, actChks) + }) + } +} + +func createSamplesWithStep(start, numOfSamples, step int) []tsdbutil.Sample { + res := make([]tsdbutil.Sample, numOfSamples) + cur := start + for i := 0; i < numOfSamples; i++ { + res[i] = sample{t: int64(cur), v: float64(cur)} + cur += step + } + + return res +} From a5e6874ab2c400bf86762bb9429b4a3ba7757c1d Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 21 May 2021 11:53:16 -0400 Subject: [PATCH 03/11] address review comment Signed-off-by: yeya24 --- cmd/thanos/compact.go | 11 +++++------ docs/components/compact.md | 28 +++++++++++++++------------- pkg/compact/compact_e2e_test.go | 2 +- pkg/dedup/chunk_iter.go | 4 ++-- pkg/dedup/chunk_iter_test.go | 8 ++++---- test/e2e/compact_test.go | 4 ++-- 6 files changed, 29 insertions(+), 28 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index fbd24d59db..f4026cc07a 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -309,7 +309,7 @@ func runCompact( var mergeFunc storage.VerticalChunkSeriesMergeFunc switch conf.dedupFunc { case compact.DedupAlgorithmPenalty: - mergeFunc = dedup.NewDedupChunkSeriesMerger() + mergeFunc = dedup.NewChunkSeriesMerger() if len(conf.dedupReplicaLabels) == 0 { return errors.New("penalty based deduplication needs at least one replica label specified") @@ -638,20 +638,19 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { Default("48h").SetValue(&cc.deleteDelay) cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more."+ - "Please note that this uses a NAIVE algorithm for merging. If you need a smarter deduplication algorithm, please set it via -- compact.dedup-func."+ + "Please note that by default this uses a NAIVE algorithm for merging. If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func."+ "NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set."). Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction) - cmd.Flag("compact.dedup-func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+ + cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+ "Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+ "When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag."). Default("").EnumVar(&cc.dedupFunc, compact.DedupAlgorithmPenalty, "") - // Update this. This flag works for both dedup version compactor and the original compactor. cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+ "Experimental. When it is set to true, compactor will ignore the given labels so that vertical compaction can merge the blocks."+ - "Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+ - "This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."). + "Please note that by default this uses a NAIVE algorithm for merging which works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."+ + "If you need a smarter deduplication algorithm, please set it via --deduplication.func."). Hidden().StringsVar(&cc.dedupReplicaLabels) // TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390. diff --git a/docs/components/compact.md b/docs/components/compact.md index 3d92201c50..2888454954 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -89,8 +89,8 @@ which compacts overlapping blocks into single one. This is mainly used for **bac In Thanos, it works similarly, but on bigger scale and using external labels for grouping as explained in [Compaction section](#compaction). -In both systems, series with the same labels are merged together. Merging samples is **naive**. It works by deduplicating samples within -exactly the same timestamps. Otherwise samples are added in sorted by time order. +In both systems, series with the same labels are merged together. In prometheus, merging samples is **naive**. It works by deduplicating samples within +exactly the same timestamps. Otherwise samples are added in sorted by time order. Thanos also support a new penalty based samples merger and it is explained in [Deduplication](#Vertical Compaction Use Cases). > **NOTE:** Both Prometheus and Thanos default behaviour is to fail compaction if any overlapping blocks are spotted. (For Thanos, within the same external labels). @@ -101,12 +101,12 @@ There can be few valid use cases for vertical compaction: * Races between multiple compactions, for example multiple compactors or between compactor and Prometheus compactions. While this will have extra computation overhead for Compactor it's safe to enable vertical compaction for this case. * Backfilling. If you want to add blocks of data to any stream where there is existing data already there for the time range, you will need enabled vertical compaction. -* Offline deduplication of series. It's very common to have the same data replicated into multiple streams. We can distinguish two common series duplications, `one-to-one` and `realistic`: - * `one-to-one` duplication is when same series (series with the same labels from different blocks) for the same range have **exactly** the same samples: Same values and timestamps. +* Offline deduplication of series. It's very common to have the same data replicated into multiple streams. We can distinguish two common series deduplications, `one-to-one` and `penalty`: + * `one-to-one` deduplication is when same series (series with the same labels from different blocks) for the same range have **exactly** the same samples: Same values and timestamps. This is very common while using [Receivers](../components/receive.md) with replication greater than 1 as receiver replication copies exactly the same timestamps and values to different receive instances. - * `realistic` duplication is when same series data is **logically duplicated**. For example, it comes from the same application, but scraped by two different Prometheus-es. Ideally + * `penalty` deduplication is when same series data is **logically duplicated**. For example, it comes from the same application, but scraped by two different Prometheus-es. Ideally this requires more complex deduplication algorithms. For example one that is used to [deduplicate on the fly on the Querier](query.md#run-time-deduplication-of-ha-groups). This is common -case when Prometheus HA replicas are used. [Offline deduplication for this is in progress](https://github.com/thanos-io/thanos/issues/1014). +case when Prometheus HA replicas are used. You can enable this deduplication via `--deduplication.func=penalty` flag. #### Vertical Compaction Risks @@ -143,6 +143,8 @@ external_labels: {cluster="us1", receive="true", environment="staging"} On next compaction multiple streams' blocks will be compacted into one. +If you need a different deduplication algorithm, use `deduplication.func` flag. The default value is the original `one-to-one` deduplication. + ## Enforcing Retention of Data By default, there is NO retention set for object storage data. This means that you store data for unlimited time, which is a valid and recommended way of running Thanos. @@ -343,7 +345,13 @@ Flags: happen at the end of an iteration. --compact.concurrency=1 Number of goroutines to use when compacting groups. - --compact.dedup-func= Experimental. Deduplication algorithm for + --consistency-delay=30m Minimum age of fresh (non-compacted) blocks + before they are being processed. Malformed + blocks older than the maximum of + consistency-delay and 48h0m0s will be removed. + --data-dir="./data" Data directory in which to cache blocks and + process compactions. + --deduplication.func= Experimental. Deduplication algorithm for merging overlapping blocks. Possible values are: "", "penalty". If no value is specified, the default compact deduplication merger is used, @@ -352,12 +360,6 @@ Flags: algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag. - --consistency-delay=30m Minimum age of fresh (non-compacted) blocks - before they are being processed. Malformed - blocks older than the maximum of - consistency-delay and 48h0m0s will be removed. - --data-dir="./data" Data directory in which to cache blocks and - process compactions. --delete-delay=48h Time before a block marked for deletion is deleted from bucket. If delete-delay is non zero, blocks will be marked for deletion and diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 0e620840d8..cc690188cc 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -176,7 +176,7 @@ func TestGroupCompactE2E(t *testing.T) { // Penalty based merger should get the same result as the blocks don't have overlap. func TestGroupCompactPenaltyDedupE2E(t *testing.T) { - testGroupCompactE2e(t, dedup.NewDedupChunkSeriesMerger()) + testGroupCompactE2e(t, dedup.NewChunkSeriesMerger()) } func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMergeFunc) { diff --git a/pkg/dedup/chunk_iter.go b/pkg/dedup/chunk_iter.go index 84c6883576..0d7e6481e2 100644 --- a/pkg/dedup/chunk_iter.go +++ b/pkg/dedup/chunk_iter.go @@ -14,9 +14,9 @@ import ( "github.com/thanos-io/thanos/pkg/compact/downsample" ) -// NewDedupChunkSeriesMerger merges several chunk series into one. +// NewChunkSeriesMerger merges several chunk series into one. // Deduplication is based on penalty based deduplication algorithm without handling counter reset. -func NewDedupChunkSeriesMerger() storage.VerticalChunkSeriesMergeFunc { +func NewChunkSeriesMerger() storage.VerticalChunkSeriesMergeFunc { return func(series ...storage.ChunkSeries) storage.ChunkSeries { if len(series) == 0 { return nil diff --git a/pkg/dedup/chunk_iter_test.go b/pkg/dedup/chunk_iter_test.go index cfca35729c..f68b071c8b 100644 --- a/pkg/dedup/chunk_iter_test.go +++ b/pkg/dedup/chunk_iter_test.go @@ -16,7 +16,7 @@ import ( ) func TestDedupChunkSeriesMerger(t *testing.T) { - m := NewDedupChunkSeriesMerger() + m := NewChunkSeriesMerger() for _, tc := range []struct { name string @@ -121,7 +121,7 @@ func TestDedupChunkSeriesMerger(t *testing.T) { ), }, { - name: "150 overlapping samples, split chunk", + name: "150 overlapping samples, no chunk splitting due to penalty deduplication", input: []storage.ChunkSeries{ storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 90)), // [0 - 90) storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 90)), // [90 - 150) @@ -144,7 +144,7 @@ func TestDedupChunkSeriesMerger(t *testing.T) { } func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) { - m := NewDedupChunkSeriesMerger() + m := NewChunkSeriesMerger() defaultLabels := labels.FromStrings("bar", "baz") emptySamples := downsample.SamplesFromTSDBSamples([]tsdbutil.Sample{}) @@ -152,7 +152,7 @@ func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) { samples1 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(0, 10, 60*1000)) // Non overlapping samples with samples1. 5m downsampled chunk has 2 samples. samples2 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(600000, 10, 60*1000)) - + // Overlapped with samples1. samples3 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(120000, 10, 60*1000)) for _, tc := range []struct { diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 92cb21c090..697e8248cd 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -673,7 +673,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) { extArgs := []string{"--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica"} if penaltyDedup { - extArgs = append(extArgs, "--compact.dedup-func=penalty") + extArgs = append(extArgs, "--deduplication.func=penalty") } // We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction. @@ -740,7 +740,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) { t.Run("dedup enabled; no delete delay; compactor should work and remove things as expected", func(t *testing.T) { extArgs := []string{"--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica", "--delete-delay=0s"} if penaltyDedup { - extArgs = append(extArgs, "--compact.dedup-func=penalty") + extArgs = append(extArgs, "--deduplication.func=penalty") } c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, extArgs...) testutil.Ok(t, err) From a0be864c9d69a858b623a156af531de0a340517b Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 21 May 2021 14:55:34 -0400 Subject: [PATCH 04/11] update flag description Signed-off-by: yeya24 --- cmd/thanos/compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index f4026cc07a..5b1f7e9cd3 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -650,7 +650,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+ "Experimental. When it is set to true, compactor will ignore the given labels so that vertical compaction can merge the blocks."+ "Please note that by default this uses a NAIVE algorithm for merging which works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."+ - "If you need a smarter deduplication algorithm, please set it via --deduplication.func."). + "If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func."). Hidden().StringsVar(&cc.dedupReplicaLabels) // TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390. From 6fa6184e7c2fd7b4dc213d5bc38a14dd6f4282ee Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 31 May 2021 11:07:39 -0400 Subject: [PATCH 05/11] make deduplication label unhidden Signed-off-by: yeya24 --- cmd/thanos/compact.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 5b1f7e9cd3..13334e1699 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -314,6 +314,11 @@ func runCompact( if len(conf.dedupReplicaLabels) == 0 { return errors.New("penalty based deduplication needs at least one replica label specified") } + case "": + mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) + + default: + return errors.Errorf("unsupported deduplication func, got %s", conf.dedupFunc) } // Instantiate the compactor with different time slices. Timestamps in TSDB @@ -651,7 +656,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "Experimental. When it is set to true, compactor will ignore the given labels so that vertical compaction can merge the blocks."+ "Please note that by default this uses a NAIVE algorithm for merging which works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."+ "If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func."). - Hidden().StringsVar(&cc.dedupReplicaLabels) + StringsVar(&cc.dedupReplicaLabels) // TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390. cmd.Flag("compact.block-max-index-size", "Maximum index size for the resulted block during any compaction. Note that"+ From 15acd8c8683c8ecc785ec71e4c16f89738e839b6 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 31 May 2021 11:30:14 -0400 Subject: [PATCH 06/11] fix doc lint Signed-off-by: yeya24 --- docs/components/compact.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docs/components/compact.md b/docs/components/compact.md index 2888454954..9e34526cc3 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -360,6 +360,21 @@ Flags: algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag. + --deduplication.replica-label=DEDUPLICATION.REPLICA-LABEL ... + Label to treat as a replica indicator of blocks + that can be deduplicated (repeated flag). This + will merge multiple replica blocks into one. + This process is irreversible.Experimental. When + it is set to true, compactor will ignore the + given labels so that vertical compaction can + merge the blocks.Please note that by default + this uses a NAIVE algorithm for merging which + works well for deduplication of blocks with + **precisely the same samples** like produced by + Receiver replication.If you need a different + deduplication algorithm (e.g one that works well + with Prometheus replicas), please set it via + --deduplication.func. --delete-delay=48h Time before a block marked for deletion is deleted from bucket. If delete-delay is non zero, blocks will be marked for deletion and From d0cba248a2914669389d31d06e517327df272b6d Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 7 Jun 2021 21:55:30 -0700 Subject: [PATCH 07/11] address review comments Signed-off-by: yeya24 --- cmd/thanos/compact.go | 4 ++-- docs/components/compact.md | 20 ++++++++-------- docs/components/tools.md | 2 +- pkg/dedup/chunk_iter.go | 49 ++++++++++++-------------------------- 4 files changed, 28 insertions(+), 47 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 13334e1699..f91a03512d 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -642,7 +642,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "or compactor is ignoring the deletion because it's compacting the block at the same time."). Default("48h").SetValue(&cc.deleteDelay) - cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more."+ + cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more. "+ "Please note that by default this uses a NAIVE algorithm for merging. If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func."+ "NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set."). Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction) @@ -653,7 +653,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { Default("").EnumVar(&cc.dedupFunc, compact.DedupAlgorithmPenalty, "") cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+ - "Experimental. When it is set to true, compactor will ignore the given labels so that vertical compaction can merge the blocks."+ + "Experimental. When one or more labels are set, compactor will ignore the given labels so that vertical compaction can merge the blocks."+ "Please note that by default this uses a NAIVE algorithm for merging which works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."+ "If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func."). StringsVar(&cc.dedupReplicaLabels) diff --git a/docs/components/compact.md b/docs/components/compact.md index 9e34526cc3..b63c685059 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -365,16 +365,16 @@ Flags: that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible.Experimental. When - it is set to true, compactor will ignore the - given labels so that vertical compaction can - merge the blocks.Please note that by default - this uses a NAIVE algorithm for merging which - works well for deduplication of blocks with - **precisely the same samples** like produced by - Receiver replication.If you need a different - deduplication algorithm (e.g one that works well - with Prometheus replicas), please set it via - --deduplication.func. + one or more labels are set, compactor will + ignore the given labels so that vertical + compaction can merge the blocks.Please note that + by default this uses a NAIVE algorithm for + merging which works well for deduplication of + blocks with **precisely the same samples** like + produced by Receiver replication.If you need a + different deduplication algorithm (e.g one that + works well with Prometheus replicas), please set + it via --deduplication.func. --delete-delay=48h Time before a block marked for deletion is deleted from bucket. If delete-delay is non zero, blocks will be marked for deletion and diff --git a/docs/components/tools.md b/docs/components/tools.md index b69c45d227..d0083abc7f 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -777,7 +777,7 @@ Flags: --rewrite.to-relabel-config-file= Path to YAML file that contains relabel configs that will be applied to blocks - --tmp.dir="/tmp/thanos-rewrite" + --tmp.dir="/var/folders/0v/x6r6mgjx14ldyx7tfmpmx3gw0000gn/T/thanos-rewrite" Working directory for temporary files --tracing.config= Alternative to 'tracing.config-file' flag diff --git a/pkg/dedup/chunk_iter.go b/pkg/dedup/chunk_iter.go index 0d7e6481e2..e053c7a6c4 100644 --- a/pkg/dedup/chunk_iter.go +++ b/pkg/dedup/chunk_iter.go @@ -252,6 +252,9 @@ func newAggrChunkIterator(iters [5]chunkenc.Iterator) chunks.Iterator { func (a *aggrChunkIterator) Next() bool { if !a.countChkIter.Next() { + if err := a.countChkIter.Err(); err != nil { + a.err = err + } return false } @@ -266,40 +269,15 @@ func (a *aggrChunkIterator) Next() bool { ) chks[downsample.AggrCount] = countChk.Chunk - chk, err = a.toChunk(downsample.AggrSum, mint, maxt) - if err != nil { - a.err = err - return false - } - if chk != nil { - chks[downsample.AggrSum] = chk.Chunk - } - - chk, err = a.toChunk(downsample.AggrMin, mint, maxt) - if err != nil { - a.err = err - return false - } - if chk != nil { - chks[downsample.AggrMin] = chk.Chunk - } - - chk, err = a.toChunk(downsample.AggrMax, mint, maxt) - if err != nil { - a.err = err - return false - } - if chk != nil { - chks[downsample.AggrMax] = chk.Chunk - } - - chk, err = a.toChunk(downsample.AggrCounter, mint, maxt) - if err != nil { - a.err = err - return false - } - if chk != nil { - chks[downsample.AggrCounter] = chk.Chunk + for i := downsample.AggrSum; i <= downsample.AggrCounter; i++ { + chk, err = a.toChunk(i, mint, maxt) + if err != nil { + a.err = err + return false + } + if chk != nil { + chks[i] = chk.Chunk + } } a.curr = chunks.Meta{ @@ -338,6 +316,9 @@ func (a *aggrChunkIterator) toChunk(at downsample.AggrType, minTime, maxTime int lastT, lastV = it.At() appender.Append(lastT, lastV) } + if err := it.Err(); err != nil { + return nil, err + } // No sample in the required time range. if lastT == 0 && lastV == 0 { From 0aa41f32e32f5d082574d779fc0dd931ae74ad59 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 7 Jun 2021 21:58:26 -0700 Subject: [PATCH 08/11] change tmp dir Signed-off-by: yeya24 --- docs/components/tools.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/components/tools.md b/docs/components/tools.md index d0083abc7f..b69c45d227 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -777,7 +777,7 @@ Flags: --rewrite.to-relabel-config-file= Path to YAML file that contains relabel configs that will be applied to blocks - --tmp.dir="/var/folders/0v/x6r6mgjx14ldyx7tfmpmx3gw0000gn/T/thanos-rewrite" + --tmp.dir="/tmp/thanos-rewrite" Working directory for temporary files --tracing.config= Alternative to 'tracing.config-file' flag From dbd7ed2901a3bd38f844d675bf534d1d3f4d2403 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 11 Jun 2021 09:26:56 -0700 Subject: [PATCH 09/11] update docs Signed-off-by: yeya24 --- docs/components/compact.md | 2 ++ pkg/dedup/chunk_iter_test.go | 14 +++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/docs/components/compact.md b/docs/components/compact.md index b63c685059..053ceebf8a 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -115,6 +115,8 @@ The main risk is the **irreversible** implications of potential configuration er * If you accidentally upload block with the same external labels but produced by totally different Prometheus for totally different applications, some metrics can overlap and potentially can merge together making such series useless. * If you merge disjoint series in multiple of blocks together, there is currently no easy way to split them back. +* The `penalty` offline deduplication algorithm has its own limitation. Even though it has been battle-tested for quite a long time but still very few issues come up from time to time + such as https://github.com/thanos-io/thanos/issues/2890. If you'd like to enable this deduplication algorithm, please take the risk and make sure you back up your data. #### Enabling Vertical Compaction diff --git a/pkg/dedup/chunk_iter_test.go b/pkg/dedup/chunk_iter_test.go index f68b071c8b..c9d79a6943 100644 --- a/pkg/dedup/chunk_iter_test.go +++ b/pkg/dedup/chunk_iter_test.go @@ -11,8 +11,8 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/testutil" ) func TestDedupChunkSeriesMerger(t *testing.T) { @@ -133,12 +133,12 @@ func TestDedupChunkSeriesMerger(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { merged := m(tc.input...) - require.Equal(t, tc.expected.Labels(), merged.Labels()) + testutil.Equals(t,tc.expected.Labels(), merged.Labels() ) actChks, actErr := storage.ExpandChunks(merged.Iterator()) expChks, expErr := storage.ExpandChunks(tc.expected.Iterator()) - require.Equal(t, expErr, actErr) - require.Equal(t, expChks, actChks) + testutil.Equals(t, expErr, actErr) + testutil.Equals(t, expChks, actChks) }) } } @@ -303,12 +303,12 @@ func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { merged := m(tc.input...) - require.Equal(t, tc.expected.Labels(), merged.Labels()) + testutil.Equals(t, tc.expected.Labels(), merged.Labels()) actChks, actErr := storage.ExpandChunks(merged.Iterator()) expChks, expErr := storage.ExpandChunks(tc.expected.Iterator()) - require.Equal(t, expErr, actErr) - require.Equal(t, expChks, actChks) + testutil.Equals(t, expErr, actErr) + testutil.Equals(t, expChks, actChks) }) } } From 31686171f27814f5ac2c2de3eee20817a375acea Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 11 Jun 2021 09:34:37 -0700 Subject: [PATCH 10/11] update changelog Signed-off-by: yeya24 --- CHANGELOG.md | 2 +- go.mod | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d13a623862..646fdee647 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#4299](https://github.com/thanos-io/thanos/pull/4299) Tracing: Add tracing to exemplar APIs. - [#4327](https://github.com/thanos-io/thanos/pull/4327) Add environment variable substitution to all YAML configuration flags. +- [#4239](https://github.com/thanos-io/thanos/pull/4239) Add penalty based deduplication mode for compactor. ### Fixed @@ -45,7 +46,6 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#4211](https://github.com/thanos-io/thanos/pull/4211) Add TLS and basic authentication to Thanos APIs - [#4249](https://github.com/thanos-io/thanos/pull/4249) UI: add dark theme - [#3707](https://github.com/thanos-io/thanos/pull/3707) Tools: Added `--rewrite.to-relabel-config` to bucket rewrite tool to support series relabel from given blocks. -- [#4239](https://github.com/thanos-io/thanos/pull/4239) Add penalty based deduplication mode for compactor. ### Fixed diff --git a/go.mod b/go.mod index 6c7a3b6428..b453556881 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,6 @@ require ( github.com/prometheus/common v0.23.0 github.com/prometheus/exporter-toolkit v0.5.1 github.com/prometheus/prometheus v1.8.2-0.20210519120135-d95b0972505f - github.com/stretchr/testify v1.7.0 github.com/uber/jaeger-client-go v2.28.0+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible github.com/weaveworks/common v0.0.0-20210419092856-009d1eebd624 From b834cda7ed924bde309b06cf5946c5d10c786b52 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 11 Jun 2021 09:47:33 -0700 Subject: [PATCH 11/11] fix lint Signed-off-by: yeya24 --- pkg/dedup/chunk_iter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/dedup/chunk_iter_test.go b/pkg/dedup/chunk_iter_test.go index c9d79a6943..0e1eaf0b3a 100644 --- a/pkg/dedup/chunk_iter_test.go +++ b/pkg/dedup/chunk_iter_test.go @@ -133,7 +133,7 @@ func TestDedupChunkSeriesMerger(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { merged := m(tc.input...) - testutil.Equals(t,tc.expected.Labels(), merged.Labels() ) + testutil.Equals(t, tc.expected.Labels(), merged.Labels()) actChks, actErr := storage.ExpandChunks(merged.Iterator()) expChks, expErr := storage.ExpandChunks(tc.expected.Iterator())