From 82b7b5750eee3db8a00277bd9411c7a335754056 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Fri, 16 Dec 2022 16:03:27 -0500 Subject: [PATCH 1/4] Use BasicLifecycler for Compactor and autoforget unhealthy instances Change the compactor to use BasicLifecycler instead of Lifecycler so that we can make use of autoforget functionality. This works around an issue where ownership of tenants is retained by unhealthy instances. The behavior of compactors while starting changes slightly because of the use of BasicLifecycler instead of Lifecycler. * Lifecycler didn't join the ring when starting, only after running so the `wait_active_instance_timeout` was only needed while waiting for the instance to become active. * BasicLifecycler joins the ring while starting, thus we need to apply the `wait_active_instance_timeout` when starting the lifecycler, _not_ just when waiting for the instance to become active. See #3708 Fixes #1588 Signed-off-by: Nick Pillitteri --- CHANGELOG.md | 1 + pkg/compactor/compactor.go | 95 ++++++++++++++++++++++----------- pkg/compactor/compactor_ring.go | 38 +++++++++++++ pkg/compactor/compactor_test.go | 19 ++++--- pkg/distributor/distributor.go | 4 +- 5 files changed, 115 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5844c904c98..fc1d8876770 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ * [ENHANCEMENT] Store-gateway: Add experimental alternate implementation of index-header reader that does not use memory mapped files. The index-header reader is expected to improve stability of the store-gateway. You can enable this implementation with the flag `-blocks-storage.bucket-store.index-header.stream-reader-enabled`. #3639 #3691 #3703 * [ENHANCEMENT] Query-scheduler: add `cortex_query_scheduler_cancelled_requests_total` metric to track the number of requests that are already cancelled when dequeued. #3696 * [ENHANCEMENT] Store-gateway: add `cortex_bucket_store_partitioner_extended_ranges_total` metric to keep track of the ranges that the partitioner decided to overextend and merge in order to save API call to the object storage. #3769 +* [ENHANCEMENT] Compactor: Auto-forget unhealthy compactors after ten failed ring heartbeats. #3771 * [BUGFIX] Log the names of services that are not yet running rather than `unsupported value type` when calling `/ready` and some services are not running. #3625 * [BUGFIX] Alertmanager: Fix template spurious deletion with relative data dir. #3604 * [BUGFIX] Security: update prometheus/exporter-toolkit for CVE-2022-46146. #3675 diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 4e599858a88..44cc4680d5c 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/pkg/errors" @@ -37,8 +38,12 @@ import ( ) const ( - // CompactorRingKey is the key under which we store the compactors ring in the KVStore. - CompactorRingKey = "compactor" + // ringKey is the key under which we store the compactors ring in the KVStore. + ringKey = "compactor" + + // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance + // in the ring will be automatically removed after. + ringAutoForgetUnhealthyPeriods = 10 ) const ( @@ -225,7 +230,7 @@ type MultitenantCompactor struct { bucketClient objstore.Bucket // Ring used for sharding compactions. - ringLifecycler *ring.Lifecycler + ringLifecycler *ring.BasicLifecycler ring *ring.Ring ringSubservices *services.Manager ringSubservicesWatcher *services.FailureWatcher @@ -365,7 +370,11 @@ func newMultitenantCompactor( // Start the compactor. func (c *MultitenantCompactor) starting(ctx context.Context) error { - var err error + var ( + err error + ctxTimeout context.Context + cancel context.CancelFunc + ) // Create bucket client. c.bucketClient, err = c.bucketClientFactory(ctx) @@ -383,42 +392,39 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error { c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient) // Initialize the compactors ring if sharding is enabled. - lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig() - c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", CompactorRingKey, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) + c.ring, c.ringLifecycler, err = newRingAndLifecycler(c.compactorCfg.ShardingRing, c.logger, c.registerer) if err != nil { - return errors.Wrap(err, "unable to initialize compactor ring lifecycler") + return err } - c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", CompactorRingKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) + c.ringSubservices, err = services.NewManager(c.ringLifecycler, c.ring) if err != nil { - return errors.Wrap(err, "unable to initialize compactor ring") + return errors.Wrap(err, "unable to start compactor ring dependencies") } - c.ringSubservices, err = services.NewManager(c.ringLifecycler, c.ring) - if err == nil { - c.ringSubservicesWatcher = services.NewFailureWatcher() - c.ringSubservicesWatcher.WatchManager(c.ringSubservices) - - err = services.StartManagerAndAwaitHealthy(ctx, c.ringSubservices) + c.ringSubservicesWatcher = services.NewFailureWatcher() + c.ringSubservicesWatcher.WatchManager(c.ringSubservices) + if err = c.ringSubservices.StartAsync(ctx); err != nil { + return errors.Wrap(err, "unable to start compactor ring dependencies") } - if err != nil { + ctxTimeout, cancel = context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout) + defer cancel() + if err = c.ringSubservices.AwaitHealthy(ctxTimeout); err != nil { return errors.Wrap(err, "unable to start compactor ring dependencies") } - // If sharding is enabled we should wait until this instance is - // ACTIVE within the ring. This MUST be done before starting the - // any other component depending on the users scanner, because the - // users scanner depends on the ring (to check whether an user belongs - // to this shard or not). + // If sharding is enabled we should wait until this instance is ACTIVE within the ring. This + // MUST be done before starting any other component depending on the users scanner, because + // the users scanner depends on the ring (to check whether a user belongs to this shard or not). level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring") - ctxWithTimeout, cancel := context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout) + ctxTimeout, cancel = context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout) defer cancel() - if err := ring.WaitInstanceState(ctxWithTimeout, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil { - level.Error(c.logger).Log("msg", "compactor failed to become ACTIVE in the ring", "err", err) - return err + if err = ring.WaitInstanceState(ctxTimeout, c.ring, c.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { + return errors.Wrap(err, "compactor failed to become ACTIVE in the ring") } + level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring") // In the event of a cluster cold start or scale up of 2+ compactor instances at the same @@ -458,6 +464,35 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error { return nil } +func newRingAndLifecycler(cfg RingConfig, logger log.Logger, reg prometheus.Registerer) (*ring.Ring, *ring.BasicLifecycler, error) { + kvStore, err := kv.NewClient(cfg.KVStore, ring.GetCodec(), kv.RegistererWithKVName(reg, "compactor-lifecycler"), logger) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to initialize compactors' KV store") + } + + lifecyclerCfg, err := cfg.ToBasicLifecyclerConfig(logger) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to build compactors' lifecycler config") + } + + var delegate ring.BasicLifecyclerDelegate + delegate = ring.NewInstanceRegisterDelegate(ring.ACTIVE, lifecyclerCfg.NumTokens) + delegate = ring.NewLeaveOnStoppingDelegate(delegate, logger) + delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*lifecyclerCfg.HeartbeatTimeout, delegate, logger) + + compactorsLifecycler, err := ring.NewBasicLifecycler(lifecyclerCfg, "compactor", ringKey, kvStore, delegate, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg)) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to initialize compactors' lifecycler") + } + + compactorsRing, err := ring.NewWithStoreClientAndStrategy(cfg.ToRingConfig(), "compactor", ringKey, kvStore, ring.NewDefaultReplicationStrategy(), prometheus.WrapRegistererWithPrefix("cortex_", reg), logger) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to initialize compactors' ring client") + } + + return compactorsRing, compactorsLifecycler, nil +} + func (c *MultitenantCompactor) stopping(_ error) error { ctx := context.Background() @@ -745,11 +780,11 @@ type shardingStrategy interface { type splitAndMergeShardingStrategy struct { allowedTenants *util.AllowedTenants ring *ring.Ring - ringLifecycler *ring.Lifecycler + ringLifecycler *ring.BasicLifecycler configProvider ConfigProvider } -func newSplitAndMergeShardingStrategy(allowedTenants *util.AllowedTenants, ring *ring.Ring, ringLifecycler *ring.Lifecycler, configProvider ConfigProvider) *splitAndMergeShardingStrategy { +func newSplitAndMergeShardingStrategy(allowedTenants *util.AllowedTenants, ring *ring.Ring, ringLifecycler *ring.BasicLifecycler, configProvider ConfigProvider) *splitAndMergeShardingStrategy { return &splitAndMergeShardingStrategy{ allowedTenants: allowedTenants, ring: ring, @@ -766,7 +801,7 @@ func (s *splitAndMergeShardingStrategy) blocksCleanerOwnUser(userID string) (boo r := s.ring.ShuffleShard(userID, s.configProvider.CompactorTenantShardSize(userID)) - return instanceOwnsTokenInRing(r, s.ringLifecycler.Addr, userID) + return instanceOwnsTokenInRing(r, s.ringLifecycler.GetInstanceAddr(), userID) } // ALL compactors should plan jobs for all users. @@ -777,7 +812,7 @@ func (s *splitAndMergeShardingStrategy) compactorOwnUser(userID string) (bool, e r := s.ring.ShuffleShard(userID, s.configProvider.CompactorTenantShardSize(userID)) - return r.HasInstance(s.ringLifecycler.ID), nil + return r.HasInstance(s.ringLifecycler.GetInstanceID()), nil } // Only single compactor should execute the job. @@ -789,7 +824,7 @@ func (s *splitAndMergeShardingStrategy) ownJob(job *Job) (bool, error) { r := s.ring.ShuffleShard(job.UserID(), s.configProvider.CompactorTenantShardSize(job.UserID())) - return instanceOwnsTokenInRing(r, s.ringLifecycler.Addr, job.ShardingKey()) + return instanceOwnsTokenInRing(r, s.ringLifecycler.GetInstanceAddr(), job.ShardingKey()) } func instanceOwnsTokenInRing(r ring.ReadRing, instanceAddr string, key string) (bool, error) { diff --git a/pkg/compactor/compactor_ring.go b/pkg/compactor/compactor_ring.go index 1d03087517a..b593164704e 100644 --- a/pkg/compactor/compactor_ring.go +++ b/pkg/compactor/compactor_ring.go @@ -7,6 +7,7 @@ package compactor import ( "flag" + "fmt" "os" "time" @@ -20,6 +21,13 @@ import ( util_log "github.com/grafana/mimir/pkg/util/log" ) +const ( + // ringNumTokens is how many tokens each compactor should have in the ring. We use a + // safe default instead of exposing to config option to the user in order to simplify + // the config. + ringNumTokens = 512 +) + // RingConfig masks the ring lifecycler config which contains // many options not really required by the compactors ring. This config // is used to strip down the config to the minimum, and avoid confusion @@ -76,6 +84,36 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.DurationVar(&cfg.WaitActiveInstanceTimeout, "compactor.ring.wait-active-instance-timeout", 10*time.Minute, "Timeout for waiting on compactor to become ACTIVE in the ring.") } +func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error) { + instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, logger) + if err != nil { + return ring.BasicLifecyclerConfig{}, err + } + + instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) + + return ring.BasicLifecyclerConfig{ + ID: cfg.InstanceID, + Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), + HeartbeatPeriod: cfg.HeartbeatPeriod, + HeartbeatTimeout: cfg.HeartbeatTimeout, + TokensObservePeriod: cfg.ObservePeriod, + NumTokens: ringNumTokens, + KeepInstanceInTheRingOnShutdown: false, + }, nil +} + +func (cfg *RingConfig) ToRingConfig() ring.Config { + rc := ring.Config{} + flagext.DefaultValues(&rc) + + rc.KVStore = cfg.KVStore + rc.HeartbeatTimeout = cfg.HeartbeatTimeout + rc.ReplicationFactor = 1 + + return rc +} + // ToLifecyclerConfig returns a LifecyclerConfig based on the compactor // ring config. func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index c36a3377574..b5bdef81435 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1426,9 +1426,12 @@ func findCompactorByUserID(compactors []*MultitenantCompactor, logs []*concurren func removeIgnoredLogs(input []string) []string { ignoredLogStringsMap := map[string]struct{}{ + // Since we moved to the component logger from the global logger for the ring in dskit these lines are now expected but are just ring setup information. `level=info component=compactor msg="ring doesn't exist in KV store yet"`: {}, `level=info component=compactor msg="not loading tokens from file, tokens file path is empty"`: {}, + `level=info component=compactor msg="tokens verification succeeded" ring=compactor`: {}, + `level=info component=compactor msg="waiting stable tokens" ring=compactor`: {}, `level=info component=compactor msg="instance not found in ring, adding with no tokens" ring=compactor`: {}, `level=debug component=compactor msg="JoinAfter expired" ring=compactor`: {}, `level=info component=compactor msg="auto-joining cluster after timeout" ring=compactor`: {}, @@ -1436,17 +1439,18 @@ func removeIgnoredLogs(input []string) []string { `level=info component=compactor msg="changing instance state from" old_state=ACTIVE new_state=LEAVING ring=compactor`: {}, `level=error component=compactor msg="failed to set state to LEAVING" ring=compactor err="Changing instance state from LEAVING -> LEAVING is disallowed"`: {}, `level=error component=compactor msg="failed to set state to LEAVING" ring=compactor err="Changing instance state from JOINING -> LEAVING is disallowed"`: {}, - `level=debug component=compactor msg="unregistering instance from ring" ring=compactor`: {}, - `level=info component=compactor msg="instance removed from the KV store" ring=compactor`: {}, + `level=info component=compactor msg="unregistering instance from ring" ring=compactor`: {}, + `level=info component=compactor msg="instance removed from the ring" ring=compactor`: {}, `level=info component=compactor msg="observing tokens before going ACTIVE" ring=compactor`: {}, `level=info component=compactor msg="lifecycler entering final sleep before shutdown" final_sleep=0s`: {}, + `level=info component=compactor msg="ring lifecycler is shutting down" ring=compactor`: {}, } out := make([]string, 0, len(input)) for i := 0; i < len(input); i++ { log := input[i] - if strings.Contains(log, "block.MetaFetcher") || strings.Contains(log, "block.BaseFetcher") { + if strings.Contains(log, "block.MetaFetcher") || strings.Contains(log, "block.BaseFetcher") || strings.Contains(log, "instance not found in the ring") { continue } @@ -1758,18 +1762,13 @@ func TestMultitenantCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { // Set ObservePeriod to longer than the timeout period to mock a timeout while waiting on ring to become ACTIVE cfg.ShardingRing.ObservePeriod = time.Second * 10 - c, _, _, logs, _ := prepare(t, cfg, bucketClient) + c, _, _, _, _ := prepare(t, cfg, bucketClient) // Try to start the compactor with a bad consul kv-store. The err := services.StartAndAwaitRunning(context.Background(), c) // Assert that the compactor timed out - assert.Equal(t, context.DeadlineExceeded, err) - - assert.ElementsMatch(t, []string{ - `level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`, - `level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`, - }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) + require.ErrorIs(t, err, context.DeadlineExceeded) } type ownUserReason int diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 7d3caa8e1b5..d70b3b4b7e3 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -456,7 +456,7 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l } var delegate ring.BasicLifecyclerDelegate - delegate = ring.NewInstanceRegisterDelegate(ring.ACTIVE, ringNumTokens) + delegate = ring.NewInstanceRegisterDelegate(ring.ACTIVE, lifecyclerCfg.NumTokens) delegate = newHealthyInstanceDelegate(instanceCount, cfg.HeartbeatTimeout, delegate) delegate = ring.NewLeaveOnStoppingDelegate(delegate, logger) delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.HeartbeatTimeout, delegate, logger) @@ -466,7 +466,7 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l return nil, nil, errors.Wrap(err, "failed to initialize distributors' lifecycler") } - distributorsRing, err := ring.New(cfg.ToRingConfig(), "distributor", distributorRingKey, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg)) + distributorsRing, err := ring.NewWithStoreClientAndStrategy(cfg.ToRingConfig(), "distributor", distributorRingKey, kvStore, ring.NewDefaultReplicationStrategy(), prometheus.WrapRegistererWithPrefix("cortex_", reg), logger) if err != nil { return nil, nil, errors.Wrap(err, "failed to initialize distributors' ring client") } From 7051b65ff25b384af2e36eeb12c949a6d71f578b Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Mon, 19 Dec 2022 14:47:05 -0500 Subject: [PATCH 2/4] Remove unused method Signed-off-by: Nick Pillitteri --- pkg/compactor/compactor_ring.go | 39 --------------- pkg/compactor/compactor_ring_test.go | 72 ---------------------------- 2 files changed, 111 deletions(-) delete mode 100644 pkg/compactor/compactor_ring_test.go diff --git a/pkg/compactor/compactor_ring.go b/pkg/compactor/compactor_ring.go index b593164704e..2190d4de4bd 100644 --- a/pkg/compactor/compactor_ring.go +++ b/pkg/compactor/compactor_ring.go @@ -113,42 +113,3 @@ func (cfg *RingConfig) ToRingConfig() ring.Config { return rc } - -// ToLifecyclerConfig returns a LifecyclerConfig based on the compactor -// ring config. -func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { - // We have to make sure that the ring.LifecyclerConfig and ring.Config - // defaults are preserved - lc := ring.LifecyclerConfig{} - rc := ring.Config{} - - flagext.DefaultValues(&lc) - flagext.DefaultValues(&rc) - - // Configure ring - rc.KVStore = cfg.KVStore - rc.HeartbeatTimeout = cfg.HeartbeatTimeout - rc.ReplicationFactor = 1 - - // Configure lifecycler - lc.RingConfig = rc - lc.RingConfig.SubringCacheDisabled = true - lc.ListenPort = cfg.ListenPort - lc.Addr = cfg.InstanceAddr - lc.Port = cfg.InstancePort - lc.ID = cfg.InstanceID - lc.InfNames = cfg.InstanceInterfaceNames - lc.UnregisterOnShutdown = true - lc.HeartbeatPeriod = cfg.HeartbeatPeriod - lc.HeartbeatTimeout = cfg.HeartbeatTimeout - lc.ObservePeriod = cfg.ObservePeriod - lc.JoinAfter = 0 - lc.MinReadyDuration = 0 - lc.FinalSleep = 0 - - // We use a safe default instead of exposing to config option to the user - // in order to simplify the config. - lc.NumTokens = 512 - - return lc -} diff --git a/pkg/compactor/compactor_ring_test.go b/pkg/compactor/compactor_ring_test.go deleted file mode 100644 index 617dce7dca8..00000000000 --- a/pkg/compactor/compactor_ring_test.go +++ /dev/null @@ -1,72 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only -// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/compactor/compactor_ring_test.go -// Provenance-includes-license: Apache-2.0 -// Provenance-includes-copyright: The Cortex Authors. - -package compactor - -import ( - "testing" - "time" - - "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/ring" - "github.com/stretchr/testify/assert" -) - -func TestRingConfig_DefaultConfigToLifecyclerConfig(t *testing.T) { - cfg := RingConfig{} - expected := ring.LifecyclerConfig{} - flagext.DefaultValues(&cfg, &expected) - - // The default config of the compactor ring must be the exact same - // of the default lifecycler config, except few options which are - // intentionally overridden - expected.ListenPort = cfg.ListenPort - expected.RingConfig.ReplicationFactor = 1 - expected.RingConfig.SubringCacheDisabled = true - expected.RingConfig.KVStore.Store = "memberlist" - expected.NumTokens = 512 - expected.MinReadyDuration = 0 - expected.FinalSleep = 0 - expected.InfNames = cfg.InstanceInterfaceNames - expected.HeartbeatPeriod = 15 * time.Second - - assert.Equal(t, expected, cfg.ToLifecyclerConfig()) -} - -func TestRingConfig_CustomConfigToLifecyclerConfig(t *testing.T) { - cfg := RingConfig{} - expected := ring.LifecyclerConfig{} - flagext.DefaultValues(&cfg, &expected) - - // Customize the compactor ring config - cfg.HeartbeatPeriod = 1 * time.Second - cfg.HeartbeatTimeout = 10 * time.Second - cfg.InstanceID = "test" - cfg.InstanceInterfaceNames = []string{"abc1"} - cfg.InstancePort = 10 - cfg.InstanceAddr = "1.2.3.4" - cfg.ListenPort = 10 - - // The lifecycler config should be generated based upon the compactor - // ring config - expected.HeartbeatPeriod = cfg.HeartbeatPeriod - expected.HeartbeatTimeout = cfg.HeartbeatTimeout - expected.RingConfig.HeartbeatTimeout = cfg.HeartbeatTimeout - expected.RingConfig.SubringCacheDisabled = true - expected.RingConfig.KVStore.Store = "memberlist" - expected.ID = cfg.InstanceID - expected.InfNames = cfg.InstanceInterfaceNames - expected.Port = cfg.InstancePort - expected.Addr = cfg.InstanceAddr - expected.ListenPort = cfg.ListenPort - - // Hardcoded config - expected.RingConfig.ReplicationFactor = 1 - expected.NumTokens = 512 - expected.MinReadyDuration = 0 - expected.FinalSleep = 0 - - assert.Equal(t, expected, cfg.ToLifecyclerConfig()) -} From 5f60d63e914c70c64f6350e6099921ab48dae407 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Tue, 20 Dec 2022 13:10:57 -0500 Subject: [PATCH 3/4] Code review changes Signed-off-by: Nick Pillitteri --- pkg/compactor/compactor.go | 18 ++++++------------ pkg/distributor/distributor.go | 5 +++-- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 44cc4680d5c..30702884a2c 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -370,11 +370,7 @@ func newMultitenantCompactor( // Start the compactor. func (c *MultitenantCompactor) starting(ctx context.Context) error { - var ( - err error - ctxTimeout context.Context - cancel context.CancelFunc - ) + var err error // Create bucket client. c.bucketClient, err = c.bucketClientFactory(ctx) @@ -399,7 +395,7 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error { c.ringSubservices, err = services.NewManager(c.ringLifecycler, c.ring) if err != nil { - return errors.Wrap(err, "unable to start compactor ring dependencies") + return errors.Wrap(err, "unable to create compactor ring dependencies") } c.ringSubservicesWatcher = services.NewFailureWatcher() @@ -408,7 +404,7 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error { return errors.Wrap(err, "unable to start compactor ring dependencies") } - ctxTimeout, cancel = context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout) + ctxTimeout, cancel := context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout) defer cancel() if err = c.ringSubservices.AwaitHealthy(ctxTimeout); err != nil { return errors.Wrap(err, "unable to start compactor ring dependencies") @@ -418,9 +414,6 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error { // MUST be done before starting any other component depending on the users scanner, because // the users scanner depends on the ring (to check whether a user belongs to this shard or not). level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring") - - ctxTimeout, cancel = context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout) - defer cancel() if err = ring.WaitInstanceState(ctxTimeout, c.ring, c.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { return errors.Wrap(err, "compactor failed to become ACTIVE in the ring") } @@ -465,6 +458,7 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error { } func newRingAndLifecycler(cfg RingConfig, logger log.Logger, reg prometheus.Registerer) (*ring.Ring, *ring.BasicLifecycler, error) { + reg = prometheus.WrapRegistererWithPrefix("cortex_", reg) kvStore, err := kv.NewClient(cfg.KVStore, ring.GetCodec(), kv.RegistererWithKVName(reg, "compactor-lifecycler"), logger) if err != nil { return nil, nil, errors.Wrap(err, "failed to initialize compactors' KV store") @@ -480,12 +474,12 @@ func newRingAndLifecycler(cfg RingConfig, logger log.Logger, reg prometheus.Regi delegate = ring.NewLeaveOnStoppingDelegate(delegate, logger) delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*lifecyclerCfg.HeartbeatTimeout, delegate, logger) - compactorsLifecycler, err := ring.NewBasicLifecycler(lifecyclerCfg, "compactor", ringKey, kvStore, delegate, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg)) + compactorsLifecycler, err := ring.NewBasicLifecycler(lifecyclerCfg, "compactor", ringKey, kvStore, delegate, logger, reg) if err != nil { return nil, nil, errors.Wrap(err, "failed to initialize compactors' lifecycler") } - compactorsRing, err := ring.NewWithStoreClientAndStrategy(cfg.ToRingConfig(), "compactor", ringKey, kvStore, ring.NewDefaultReplicationStrategy(), prometheus.WrapRegistererWithPrefix("cortex_", reg), logger) + compactorsRing, err := ring.New(cfg.ToRingConfig(), "compactor", ringKey, logger, reg) if err != nil { return nil, nil, errors.Wrap(err, "failed to initialize compactors' ring client") } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d70b3b4b7e3..01f0838a7f2 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -445,6 +445,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove // newRingAndLifecycler creates a new distributor ring and lifecycler with all required lifecycler delegates func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger log.Logger, reg prometheus.Registerer) (*ring.Ring, *ring.BasicLifecycler, error) { + reg = prometheus.WrapRegistererWithPrefix("cortex_", reg) kvStore, err := kv.NewClient(cfg.KVStore, ring.GetCodec(), kv.RegistererWithKVName(reg, "distributor-lifecycler"), logger) if err != nil { return nil, nil, errors.Wrap(err, "failed to initialize distributors' KV store") @@ -461,12 +462,12 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l delegate = ring.NewLeaveOnStoppingDelegate(delegate, logger) delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.HeartbeatTimeout, delegate, logger) - distributorsLifecycler, err := ring.NewBasicLifecycler(lifecyclerCfg, "distributor", distributorRingKey, kvStore, delegate, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg)) + distributorsLifecycler, err := ring.NewBasicLifecycler(lifecyclerCfg, "distributor", distributorRingKey, kvStore, delegate, logger, reg) if err != nil { return nil, nil, errors.Wrap(err, "failed to initialize distributors' lifecycler") } - distributorsRing, err := ring.NewWithStoreClientAndStrategy(cfg.ToRingConfig(), "distributor", distributorRingKey, kvStore, ring.NewDefaultReplicationStrategy(), prometheus.WrapRegistererWithPrefix("cortex_", reg), logger) + distributorsRing, err := ring.New(cfg.ToRingConfig(), "distributor", distributorRingKey, logger, reg) if err != nil { return nil, nil, errors.Wrap(err, "failed to initialize distributors' ring client") } From ddb4f2b3203b42a58640724b32913b03e82ae950 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Thu, 22 Dec 2022 09:29:09 -0500 Subject: [PATCH 4/4] Code review changes: revert distributor registerer fix Signed-off-by: Nick Pillitteri --- pkg/distributor/distributor.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 01f0838a7f2..c596ca15b34 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -445,7 +445,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove // newRingAndLifecycler creates a new distributor ring and lifecycler with all required lifecycler delegates func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger log.Logger, reg prometheus.Registerer) (*ring.Ring, *ring.BasicLifecycler, error) { - reg = prometheus.WrapRegistererWithPrefix("cortex_", reg) kvStore, err := kv.NewClient(cfg.KVStore, ring.GetCodec(), kv.RegistererWithKVName(reg, "distributor-lifecycler"), logger) if err != nil { return nil, nil, errors.Wrap(err, "failed to initialize distributors' KV store") @@ -462,12 +461,12 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l delegate = ring.NewLeaveOnStoppingDelegate(delegate, logger) delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.HeartbeatTimeout, delegate, logger) - distributorsLifecycler, err := ring.NewBasicLifecycler(lifecyclerCfg, "distributor", distributorRingKey, kvStore, delegate, logger, reg) + distributorsLifecycler, err := ring.NewBasicLifecycler(lifecyclerCfg, "distributor", distributorRingKey, kvStore, delegate, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg)) if err != nil { return nil, nil, errors.Wrap(err, "failed to initialize distributors' lifecycler") } - distributorsRing, err := ring.New(cfg.ToRingConfig(), "distributor", distributorRingKey, logger, reg) + distributorsRing, err := ring.New(cfg.ToRingConfig(), "distributor", distributorRingKey, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg)) if err != nil { return nil, nil, errors.Wrap(err, "failed to initialize distributors' ring client") }