diff --git a/CHANGELOG.md b/CHANGELOG.md index aef921ec11..646fdee647 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#4299](https://github.com/thanos-io/thanos/pull/4299) Tracing: Add tracing to exemplar APIs. - [#4327](https://github.com/thanos-io/thanos/pull/4327) Add environment variable substitution to all YAML configuration flags. +- [#4239](https://github.com/thanos-io/thanos/pull/4239) Add penalty based deduplication mode for compactor. ### Fixed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index b765599604..c374297ee4 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -24,13 +24,16 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/dedup" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" @@ -302,9 +305,25 @@ func runCompact( cancel() } }() + + var mergeFunc storage.VerticalChunkSeriesMergeFunc + switch conf.dedupFunc { + case compact.DedupAlgorithmPenalty: + mergeFunc = dedup.NewChunkSeriesMerger() + + if len(conf.dedupReplicaLabels) == 0 { + return errors.New("penalty based deduplication needs at least one replica label specified") + } + case "": + mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) + + default: + return errors.Errorf("unsupported deduplication func, got %s", conf.dedupFunc) + } + // Instantiate the compactor with different time slices. Timestamps in TSDB // are in milliseconds. - comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), nil) + comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), mergeFunc) if err != nil { return errors.Wrap(err, "create compactor") } @@ -564,6 +583,7 @@ type compactConfig struct { maxBlockIndexSize units.Base2Bytes hashFunc string enableVerticalCompaction bool + dedupFunc string } func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -622,16 +642,21 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "or compactor is ignoring the deletion because it's compacting the block at the same time."). Default("48h").SetValue(&cc.deleteDelay) - cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more."+ - "Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+ + cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more. "+ + "Please note that by default this uses a NAIVE algorithm for merging. If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func."+ "NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set."). Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction) + cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+ + "Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+ + "When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag."). + Default("").EnumVar(&cc.dedupFunc, compact.DedupAlgorithmPenalty, "") + cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+ - "Experimental. When it is set to true, compactor will ignore the given labels so that vertical compaction can merge the blocks."+ - "Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+ - "This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."). - Hidden().StringsVar(&cc.dedupReplicaLabels) + "Experimental. When one or more labels are set, compactor will ignore the given labels so that vertical compaction can merge the blocks."+ + "Please note that by default this uses a NAIVE algorithm for merging which works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."+ + "If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func."). + StringsVar(&cc.dedupReplicaLabels) // TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390. cmd.Flag("compact.block-max-index-size", "Maximum index size for the resulted block during any compaction. Note that"+ diff --git a/docs/components/compact.md b/docs/components/compact.md index 355d8a13a0..0afc283318 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -89,8 +89,8 @@ which compacts overlapping blocks into single one. This is mainly used for **bac In Thanos, it works similarly, but on bigger scale and using external labels for grouping as explained in [Compaction section](#compaction). -In both systems, series with the same labels are merged together. Merging samples is **naive**. It works by deduplicating samples within -exactly the same timestamps. Otherwise samples are added in sorted by time order. +In both systems, series with the same labels are merged together. In prometheus, merging samples is **naive**. It works by deduplicating samples within +exactly the same timestamps. Otherwise samples are added in sorted by time order. Thanos also support a new penalty based samples merger and it is explained in [Deduplication](#Vertical Compaction Use Cases). > **NOTE:** Both Prometheus and Thanos default behaviour is to fail compaction if any overlapping blocks are spotted. (For Thanos, within the same external labels). @@ -101,12 +101,12 @@ There can be few valid use cases for vertical compaction: * Races between multiple compactions, for example multiple compactors or between compactor and Prometheus compactions. While this will have extra computation overhead for Compactor it's safe to enable vertical compaction for this case. * Backfilling. If you want to add blocks of data to any stream where there is existing data already there for the time range, you will need enabled vertical compaction. -* Offline deduplication of series. It's very common to have the same data replicated into multiple streams. We can distinguish two common series duplications, `one-to-one` and `realistic`: - * `one-to-one` duplication is when same series (series with the same labels from different blocks) for the same range have **exactly** the same samples: Same values and timestamps. +* Offline deduplication of series. It's very common to have the same data replicated into multiple streams. We can distinguish two common series deduplications, `one-to-one` and `penalty`: + * `one-to-one` deduplication is when same series (series with the same labels from different blocks) for the same range have **exactly** the same samples: Same values and timestamps. This is very common while using [Receivers](../components/receive.md) with replication greater than 1 as receiver replication copies exactly the same timestamps and values to different receive instances. - * `realistic` duplication is when same series data is **logically duplicated**. For example, it comes from the same application, but scraped by two different Prometheus-es. Ideally + * `penalty` deduplication is when same series data is **logically duplicated**. For example, it comes from the same application, but scraped by two different Prometheus-es. Ideally this requires more complex deduplication algorithms. For example one that is used to [deduplicate on the fly on the Querier](query.md#run-time-deduplication-of-ha-groups). This is common -case when Prometheus HA replicas are used. [Offline deduplication for this is in progress](https://github.com/thanos-io/thanos/issues/1014). +case when Prometheus HA replicas are used. You can enable this deduplication via `--deduplication.func=penalty` flag. #### Vertical Compaction Risks @@ -115,6 +115,8 @@ The main risk is the **irreversible** implications of potential configuration er * If you accidentally upload block with the same external labels but produced by totally different Prometheus for totally different applications, some metrics can overlap and potentially can merge together making such series useless. * If you merge disjoint series in multiple of blocks together, there is currently no easy way to split them back. +* The `penalty` offline deduplication algorithm has its own limitation. Even though it has been battle-tested for quite a long time but still very few issues come up from time to time + such as https://github.com/thanos-io/thanos/issues/2890. If you'd like to enable this deduplication algorithm, please take the risk and make sure you back up your data. #### Enabling Vertical Compaction @@ -143,6 +145,8 @@ external_labels: {cluster="us1", receive="true", environment="staging"} On next compaction multiple streams' blocks will be compacted into one. +If you need a different deduplication algorithm, use `deduplication.func` flag. The default value is the original `one-to-one` deduplication. + ## Enforcing Retention of Data By default, there is NO retention set for object storage data. This means that you store data for unlimited time, which is a valid and recommended way of running Thanos. @@ -349,6 +353,30 @@ Flags: consistency-delay and 48h0m0s will be removed. --data-dir="./data" Data directory in which to cache blocks and process compactions. + --deduplication.func= Experimental. Deduplication algorithm for + merging overlapping blocks. Possible values are: + "", "penalty". If no value is specified, the + default compact deduplication merger is used, + which performs 1:1 deduplication for samples. + When set to penalty, penalty based deduplication + algorithm will be used. At least one replica + label has to be set via + --deduplication.replica-label flag. + --deduplication.replica-label=DEDUPLICATION.REPLICA-LABEL ... + Label to treat as a replica indicator of blocks + that can be deduplicated (repeated flag). This + will merge multiple replica blocks into one. + This process is irreversible.Experimental. When + one or more labels are set, compactor will + ignore the given labels so that vertical + compaction can merge the blocks.Please note that + by default this uses a NAIVE algorithm for + merging which works well for deduplication of + blocks with **precisely the same samples** like + produced by Receiver replication.If you need a + different deduplication algorithm (e.g one that + works well with Prometheus replicas), please set + it via --deduplication.func. --delete-delay=48h Time before a block marked for deletion is deleted from bucket. If delete-delay is non zero, blocks will be marked for deletion and diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 58388a652f..1d956c49a1 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -41,6 +41,12 @@ const ( ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2) ) +const ( + // DedupAlgorithmPenalty is the penalty based compactor series merge algorithm. + // This is the same as the online deduplication of querier except counter reset handling. + DedupAlgorithmPenalty = "penalty" +) + // Syncer synchronizes block metas from a bucket into a local directory. // It sorts them into compaction groups based on equal label sets. type Syncer struct { diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 663bb5767f..cc690188cc 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -22,9 +22,12 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/dedup" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/objtesting" "github.com/thanos-io/thanos/pkg/testutil" @@ -167,7 +170,16 @@ func MetricCount(c prometheus.Collector) int { return mCount } -func TestGroup_Compact_e2e(t *testing.T) { +func TestGroupCompactE2E(t *testing.T) { + testGroupCompactE2e(t, nil) +} + +// Penalty based merger should get the same result as the blocks don't have overlap. +func TestGroupCompactPenaltyDedupE2E(t *testing.T) { + testGroupCompactE2e(t, dedup.NewChunkSeriesMerger()) +} + +func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMergeFunc) { objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() @@ -194,7 +206,7 @@ func TestGroup_Compact_e2e(t *testing.T) { sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5) testutil.Ok(t, err) - comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil) + comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc) testutil.Ok(t, err) planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000}) diff --git a/pkg/dedup/chunk_iter.go b/pkg/dedup/chunk_iter.go new file mode 100644 index 0000000000..e053c7a6c4 --- /dev/null +++ b/pkg/dedup/chunk_iter.go @@ -0,0 +1,338 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package dedup + +import ( + "bytes" + "container/heap" + + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + + "github.com/thanos-io/thanos/pkg/compact/downsample" +) + +// NewChunkSeriesMerger merges several chunk series into one. +// Deduplication is based on penalty based deduplication algorithm without handling counter reset. +func NewChunkSeriesMerger() storage.VerticalChunkSeriesMergeFunc { + return func(series ...storage.ChunkSeries) storage.ChunkSeries { + if len(series) == 0 { + return nil + } + return &storage.ChunkSeriesEntry{ + Lset: series[0].Labels(), + ChunkIteratorFn: func() chunks.Iterator { + iterators := make([]chunks.Iterator, 0, len(series)) + for _, s := range series { + iterators = append(iterators, s.Iterator()) + } + return &dedupChunksIterator{ + iterators: iterators, + } + }, + } + } +} + +type dedupChunksIterator struct { + iterators []chunks.Iterator + h chunkIteratorHeap + + err error + curr chunks.Meta +} + +func (d *dedupChunksIterator) At() chunks.Meta { + return d.curr +} + +// Next method is almost the same as https://github.com/prometheus/prometheus/blob/v2.27.1/storage/merge.go#L615. +// The difference is that it handles both XOR and Aggr chunk Encoding. +func (d *dedupChunksIterator) Next() bool { + if d.h == nil { + for _, iter := range d.iterators { + if iter.Next() { + heap.Push(&d.h, iter) + } + } + } + if len(d.h) == 0 { + return false + } + + iter := heap.Pop(&d.h).(chunks.Iterator) + d.curr = iter.At() + if iter.Next() { + heap.Push(&d.h, iter) + } + + var ( + om = newOverlappingMerger() + oMaxTime = d.curr.MaxTime + prev = d.curr + ) + + // Detect overlaps to compact. + for len(d.h) > 0 { + // Get the next oldest chunk by min, then max time. + next := d.h[0].At() + if next.MinTime > oMaxTime { + // No overlap with current one. + break + } + + if next.MinTime == prev.MinTime && + next.MaxTime == prev.MaxTime && + bytes.Equal(next.Chunk.Bytes(), prev.Chunk.Bytes()) { + // 1:1 duplicates, skip it. + } else { + // We operate on same series, so labels does not matter here. + om.addChunk(next) + + if next.MaxTime > oMaxTime { + oMaxTime = next.MaxTime + } + prev = next + } + + iter := heap.Pop(&d.h).(chunks.Iterator) + if iter.Next() { + heap.Push(&d.h, iter) + } + } + if om.empty() { + return true + } + + iter = om.iterator(d.curr) + if !iter.Next() { + if d.err = iter.Err(); d.err != nil { + return false + } + panic("unexpected seriesToChunkEncoder lack of iterations") + } + d.curr = iter.At() + if iter.Next() { + heap.Push(&d.h, iter) + } + return true +} + +func (d *dedupChunksIterator) Err() error { + return d.err +} + +type chunkIteratorHeap []chunks.Iterator + +func (h chunkIteratorHeap) Len() int { return len(h) } +func (h chunkIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h chunkIteratorHeap) Less(i, j int) bool { + at := h[i].At() + bt := h[j].At() + if at.MinTime == bt.MinTime { + return at.MaxTime < bt.MaxTime + } + return at.MinTime < bt.MinTime +} + +func (h *chunkIteratorHeap) Push(x interface{}) { + *h = append(*h, x.(chunks.Iterator)) +} + +func (h *chunkIteratorHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +type overlappingMerger struct { + xorIterators []chunkenc.Iterator + aggrIterators [5][]chunkenc.Iterator + + samplesMergeFunc func(a, b chunkenc.Iterator) chunkenc.Iterator +} + +func newOverlappingMerger() *overlappingMerger { + return &overlappingMerger{ + samplesMergeFunc: func(a, b chunkenc.Iterator) chunkenc.Iterator { + it := noopAdjustableSeriesIterator{a} + return newDedupSeriesIterator(it, noopAdjustableSeriesIterator{b}) + }, + } +} + +func (o *overlappingMerger) addChunk(chk chunks.Meta) { + switch chk.Chunk.Encoding() { + case chunkenc.EncXOR: + o.xorIterators = append(o.xorIterators, chk.Chunk.Iterator(nil)) + case downsample.ChunkEncAggr: + aggrChk := chk.Chunk.(*downsample.AggrChunk) + for i := downsample.AggrCount; i <= downsample.AggrCounter; i++ { + if c, err := aggrChk.Get(i); err == nil { + o.aggrIterators[i] = append(o.aggrIterators[i], c.Iterator(nil)) + } + } + } +} + +func (o *overlappingMerger) empty() bool { + // OverlappingMerger only contains either xor chunk or aggr chunk. + // If xor chunks are present then we don't need to check aggr chunks. + if len(o.xorIterators) > 0 { + return false + } + return len(o.aggrIterators[downsample.AggrCount]) == 0 +} + +// Return a chunk iterator based on the encoding of base chunk. +func (o *overlappingMerger) iterator(baseChk chunks.Meta) chunks.Iterator { + var it chunkenc.Iterator + switch baseChk.Chunk.Encoding() { + case chunkenc.EncXOR: + // If XOR encoding, we need to deduplicate the samples and re-encode them to chunks. + return storage.NewSeriesToChunkEncoder(&storage.SeriesEntry{ + SampleIteratorFn: func() chunkenc.Iterator { + it = baseChk.Chunk.Iterator(nil) + for _, i := range o.xorIterators { + it = o.samplesMergeFunc(it, i) + } + return it + }}).Iterator() + + case downsample.ChunkEncAggr: + // If Aggr encoding, each aggregated chunks need to be expanded and deduplicated, + // then re-encoded into Aggr chunks. + aggrChk := baseChk.Chunk.(*downsample.AggrChunk) + samplesIter := [5]chunkenc.Iterator{} + for i := downsample.AggrCount; i <= downsample.AggrCounter; i++ { + if c, err := aggrChk.Get(i); err == nil { + o.aggrIterators[i] = append(o.aggrIterators[i], c.Iterator(nil)) + } + + if len(o.aggrIterators[i]) > 0 { + for _, j := range o.aggrIterators[i][1:] { + o.aggrIterators[i][0] = o.samplesMergeFunc(o.aggrIterators[i][0], j) + } + samplesIter[i] = o.aggrIterators[i][0] + } else { + samplesIter[i] = nil + } + } + + return newAggrChunkIterator(samplesIter) + } + + // Impossible for now. + return nil +} + +type aggrChunkIterator struct { + iters [5]chunkenc.Iterator + curr chunks.Meta + countChkIter chunks.Iterator + + err error +} + +func newAggrChunkIterator(iters [5]chunkenc.Iterator) chunks.Iterator { + return &aggrChunkIterator{ + iters: iters, + countChkIter: storage.NewSeriesToChunkEncoder(&storage.SeriesEntry{ + SampleIteratorFn: func() chunkenc.Iterator { + return iters[downsample.AggrCount] + }, + }).Iterator(), + } +} + +func (a *aggrChunkIterator) Next() bool { + if !a.countChkIter.Next() { + if err := a.countChkIter.Err(); err != nil { + a.err = err + } + return false + } + + countChk := a.countChkIter.At() + mint := countChk.MinTime + maxt := countChk.MaxTime + + var ( + chks [5]chunkenc.Chunk + chk *chunks.Meta + err error + ) + + chks[downsample.AggrCount] = countChk.Chunk + for i := downsample.AggrSum; i <= downsample.AggrCounter; i++ { + chk, err = a.toChunk(i, mint, maxt) + if err != nil { + a.err = err + return false + } + if chk != nil { + chks[i] = chk.Chunk + } + } + + a.curr = chunks.Meta{ + MinTime: mint, + MaxTime: maxt, + Chunk: downsample.EncodeAggrChunk(chks), + } + return true +} + +func (a *aggrChunkIterator) At() chunks.Meta { + return a.curr +} + +func (a *aggrChunkIterator) Err() error { + return a.err +} + +func (a *aggrChunkIterator) toChunk(at downsample.AggrType, minTime, maxTime int64) (*chunks.Meta, error) { + if a.iters[at] == nil { + return nil, nil + } + c := chunkenc.NewXORChunk() + appender, err := c.Appender() + if err != nil { + return nil, err + } + + it := NewBoundedSeriesIterator(a.iters[at], minTime, maxTime) + + var ( + lastT int64 + lastV float64 + ) + for it.Next() { + lastT, lastV = it.At() + appender.Append(lastT, lastV) + } + if err := it.Err(); err != nil { + return nil, err + } + + // No sample in the required time range. + if lastT == 0 && lastV == 0 { + return nil, nil + } + + // Encode last sample for AggrCounter. + if at == downsample.AggrCounter { + appender.Append(lastT, lastV) + } + + return &chunks.Meta{ + MinTime: minTime, + MaxTime: maxTime, + Chunk: c, + }, nil +} diff --git a/pkg/dedup/chunk_iter_test.go b/pkg/dedup/chunk_iter_test.go new file mode 100644 index 0000000000..0e1eaf0b3a --- /dev/null +++ b/pkg/dedup/chunk_iter_test.go @@ -0,0 +1,325 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package dedup + +import ( + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestDedupChunkSeriesMerger(t *testing.T) { + m := NewChunkSeriesMerger() + + for _, tc := range []struct { + name string + input []storage.ChunkSeries + expected storage.ChunkSeries + }{ + { + name: "single empty series", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + { + name: "single series", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + }, + { + name: "two empty series", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + { + name: "two non overlapping", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}, []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + { + name: "two overlapping", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}, []tsdbutil.Sample{sample{10, 10}}), + }, + { + name: "two overlapping with large time diff", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{2, 2}, sample{5008, 5008}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + // sample{5008, 5008} is added to the result due to its large timestamp. + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{5008, 5008}}), + }, + { + name: "two duplicated", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + }, + { + name: "three overlapping", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}), + }, + // only samples from the last series are retained due to high penalty. + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}), + }, + { + name: "three in chained overlap", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 66}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}), + }, + // only samples from the last series are retained due to high penalty. + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + }, + { + name: "three in chained overlap complex", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, + []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}, + ), + }, + { + name: "110 overlapping samples", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 110)), // [0 - 110) + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 50)), // [60 - 110) + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + tsdbutil.GenerateSamples(0, 110), + ), + }, + { + name: "150 overlapping samples, no chunk splitting due to penalty deduplication", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 90)), // [0 - 90) + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 90)), // [90 - 150) + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + tsdbutil.GenerateSamples(0, 90), + ), + }, + } { + t.Run(tc.name, func(t *testing.T) { + merged := m(tc.input...) + testutil.Equals(t, tc.expected.Labels(), merged.Labels()) + actChks, actErr := storage.ExpandChunks(merged.Iterator()) + expChks, expErr := storage.ExpandChunks(tc.expected.Iterator()) + + testutil.Equals(t, expErr, actErr) + testutil.Equals(t, expChks, actChks) + }) + } +} + +func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) { + m := NewChunkSeriesMerger() + + defaultLabels := labels.FromStrings("bar", "baz") + emptySamples := downsample.SamplesFromTSDBSamples([]tsdbutil.Sample{}) + // Samples are created with step 1m. So the 5m downsampled chunk has 2 samples. + samples1 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(0, 10, 60*1000)) + // Non overlapping samples with samples1. 5m downsampled chunk has 2 samples. + samples2 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(600000, 10, 60*1000)) + // Overlapped with samples1. + samples3 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(120000, 10, 60*1000)) + + for _, tc := range []struct { + name string + input []storage.ChunkSeries + expected storage.ChunkSeries + }{ + { + name: "single empty series", + input: []storage.ChunkSeries{ + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(emptySamples, downsample.ResLevel1)...) + }, + }, + }, + expected: &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator() + }, + }, + }, + { + name: "single series", + input: []storage.ChunkSeries{ + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + }, + expected: &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + }, + { + name: "two empty series", + input: []storage.ChunkSeries{ + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(emptySamples, downsample.ResLevel1)...) + }, + }, + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(emptySamples, downsample.ResLevel1)...) + }, + }, + }, + expected: &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator() + }, + }, + }, + { + name: "two non overlapping series", + input: []storage.ChunkSeries{ + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples2, downsample.ResLevel1)...) + }, + }, + }, + expected: &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator( + append(downsample.DownsampleRaw(samples1, downsample.ResLevel1), + downsample.DownsampleRaw(samples2, downsample.ResLevel1)...)...) + }, + }, + }, + { + // 1:1 duplicated chunks are deduplicated. + name: "two same series", + input: []storage.ChunkSeries{ + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + }, + expected: &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator( + downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + }, + { + name: "two overlapping series", + input: []storage.ChunkSeries{ + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...) + }, + }, + &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples3, downsample.ResLevel1)...) + }, + }, + }, + expected: &storage.ChunkSeriesEntry{ + Lset: defaultLabels, + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(chunks.Meta{ + MinTime: 299999, + MaxTime: 540000, + Chunk: downsample.EncodeAggrChunk([5]chunkenc.Chunk{ + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 3}, sample{540000, 5}}).Chunk, + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 540000}, sample{540000, 2100000}}).Chunk, + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 120000}, sample{540000, 300000}}).Chunk, + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 240000}, sample{540000, 540000}}).Chunk, + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 240000}, sample{299999, 240000}}).Chunk, + }), + }) + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + merged := m(tc.input...) + testutil.Equals(t, tc.expected.Labels(), merged.Labels()) + actChks, actErr := storage.ExpandChunks(merged.Iterator()) + expChks, expErr := storage.ExpandChunks(tc.expected.Iterator()) + + testutil.Equals(t, expErr, actErr) + testutil.Equals(t, expChks, actChks) + }) + } +} + +func createSamplesWithStep(start, numOfSamples, step int) []tsdbutil.Sample { + res := make([]tsdbutil.Sample, numOfSamples) + cur := start + for i := 0; i < numOfSamples; i++ { + res[i] = sample{t: int64(cur), v: float64(cur)} + cur += step + } + + return res +} diff --git a/pkg/dedup/iter_test.go b/pkg/dedup/iter_test.go index b3f52f5bde..79bd5de3c8 100644 --- a/pkg/dedup/iter_test.go +++ b/pkg/dedup/iter_test.go @@ -22,6 +22,14 @@ type sample struct { v float64 } +func (s sample) T() int64 { + return s.t +} + +func (s sample) V() float64 { + return s.v +} + type series struct { lset labels.Labels samples []sample diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 45b8918fec..d19bc778ec 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -70,8 +70,15 @@ func (b *blockDesc) Create(ctx context.Context, dir string, delay time.Duration, } func TestCompactWithStoreGateway(t *testing.T) { - t.Parallel() + testCompactWithStoreGateway(t, false) +} + +func TestCompactWithStoreGatewayWithPenaltyDedup(t *testing.T) { + testCompactWithStoreGateway(t, true) +} +func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) { + t.Parallel() logger := log.NewLogfmtLogger(os.Stdout) justAfterConsistencyDelay := 30 * time.Minute @@ -325,7 +332,11 @@ func TestCompactWithStoreGateway(t *testing.T) { }, ) - s, err := e2e.NewScenario("e2e_test_compact") + name := "e2e_test_compact" + if penaltyDedup { + name = "e2e_test_compact_penalty_dedup" + } + s, err := e2e.NewScenario(name) testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, s)) @@ -537,6 +548,49 @@ func TestCompactWithStoreGateway(t *testing.T) { {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, } + if penaltyDedup { + expectedEndVector = model.Vector{ + // NOTE(bwplotka): Even after deduplication some series has still replica labels. This is because those blocks did not overlap yet with anything. + // This is fine as querier deduplication will remove it if needed. + {Value: 360, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "no-compaction", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-one-block-marked-for-no-compact"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-one-block-marked-for-no-compact"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-after-dedup"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-after-dedup"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-after-dedup"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-after-dedup"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-after-dedup", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "a-partial-overlap-dedup-ready"}}, + // If no penalty dedup enabled, the value should be 360. + {Value: 200, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "a-partial-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "a-partial-overlap-dedup-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "a-partial-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "a-partial-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "a-partial-overlap-dedup-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "partial-multi-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "partial-multi-replica-overlap-dedup-ready", "replica": "1", "rule_replica": "1"}}, + // If no penalty dedup enabled, the value should be 240. + {Value: 195, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "partial-multi-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "partial-multi-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "partial-multi-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "partial-multi-replica-overlap-dedup-ready", "replica": "1", "rule_replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "full-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "full-replica-overlap-dedup-ready"}}, + {Value: 240, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "full-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, + } + } + // No replica label with overlaps should halt compactor. This test is sequential // because we do not want two Thanos Compact instances deleting the same partially // uploaded blocks and blocks with deletion marks. We also check that Thanos Compactor @@ -617,8 +671,13 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, block.Download(ctx, logger, bkt, id, filepath.Join(p, "compact", compact.DefaultGroupKey(m.Thanos), id.String()))) } + extArgs := []string{"--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica"} + if penaltyDedup { + extArgs = append(extArgs, "--deduplication.func=penalty") + } + // We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction. - c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica") + c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, extArgs...) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(c)) @@ -679,7 +738,11 @@ func TestCompactWithStoreGateway(t *testing.T) { } t.Run("dedup enabled; no delete delay; compactor should work and remove things as expected", func(t *testing.T) { - c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica", "--delete-delay=0s") + extArgs := []string{"--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica", "--delete-delay=0s"} + if penaltyDedup { + extArgs = append(extArgs, "--deduplication.func=penalty") + } + c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, extArgs...) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(c))