From b0fc0b3d5b9d3599901a8823bb0ea332523c0848 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Serv=C3=A9n=20Mar=C3=ADn?= Date: Wed, 9 Oct 2019 13:08:41 +0200 Subject: [PATCH] pkg/receive: avoid false positive conflict err MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit ensures that the `replicate` func does not falsely identify a replication error as a conflict. If the number of conflict errors during replication is less than the minimum replication threshold, then replication cannot be said to have failed due to a conflict and the request should be retried. Fixes: #1615 xref: #1563 Signed-off-by: Lucas Servén Marín --- pkg/receive/handler.go | 34 +++++++++++++++++---------- pkg/receive/handler_test.go | 47 +++++++++++++++++++++++++++---------- 2 files changed, 56 insertions(+), 25 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 6e4faf22dea..5a5ec8ab665 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -29,6 +29,9 @@ import ( "github.com/thanos-io/thanos/pkg/tracing" ) +// conflictErr is returned whenever an operation fails due to any conflict-type error. +var conflictErr = errors.New("conflict") + // Options for the web Handler. type Options struct { Writer *Writer @@ -232,7 +235,7 @@ func (h *Handler) receive(w http.ResponseWriter, r *http.Request) { // destined for the local node will be written to the receiver. // Time series will be replicated as necessary. if err := h.forward(r.Context(), tenant, rep, &wreq); err != nil { - if hasCause(err, isConflict) { + if countCause(err, isConflict) > 0 { http.Error(w, err.Error(), http.StatusConflict) return } @@ -427,6 +430,9 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri err := h.parallelizeRequests(ctx, tenant, replicas, wreqs) if errs, ok := err.(terrors.MultiError); ok { + if uint64(countCause(errs, isConflict)) >= (h.options.ReplicationFactor+1)/2 { + return errors.Wrap(conflictErr, "did not meet replication threshold") + } if uint64(len(errs)) >= (h.options.ReplicationFactor+1)/2 { return errors.Wrap(err, "did not meet replication threshold") } @@ -435,26 +441,28 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri return errors.Wrap(err, "could not replicate write request") } -// hasCause performs a depth-first search of the causes of a nested MultiError -// to determine if it contains an error that satisfies the given cause. -func hasCause(err error, f func(error) bool) bool { - if err == nil { - return false - } - err = errors.Cause(err) +// countCause counts the number of errors within the given error +// whose causes satisfy the given function. +// countCause will inspect the error's cause or, if the error is a MultiError, +// the cause of each contained error but will not traverse any deeper. +func countCause(err error, f func(error) bool) int { errs, ok := err.(terrors.MultiError) if !ok { - return f(err) + errs = []error{err} } + var n int for i := range errs { - if hasCause(errs[i], f) { - return true + if f(errors.Cause(errs[i])) { + n++ } } - return false + return n } // isConflict returns whether or not the given error represents a conflict. func isConflict(err error) bool { - return err == storage.ErrDuplicateSampleForTimestamp || err == storage.ErrOutOfOrderSample || err == storage.ErrOutOfBounds || err.Error() == strconv.Itoa(http.StatusConflict) + if err == nil { + return false + } + return err == conflictErr || err == storage.ErrDuplicateSampleForTimestamp || err == storage.ErrOutOfOrderSample || err == storage.ErrOutOfBounds || err.Error() == strconv.Itoa(http.StatusConflict) } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 87fc5c24f62..5527b17d9e6 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -10,23 +10,34 @@ import ( terrors "github.com/prometheus/prometheus/tsdb/errors" ) -func TestHasCause(t *testing.T) { +func TestCountCause(t *testing.T) { for _, tc := range []struct { name string err error f func(error) bool - out bool + out int }{ { name: "nil", f: isConflict, - out: false, + out: 0, }, { name: "nil multierror", err: terrors.MultiError([]error{}), f: isConflict, - out: false, + out: 0, + }, + { + name: "matching nil", + f: func(err error) bool { return err == nil }, + out: 1, + }, + { + name: "matching simple", + err: conflictErr, + f: isConflict, + out: 1, }, { name: "non-matching multierror", @@ -35,7 +46,7 @@ func TestHasCause(t *testing.T) { errors.New("bar"), }), f: isConflict, - out: false, + out: 0, }, { name: "nested non-matching multierror", @@ -44,7 +55,7 @@ func TestHasCause(t *testing.T) { errors.New("bar"), }), "baz"), f: isConflict, - out: false, + out: 0, }, { name: "deep nested non-matching multierror", @@ -56,7 +67,7 @@ func TestHasCause(t *testing.T) { }), }), "baz"), f: isConflict, - out: false, + out: 0, }, { name: "matching multierror", @@ -66,7 +77,19 @@ func TestHasCause(t *testing.T) { errors.New("bar"), }), f: isConflict, - out: true, + out: 1, + }, + { + name: "matching multierror many", + err: terrors.MultiError([]error{ + storage.ErrOutOfOrderSample, + conflictErr, + errors.New(strconv.Itoa(http.StatusConflict)), + errors.New("foo"), + errors.New("bar"), + }), + f: isConflict, + out: 3, }, { name: "nested matching multierror", @@ -76,7 +99,7 @@ func TestHasCause(t *testing.T) { errors.New("bar"), }), "baz"), f: isConflict, - out: true, + out: 0, }, { name: "deep nested matching multierror", @@ -89,11 +112,11 @@ func TestHasCause(t *testing.T) { errors.New("bar"), }), "baz"), f: isConflict, - out: true, + out: 0, }, } { - if hasCause(tc.err, tc.f) != tc.out { - t.Errorf("test case %s: expected %t, got %t", tc.name, tc.out, !tc.out) + if n := countCause(tc.err, tc.f); n != tc.out { + t.Errorf("test case %s: expected %d, got %d", tc.name, tc.out, n) } } }