diff --git a/CHANGELOG.md b/CHANGELOG.md index f3066b6b76..887f72f2d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 * [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335 * [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304 +* [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 37897b70ba..6a0f6cb678 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -576,7 +576,8 @@ func (i *Lifecycler) initRing(ctx context.Context) error { return ringDesc, true, nil } - // If the ingester failed to clean it's ring entry up in can leave it's state in LEAVING. + // If the ingester failed to clean its ring entry up in can leave its state in LEAVING + // OR unregister_on_shutdown=false // Move it into ACTIVE to ensure the ingester joins the ring. if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) == i.cfg.NumTokens { instanceDesc.State = ACTIVE @@ -588,6 +589,17 @@ func (i *Lifecycler) initRing(ctx context.Context) error { i.setTokens(tokens) level.Info(log.Logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName) + + // Update the ring if the instance has been changed and the heartbeat is disabled. + // We dont need to update KV here when heartbeat is enabled as this info will eventually be update on KV + // on the next heartbeat + if i.cfg.HeartbeatPeriod == 0 && !instanceDesc.Equal(ringDesc.Ingesters[i.ID]) { + // Update timestamp to give gossiping client a chance register ring change. + instanceDesc.Timestamp = time.Now().Unix() + ringDesc.Ingesters[i.ID] = instanceDesc + return ringDesc, true, nil + } + // we haven't modified the ring, don't try to store it. return nil, true, nil }) diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index a67dfd20b1..646c3fac6e 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -321,6 +321,106 @@ type noopFlushTransferer struct { func (f *noopFlushTransferer) Flush() {} func (f *noopFlushTransferer) TransferOut(ctx context.Context) error { return nil } +func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testing.T) { + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec()) + + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) + + // poll function waits for a condition and returning actual state of the ingesters after the condition succeed. + poll := func(condition func(*Desc) bool) map[string]InstanceDesc { + var ingesters map[string]InstanceDesc + test.Poll(t, 5*time.Second, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), IngesterRingKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + + if ok { + ingesters = desc.Ingesters + } + return ok && condition(desc) + }) + + return ingesters + } + + // Starts Ingester and wait it to became active + startIngesterAndWaitActive := func(ingId string) *Lifecycler { + lifecyclerConfig := testLifecyclerConfig(ringConfig, ingId) + // Disabling heartBeat and unregister_on_shutdown + lifecyclerConfig.UnregisterOnShutdown = false + lifecyclerConfig.HeartbeatPeriod = 0 + lifecycler, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "lifecycler", IngesterRingKey, true, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler)) + poll(func(desc *Desc) bool { + return desc.Ingesters[ingId].State == ACTIVE + }) + return lifecycler + } + + // We are going to create 2 fake ingester with disabled heart beat and `unregister_on_shutdown=false` then + // test if the ingester 2 became active after: + // * Clean Shutdown (LEAVING after shutdown) + // * Crashes while in the PENDING or JOINING state + l1 := startIngesterAndWaitActive("ing1") + defer services.StopAndAwaitTerminated(context.Background(), l1) //nolint:errcheck + + l2 := startIngesterAndWaitActive("ing2") + + ingesters := poll(func(desc *Desc) bool { + return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == ACTIVE && desc.Ingesters["ing2"].State == ACTIVE + }) + + // Both Ingester should be active and running + assert.Equal(t, ACTIVE, ingesters["ing1"].State) + assert.Equal(t, ACTIVE, ingesters["ing2"].State) + + // Stop One ingester gracefully should leave it on LEAVING STATE on the ring + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) + + ingesters = poll(func(desc *Desc) bool { + return len(desc.Ingesters) == 2 && desc.Ingesters["ing2"].State == LEAVING + }) + assert.Equal(t, LEAVING, ingesters["ing2"].State) + + // Start Ingester2 again - Should flip back to ACTIVE in the ring + l2 = startIngesterAndWaitActive("ing2") + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) + + // Simulate ingester2 crash on startup and left the ring with JOINING state + err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + desc, ok := in.(*Desc) + require.Equal(t, true, ok) + ingester2Desc := desc.Ingesters["ing2"] + ingester2Desc.State = JOINING + desc.Ingesters["ing2"] = ingester2Desc + return desc, true, nil + }) + require.NoError(t, err) + + l2 = startIngesterAndWaitActive("ing2") + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) + + // Simulate ingester2 crash on startup and left the ring with PENDING state + err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + desc, ok := in.(*Desc) + require.Equal(t, true, ok) + ingester2Desc := desc.Ingesters["ing2"] + ingester2Desc.State = PENDING + desc.Ingesters["ing2"] = ingester2Desc + return desc, true, nil + }) + require.NoError(t, err) + + l2 = startIngesterAndWaitActive("ing2") + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) +} + func TestTokensOnDisk(t *testing.T) { var ringConfig Config flagext.DefaultValues(&ringConfig)