Skip to content

Commit

Permalink
Receiver: Validate labels in write requests (#5508)
Browse files Browse the repository at this point in the history
* Add label set validation method

Signed-off-by: Matej Gera <[email protected]>

* Add tests for label validation method

Signed-off-by: Matej Gera <[email protected]>

* Validate label set during receiver write

Signed-off-by: Matej Gera <[email protected]>

* Handle labels conflict errors

Signed-off-by: Matej Gera <[email protected]>

* Rename out-of-order error methods

Signed-off-by: Matej Gera <[email protected]>

* Add empty label value validation

Signed-off-by: Matej Gera <[email protected]>

* Add receive writer tests

Signed-off-by: Matej Gera <[email protected]>

* Optimize label validation

Signed-off-by: Matej Gera <[email protected]>

* Adjust writer tests

Signed-off-by: Matej Gera <[email protected]>

* Fix comment in valildation method

Signed-off-by: Matej Gera <[email protected]>

* Improve error handling

Signed-off-by: Matej Gera <[email protected]>
  • Loading branch information
matej-g authored Aug 6, 2022
1 parent 547da58 commit 9812cea
Show file tree
Hide file tree
Showing 8 changed files with 444 additions and 42 deletions.
6 changes: 3 additions & 3 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ type HealthStats struct {
MetricLabelValuesCount int64
}

// PrometheusIssue5372Err returns an error if the HealthStats object indicates
// OutOfOrderLabelsErr returns an error if the HealthStats object indicates
// postings with out of order labels. This is corrected by Prometheus Issue
// #5372 and affects Prometheus versions 2.8.0 and below.
func (i HealthStats) PrometheusIssue5372Err() error {
func (i HealthStats) OutOfOrderLabelsErr() error {
if i.OutOfOrderLabels > 0 {
return errors.Errorf("index contains %d postings with out of order labels",
i.OutOfOrderLabels)
Expand Down Expand Up @@ -157,7 +157,7 @@ func (i HealthStats) AnyErr() error {
errMsg = append(errMsg, err.Error())
}

if err := i.PrometheusIssue5372Err(); err != nil {
if err := i.OutOfOrderLabelsErr(); err != nil {
errMsg = append(errMsg, err.Error())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}

if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil {
if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil {
return errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}
Expand Down
31 changes: 26 additions & 5 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,13 +1031,34 @@ func isConflict(err error) bool {
return false
}
return err == errConflict ||
err == storage.ErrDuplicateSampleForTimestamp ||
isSampleConflictErr(err) ||
isExemplarConflictErr(err) ||
isLabelsConflictErr(err) ||
status.Code(err) == codes.AlreadyExists
}

// isSampleConflictErr returns whether or not the given error represents
// a sample-related conflict.
func isSampleConflictErr(err error) bool {
return err == storage.ErrDuplicateSampleForTimestamp ||
err == storage.ErrOutOfOrderSample ||
err == storage.ErrOutOfBounds ||
err == storage.ErrDuplicateExemplar ||
err == storage.ErrOutOfBounds
}

// isExemplarConflictErr returns whether or not the given error represents
// a exemplar-related conflict.
func isExemplarConflictErr(err error) bool {
return err == storage.ErrDuplicateExemplar ||
err == storage.ErrOutOfOrderExemplar ||
err == storage.ErrExemplarLabelLength ||
status.Code(err) == codes.AlreadyExists
err == storage.ErrExemplarLabelLength
}

// isLabelsConflictErr returns whether or not the given error represents
// a labels-related conflict.
func isLabelsConflictErr(err error) bool {
return err == labelpb.ErrDuplicateLabels ||
err == labelpb.ErrEmptyLabels ||
err == labelpb.ErrOutOfOrderLabels
}

// isNotReady returns whether or not the given error represents a not ready error.
Expand Down
26 changes: 25 additions & 1 deletion pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ func TestDetermineWriteErrorCause(t *testing.T) {
threshold: 1,
exp: errConflict,
},
{
name: "matching multierror (labels error)",
err: errutil.NonNilMultiError([]error{
labelpb.ErrEmptyLabels,
errors.New("foo"),
errors.New("bar"),
}),
threshold: 1,
exp: errConflict,
},
{
name: "matching but below threshold multierror",
err: errutil.NonNilMultiError([]error{
Expand Down Expand Up @@ -164,7 +174,7 @@ func TestDetermineWriteErrorCause(t *testing.T) {
exp: errConflict,
},
{
name: "matching multierror many, both above threshold, conflict have precedence",
name: "matching multierror many, both above threshold, conflict has precedence",
err: errutil.NonNilMultiError([]error{
storage.ErrOutOfOrderSample,
errConflict,
Expand All @@ -177,6 +187,20 @@ func TestDetermineWriteErrorCause(t *testing.T) {
threshold: 2,
exp: errConflict,
},
{
name: "matching multierror many, both above threshold, conflict has precedence (labels error)",
err: errutil.NonNilMultiError([]error{
labelpb.ErrDuplicateLabels,
labelpb.ErrDuplicateLabels,
tsdb.ErrNotReady,
tsdb.ErrNotReady,
tsdb.ErrNotReady,
labelpb.ErrDuplicateLabels,
errors.New("foo"),
}),
threshold: 2,
exp: errConflict,
},
{
name: "nested matching multierror",
err: errors.Wrap(errors.Wrap(errutil.NonNilMultiError([]error{
Expand Down
106 changes: 75 additions & 31 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
tLogger := log.With(r.logger, "tenant", tenantID)

var (
numOutOfOrder = 0
numDuplicates = 0
numOutOfBounds = 0
numLabelsOutOfOrder = 0
numLabelsDuplicates = 0
numLabelsEmpty = 0

numSamplesOutOfOrder = 0
numSamplesDuplicates = 0
numSamplesOutOfBounds = 0

numExemplarsOutOfOrder = 0
numExemplarsDuplicate = 0
numExemplarsLabelLength = 0
Expand All @@ -70,6 +75,26 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
errs errutil.MultiError
)
for _, t := range wreq.Timeseries {
// Check if time series labels are valid. If not, skip the time series
// and report the error.
if err := labelpb.ValidateLabels(t.Labels); err != nil {
switch err {
case labelpb.ErrOutOfOrderLabels:
numLabelsOutOfOrder++
level.Debug(tLogger).Log("msg", "Out of order labels in the label set", "lset", t.Labels)
case labelpb.ErrDuplicateLabels:
numLabelsDuplicates++
level.Debug(tLogger).Log("msg", "Duplicate labels in the label set", "lset", t.Labels)
case labelpb.ErrEmptyLabels:
numLabelsEmpty++
level.Debug(tLogger).Log("msg", "Labels with empty name in the label set", "lset", t.Labels)
default:
level.Debug(tLogger).Log("msg", "Error validating labels", "err", err)
}

continue
}

lset := labelpb.ZLabelsToPromLabels(t.Labels)

// Check if the TSDB has cached reference for those labels.
Expand All @@ -86,14 +111,18 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
ref, err = app.Append(ref, lset, s.Timestamp, s.Value)
switch err {
case storage.ErrOutOfOrderSample:
numOutOfOrder++
numSamplesOutOfOrder++
level.Debug(tLogger).Log("msg", "Out of order sample", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
numSamplesDuplicates++
level.Debug(tLogger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
case storage.ErrOutOfBounds:
numOutOfBounds++
numSamplesOutOfBounds++
level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
default:
if err != nil {
level.Debug(tLogger).Log("msg", "Error ingesting sample", "err", err)
}
}
}

Expand All @@ -102,45 +131,60 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
if ref != 0 && len(t.Exemplars) > 0 {
for _, ex := range t.Exemplars {
exLset := labelpb.ZLabelsToPromLabels(ex.Labels)
logger := log.With(tLogger, "exemplarLset", exLset, "exemplar", ex.String())
exLogger := log.With(tLogger, "exemplarLset", exLset, "exemplar", ex.String())

_, err = app.AppendExemplar(ref, lset, exemplar.Exemplar{
if _, err = app.AppendExemplar(ref, lset, exemplar.Exemplar{
Labels: exLset,
Value: ex.Value,
Ts: ex.Timestamp,
HasTs: true,
})
switch err {
case storage.ErrOutOfOrderExemplar:
numExemplarsOutOfOrder++
level.Debug(logger).Log("msg", "Out of order exemplar")
case storage.ErrDuplicateExemplar:
numExemplarsDuplicate++
level.Debug(logger).Log("msg", "Duplicate exemplar")
case storage.ErrExemplarLabelLength:
numExemplarsLabelLength++
level.Debug(logger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength)
default:
if err != nil {
level.Debug(logger).Log("msg", "Error ingesting exemplar", "err", err)
}); err != nil {
switch err {
case storage.ErrOutOfOrderExemplar:
numExemplarsOutOfOrder++
level.Debug(exLogger).Log("msg", "Out of order exemplar")
case storage.ErrDuplicateExemplar:
numExemplarsDuplicate++
level.Debug(exLogger).Log("msg", "Duplicate exemplar")
case storage.ErrExemplarLabelLength:
numExemplarsLabelLength++
level.Debug(exLogger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength)
default:
if err != nil {
level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err)
}
}
}
}
}
}

if numOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numOutOfOrder)
errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "add %d samples", numOutOfOrder))
if numLabelsOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on series with out-of-order labels", "numDropped", numLabelsOutOfOrder)
errs.Add(errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add %d series", numLabelsOutOfOrder))
}
if numLabelsDuplicates > 0 {
level.Warn(tLogger).Log("msg", "Error on series with duplicate labels", "numDropped", numLabelsDuplicates)
errs.Add(errors.Wrapf(labelpb.ErrDuplicateLabels, "add %d series", numLabelsDuplicates))
}
if numLabelsEmpty > 0 {
level.Warn(tLogger).Log("msg", "Error on series with empty label name or value", "numDropped", numLabelsEmpty)
errs.Add(errors.Wrapf(labelpb.ErrEmptyLabels, "add %d series", numLabelsEmpty))
}

if numSamplesOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numSamplesOutOfOrder)
errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "add %d samples", numSamplesOutOfOrder))
}
if numDuplicates > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numDuplicates)
errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numDuplicates))
if numSamplesDuplicates > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numSamplesDuplicates)
errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numSamplesDuplicates))
}
if numOutOfBounds > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numOutOfBounds)
errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numOutOfBounds))
if numSamplesOutOfBounds > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numSamplesOutOfBounds)
errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numSamplesOutOfBounds))
}

if numExemplarsOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order exemplars", "numDropped", numExemplarsOutOfOrder)
errs.Add(errors.Wrapf(storage.ErrOutOfOrderExemplar, "add %d exemplars", numExemplarsOutOfOrder))
Expand Down
98 changes: 98 additions & 0 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,94 @@ func TestWriter(t *testing.T) {
expectedIngested []prompb.TimeSeries
maxExemplars int64
}{
"should error out on series with no labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
{
Labels: []labelpb.ZLabel{{Name: "__name__", Value: ""}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: errors.Wrapf(labelpb.ErrEmptyLabels, "add 2 series"),
},
"should succeed on series with valid labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: nil,
expectedIngested: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
"should error out and skip series with out-of-order labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "1"}, labelpb.ZLabel{Name: "Z", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add 1 series"),
},
"should error out and skip series with duplicate labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}, labelpb.ZLabel{Name: "z", Value: "1"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: errors.Wrapf(labelpb.ErrDuplicateLabels, "add 1 series"),
},
"should error out and skip series with out-of-order labels; accept series with valid labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "A", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
{
Labels: append(lbls, labelpb.ZLabel{Name: "c", Value: "1"}, labelpb.ZLabel{Name: "d", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
{
Labels: append(lbls, labelpb.ZLabel{Name: "E", Value: "1"}, labelpb.ZLabel{Name: "f", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add 2 series"),
expectedIngested: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "c", Value: "1"}, labelpb.ZLabel{Name: "d", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
"should succeed on valid series with exemplars": {
reqs: []*prompb.WriteRequest{{
Timeseries: []prompb.TimeSeries{
Expand Down Expand Up @@ -171,6 +259,16 @@ func TestWriter(t *testing.T) {
testutil.Equals(t, testData.expectedErr.Error(), err.Error())
}
}

// On each expected series, assert we have a ref available.
a, err := app.Appender(context.Background())
testutil.Ok(t, err)
gr := a.(storage.GetRef)

for _, ts := range testData.expectedIngested {
ref, _ := gr.GetRef(labelpb.ZLabelsToPromLabels(ts.Labels))
testutil.Assert(t, ref != 0, fmt.Sprintf("appender should have reference to series %v", ts))
}
})
}
}
Expand Down
Loading

0 comments on commit 9812cea

Please sign in to comment.