Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSDB: Reintroduce 'quiet zeros' for otel start time handling #791

Merged
merged 6 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions model/value/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
// complicated values in the future. It is 2 rather than 1 to make
// it easier to distinguish from the NormalNaN by a human when debugging.
StaleNaN uint64 = 0x7ff0000000000002

// QuietZeroNaN signals TSDB to add a zero, but do nothing if there is already a value at that timestamp.
QuietZeroNaN uint64 = 0x7ff0000000000003
)

// IsStaleNaN returns true when the provided NaN value is a stale marker.
Expand Down
12 changes: 8 additions & 4 deletions storage/remote/otlptranslator/prometheusremotewrite/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,9 +595,10 @@ const defaultIntervalForStartTimestamps = int64(300_000)
// handleStartTime adds a zero sample at startTs only if startTs is within validIntervalForStartTimestamps of the sample timestamp.
// The reason for doing this is that PRW v1 doesn't support Created Timestamps. After switching to PRW v2's direct CT support,
// make use of its direct support fort Created Timestamps instead.
// See https://github.com/prometheus/prometheus/issues/14600 for context.
// See https://opentelemetry.io/docs/specs/otel/metrics/data-model/#resets-and-gaps to know more about how OTel handles
// resets for cumulative metrics.
func (c *PrometheusConverter) handleStartTime(startTs, ts int64, labels []prompb.Label, settings Settings, typ string, value float64, logger *slog.Logger) {
func (c *PrometheusConverter) handleStartTime(startTs, ts int64, labels []prompb.Label, settings Settings, typ string, val float64, logger *slog.Logger) {
if !settings.EnableCreatedTimestampZeroIngestion {
return
}
Expand All @@ -619,10 +620,13 @@ func (c *PrometheusConverter) handleStartTime(startTs, ts int64, labels []prompb
return
}

logger.Debug("adding zero value at start_ts", "type", typ, "labels", labelsStringer(labels), "start_ts", startTs, "sample_ts", ts, "sample_value", value)
logger.Debug("adding zero value at start_ts", "type", typ, "labels", labelsStringer(labels), "start_ts", startTs, "sample_ts", ts, "sample_value", val)

// See https://github.com/prometheus/prometheus/issues/14600 for context.
c.addSample(&prompb.Sample{Timestamp: startTs}, labels)
var createdTimeValue float64
if settings.EnableStartTimeQuietZero {
createdTimeValue = math.Float64frombits(value.QuietZeroNaN)
}
c.addSample(&prompb.Sample{Timestamp: startTs, Value: createdTimeValue}, labels)
}

// handleHistogramStartTime similar to the method above but for native histograms..
Expand Down
20 changes: 15 additions & 5 deletions storage/remote/otlptranslator/prometheusremotewrite/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package prometheusremotewrite

import (
"context"
"math"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -29,7 +31,9 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"

"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/util/testutil"
)

func TestCreateAttributes(t *testing.T) {
Expand Down Expand Up @@ -313,14 +317,14 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
timeSeriesSignature(labels): {
Labels: labels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(nowMinus2m30s)},
{Value: math.Float64frombits(value.QuietZeroNaN), Timestamp: convertTimeStamp(nowMinus2m30s)},
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
},
},
timeSeriesSignature(sumLabels): {
Labels: sumLabels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(nowMinus2m30s)},
{Value: math.Float64frombits(value.QuietZeroNaN), Timestamp: convertTimeStamp(nowMinus2m30s)},
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
},
},
Expand Down Expand Up @@ -361,14 +365,14 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
timeSeriesSignature(labels): {
Labels: labels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(nowMinus6m)},
{Value: math.Float64frombits(value.QuietZeroNaN), Timestamp: convertTimeStamp(nowMinus6m)},
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
},
},
timeSeriesSignature(sumLabels): {
Labels: sumLabels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(nowMinus6m)},
{Value: math.Float64frombits(value.QuietZeroNaN), Timestamp: convertTimeStamp(nowMinus6m)},
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
},
},
Expand Down Expand Up @@ -474,19 +478,25 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
Settings{
ExportCreatedMetric: true,
EnableCreatedTimestampZeroIngestion: true,
EnableStartTimeQuietZero: true,
ValidIntervalCreatedTimestampZeroIngestion: tt.overrideValidInterval,
},
metric.Name(),
promslog.NewNopLogger(),
)
require.NoError(t, err)

assert.Equal(t, tt.want(), converter.unique)
testutil.RequireEqualWithOptions(t, tt.want(), converter.unique, []cmp.Option{cmp.Comparer(equalSamples)})
assert.Empty(t, converter.conflicts)
})
}
}

