Skip to content

Commit

Permalink
pkg/receive: avoid false positive conflict err (#1616)
Browse files Browse the repository at this point in the history
This commit ensures that the `replicate` func does not falsely identify
a replication error as a conflict. If the number of conflict errors
during replication is less than the minimum replication threshold, then
replication cannot be said to have failed due to a conflict and the
request should be retried.

Fixes: #1615

xref: #1563
Signed-off-by: Lucas Servén Marín <[email protected]>
  • Loading branch information
squat authored and brancz committed Oct 9, 2019
1 parent 19e59ef commit aaaa95f
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 26 deletions.
34 changes: 21 additions & 13 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

// conflictErr is returned whenever an operation fails due to any conflict-type error.
var conflictErr = errors.New("conflict")

// Options for the web Handler.
type Options struct {
Writer *Writer
Expand Down Expand Up @@ -232,7 +235,7 @@ func (h *Handler) receive(w http.ResponseWriter, r *http.Request) {
// destined for the local node will be written to the receiver.
// Time series will be replicated as necessary.
if err := h.forward(r.Context(), tenant, rep, &wreq); err != nil {
if hasCause(err, isConflict) {
if countCause(err, isConflict) > 0 {
http.Error(w, err.Error(), http.StatusConflict)
return
}
Expand Down Expand Up @@ -427,6 +430,9 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri

err := h.parallelizeRequests(ctx, tenant, replicas, wreqs)
if errs, ok := err.(terrors.MultiError); ok {
if uint64(countCause(errs, isConflict)) >= (h.options.ReplicationFactor+1)/2 {
return errors.Wrap(conflictErr, "did not meet replication threshold")
}
if uint64(len(errs)) >= (h.options.ReplicationFactor+1)/2 {
return errors.Wrap(err, "did not meet replication threshold")
}
Expand All @@ -435,26 +441,28 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri
return errors.Wrap(err, "could not replicate write request")
}

// hasCause performs a depth-first search of the causes of a nested MultiError
// to determine if it contains an error that satisfies the given cause.
func hasCause(err error, f func(error) bool) bool {
if err == nil {
return false
}
err = errors.Cause(err)
// countCause counts the number of errors within the given error
// whose causes satisfy the given function.
// countCause will inspect the error's cause or, if the error is a MultiError,
// the cause of each contained error but will not traverse any deeper.
func countCause(err error, f func(error) bool) int {
errs, ok := err.(terrors.MultiError)
if !ok {
return f(err)
errs = []error{err}
}
var n int
for i := range errs {
if hasCause(errs[i], f) {
return true
if f(errors.Cause(errs[i])) {
n++
}
}
return false
return n
}

// isConflict returns whether or not the given error represents a conflict.
func isConflict(err error) bool {
return err == storage.ErrDuplicateSampleForTimestamp || err == storage.ErrOutOfOrderSample || err == storage.ErrOutOfBounds || err.Error() == strconv.Itoa(http.StatusConflict)
if err == nil {
return false
}
return err == conflictErr || err == storage.ErrDuplicateSampleForTimestamp || err == storage.ErrOutOfOrderSample || err == storage.ErrOutOfBounds || err.Error() == strconv.Itoa(http.StatusConflict)
}
47 changes: 35 additions & 12 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,34 @@ import (
terrors "github.com/prometheus/prometheus/tsdb/errors"
)

func TestHasCause(t *testing.T) {
func TestCountCause(t *testing.T) {
for _, tc := range []struct {
name string
err error
f func(error) bool
out bool
out int
}{
{
name: "nil",
f: isConflict,
out: false,
out: 0,
},
{
name: "nil multierror",
err: terrors.MultiError([]error{}),
f: isConflict,
out: false,
out: 0,
},
{
name: "matching nil",
f: func(err error) bool { return err == nil },
out: 1,
},
{
name: "matching simple",
err: conflictErr,
f: isConflict,
out: 1,
},
{
name: "non-matching multierror",
Expand All @@ -35,7 +46,7 @@ func TestHasCause(t *testing.T) {
errors.New("bar"),
}),
f: isConflict,
out: false,
out: 0,
},
{
name: "nested non-matching multierror",
Expand All @@ -44,7 +55,7 @@ func TestHasCause(t *testing.T) {
errors.New("bar"),
}), "baz"),
f: isConflict,
out: false,
out: 0,
},
{
name: "deep nested non-matching multierror",
Expand All @@ -56,7 +67,7 @@ func TestHasCause(t *testing.T) {
}),
}), "baz"),
f: isConflict,
out: false,
out: 0,
},
{
name: "matching multierror",
Expand All @@ -66,7 +77,19 @@ func TestHasCause(t *testing.T) {
errors.New("bar"),
}),
f: isConflict,
out: true,
out: 1,
},
{
name: "matching multierror many",
err: terrors.MultiError([]error{
storage.ErrOutOfOrderSample,
conflictErr,
errors.New(strconv.Itoa(http.StatusConflict)),
errors.New("foo"),
errors.New("bar"),
}),
f: isConflict,
out: 3,
},
{
name: "nested matching multierror",
Expand All @@ -76,7 +99,7 @@ func TestHasCause(t *testing.T) {
errors.New("bar"),
}), "baz"),
f: isConflict,
out: true,
out: 0,
},
{
name: "deep nested matching multierror",
Expand All @@ -89,11 +112,11 @@ func TestHasCause(t *testing.T) {
errors.New("bar"),
}), "baz"),
f: isConflict,
out: true,
out: 0,
},
} {
if hasCause(tc.err, tc.f) != tc.out {
t.Errorf("test case %s: expected %t, got %t", tc.name, tc.out, !tc.out)
if n := countCause(tc.err, tc.f); n != tc.out {
t.Errorf("test case %s: expected %d, got %d", tc.name, tc.out, n)
}
}
}
2 changes: 1 addition & 1 deletion pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (r *Writer) Write(wreq *prompb.WriteRequest) error {
}
}

// Append as many valid samples as possible, but keep track of the errors
// Append as many valid samples as possible, but keep track of the errors.
for _, s := range t.Samples {
_, err = app.Add(lset, s.Timestamp, s.Value)
switch err {
Expand Down

0 comments on commit aaaa95f

Please sign in to comment.