-
Notifications
You must be signed in to change notification settings - Fork 812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a healthcheck endpoint on the ingesters that distributors can use #741
Add a healthcheck endpoint on the ingesters that distributors can use #741
Conversation
pkg/ingester/client/cortex.proto
Outdated
} | ||
|
||
message HealthCheckResponse { | ||
enum ServingStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do UNKNOWN or NOT_SERVING ever get sent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They do not right now, that is the format suggested for health-checking by the gRPC docs. We could make a more cortex specific format if we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will also be compatible with: https://godoc.org/google.golang.org/grpc/health
I could also try to spend some time importing the proto definitions from grpc/health, rather than duplicating them if we decide to stick with the general format instead of a cortex specific one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'd say either use those protos or make our response an empty response.
General approach to this is to hide the behaviour behind a flag that defaults to off; new code gets rolled out with the behaviour off, then a flag changed can be rolled out to enable it. |
pkg/distributor/distributor.go
Outdated
ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) | ||
ctx = user.InjectOrgID(ctx, "0") | ||
resp, err := client.Check(ctx, &ingester_client.HealthCheckRequest{}) | ||
cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can we move that up to the ctx, cancel := ...
line and make it a defer please? Just causes minor confusion as it doesn't match the common pattern.
@@ -184,29 +184,55 @@ func (d *Distributor) Stop() { | |||
} | |||
|
|||
func (d *Distributor) removeStaleIngesterClients() { | |||
d.clientsMtx.Lock() | |||
defer d.clientsMtx.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:-( I prefer the defer way, mainly because it makes the lock more robust to future modifications.
As the remote timeout is set to 2s, and the removeStateIngesterClients functions only runs every 15s, do we really need the extra goroutines and waitgroup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also prefer the defer way. Since we have the mutex, even 1 timeout of 2s would block any other rules getting evaluated during that time which concerned me.
I could get rid of the wait group, but keep the goroutines. That should be ok since all the healthchecks should be done after at most 2s, and cleanup period is 15s by default. What do you think of that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course (d'oh).
How about making it all nice and inline, synchronous code that builds a new clients dict without holding the lock, and then replaces the old one under the lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I split the logic for removing stale ingester clients and healthchecking the ingester clients. I think everything is properly deferred now. Let me know what you think!
Oh dear - I'm fine with fixes like this going in work around other issues, but do you have any idea why gRPC is getting so unwell? |
Does this help with #157 - use this endpoint as the heartbeat instead of the consul timestamp? |
Not as it stands (ingester clients will be created on demand), but it could be made to do so... |
@tomwilkie Thanks for the comments! I wish I knew why gRPC is getting so unwell, I have been banging my head against a wall for awhile about this. If you or someone else have any ideas I would be happy to test them out. Also, do you or @bboreham think there is anything we could do in this PR to make it easier to heartbeat with respect to #157? I don't want to do all the work for that here, but would be happy to make it easier in the future. |
No ideas, but we had something similar with Bigtable; I just added a timeout and it unstuck it. Are the ingester timeouts working okay?
The two systems are quite decoupled right now; lets get #279 merged and see what shakes out. |
Sounds good on waiting for #279. The ingester timeouts seem to be working ok, but every request will start timing out after 2s (push) or 10s (query), and that ingester connection never manages to correct itself. |
Is it worth destroying clients that experience a timeout then? |
Are you asking: Instead of doing a healthcheck, just delete the client anytime they experience a timeout in regular use? |
Yeah, just as a thought... |
It would work, and was something I was considering, but liked the healthcheck a bit more. I am happy to rework it based on what everyone thinks though. A couple pros/cons off the top of my head
Cons:
|
70f618e
to
db98e77
Compare
You had me at "Any future endpoints on the ingester..." |
pkg/distributor/distributor.go
Outdated
@@ -28,6 +28,7 @@ import ( | |||
ingester_client "github.com/weaveworks/cortex/pkg/ingester/client" | |||
"github.com/weaveworks/cortex/pkg/ring" | |||
"github.com/weaveworks/cortex/pkg/util" | |||
"google.golang.org/grpc/health/grpc_health_v1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this import up to the block above please? The pattern is should be [std lib import, 3rd party imports, repo local import]. I thought this was standard (as described here https://github.com/golang/go/wiki/CodeReviewComments#imports) but its turns out it might just be me being picky.
pkg/distributor/distributor.go
Outdated
client, err := d.getClientFor(ingester) | ||
if err != nil { | ||
d.removeClientFor(ingester, err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err != nil, client == nil and the rest of this function will panic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think it err != nil you don't need to remove the client either, you should just log and return (or return an error).
pkg/distributor/distributor.go
Outdated
ctx = user.InjectOrgID(ctx, "0") | ||
|
||
resp, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) | ||
if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please either return the error (and log it in the calling function) or log it here. Slight preference for returning it and logging it in the caller.
I think its looking much easier to follow wrt locking and concurrency. A couple of comments, then we should be good to go. It might also be worth extracting all this to a separate file (and even package), as there is starting to be enough logic to justify it. |
Yeah, the more I think about it the more I like the idea of putting it in |
Also, would you mind enabling CircleCI on your fork so we get test runs? It should be free. |
I've done a pull/push to weaveworks/cortex so we get one test run now. |
I did the refactor, and definitely think it cleans up distributor quite a bit. I also moved utils/compat to the ingester/client package since otherwise ingester/client could not import utils due to a circular dependency. Right now the loops for cleaning up stale clients, and health checking each client remain in distributor.go in order to not complicate what should just be a cache. If you want me to move those into the client package I can do that. |
4903a94
to
2a2d543
Compare
Not sure what's wrong with my circle ci yet... hopefully I will get some time to look at it tonight/tomorrow morning. EDIT - got it working, needed to specify an |
73c3a95
to
41475c6
Compare
@bboreham or @tomwilkie Another review would be great when you get a chance! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts attached to specific lines.
Can I ask that the PR description be modified to match the result of all changes to date?
pkg/distributor/distributor.go
Outdated
@@ -93,6 +93,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { | |||
flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.") | |||
flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") | |||
flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") | |||
flag.BoolVar(&cfg.HealthCheckIngesters, "distributor.health-check-ingesters", false, "Run a health check on each ingester client during the cleanup period.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we say "during periodic cleanup" ? First off I thought "the cleanup period" was maybe a period during shutdown when we clean up.
pkg/ingester/client/cache.go
Outdated
type Factory func(addr string, cfg Config) (IngesterClient, error) | ||
|
||
// IngesterClientCache holds a cache of ingester clients | ||
type IngesterClientCache struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is "cache" the right word? Elsewhere I've seen this called a "connection pool", except we have max one connection per endpoint. Maybe just some more explanation of the intended uses would help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like IngesterPool, I will change it to that, and add some more explanations.
pkg/distributor/distributor.go
Outdated
ingesters := map[string]struct{}{} | ||
for _, ing := range d.ring.GetAll() { | ||
ingesters[ing.Addr] = struct{}{} | ||
} | ||
|
||
for addr, client := range d.clients { | ||
for _, addr := range d.clientCache.RegisteredAddresses() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we are doing two kinds of cleanup now: removing cache entries which are "stale" because the ring no longer references an ingester at that address, and removing entries which fail healthcheck. Do we still need the first one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could get away with only the healthcheck loop (after everyone has upgraded their ingesters and turned the flag on). However, I like the difference in expected behavior/logging from removing stale ingesters (expected) vs. failing ingesters (unexpected).
pkg/distributor/distributor.go
Outdated
func (d *Distributor) healthCheckAndRemoveIngesters() { | ||
for _, addr := range d.clientCache.RegisteredAddresses() { | ||
client, err := d.clientCache.GetClientFor(addr) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can only happen due to some race between RegisteredAddresses()
and GetClientFor()
- maybe change the cache API to remove the possibility? Or move this whole loop into the cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored this to the pool, but it still has an if around it in case someone deletes the entry from the pool while a previous healthcheck is happening. Right now that shouldn't happen, but that would be an annoying bug to run into if things change.
PR description updated |
|
||
pool.GetClientFor("2") | ||
if pool.Count() != 2 { | ||
t.Errorf("Expected Count() = 2, got %d", pool.Count()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit wordy - we use testify/assert
elsewhere to reduce to assert.Equal(t, 2, pool.Count())
This puts a Check endpoint into the ingesters that allows our ingester clients to do a healthcheck loop to make sure their connection is still ok. As part of the maintenance loop to remove stale ingesters, we will now also (in the background) healthcheck each remaining ingester, and if it is not ok delete the entry in distributor.clients. The client will then be remade the next time it is needed by
getClientFor
.A couple questions:
Another solution would be to just periodically close the ingester clients, perhaps put an age field on them and only use them until then.
This PR fixes: #702
Current state of this PR:
Check
grpc endpoint, used to health check themIngesterPool
IngesterPool
has a function,CleanUnhealthy()
, that will clean up unhealthy ingester clientsCleanUnhealthy()
is run as part of the distributor periodic cleanup when thedistributor.health-check-ingesters
flag is turned on