-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix ketama quorum #5910
Fix ketama quorum #5910
Conversation
a37d49f
to
c120cfb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this @fpetkovski, I think the approach in general looks good and somewhat converges with #5791 as you mentioned. Couple of thoughts:
- I'm not sure if error determination works correctly here, it might happen that we will have a mix of different failure reasons for different replication batches (some may end up with 409, some with 500) - in such case I think we don't have other option but to tell client to retry (i.e. return server error).
- Would be good to add more tests cases with different numbers of nodes / replication factor + E2E tests, perhaps taken over from Receiver: Fix quorum handling for all hashing algorithms #5791
57f7fab
to
4437ca1
Compare
7542349
to
2ec06ca
Compare
The quorum calculation is currently broken when using the Ketama hashring. The reasons are explained in detail in issue thanos-io#5784. This commit fixes quorum calculation by tracking successfull writes for each individual time-series inside a remote-write request. The commit also removes the replicate() method inside the Handler and moves the entire logic of fanning out and calculating success into the fanoutForward() method. Signed-off-by: Filip Petkovski <[email protected]>
2ec06ca
to
572bddf
Compare
Thanks everyone for the review. We had a sync with @matej-g and it seems like the only correct way to verify quorum is to track successful writes for each individual time-series. I've updated this PR to reflect that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we spoke, overall this approach seems fine and more understandable than with replicating batches. One more part to figure out is the error handling / determination.
On the other hand, I'm uncertain about the performance implications, as we're changing the characteristics of how replication in receiver works. That's on both micro level (as we'll now track each series replication instead of batches) and macro level (we'll send fewer but bigger requests). It would be nice to run some of the benchmarks we have for handler as well as see this in action on a cluster with some real traffic or a synthetic load test.
pkg/receive/handler.go
Outdated
if seriesReplicated { | ||
errs.Add(rerr.Err()) | ||
} else if uint64(len(rerr)) >= failureThreshold { | ||
cause := determineWriteErrorCause(rerr.Err(), quorum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'll also have to change how we determine the HTTP error we return to client, when this cause
error bubbles up back to handleRequest
. Right now, we'll return error that occurs the most or the original multi error, since we use threshold 1
. But this might be incorrect, as if a cause
error for any individual series replications will be server error, we have to retry the whole request. I think solution would be:
- Return server error, if any of the
cause
errors is an unknown error / unavailable / not ready (cases when we have to retry). Tricky but less important part here might be exactly which error to return if we have a mixed bag of server errors - the behavior of client should be same though regardless of the error message we decide to return. - Otherwise we should only have conflict errors and can return conflict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing we should be mindful of here is when cause
will return the original multi-error (and same actually above for the if
branch), we are putting a multi-error inside of the errs
multi-error, which can lead to erroneous 5xx
as described in #5407 (comment) .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are correct. However I wonder if we already have this issue in main because we calculate the top-level cause the same way using threshold=1
. So if we have 2 batches with conflict and 1 batch with server error, we will return conflict to the user and not retry the request.
In any case, I would also prefer to solve this problem now since it can lead to data loss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I am not sure about is what should the error code be when we try to replicate a series and we get one success, one server error and one client error. Right now I believe we return client-error, but if we change the rules, we would return a server error. It also means that in case of 2 conflicts (samples already exist in TSDB) and 1 server error, we would still return a server error even though that might not be necessary.
Maybe for replicating an individual series we can treat client-errors as success and only return 5xx when two replicas fail. For the overall response, we can return 5xx if any series has a 5xx.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I believe we basically have to treat conflict as if we have 'success'. It's just important to return the correct status to the upstream so if we have any conflicts in the replication, we'll want to return this to the client. Otherwise 5xx and OK should be clear (5xx if any series fails quorum; OK if no failed quorums or conflicts).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I think the MultiError
and determineWriteErrorCause
are not good abstractions for this. The determineWriteErrorCause
function is overloaded and tries to determine the error for both cases.
Because of this, I added two error types writeErrors
, and replicationErrors
with their own own Cause()
methods. The writeErrors
cause prioritizes server errors, while the one from replicationErrors
is mostly identical to determineWriteErrorCause
and is used for determining the error of replicating a single series.
This way we always use the Cause
method and depending on the error type we will bubble the appropriate error.
5510b7c
to
5383af3
Compare
12dd5d5
to
b0a227b
Compare
Signed-off-by: fpetkovski <[email protected]>
4c7536b
to
0c6087b
Compare
Signed-off-by: fpetkovski <[email protected]>
2c4ed70
to
a19fa93
Compare
Here are the benchmark results with the per-series error tracking:
There is a notable difference when we have actual errors, but this is likely expected because we have more errors to work with, and more objects to manage. |
Signed-off-by: fpetkovski <[email protected]>
a19fa93
to
a08ba98
Compare
Signed-off-by: Filip Petkovski <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Latest changes are looking good, differentiating between write and replication errors makes the error handling more digestible 👍 I have couple more nits and would be good if in general we could add few more comments here and there in the forward
method to make the 'funneling' from writer errors -> replication errors -> final error a bit more obvious. I'm also wondering since now we have quite a lot of error handling code, if it would make sense to extract these types and methods into separate file (e.g. receive/errors.go
).
We also load tested the changes with @philipgough on our test cluster but could not see any difference in performance. The microbenchmark runs also look acceptable for me. So performance-wise I'd expect this to be all good.
} | ||
|
||
expErrs := expectedErrors{ | ||
{err: errUnavailable, cause: isUnavailable}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, we should not expect unavailable here, as that is expected on node level. I think we can only expect not ready (if TSDB appender is not ready) or conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can still have an unavailable error because write errors can come either from writing to a local TSDB, or from sending a request for replication to a different node. And that node can return unavailable in various different cases:
Lines 837 to 850 in afdb30e
switch determineWriteErrorCause(err, 1) { | |
case nil: | |
return &storepb.WriteResponse{}, nil | |
case errNotReady: | |
return nil, status.Error(codes.Unavailable, err.Error()) | |
case errUnavailable: | |
return nil, status.Error(codes.Unavailable, err.Error()) | |
case errConflict: | |
return nil, status.Error(codes.AlreadyExists, err.Error()) | |
case errBadReplica: | |
return nil, status.Error(codes.InvalidArgument, err.Error()) | |
default: | |
return nil, status.Error(codes.Internal, err.Error()) | |
} |
If the cause of a replicationErr
is an unavailable error, then this error will bubble up to the write errors and we need to be able to detect it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, you're right, I see the flow now. I got confused because I associated write errors only in narrow sense (i.e. TSDB write errors) but we're also using them to capture remote write errors on line 626 that can originate in node's unavailability etc.
Signed-off-by: Filip Petkovski <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR looks good to me now 👍, great job @fpetkovski.
One more theoretical concern I discussed with @fpetkovski was what kind of effect would the increased resource usage for error handling have on an 'unhappy' path kind of scenario (e.g. we have some nodes down in our hashring or clients keep sending us invalid data, resulting in an increased error rate in the system - since on microbenchmarks we see this could consume ~20% more memory, would this translate to an overall increase of memory usage in a receive replica? Could that lead to further destabilization of a hashring?). We could run some additional load test to try out this hypothesis (cc @philipgough).
With this in mind, I'm still happy to go forward and iterate on this solution if any performance issues pop up.
Still I'd also like more eyes on this, nominating @bwplotka @philipgough @douglascamata 😜
@@ -51,196 +47,6 @@ import ( | |||
"github.com/thanos-io/thanos/pkg/testutil" | |||
) | |||
|
|||
func TestDetermineWriteErrorCause(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could replace this with couple of test cases for replicationErrors
and writeErrors
cause?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggesting some changes to variable names to make understand this code slightly easier.
pkg/receive/handler.go
Outdated
return err | ||
} | ||
key := endpointReplica{endpoint: endpoint, replica: rn} | ||
er, ok := wreqs[key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this variable named er
receive a better name? I have no clue what an er
is and it's easy to mistake it for err
and even endpointReplica
(often variables of this type have the name er
, which is something else I think we have to slowly move away from).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I renamed this variable to writeTarget
for clarity.
pkg/receive/handler.go
Outdated
if er.endpoint == h.options.Endpoint { | ||
go func(er endpointReplica) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment here about the er
variable name, which also applies to other occurrences: it gives no clue of what it is in this context and can be easily confused with err
. Could we rename it? Some suggestions: replicationKey
, replicaKey
, replicationID
, endpointReplica
.
pkg/receive/handler.go
Outdated
@@ -607,68 +644,41 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e | |||
tLogger = log.With(h.logger, logTags) | |||
} | |||
|
|||
ec := make(chan error) | |||
ec := make(chan writeResponse) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could ec
receive a better name? It's used many times in the next hundreds of lines and the name isn't clear. Suggestions: errorChannel
, if that's even what is actually is. 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to responses
.
Signed-off-by: Filip Petkovski <[email protected]>
38cb138
to
4c870b4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for the work, @fpetkovski. 🙇
This is a LATM (looks amazing to me)! 🚀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job, especially on tests. LGTM 👍🏽
Although I would really want to batch those requests at some point.
// It will return cause of each contained error but will not traverse any deeper. | ||
func determineWriteErrorCause(err error, threshold int) error { | ||
// errorSet is a set of errors. | ||
type errorSet struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Long term, perhaps it would be better to just use merrors
and some Dedup function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also consider some compatibility or usage of the official error (un)wrapping coming with Go 1.20: https://tip.golang.org/doc/go1.20#errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks awesome 👍
* Fix quorum calculation for Ketama hashring The quorum calculation is currently broken when using the Ketama hashring. The reasons are explained in detail in issue thanos-io#5784. This commit fixes quorum calculation by tracking successfull writes for each individual time-series inside a remote-write request. The commit also removes the replicate() method inside the Handler and moves the entire logic of fanning out and calculating success into the fanoutForward() method. Signed-off-by: Filip Petkovski <[email protected]> * Fix error propagation Signed-off-by: fpetkovski <[email protected]> * Fix writer errors Signed-off-by: fpetkovski <[email protected]> * Separate write from replication errors Signed-off-by: fpetkovski <[email protected]> * Add back replication metric Signed-off-by: Filip Petkovski <[email protected]> * Address PR comments Signed-off-by: Filip Petkovski <[email protected]> * Address code review comments Signed-off-by: Filip Petkovski <[email protected]> Signed-off-by: Filip Petkovski <[email protected]> Signed-off-by: fpetkovski <[email protected]>
The quorum calculation is currently broken when using the Ketama
hashring. The reasons are explained in detail in issue #5784.
This commit fixes quorum calculation by tracking successfull writes
for each individual time-series inside a remote-write request.
The commit also removes the replicate() method inside the Handler
and moves the entire logic of fanning out and calculating success
into the fanoutForward() method.
Signed-off-by: Filip Petkovski [email protected]
Fixes #5784