Skip to content

Commit

Permalink
Merge pull request #2 from jnyi/issue-6167
Browse files Browse the repository at this point in the history
Issue 6167
  • Loading branch information
jnyi authored Mar 10, 2023
2 parents c88f5a3 + 544a38f commit 6734594
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#6185](https://github.com/thanos-io/thanos/pull/6185) Tracing: tracing in OTLP support configuring service_name.
- [#6192](https://github.com/thanos-io/thanos/pull/6192) Store: add flag `bucket-web-label` to select the label to use as timeline title in web UI
- [#6167](https://github.com/thanos-io/thanos/pull/6195) Receive: add flag `tsdb.too-far-in-future.time-window` to prevent clock skewed samples to pollute TSDB head and block all valid incoming samples.

### Fixed

Expand Down
11 changes: 10 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ func runReceive(
conf.allowOutOfOrderUpload,
hashFunc,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, conf.writerInterning)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{
Intern: conf.writerInterning,
TooFarInFutureTimeWindow: int64(time.Duration(*conf.tsdbTooFarInFutureTimeWindow)),
})

var limitsConfig *receive.RootLimitsConfig
if conf.writeLimitsConfig != nil {
Expand Down Expand Up @@ -774,6 +777,7 @@ type receiveConfig struct {

tsdbMinBlockDuration *model.Duration
tsdbMaxBlockDuration *model.Duration
tsdbTooFarInFutureTimeWindow *model.Duration
tsdbOutOfOrderTimeWindow *model.Duration
tsdbOutOfOrderCapMax int64
tsdbAllowOverlappingBlocks bool
Expand Down Expand Up @@ -866,6 +870,11 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.tsdbMaxBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())

rc.tsdbTooFarInFutureTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.too-far-in-future.time-window",
"[EXPERIMENTAL] Configures the allowed time window for ingesting samples too far in the future. Disabled (0s) by default"+
"Please note enable this flag will reject samples in the future of receive local NTP time + configured duration.",
).Default("0s").Hidden())

rc.tsdbOutOfOrderTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.out-of-order.time-window",
"[EXPERIMENTAL] Configures the allowed time window for ingestion of out-of-order samples. Disabled (0s) by default"+
"Please note if you enable this option and you use compactor, make sure you have the --enable-vertical-compaction flag enabled, otherwise you might risk compactor halt.",
Expand Down
5 changes: 3 additions & 2 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
var (
cfg = []HashringConfig{{Hashring: "test"}}
handlers []*Handler
wOpts = &WriterOptions{}
)
// create a fake peer group where we manually fill the cache with fake addresses pointed to our handlers
// This removes the network from the tests and creates a more consistent testing harness.
Expand All @@ -187,7 +188,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
ReplicaHeader: DefaultReplicaHeader,
ReplicationFactor: replicationFactor,
ForwardTimeout: 5 * time.Minute,
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i]), false),
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i]), wOpts),
Limiter: limiter,
})
handlers = append(handlers, h)
Expand Down Expand Up @@ -948,7 +949,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
metadata.NoneFunc,
)
defer func() { testutil.Ok(b, m.Close()) }()
handler.writer = NewWriter(logger, m, false)
handler.writer = NewWriter(logger, m, &WriterOptions{})

testutil.Ok(b, m.Flush())
testutil.Ok(b, m.Open())
Expand Down
28 changes: 22 additions & 6 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package receive

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand All @@ -26,17 +28,25 @@ type TenantStorage interface {
TenantAppendable(string) (Appendable, error)
}

type WriterOptions struct {
Intern bool
TooFarInFutureTimeWindow int64 // Unit: nanoseconds
}

type Writer struct {
logger log.Logger
multiTSDB TenantStorage
intern bool
opts *WriterOptions
}

func NewWriter(logger log.Logger, multiTSDB TenantStorage, intern bool) *Writer {
func NewWriter(logger log.Logger, multiTSDB TenantStorage, opts *WriterOptions) *Writer {
if opts == nil {
opts = &WriterOptions{}
}
return &Writer{
logger: logger,
multiTSDB: multiTSDB,
intern: intern,
opts: opts,
}
}

Expand Down Expand Up @@ -71,7 +81,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
return errors.Wrap(err, "get appender")
}
getRef := app.(storage.GetRef)

