Skip to content

Commit

Permalink
Fixes out of order samples handling (#1563)
Browse files Browse the repository at this point in the history
* Fixes out of order samples handling

Signed-off-by: Devin Trejo <[email protected]>

* Ignore errcheck for rollback

Signed-off-by: Devin Trejo <[email protected]>

* Process all valid samples in remote-write call

Signed-off-by: Devin Trejo <[email protected]>

* pkg/receive: surface conflict error

This commit adds support to the receive component to identify if a
failed request is due to an ErrOutOfOrderSample error. If a request is
determined to have failed due to this reason, the handler responds with
a 409 Conflict status.

Fixes: #1509
Signed-off-by: Lucas Servén Marín <[email protected]>

* Remove rollback from errcheck exclude

Signed-off-by: Devin Trejo <[email protected]>

* Return conflict for duplicate samples

Signed-off-by: Devin Trejo <[email protected]>

* Summarize any fast-add errors rather than logging every occurrence of the error.

Signed-off-by: Devin Trejo <[email protected]>
  • Loading branch information
dtrejod authored and brancz committed Oct 9, 2019
1 parent 4c1761a commit 19e59ef
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 6 deletions.
33 changes: 31 additions & 2 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
terrors "github.com/prometheus/prometheus/tsdb/errors"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -231,6 +232,10 @@ func (h *Handler) receive(w http.ResponseWriter, r *http.Request) {
// destined for the local node will be written to the receiver.
// Time series will be replicated as necessary.
if err := h.forward(r.Context(), tenant, rep, &wreq); err != nil {
if hasCause(err, isConflict) {
http.Error(w, err.Error(), http.StatusConflict)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -368,7 +373,7 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic
return
}
if res.StatusCode != http.StatusOK {
err = errors.New(res.Status)
err = errors.New(strconv.Itoa(res.StatusCode))
level.Error(h.logger).Log("msg", "forwarding returned non-200 status", "err", err, "endpoint", endpoint)
ec <- err
return
Expand Down Expand Up @@ -423,9 +428,33 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri
err := h.parallelizeRequests(ctx, tenant, replicas, wreqs)
if errs, ok := err.(terrors.MultiError); ok {
if uint64(len(errs)) >= (h.options.ReplicationFactor+1)/2 {
return errors.New("did not meet replication threshold")
return errors.Wrap(err, "did not meet replication threshold")
}
return nil
}
return errors.Wrap(err, "could not replicate write request")
}

// hasCause performs a depth-first search of the causes of a nested MultiError
// to determine if it contains an error that satisfies the given cause.
func hasCause(err error, f func(error) bool) bool {
if err == nil {
return false
}
err = errors.Cause(err)
errs, ok := err.(terrors.MultiError)
if !ok {
return f(err)
}
for i := range errs {
if hasCause(errs[i], f) {
return true
}
}
return false
}

// isConflict returns whether or not the given error represents a conflict.
func isConflict(err error) bool {
return err == storage.ErrDuplicateSampleForTimestamp || err == storage.ErrOutOfOrderSample || err == storage.ErrOutOfBounds || err.Error() == strconv.Itoa(http.StatusConflict)
}
99 changes: 99 additions & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package receive

import (
"net/http"
"strconv"
"testing"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/storage"
terrors "github.com/prometheus/prometheus/tsdb/errors"
)

func TestHasCause(t *testing.T) {
for _, tc := range []struct {
name string
err error
f func(error) bool
out bool
}{
{
name: "nil",
f: isConflict,
out: false,
},
{
name: "nil multierror",
err: terrors.MultiError([]error{}),
f: isConflict,
out: false,
},
{
name: "non-matching multierror",
err: terrors.MultiError([]error{
errors.New("foo"),
errors.New("bar"),
}),
f: isConflict,
out: false,
},
{
name: "nested non-matching multierror",
err: errors.Wrap(terrors.MultiError([]error{
errors.New("foo"),
errors.New("bar"),
}), "baz"),
f: isConflict,
out: false,
},
{
name: "deep nested non-matching multierror",
err: errors.Wrap(terrors.MultiError([]error{
errors.New("foo"),
terrors.MultiError([]error{
errors.New("bar"),
errors.New("qux"),
}),
}), "baz"),
f: isConflict,
out: false,
},
{
name: "matching multierror",
err: terrors.MultiError([]error{
storage.ErrOutOfOrderSample,
errors.New("foo"),
errors.New("bar"),
}),
f: isConflict,
out: true,
},
{
name: "nested matching multierror",
err: errors.Wrap(terrors.MultiError([]error{
storage.ErrOutOfOrderSample,
errors.New("foo"),
errors.New("bar"),
}), "baz"),
f: isConflict,
out: true,
},
{
name: "deep nested matching multierror",
err: errors.Wrap(terrors.MultiError([]error{
terrors.MultiError([]error{
errors.New("qux"),
errors.New(strconv.Itoa(http.StatusConflict)),
}),
errors.New("foo"),
errors.New("bar"),
}), "baz"),
f: isConflict,
out: true,
},
} {
if hasCause(tc.err, tc.f) != tc.out {
t.Errorf("test case %s: expected %t, got %t", tc.name, tc.out, !tc.out)
}
}
}
42 changes: 38 additions & 4 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package receive

import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/prompb"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
terrors "github.com/prometheus/prometheus/tsdb/errors"
)

// Appendable returns an Appender.
Expand All @@ -27,11 +29,19 @@ func NewWriter(logger log.Logger, app Appendable) *Writer {
}

func (r *Writer) Write(wreq *prompb.WriteRequest) error {
var (
numOutOfOrder = 0
numDuplicates = 0
numOutOfBounds = 0
)

app, err := r.append.Appender()

if err != nil {
return errors.Wrap(err, "failed to get appender")
}

var errs terrors.MultiError
for _, t := range wreq.Timeseries {
lset := make(labels.Labels, len(t.Labels))
for j := range t.Labels {
Expand All @@ -41,17 +51,41 @@ func (r *Writer) Write(wreq *prompb.WriteRequest) error {
}
}

// Append as many valid samples as possible, but keep track of the errors
for _, s := range t.Samples {
_, err = app.Add(lset, s.Timestamp, s.Value)
if err != nil {
return errors.Wrap(err, "failed to non-fast add")
switch err {
case nil:
continue
case storage.ErrOutOfOrderSample:
numOutOfOrder++
level.Debug(r.logger).Log("msg", "Out of order sample", "lset", lset.String(), "sample", s.String())
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
level.Debug(r.logger).Log("msg", "Duplicate sample for timestamp", "lset", lset.String(), "sample", s.String())
case storage.ErrOutOfBounds:
numOutOfBounds++
level.Debug(r.logger).Log("msg", "Out of bounds metric", "lset", lset.String(), "sample", s.String())
}
}
}

if numOutOfOrder > 0 {
level.Warn(r.logger).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder)
errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "failed to non-fast add %d samples", numOutOfOrder))
}
if numDuplicates > 0 {
level.Warn(r.logger).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates)
errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "failed to non-fast add %d samples", numDuplicates))
}
if numOutOfBounds > 0 {
level.Warn(r.logger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds)
errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "failed to non-fast add %d samples", numOutOfBounds))
}

if err := app.Commit(); err != nil {
return errors.Wrap(err, "failed to commit")
errs.Add(errors.Wrap(err, "failed to commit"))
}

return nil
return errs.Err()
}

0 comments on commit 19e59ef

Please sign in to comment.