func equalSamples(a, b prompb.Sample) bool {
// Compare Float64bits so NaN values which are exactly the same will compare equal.
return a.Timestamp == b.Timestamp && math.Float64bits(a.Value) == math.Float64bits(b.Value)
}

func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
ts := pcommon.Timestamp(time.Now().UnixNano())
tests := []struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Settings struct {

// Mimir specifics.
EnableCreatedTimestampZeroIngestion bool
EnableStartTimeQuietZero bool
ValidIntervalCreatedTimestampZeroIngestion time.Duration
}

Expand Down
9 changes: 9 additions & 0 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) {
return db
}

// queryHead is a helper to query the head for a given time range and labelset.
func queryHead(t testing.TB, head *Head, mint, maxt int64, label labels.Label) (map[string][]chunks.Sample, error) {
q, err := NewBlockQuerier(head, mint, maxt)
if err != nil {
return nil, err
}
return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil
}

// query runs a matcher query against the querier and fully expands its data.
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
ss := q.Select(context.Background(), false, nil, matchers...)
Expand Down
11 changes: 10 additions & 1 deletion tsdb/head_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,14 +497,18 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi
if s.lastHistogramValue != nil || s.lastFloatHistogramValue != nil {
return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v)
}
if math.Float64bits(s.lastValue) != math.Float64bits(v) {
if math.Float64bits(s.lastValue) != math.Float64bits(v) && math.Float64bits(v) != value.QuietZeroNaN {
return false, 0, storage.NewDuplicateFloatErr(t, s.lastValue, v)
}
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
return false, 0, nil
}
}

if math.Float64bits(v) == value.QuietZeroNaN { // Say it's allowed; it will be dropped later in commitSamples.
return true, 0, nil
}

// The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk.
if oooTimeWindow > 0 && t >= headMaxt-oooTimeWindow {
return true, headMaxt - t, nil
Expand Down Expand Up @@ -1144,6 +1148,8 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
switch {
case err != nil:
// Do nothing here.
case oooSample && math.Float64bits(s.V) == value.QuietZeroNaN:
// No-op: we don't store quiet zeros out-of-order.
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
Expand Down Expand Up @@ -1190,6 +1196,9 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
acc.floatsAppended--
}
default:
if math.Float64bits(s.V) == value.QuietZeroNaN {
s.V = 0 // Note that this is modifying the copy which is what will be appended but the WAL got the NaN already.
}
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
Expand Down
123 changes: 113 additions & 10 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,15 +547,6 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
})
}

// queryHead is a helper to query the head for a given time range and labelset.
queryHead := func(mint, maxt uint64, label labels.Label) (map[string][]chunks.Sample, error) {
q, err := NewBlockQuerier(head, int64(mint), int64(maxt))
if err != nil {
return nil, err
}
return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil
}

// readerTsCh will be used by the coordinator go routine to coordinate which timestamps the reader should read.
readerTsCh := make(chan uint64)

Expand Down Expand Up @@ -583,7 +574,7 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
lbls.Range(func(l labels.Label) {
lbl = l
})
samples, err := queryHead(ts-qryRange, ts, lbl)
samples, err := queryHead(t, head, int64(ts-qryRange), int64(ts), lbl)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -6098,6 +6089,118 @@ func TestCuttingNewHeadChunks(t *testing.T) {
}
}

func TestAppendQuietZeroDuplicates(t *testing.T) {
ts := int64(1695209650)
lbls := labels.FromStrings("foo", "bar")
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
defer func() {
require.NoError(t, h.Close())
}()

a := h.Appender(context.Background())
_, err := a.Append(0, lbls, ts, 42.0)
require.NoError(t, err)
_, err = a.Append(0, lbls, ts, 42.0) // Exactly the same value.
require.NoError(t, err)
_, err = a.Append(0, lbls, ts, math.Float64frombits(value.QuietZeroNaN)) // Should be a no-op.
require.NoError(t, err)
require.NoError(t, a.Commit())

result, err := queryHead(t, h, math.MinInt64, math.MaxInt64, labels.Label{Name: "foo", Value: "bar"})
require.NoError(t, err)
expectedSamples := []chunks.Sample{sample{t: ts, f: 42.0}}
require.Equal(t, expectedSamples, result[`{foo="bar"}`])

a = h.Appender(context.Background())
_, err = a.Append(0, lbls, ts+10, math.Float64frombits(value.QuietZeroNaN)) // This is at a different timestamp so should append a real zero.
require.NoError(t, err)
_, err = a.Append(0, lbls, ts+15, 5.0) // We append a normal value to reflect what would happen in reality.
require.NoError(t, err)
require.NoError(t, a.Commit())

result, err = queryHead(t, h, math.MinInt64, math.MaxInt64, labels.Label{Name: "foo", Value: "bar"})
require.NoError(t, err)
expectedSamples = []chunks.Sample{
sample{t: ts, f: 42.0},
sample{t: ts + 10, f: 0},
sample{t: ts + 15, f: 5},
}
require.Equal(t, expectedSamples, result[`{foo="bar"}`])

a = h.Appender(context.Background())
_, err = a.Append(0, lbls, ts+5, math.Float64frombits(value.QuietZeroNaN)) // This is out-of-order, so should be dropped.
require.NoError(t, err)
require.NoError(t, a.Commit())

result, err = queryHead(t, h, math.MinInt64, math.MaxInt64, labels.Label{Name: "foo", Value: "bar"})
require.NoError(t, err)
require.Equal(t, expectedSamples, result[`{foo="bar"}`]) // Same expectedSamples as before.
}