tooFarInFuture := model.Now().Add(time.Duration(r.opts.TooFarInFutureTimeWindow))
var (
ref storage.SeriesRef
errs writeErrors
Expand Down Expand Up @@ -105,13 +115,19 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
if ref == 0 {
// If not, copy labels, as TSDB will hold those strings long term. Given no
// copy unmarshal we don't want to keep memory for whole protobuf, only for labels.
labelpb.ReAllocZLabelsStrings(&t.Labels, r.intern)
labelpb.ReAllocZLabelsStrings(&t.Labels, r.opts.Intern)
lset = labelpb.ZLabelsToPromLabels(t.Labels)
}

// Append as many valid samples as possible, but keep track of the errors.
for _, s := range t.Samples {
ref, err = app.Append(ref, lset, s.Timestamp, s.Value)
if r.opts.TooFarInFutureTimeWindow != 0 && tooFarInFuture.Before(model.Time(s.Timestamp)) {
// now + tooFarInFutureTimeWindow < sample timestamp
err = storage.ErrOutOfBounds
level.Debug(tLogger).Log("msg", "block metric too far in the future", "lset", lset, "timestamp", s.Timestamp, "bound", tooFarInFuture)
} else {
ref, err = app.Append(ref, lset, s.Timestamp, s.Value)
}
switch err {
case storage.ErrOutOfOrderSample:
numSamplesOutOfOrder++
Expand Down
44 changes: 41 additions & 3 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -27,12 +28,14 @@ import (
)

func TestWriter(t *testing.T) {
now := model.Now()
lbls := []labelpb.ZLabel{{Name: "__name__", Value: "test"}}
tests := map[string]struct {
reqs []*prompb.WriteRequest
expectedErr error
expectedIngested []prompb.TimeSeries
maxExemplars int64
opts *WriterOptions
}{
"should error out on series with no labels": {
reqs: []*prompb.WriteRequest{
Expand Down Expand Up @@ -122,6 +125,41 @@ func TestWriter(t *testing.T) {
},
},
},
"should succeed when sample timestamp is NOT too far in the future": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: lbls,
Samples: []prompb.Sample{{Value: 1, Timestamp: int64(now)}},
},
},
},
},
expectedErr: nil,
expectedIngested: []prompb.TimeSeries{
{
Labels: lbls,
Samples: []prompb.Sample{{Value: 1, Timestamp: int64(now)}},
},
},
opts: &WriterOptions{TooFarInFutureTimeWindow: 30 * int64(time.Second)},
},
"should error out when sample timestamp is too far in the future": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: lbls,
// A sample with a very large timestamp in year 5138 (milliseconds)
Samples: []prompb.Sample{{Value: 1, Timestamp: 99999999999999}},
},
},
},
},
expectedErr: errors.Wrapf(storage.ErrOutOfBounds, "add 1 samples"),
opts: &WriterOptions{TooFarInFutureTimeWindow: 10000},
},
"should succeed on valid series with exemplars": {
reqs: []*prompb.WriteRequest{{
Timeseries: []prompb.TimeSeries{
Expand Down Expand Up @@ -299,7 +337,7 @@ func TestWriter(t *testing.T) {
return err
}))

w := NewWriter(logger, m, false)
w := NewWriter(logger, m, testData.opts)

for idx, req := range testData.reqs {
err = w.Write(context.Background(), DefaultTenant, req)
Expand Down Expand Up @@ -398,7 +436,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
}

b.Run("without interning", func(b *testing.B) {
w := NewWriter(logger, m, false)
w := NewWriter(logger, m, &WriterOptions{Intern: false})

b.ReportAllocs()
b.ResetTimer()
Expand All @@ -409,7 +447,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
})

b.Run("with interning", func(b *testing.B) {
w := NewWriter(logger, m, true)
w := NewWriter(logger, m, &WriterOptions{Intern: true})

b.ReportAllocs()
b.ResetTimer()
Expand Down

0 comments on commit 6734594

Please sign in to comment.