Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a healthcheck endpoint on the ingesters that distributors can use #741

Merged
3 changes: 2 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 28 additions & 58 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"flag"
"fmt"
"hash/fnv"
"io"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -44,12 +43,12 @@ var (
// Distributor is a storage.SampleAppender and a client.Querier which
// forwards appends and queries to individual ingesters.
type Distributor struct {
cfg Config
ring ring.ReadRing
clientsMtx sync.RWMutex
clients map[string]client.IngesterClient
quit chan struct{}
done chan struct{}
cfg Config
ring ring.ReadRing
clientsMtx sync.RWMutex
ingesterPool *ingester_client.IngesterPool
quit chan struct{}
done chan struct{}

billingClient *billing.Client

Expand All @@ -73,11 +72,12 @@ type Config struct {
BillingConfig billing.Config
IngesterClientConfig ingester_client.Config

ReplicationFactor int
RemoteTimeout time.Duration
ClientCleanupPeriod time.Duration
IngestionRateLimit float64
IngestionBurstSize int
ReplicationFactor int
RemoteTimeout time.Duration
ClientCleanupPeriod time.Duration
IngestionRateLimit float64
IngestionBurstSize int
HealthCheckIngesters bool

// for testing
ingesterClientFactory func(addr string, cfg ingester_client.Config) (client.IngesterClient, error)
Expand All @@ -93,6 +93,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).")
flag.BoolVar(&cfg.HealthCheckIngesters, "distributor.health-check-ingesters", false, "Run a health check on each ingester client during periodic cleanup.")
}

// New constructs a new Distributor
Expand All @@ -116,7 +117,7 @@ func New(cfg Config, ring ring.ReadRing) (*Distributor, error) {
d := &Distributor{
cfg: cfg,
ring: ring,
clients: map[string]client.IngesterClient{},
ingesterPool: ingester_client.NewIngesterPool(cfg.ingesterClientFactory, cfg.IngesterClientConfig, cfg.RemoteTimeout),
quit: make(chan struct{}),
done: make(chan struct{}),
billingClient: billingClient,
Expand Down Expand Up @@ -170,6 +171,9 @@ func (d *Distributor) Run() {
select {
case <-cleanupClients.C:
d.removeStaleIngesterClients()
if d.cfg.HealthCheckIngesters {
d.ingesterPool.CleanUnhealthy()
}
case <-d.quit:
close(d.done)
return
Expand All @@ -184,52 +188,18 @@ func (d *Distributor) Stop() {
}

func (d *Distributor) removeStaleIngesterClients() {
d.clientsMtx.Lock()
defer d.clientsMtx.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:-( I prefer the defer way, mainly because it makes the lock more robust to future modifications.

As the remote timeout is set to 2s, and the removeStateIngesterClients functions only runs every 15s, do we really need the extra goroutines and waitgroup?

Copy link
Contributor Author

@csmarchbanks csmarchbanks Mar 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also prefer the defer way. Since we have the mutex, even 1 timeout of 2s would block any other rules getting evaluated during that time which concerned me.

I could get rid of the wait group, but keep the goroutines. That should be ok since all the healthchecks should be done after at most 2s, and cleanup period is 15s by default. What do you think of that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course (d'oh).

How about making it all nice and inline, synchronous code that builds a new clients dict without holding the lock, and then replaces the old one under the lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split the logic for removing stale ingester clients and healthchecking the ingester clients. I think everything is properly deferred now. Let me know what you think!


ingesters := map[string]struct{}{}
for _, ing := range d.ring.GetAll() {
ingesters[ing.Addr] = struct{}{}
}

for addr, client := range d.clients {
for _, addr := range d.ingesterPool.RegisteredAddresses() {
if _, ok := ingesters[addr]; ok {
continue
}
level.Info(util.Logger).Log("msg", "removing stale ingester client", "addr", addr)
delete(d.clients, addr)

// Do the gRPC closing in the background since it might take a while and
// we're holding a mutex.
go func(addr string, closer io.Closer) {
if err := closer.Close(); err != nil {
level.Error(util.Logger).Log("msg", "error closing connection to ingester", "ingester", addr, "err", err)
}
}(addr, client.(io.Closer))
}
}

func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.IngesterClient, error) {
d.clientsMtx.RLock()
client, ok := d.clients[ingester.Addr]
d.clientsMtx.RUnlock()
if ok {
return client, nil
}

d.clientsMtx.Lock()
defer d.clientsMtx.Unlock()
client, ok = d.clients[ingester.Addr]
if ok {
return client, nil
}

client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.IngesterClientConfig)
if err != nil {
return nil, err
d.ingesterPool.RemoveClientFor(addr)
}
d.clients[ingester.Addr] = client
return client, nil
}

func tokenForLabels(userID string, labels []client.LabelPair) (uint32, error) {
Expand Down Expand Up @@ -412,7 +382,7 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe
}

func (d *Distributor) sendSamplesErr(ctx context.Context, ingester *ring.IngesterDesc, samples []*sampleTracker) error {
c, err := d.getClientFor(ingester)
c, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return err
}
Expand Down Expand Up @@ -449,7 +419,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .

metricNameMatcher, _, ok := util.ExtractMetricNameMatcherFromMatchers(matchers)

req, err := util.ToQueryRequest(from, to, matchers)
req, err := ingester_client.ToQueryRequest(from, to, matchers)
if err != nil {
return err
}
Expand Down Expand Up @@ -529,7 +499,7 @@ func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.Inge
}

func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc, req *client.QueryRequest) (model.Matrix, error) {
client, err := d.getClientFor(ing)
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
}
Expand All @@ -541,7 +511,7 @@ func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc,
return nil, err
}

return util.FromQueryResponse(resp), nil
return ingester_client.FromQueryResponse(resp), nil
}

// forAllIngesters runs f, in parallel, for all ingesters
Expand All @@ -550,7 +520,7 @@ func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{}
ingesters := d.ring.GetAll()
for _, ingester := range ingesters {
go func(ingester *ring.IngesterDesc) {
client, err := d.getClientFor(ingester)
client, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
errs <- err
return
Expand Down Expand Up @@ -609,7 +579,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName mod

// MetricsForLabelMatchers gets the metrics that match said matchers
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
req, err := util.ToMetricsForLabelMatchersRequest(from, through, matchers)
req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, matchers)
if err != nil {
return nil, err
}
Expand All @@ -623,7 +593,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through

metrics := map[model.Fingerprint]model.Metric{}
for _, resp := range resps {
ms := util.FromMetricsForLabelMatchersResponse(resp.(*client.MetricsForLabelMatchersResponse))
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp.(*client.MetricsForLabelMatchersResponse))
for _, m := range ms {
metrics[m.Fingerprint()] = m
}
Expand Down Expand Up @@ -677,7 +647,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
// Not using d.forAllIngesters(), so we can fail after first error.
ingesters := d.ring.GetAll()
for _, ingester := range ingesters {
client, err := d.getClientFor(ingester)
client, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -736,6 +706,6 @@ func (d *Distributor) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
numClientsDesc,
prometheus.GaugeValue,
float64(len(d.clients)),
float64(d.ingesterPool.Count()),
)
}
Loading