Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use BasicLifecycler for Compactor and autoforget unhealthy instances #3771

Merged
merged 4 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [ENHANCEMENT] Store-gateway: Add experimental alternate implementation of index-header reader that does not use memory mapped files. The index-header reader is expected to improve stability of the store-gateway. You can enable this implementation with the flag `-blocks-storage.bucket-store.index-header.stream-reader-enabled`. #3639 #3691 #3703
* [ENHANCEMENT] Query-scheduler: add `cortex_query_scheduler_cancelled_requests_total` metric to track the number of requests that are already cancelled when dequeued. #3696
* [ENHANCEMENT] Store-gateway: add `cortex_bucket_store_partitioner_extended_ranges_total` metric to keep track of the ranges that the partitioner decided to overextend and merge in order to save API call to the object storage. #3769
* [ENHANCEMENT] Compactor: Auto-forget unhealthy compactors after ten failed ring heartbeats. #3771
* [BUGFIX] Log the names of services that are not yet running rather than `unsupported value type` when calling `/ready` and some services are not running. #3625
* [BUGFIX] Alertmanager: Fix template spurious deletion with relative data dir. #3604
* [BUGFIX] Security: update prometheus/exporter-toolkit for CVE-2022-46146. #3675
Expand Down
95 changes: 65 additions & 30 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
Expand All @@ -37,8 +38,12 @@ import (
)

const (
// CompactorRingKey is the key under which we store the compactors ring in the KVStore.
CompactorRingKey = "compactor"
// ringKey is the key under which we store the compactors ring in the KVStore.
ringKey = "compactor"

// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
// in the ring will be automatically removed after.
ringAutoForgetUnhealthyPeriods = 10
)