func TestQuietZeroWALReplay(t *testing.T) {
ts := int64(1695209650)
lbls := labels.FromStrings("foo", "bar")
h, w := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, true)

a := h.Appender(context.Background())
_, err := a.Append(0, lbls, ts, 42.0)
require.NoError(t, err)
_, err = a.Append(0, lbls, ts, 42.0) // Exactly the same value.
require.NoError(t, err)
_, err = a.Append(0, lbls, ts, math.Float64frombits(value.QuietZeroNaN)) // Should be a no-op.
require.NoError(t, err)
_, err = a.Append(0, lbls, ts+10, math.Float64frombits(value.QuietZeroNaN)) // This is at a different timestamp so should append a real zero.
require.NoError(t, err)
_, err = a.Append(0, lbls, ts+5, math.Float64frombits(value.QuietZeroNaN)) // This is out-of-order, so should be dropped.
require.NoError(t, err)
require.NoError(t, a.Commit())

result, err := queryHead(t, h, math.MinInt64, math.MaxInt64, labels.Label{Name: "foo", Value: "bar"})
require.NoError(t, err)
expectedSamples := []chunks.Sample{
sample{t: ts, f: 42.0},
sample{t: ts + 10, f: 0},
}
require.Equal(t, expectedSamples, result[`{foo="bar"}`])

require.NoError(t, h.Close())

// Next we replay the WAL by creating a new head and then verify that previous samples are there as we expect them.
w, err = wlog.New(nil, nil, w.Dir(), wlog.CompressionNone)
require.NoError(t, err)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = w.Dir()
h, err = NewHead(nil, nil, w, nil, opts, nil)
require.NoError(t, err)
require.NoError(t, h.Init(0))
defer func() {
require.NoError(t, h.Close())
}()

result, err = queryHead(t, h, math.MinInt64, math.MaxInt64, labels.Label{Name: "foo", Value: "bar"})
require.NoError(t, err)
require.Equal(t, expectedSamples, result[`{foo="bar"}`])

// For correctness, we also verify that the WAL contains the expected records.
recs := readTestWAL(t, h.wal.Dir())
require.NotEmpty(t, recs, "WAL should contain records")

if samples, ok := recs[1].([]record.RefSample); ok {
require.Len(t, samples, 5)
require.Equal(t, record.RefSample{Ref: 1, T: ts, V: 42.0}, samples[0])
require.Equal(t, record.RefSample{Ref: 1, T: ts, V: 42.0}, samples[1])
require.True(t, math.IsNaN(samples[2].V))
require.Equal(t, ts, samples[2].T)
require.True(t, math.IsNaN(samples[3].V))
require.Equal(t, ts+10, samples[3].T)
require.True(t, math.IsNaN(samples[4].V))
require.Equal(t, ts+5, samples[4].T)
} else {
t.Fatalf("unexpected record type: %T", recs[1])
}
}

// TestHeadDetectsDuplicateSampleAtSizeLimit tests a regression where a duplicate sample
// is appended to the head, right when the head chunk is at the size limit.
// The test adds all samples as duplicate, thus expecting that the result has
Expand Down
7 changes: 7 additions & 0 deletions tsdb/head_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
Expand Down Expand Up @@ -589,6 +590,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
if s.T <= ms.mmMaxTime {
continue
}
if math.Float64bits(s.V) == value.QuietZeroNaN {
s.V = 0
}
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
Expand Down Expand Up @@ -989,6 +993,9 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi
unknownRefs++
continue
}
if math.Float64bits(s.V) == value.QuietZeroNaN {
continue
}
ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
if chunkCreated {
h.metrics.chunksCreated.Inc()
Expand Down
Loading