diff --git a/pkg/block/index.go b/pkg/block/index.go index 830ac89718..4bc13195d4 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -92,10 +92,10 @@ type HealthStats struct { MetricLabelValuesCount int64 } -// PrometheusIssue5372Err returns an error if the HealthStats object indicates +// OutOfOrderLabelsErr returns an error if the HealthStats object indicates // postings with out of order labels. This is corrected by Prometheus Issue // #5372 and affects Prometheus versions 2.8.0 and below. -func (i HealthStats) PrometheusIssue5372Err() error { +func (i HealthStats) OutOfOrderLabelsErr() error { if i.OutOfOrderLabels > 0 { return errors.Errorf("index contains %d postings with out of order labels", i.OutOfOrderLabels) @@ -157,7 +157,7 @@ func (i HealthStats) AnyErr() error { errMsg = append(errMsg, err.Error()) } - if err := i.PrometheusIssue5372Err(); err != nil { + if err := i.OutOfOrderLabelsErr(); err != nil { errMsg = append(errMsg, err.Error()) } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 8b7c442c41..9b17f3f221 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -1052,7 +1052,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) } - if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { + if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil { return errors.Wrapf(err, "block id %s, try running with --debug.accept-malformed-index", meta.ULID) } diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 0ca0228445..6fd854dce2 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -1031,13 +1031,34 @@ func isConflict(err error) bool { return false } return err == errConflict || - err == storage.ErrDuplicateSampleForTimestamp || + isSampleConflictErr(err) || + isExemplarConflictErr(err) || + isLabelsConflictErr(err) || + status.Code(err) == codes.AlreadyExists +} + +// isSampleConflictErr returns whether or not the given error represents +// a sample-related conflict. +func isSampleConflictErr(err error) bool { + return err == storage.ErrDuplicateSampleForTimestamp || err == storage.ErrOutOfOrderSample || - err == storage.ErrOutOfBounds || - err == storage.ErrDuplicateExemplar || + err == storage.ErrOutOfBounds +} + +// isExemplarConflictErr returns whether or not the given error represents +// a exemplar-related conflict. +func isExemplarConflictErr(err error) bool { + return err == storage.ErrDuplicateExemplar || err == storage.ErrOutOfOrderExemplar || - err == storage.ErrExemplarLabelLength || - status.Code(err) == codes.AlreadyExists + err == storage.ErrExemplarLabelLength +} + +// isLabelsConflictErr returns whether or not the given error represents +// a labels-related conflict. +func isLabelsConflictErr(err error) bool { + return err == labelpb.ErrDuplicateLabels || + err == labelpb.ErrEmptyLabels || + err == labelpb.ErrOutOfOrderLabels } // isNotReady returns whether or not the given error represents a not ready error. diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 325e344342..20cafba891 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -115,6 +115,16 @@ func TestDetermineWriteErrorCause(t *testing.T) { threshold: 1, exp: errConflict, }, + { + name: "matching multierror (labels error)", + err: errutil.NonNilMultiError([]error{ + labelpb.ErrEmptyLabels, + errors.New("foo"), + errors.New("bar"), + }), + threshold: 1, + exp: errConflict, + }, { name: "matching but below threshold multierror", err: errutil.NonNilMultiError([]error{ @@ -164,7 +174,7 @@ func TestDetermineWriteErrorCause(t *testing.T) { exp: errConflict, }, { - name: "matching multierror many, both above threshold, conflict have precedence", + name: "matching multierror many, both above threshold, conflict has precedence", err: errutil.NonNilMultiError([]error{ storage.ErrOutOfOrderSample, errConflict, @@ -177,6 +187,20 @@ func TestDetermineWriteErrorCause(t *testing.T) { threshold: 2, exp: errConflict, }, + { + name: "matching multierror many, both above threshold, conflict has precedence (labels error)", + err: errutil.NonNilMultiError([]error{ + labelpb.ErrDuplicateLabels, + labelpb.ErrDuplicateLabels, + tsdb.ErrNotReady, + tsdb.ErrNotReady, + tsdb.ErrNotReady, + labelpb.ErrDuplicateLabels, + errors.New("foo"), + }), + threshold: 2, + exp: errConflict, + }, { name: "nested matching multierror", err: errors.Wrap(errors.Wrap(errutil.NonNilMultiError([]error{ diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 90bb2897c4..aa02f7e08b 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -43,9 +43,14 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR tLogger := log.With(r.logger, "tenant", tenantID) var ( - numOutOfOrder = 0 - numDuplicates = 0 - numOutOfBounds = 0 + numLabelsOutOfOrder = 0 + numLabelsDuplicates = 0 + numLabelsEmpty = 0 + + numSamplesOutOfOrder = 0 + numSamplesDuplicates = 0 + numSamplesOutOfBounds = 0 + numExemplarsOutOfOrder = 0 numExemplarsDuplicate = 0 numExemplarsLabelLength = 0 @@ -70,6 +75,26 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR errs errutil.MultiError ) for _, t := range wreq.Timeseries { + // Check if time series labels are valid. If not, skip the time series + // and report the error. + if err := labelpb.ValidateLabels(t.Labels); err != nil { + switch err { + case labelpb.ErrOutOfOrderLabels: + numLabelsOutOfOrder++ + level.Debug(tLogger).Log("msg", "Out of order labels in the label set", "lset", t.Labels) + case labelpb.ErrDuplicateLabels: + numLabelsDuplicates++ + level.Debug(tLogger).Log("msg", "Duplicate labels in the label set", "lset", t.Labels) + case labelpb.ErrEmptyLabels: + numLabelsEmpty++ + level.Debug(tLogger).Log("msg", "Labels with empty name in the label set", "lset", t.Labels) + default: + level.Debug(tLogger).Log("msg", "Error validating labels", "err", err) + } + + continue + } + lset := labelpb.ZLabelsToPromLabels(t.Labels) // Check if the TSDB has cached reference for those labels. @@ -86,14 +111,18 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR ref, err = app.Append(ref, lset, s.Timestamp, s.Value) switch err { case storage.ErrOutOfOrderSample: - numOutOfOrder++ + numSamplesOutOfOrder++ level.Debug(tLogger).Log("msg", "Out of order sample", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) case storage.ErrDuplicateSampleForTimestamp: - numDuplicates++ + numSamplesDuplicates++ level.Debug(tLogger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) case storage.ErrOutOfBounds: - numOutOfBounds++ + numSamplesOutOfBounds++ level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) + default: + if err != nil { + level.Debug(tLogger).Log("msg", "Error ingesting sample", "err", err) + } } } @@ -102,45 +131,60 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR if ref != 0 && len(t.Exemplars) > 0 { for _, ex := range t.Exemplars { exLset := labelpb.ZLabelsToPromLabels(ex.Labels) - logger := log.With(tLogger, "exemplarLset", exLset, "exemplar", ex.String()) + exLogger := log.With(tLogger, "exemplarLset", exLset, "exemplar", ex.String()) - _, err = app.AppendExemplar(ref, lset, exemplar.Exemplar{ + if _, err = app.AppendExemplar(ref, lset, exemplar.Exemplar{ Labels: exLset, Value: ex.Value, Ts: ex.Timestamp, HasTs: true, - }) - switch err { - case storage.ErrOutOfOrderExemplar: - numExemplarsOutOfOrder++ - level.Debug(logger).Log("msg", "Out of order exemplar") - case storage.ErrDuplicateExemplar: - numExemplarsDuplicate++ - level.Debug(logger).Log("msg", "Duplicate exemplar") - case storage.ErrExemplarLabelLength: - numExemplarsLabelLength++ - level.Debug(logger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength) - default: - if err != nil { - level.Debug(logger).Log("msg", "Error ingesting exemplar", "err", err) + }); err != nil { + switch err { + case storage.ErrOutOfOrderExemplar: + numExemplarsOutOfOrder++ + level.Debug(exLogger).Log("msg", "Out of order exemplar") + case storage.ErrDuplicateExemplar: + numExemplarsDuplicate++ + level.Debug(exLogger).Log("msg", "Duplicate exemplar") + case storage.ErrExemplarLabelLength: + numExemplarsLabelLength++ + level.Debug(exLogger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength) + default: + if err != nil { + level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err) + } } } } } } - if numOutOfOrder > 0 { - level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numOutOfOrder) - errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "add %d samples", numOutOfOrder)) + if numLabelsOutOfOrder > 0 { + level.Warn(tLogger).Log("msg", "Error on series with out-of-order labels", "numDropped", numLabelsOutOfOrder) + errs.Add(errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add %d series", numLabelsOutOfOrder)) + } + if numLabelsDuplicates > 0 { + level.Warn(tLogger).Log("msg", "Error on series with duplicate labels", "numDropped", numLabelsDuplicates) + errs.Add(errors.Wrapf(labelpb.ErrDuplicateLabels, "add %d series", numLabelsDuplicates)) + } + if numLabelsEmpty > 0 { + level.Warn(tLogger).Log("msg", "Error on series with empty label name or value", "numDropped", numLabelsEmpty) + errs.Add(errors.Wrapf(labelpb.ErrEmptyLabels, "add %d series", numLabelsEmpty)) + } + + if numSamplesOutOfOrder > 0 { + level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numSamplesOutOfOrder) + errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "add %d samples", numSamplesOutOfOrder)) } - if numDuplicates > 0 { - level.Warn(tLogger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numDuplicates) - errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numDuplicates)) + if numSamplesDuplicates > 0 { + level.Warn(tLogger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numSamplesDuplicates) + errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numSamplesDuplicates)) } - if numOutOfBounds > 0 { - level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numOutOfBounds) - errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numOutOfBounds)) + if numSamplesOutOfBounds > 0 { + level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numSamplesOutOfBounds) + errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numSamplesOutOfBounds)) } + if numExemplarsOutOfOrder > 0 { level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order exemplars", "numDropped", numExemplarsOutOfOrder) errs.Add(errors.Wrapf(storage.ErrOutOfOrderExemplar, "add %d exemplars", numExemplarsOutOfOrder)) diff --git a/pkg/receive/writer_test.go b/pkg/receive/writer_test.go index de00734960..ec030b6d53 100644 --- a/pkg/receive/writer_test.go +++ b/pkg/receive/writer_test.go @@ -34,6 +34,94 @@ func TestWriter(t *testing.T) { expectedIngested []prompb.TimeSeries maxExemplars int64 }{ + "should error out on series with no labels": { + reqs: []*prompb.WriteRequest{ + { + Timeseries: []prompb.TimeSeries{ + { + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + { + Labels: []labelpb.ZLabel{{Name: "__name__", Value: ""}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, + }, + expectedErr: errors.Wrapf(labelpb.ErrEmptyLabels, "add 2 series"), + }, + "should succeed on series with valid labels": { + reqs: []*prompb.WriteRequest{ + { + Timeseries: []prompb.TimeSeries{ + { + Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, + }, + expectedErr: nil, + expectedIngested: []prompb.TimeSeries{ + { + Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, + "should error out and skip series with out-of-order labels": { + reqs: []*prompb.WriteRequest{ + { + Timeseries: []prompb.TimeSeries{ + { + Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "1"}, labelpb.ZLabel{Name: "Z", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, + }, + expectedErr: errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add 1 series"), + }, + "should error out and skip series with duplicate labels": { + reqs: []*prompb.WriteRequest{ + { + Timeseries: []prompb.TimeSeries{ + { + Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}, labelpb.ZLabel{Name: "z", Value: "1"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, + }, + expectedErr: errors.Wrapf(labelpb.ErrDuplicateLabels, "add 1 series"), + }, + "should error out and skip series with out-of-order labels; accept series with valid labels": { + reqs: []*prompb.WriteRequest{ + { + Timeseries: []prompb.TimeSeries{ + { + Labels: append(lbls, labelpb.ZLabel{Name: "A", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + { + Labels: append(lbls, labelpb.ZLabel{Name: "c", Value: "1"}, labelpb.ZLabel{Name: "d", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + { + Labels: append(lbls, labelpb.ZLabel{Name: "E", Value: "1"}, labelpb.ZLabel{Name: "f", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, + }, + expectedErr: errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add 2 series"), + expectedIngested: []prompb.TimeSeries{ + { + Labels: append(lbls, labelpb.ZLabel{Name: "c", Value: "1"}, labelpb.ZLabel{Name: "d", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, "should succeed on valid series with exemplars": { reqs: []*prompb.WriteRequest{{ Timeseries: []prompb.TimeSeries{ @@ -171,6 +259,16 @@ func TestWriter(t *testing.T) { testutil.Equals(t, testData.expectedErr.Error(), err.Error()) } } + + // On each expected series, assert we have a ref available. + a, err := app.Appender(context.Background()) + testutil.Ok(t, err) + gr := a.(storage.GetRef) + + for _, ts := range testData.expectedIngested { + ref, _ := gr.GetRef(labelpb.ZLabelsToPromLabels(ts.Labels)) + testutil.Assert(t, ref != 0, fmt.Sprintf("appender should have reference to series %v", ts)) + } }) } } diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 4130305792..93b0543d24 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -19,7 +19,13 @@ import ( "github.com/prometheus/prometheus/model/labels" ) -var sep = []byte{'\xff'} +var ( + ErrOutOfOrderLabels = errors.New("out of order labels") + ErrEmptyLabels = errors.New("label set contains a label with empty name or value") + ErrDuplicateLabels = errors.New("label set contains duplicate label names") + + sep = []byte{'\xff'} +) func noAllocString(buf []byte) string { return *(*string)(unsafe.Pointer(&buf)) @@ -365,6 +371,40 @@ func HashWithPrefix(prefix string, lbls []ZLabel) uint64 { return xxhash.Sum64(b) } +// ValidateLabels validates label names and values (checks for empty +// names and values, out of order labels and duplicate label names) +// Returns appropriate error if validation fails on a label. +func ValidateLabels(lbls []ZLabel) error { + if len(lbls) == 0 { + return ErrEmptyLabels + } + + // Check first label. + l0 := lbls[0] + if l0.Name == "" || l0.Value == "" { + return ErrEmptyLabels + } + + // Iterate over the rest, check each for empty / duplicates and + // check lexicographical (alphabetically) ordering. + for _, l := range lbls[1:] { + if l.Name == "" || l.Value == "" { + return ErrEmptyLabels + } + + if l.Name == l0.Name { + return ErrDuplicateLabels + } + + if l.Name < l0.Name { + return ErrOutOfOrderLabels + } + l0 = l + } + + return nil +} + // ZLabelSets is a sortable list of ZLabelSet. It assumes the label pairs in each ZLabelSet element are already sorted. type ZLabelSets []ZLabelSet diff --git a/pkg/store/labelpb/label_test.go b/pkg/store/labelpb/label_test.go index 81fbdbc984..ff41a4918a 100644 --- a/pkg/store/labelpb/label_test.go +++ b/pkg/store/labelpb/label_test.go @@ -66,6 +66,181 @@ func TestExtendLabels(t *testing.T) { testInjectExtLabels(testutil.NewTB(t)) } +func TestValidateLabels(t *testing.T) { + testCases := []struct { + labelSet []ZLabel + expectedErr error + }{ + { + // No labels at all. + labelSet: []ZLabel{}, + expectedErr: ErrEmptyLabels, + }, + { + // Empty label. + labelSet: []ZLabel{ + { + Name: "foo", + Value: "bar", + }, + { + Name: "", + Value: "baz", + }, + }, + expectedErr: ErrEmptyLabels, + }, + { + // Empty label (first label). + labelSet: []ZLabel{ + { + Name: "", + Value: "bar", + }, + { + Name: "foo", + Value: "baz", + }, + }, + expectedErr: ErrEmptyLabels, + }, + { + // Empty label (empty value). + labelSet: []ZLabel{ + { + Name: "foo", + Value: "bar", + }, + { + Name: "baz", + Value: "", + }, + }, + expectedErr: ErrEmptyLabels, + }, + { + // Out-of-order and duplicate label (out-of-order comes first). + labelSet: []ZLabel{ + { + Name: "foo", + Value: "bar", + }, + { + Name: "test", + Value: "baz", + }, + { + Name: "foo", + Value: "bar", + }, + }, + expectedErr: ErrOutOfOrderLabels, + }, + { + // Out-of-order and duplicate label (out-of-order comes first). + labelSet: []ZLabel{ + { + Name: "__test__", + Value: "baz", + }, + { + Name: "foo", + Value: "bar", + }, + { + Name: "foo", + Value: "bar", + }, + { + Name: "test", + Value: "baz", + }, + }, + expectedErr: ErrDuplicateLabels, + }, + { + // Empty and duplicate label (empty comes first). + labelSet: []ZLabel{ + { + Name: "foo", + Value: "bar", + }, + { + Name: "", + Value: "baz", + }, + { + Name: "foo", + Value: "bar", + }, + }, + expectedErr: ErrEmptyLabels, + }, + { + // Wrong order. + labelSet: []ZLabel{ + { + Name: "a", + Value: "bar", + }, + { + Name: "b", + Value: "baz", + }, + { + Name: "__name__", + Value: "test", + }, + }, + expectedErr: ErrOutOfOrderLabels, + }, + { + // Wrong order and duplicate (wrong order comes first). + labelSet: []ZLabel{ + { + Name: "a", + Value: "bar", + }, + { + Name: "__name__", + Value: "test", + }, + { + Name: "a", + Value: "bar", + }, + }, + expectedErr: ErrOutOfOrderLabels, + }, + { + // All good. + labelSet: []ZLabel{ + { + Name: "__name__", + Value: "test", + }, + { + Name: "a1", + Value: "bar", + }, + { + Name: "a2", + Value: "baz", + }, + }, + expectedErr: nil, + }, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("case %d", i+1), func(t *testing.T) { + err := ValidateLabels(tc.labelSet) + testutil.Equals(t, tc.expectedErr, err) + }) + } + +} + func BenchmarkExtendLabels(b *testing.B) { testInjectExtLabels(testutil.NewTB(b)) }