const (
Expand Down Expand Up @@ -225,7 +230,7 @@ type MultitenantCompactor struct {
bucketClient objstore.Bucket

// Ring used for sharding compactions.
ringLifecycler *ring.Lifecycler
ringLifecycler *ring.BasicLifecycler
ring *ring.Ring
ringSubservices *services.Manager
ringSubservicesWatcher *services.FailureWatcher
Expand Down Expand Up @@ -365,7 +370,11 @@ func newMultitenantCompactor(

// Start the compactor.
func (c *MultitenantCompactor) starting(ctx context.Context) error {
var err error
var (
err error
ctxTimeout context.Context
cancel context.CancelFunc
)

// Create bucket client.
c.bucketClient, err = c.bucketClientFactory(ctx)
Expand All @@ -383,42 +392,39 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error {
c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient)

// Initialize the compactors ring if sharding is enabled.
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", CompactorRingKey, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
c.ring, c.ringLifecycler, err = newRingAndLifecycler(c.compactorCfg.ShardingRing, c.logger, c.registerer)
if err != nil {
return errors.Wrap(err, "unable to initialize compactor ring lifecycler")
return err
}

c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", CompactorRingKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
c.ringSubservices, err = services.NewManager(c.ringLifecycler, c.ring)
if err != nil {
return errors.Wrap(err, "unable to initialize compactor ring")
return errors.Wrap(err, "unable to start compactor ring dependencies")
56quarters marked this conversation as resolved.
Show resolved Hide resolved
}

c.ringSubservices, err = services.NewManager(c.ringLifecycler, c.ring)
if err == nil {
c.ringSubservicesWatcher = services.NewFailureWatcher()
c.ringSubservicesWatcher.WatchManager(c.ringSubservices)

err = services.StartManagerAndAwaitHealthy(ctx, c.ringSubservices)
c.ringSubservicesWatcher = services.NewFailureWatcher()
c.ringSubservicesWatcher.WatchManager(c.ringSubservices)
if err = c.ringSubservices.StartAsync(ctx); err != nil {
return errors.Wrap(err, "unable to start compactor ring dependencies")
}

if err != nil {
ctxTimeout, cancel = context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout)
defer cancel()
if err = c.ringSubservices.AwaitHealthy(ctxTimeout); err != nil {
pracucci marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrap(err, "unable to start compactor ring dependencies")
}

// If sharding is enabled we should wait until this instance is
// ACTIVE within the ring. This MUST be done before starting the
// any other component depending on the users scanner, because the
// users scanner depends on the ring (to check whether an user belongs
// to this shard or not).
// If sharding is enabled we should wait until this instance is ACTIVE within the ring. This
// MUST be done before starting any other component depending on the users scanner, because
// the users scanner depends on the ring (to check whether a user belongs to this shard or not).
level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring")

ctxWithTimeout, cancel := context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout)
ctxTimeout, cancel = context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout)
pracucci marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
if err := ring.WaitInstanceState(ctxWithTimeout, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil {
level.Error(c.logger).Log("msg", "compactor failed to become ACTIVE in the ring", "err", err)
return err
if err = ring.WaitInstanceState(ctxTimeout, c.ring, c.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
return errors.Wrap(err, "compactor failed to become ACTIVE in the ring")
}

level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring")

// In the event of a cluster cold start or scale up of 2+ compactor instances at the same
Expand Down Expand Up @@ -458,6 +464,35 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error {
return nil
}

func newRingAndLifecycler(cfg RingConfig, logger log.Logger, reg prometheus.Registerer) (*ring.Ring, *ring.BasicLifecycler, error) {
kvStore, err := kv.NewClient(cfg.KVStore, ring.GetCodec(), kv.RegistererWithKVName(reg, "compactor-lifecycler"), logger)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize compactors' KV store")
}

lifecyclerCfg, err := cfg.ToBasicLifecyclerConfig(logger)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to build compactors' lifecycler config")
}

var delegate ring.BasicLifecyclerDelegate
delegate = ring.NewInstanceRegisterDelegate(ring.ACTIVE, lifecyclerCfg.NumTokens)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, logger)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*lifecyclerCfg.HeartbeatTimeout, delegate, logger)

compactorsLifecycler, err := ring.NewBasicLifecycler(lifecyclerCfg, "compactor", ringKey, kvStore, delegate, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize compactors' lifecycler")
}

compactorsRing, err := ring.NewWithStoreClientAndStrategy(cfg.ToRingConfig(), "compactor", ringKey, kvStore, ring.NewDefaultReplicationStrategy(), prometheus.WrapRegistererWithPrefix("cortex_", reg), logger)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're reusing kvStore for the ring client too. kvStore is created with name compactor-lifecycler which doesn't apply if we reuse it for the ring client too. In the old implementation, the name of the KV store used for the client was called compactor-ring. I see two options:

  1. Use two different KV stores, to keep previous behaviour
  2. Rename kvStore from compactor-lifecycler to just compactor, then please double check if there's any dashboard or alert to update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will take a look at this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to go back to use two different KV stores and maintain the previous behavior rather than have compactors using a different setup than other components.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also exposes a problem with the way the kvstore is created for compactors (via this PR) and distributors: The prometheus.Registerer used for the kvstore isn't prefixed with cortex_ so it emits metrics like kv_request_duration_seconds_count compared to ingesters which use cortex_kv_request_duration_seconds_count

if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize compactors' ring client")
}

return compactorsRing, compactorsLifecycler, nil
}

func (c *MultitenantCompactor) stopping(_ error) error {
ctx := context.Background()

Expand Down Expand Up @@ -745,11 +780,11 @@ type shardingStrategy interface {
type splitAndMergeShardingStrategy struct {
allowedTenants *util.AllowedTenants
ring *ring.Ring
ringLifecycler *ring.Lifecycler
ringLifecycler *ring.BasicLifecycler
configProvider ConfigProvider
}

func newSplitAndMergeShardingStrategy(allowedTenants *util.AllowedTenants, ring *ring.Ring, ringLifecycler *ring.Lifecycler, configProvider ConfigProvider) *splitAndMergeShardingStrategy {
func newSplitAndMergeShardingStrategy(allowedTenants *util.AllowedTenants, ring *ring.Ring, ringLifecycler *ring.BasicLifecycler, configProvider ConfigProvider) *splitAndMergeShardingStrategy {
return &splitAndMergeShardingStrategy{
allowedTenants: allowedTenants,
ring: ring,
Expand All @@ -766,7 +801,7 @@ func (s *splitAndMergeShardingStrategy) blocksCleanerOwnUser(userID string) (boo

r := s.ring.ShuffleShard(userID, s.configProvider.CompactorTenantShardSize(userID))

return instanceOwnsTokenInRing(r, s.ringLifecycler.Addr, userID)
return instanceOwnsTokenInRing(r, s.ringLifecycler.GetInstanceAddr(), userID)
}

// ALL compactors should plan jobs for all users.
Expand All @@ -777,7 +812,7 @@ func (s *splitAndMergeShardingStrategy) compactorOwnUser(userID string) (bool, e

r := s.ring.ShuffleShard(userID, s.configProvider.CompactorTenantShardSize(userID))

return r.HasInstance(s.ringLifecycler.ID), nil
return r.HasInstance(s.ringLifecycler.GetInstanceID()), nil
}

// Only single compactor should execute the job.
Expand All @@ -789,7 +824,7 @@ func (s *splitAndMergeShardingStrategy) ownJob(job *Job) (bool, error) {

r := s.ring.ShuffleShard(job.UserID(), s.configProvider.CompactorTenantShardSize(job.UserID()))

return instanceOwnsTokenInRing(r, s.ringLifecycler.Addr, job.ShardingKey())
return instanceOwnsTokenInRing(r, s.ringLifecycler.GetInstanceAddr(), job.ShardingKey())
}

func instanceOwnsTokenInRing(r ring.ReadRing, instanceAddr string, key string) (bool, error) {
Expand Down
59 changes: 29 additions & 30 deletions pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package compactor

import (
"flag"
"fmt"
"os"
"time"

Expand All @@ -20,6 +21,13 @@ import (
util_log "github.com/grafana/mimir/pkg/util/log"
)

const (
// ringNumTokens is how many tokens each compactor should have in the ring. We use a
// safe default instead of exposing to config option to the user in order to simplify
// the config.
ringNumTokens = 512
)

// RingConfig masks the ring lifecycler config which contains
// many options not really required by the compactors ring. This config
// is used to strip down the config to the minimum, and avoid confusion
Expand Down Expand Up @@ -76,41 +84,32 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.WaitActiveInstanceTimeout, "compactor.ring.wait-active-instance-timeout", 10*time.Minute, "Timeout for waiting on compactor to become ACTIVE in the ring.")
}

// ToLifecyclerConfig returns a LifecyclerConfig based on the compactor
// ring config.
func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {
// We have to make sure that the ring.LifecyclerConfig and ring.Config
// defaults are preserved
lc := ring.LifecyclerConfig{}
rc := ring.Config{}
func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error) {
instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, logger)
if err != nil {
return ring.BasicLifecyclerConfig{}, err
}

instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort)

return ring.BasicLifecyclerConfig{
ID: cfg.InstanceID,
Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort),
HeartbeatPeriod: cfg.HeartbeatPeriod,
HeartbeatTimeout: cfg.HeartbeatTimeout,
TokensObservePeriod: cfg.ObservePeriod,
NumTokens: ringNumTokens,
KeepInstanceInTheRingOnShutdown: false,
}, nil
}

flagext.DefaultValues(&lc)
func (cfg *RingConfig) ToRingConfig() ring.Config {
rc := ring.Config{}
flagext.DefaultValues(&rc)

// Configure ring
rc.KVStore = cfg.KVStore
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
rc.ReplicationFactor = 1

// Configure lifecycler
lc.RingConfig = rc
lc.RingConfig.SubringCacheDisabled = true
lc.ListenPort = cfg.ListenPort
lc.Addr = cfg.InstanceAddr
lc.Port = cfg.InstancePort
lc.ID = cfg.InstanceID
lc.InfNames = cfg.InstanceInterfaceNames
lc.UnregisterOnShutdown = true
lc.HeartbeatPeriod = cfg.HeartbeatPeriod
lc.HeartbeatTimeout = cfg.HeartbeatTimeout
lc.ObservePeriod = cfg.ObservePeriod
lc.JoinAfter = 0
lc.MinReadyDuration = 0
lc.FinalSleep = 0

// We use a safe default instead of exposing to config option to the user
// in order to simplify the config.
lc.NumTokens = 512

return lc
return rc
}
72 changes: 0 additions & 72 deletions pkg/compactor/compactor_ring_test.go

This file was deleted.

Loading