From ccdde4ee776c83af8ebb779f2728c90631d03e1e Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 15 Jul 2021 13:40:10 -0700 Subject: [PATCH 1/6] Fix problem when ingester heartbeat is disabled and unregister_on_shutdown=false Signed-off-by: Alan Protasio --- CHANGELOG.md | 2 +- pkg/ring/lifecycler.go | 10 +++++- pkg/ring/lifecycler_test.go | 65 +++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8797f2764..f695c159f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,7 +30,7 @@ * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 * [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335 * [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304 - +* [BUGFIX] Ingester: Ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `unregister_on_shutdown=false`. #4366 ## 1.10.0-rc.0 / 2021-06-28 * [CHANGE] Enable strict JSON unmarshal for `pkg/util/validation.Limits` struct. The custom `UnmarshalJSON()` will now fail if the input has unknown fields. #4298 diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 37897b70ba..4ddf459a26 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -576,7 +576,8 @@ func (i *Lifecycler) initRing(ctx context.Context) error { return ringDesc, true, nil } - // If the ingester failed to clean it's ring entry up in can leave it's state in LEAVING. + // If the ingester failed to clean it's ring entry up in can leave it's state in LEAVING + // OR unregister_on_shutdown=false // Move it into ACTIVE to ensure the ingester joins the ring. if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) == i.cfg.NumTokens { instanceDesc.State = ACTIVE @@ -588,6 +589,13 @@ func (i *Lifecycler) initRing(ctx context.Context) error { i.setTokens(tokens) level.Info(log.Logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName) + + // If we flip the instance from LEAVING to ACTIVE and the heartbeat is disabled, we need to update KV here + if i.cfg.HeartbeatPeriod == 0 && ringDesc.Ingesters[i.ID].State == LEAVING && instanceDesc.State == ACTIVE { + ringDesc.Ingesters[i.ID] = instanceDesc + return ringDesc, true, nil + } + // we haven't modified the ring, don't try to store it. return nil, true, nil }) diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index a67dfd20b1..c0fc878584 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -321,6 +321,71 @@ type noopFlushTransferer struct { func (f *noopFlushTransferer) Flush() {} func (f *noopFlushTransferer) TransferOut(ctx context.Context) error { return nil } +func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testing.T) { + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec()) + + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) + + lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") + lifecyclerConfig.UnregisterOnShutdown = false + lifecyclerConfig.HeartbeatPeriod = 0 + + l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) + defer services.StopAndAwaitTerminated(context.Background(), l1) //nolint:errcheck + + lifecyclerConfig.ID = "ing2" + + l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2)) + + var ingesters map[string]InstanceDesc + pool := func(condition func(*Desc) bool) { + test.Poll(t, 5000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), IngesterRingKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + + if ok { + ingesters = desc.Ingesters + } + return ok && condition(desc) + }) + } + + pool(func(desc *Desc) bool { + return len(desc.Ingesters) == 2 && ingesters["ing1"].State == ACTIVE && ingesters["ing2"].State == ACTIVE + }) + + assert.Equal(t, ACTIVE, ingesters["ing1"].State) + assert.Equal(t, ACTIVE, ingesters["ing2"].State) + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) + + pool(func(desc *Desc) bool { + return len(desc.Ingesters) == 2 && ingesters["ing2"].State == LEAVING + }) + + assert.Equal(t, LEAVING, ingesters["ing2"].State) + l2Restart, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2Restart)) + defer services.StopAndAwaitTerminated(context.Background(), l2Restart) //nolint:errcheck + + pool(func(desc *Desc) bool { + return len(desc.Ingesters) == 2 && ingesters["ing2"].State == ACTIVE + }) + + assert.Equal(t, ACTIVE, ingesters["ing1"].State) + assert.Equal(t, ACTIVE, ingesters["ing2"].State) +} + func TestTokensOnDisk(t *testing.T) { var ringConfig Config flagext.DefaultValues(&ringConfig) From 8eeaae2c64eb90425a893cc96e3ffdafab52189f Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 16 Jul 2021 07:53:49 -0700 Subject: [PATCH 2/6] Update CHANGELOG.md Co-authored-by: Marco Pracucci Signed-off-by: Alan Protasio --- CHANGELOG.md | 2 ++ pkg/ring/lifecycler.go | 8 +++-- pkg/ring/lifecycler_test.go | 68 +++++++++++++++++++++++++++++-------- 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f695c159f8..6289393ce7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ * [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335 * [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304 * [BUGFIX] Ingester: Ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `unregister_on_shutdown=false`. #4366 +* [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366 + ## 1.10.0-rc.0 / 2021-06-28 * [CHANGE] Enable strict JSON unmarshal for `pkg/util/validation.Limits` struct. The custom `UnmarshalJSON()` will now fail if the input has unknown fields. #4298 diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 4ddf459a26..d5a03e8bdc 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -590,8 +590,12 @@ func (i *Lifecycler) initRing(ctx context.Context) error { level.Info(log.Logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName) - // If we flip the instance from LEAVING to ACTIVE and the heartbeat is disabled, we need to update KV here - if i.cfg.HeartbeatPeriod == 0 && ringDesc.Ingesters[i.ID].State == LEAVING && instanceDesc.State == ACTIVE { + // Update the ring if the instance has been changed and the heartbeat is disabled. + // We dont need to update KV here when heartbeat is enabled as this info will eventually be update on KV + // on the next heartbeat + if i.cfg.HeartbeatPeriod == 0 && !instanceDesc.Equal(ringDesc.Ingesters[i.ID]) { + // Update timestamp to give gossiping client a chance register ring change. + instanceDesc.Timestamp = time.Now().Unix() ringDesc.Ingesters[i.ID] = instanceDesc return ringDesc, true, nil } diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index c0fc878584..16c2ce44c7 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -345,8 +345,8 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2)) - var ingesters map[string]InstanceDesc - pool := func(condition func(*Desc) bool) { + pool := func(condition func(*Desc) bool) map[string]InstanceDesc { + var ingesters map[string]InstanceDesc test.Poll(t, 5000*time.Millisecond, true, func() interface{} { d, err := r.KVClient.Get(context.Background(), IngesterRingKey) require.NoError(t, err) @@ -358,32 +358,72 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi } return ok && condition(desc) }) + + return ingesters + } + + startIngesterAndWaitActive := func(lcConfig LifecyclerConfig) *Lifecycler { + ingester, err := NewLifecycler(lcConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester)) + + ingesters := pool(func(desc *Desc) bool { + return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == ACTIVE && desc.Ingesters["ing2"].State == ACTIVE + }) + + assert.Equal(t, ACTIVE, ingesters["ing1"].State) + assert.Equal(t, ACTIVE, ingesters["ing2"].State) + + return ingester } - pool(func(desc *Desc) bool { - return len(desc.Ingesters) == 2 && ingesters["ing1"].State == ACTIVE && ingesters["ing2"].State == ACTIVE + ingesters := pool(func(desc *Desc) bool { + return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == ACTIVE && desc.Ingesters["ing2"].State == ACTIVE }) + // Both Ingester should be active and running assert.Equal(t, ACTIVE, ingesters["ing1"].State) assert.Equal(t, ACTIVE, ingesters["ing2"].State) + + // Stop One ingester gracefully should leave it on LEAVING STATE on the ring require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) - pool(func(desc *Desc) bool { - return len(desc.Ingesters) == 2 && ingesters["ing2"].State == LEAVING + ingesters = pool(func(desc *Desc) bool { + return len(desc.Ingesters) == 2 && desc.Ingesters["ing2"].State == LEAVING }) - assert.Equal(t, LEAVING, ingesters["ing2"].State) - l2Restart, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) + + // Start Ingester2 again - Should flip back to ACTIVE in the ring + l2 = startIngesterAndWaitActive(lifecyclerConfig) + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) + + //Simulate ingester2 crash on startup and left the ring with JOINING state + err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + desc, ok := in.(*Desc) + require.Equal(t, true, ok) + ingester2Desc := desc.Ingesters["ing2"] + ingester2Desc.State = JOINING + desc.Ingesters["ing2"] = ingester2Desc + return desc, true, nil + }) require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2Restart)) - defer services.StopAndAwaitTerminated(context.Background(), l2Restart) //nolint:errcheck - pool(func(desc *Desc) bool { - return len(desc.Ingesters) == 2 && ingesters["ing2"].State == ACTIVE + l2 = startIngesterAndWaitActive(lifecyclerConfig) + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) + + //Simulate ingester2 crash on startup and left the ring with PENDING state + err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + desc, ok := in.(*Desc) + require.Equal(t, true, ok) + ingester2Desc := desc.Ingesters["ing2"] + ingester2Desc.State = PENDING + desc.Ingesters["ing2"] = ingester2Desc + return desc, true, nil }) + require.NoError(t, err) - assert.Equal(t, ACTIVE, ingesters["ing1"].State) - assert.Equal(t, ACTIVE, ingesters["ing2"].State) + l2 = startIngesterAndWaitActive(lifecyclerConfig) + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) } func TestTokensOnDisk(t *testing.T) { From 64cf46a4a821d588bdc1a830d9dcb69eb15573e4 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 30 Jul 2021 08:22:44 -0700 Subject: [PATCH 3/6] Update pkg/ring/lifecycler.go Co-authored-by: Bryan Boreham Signed-off-by: Alan Protasio --- pkg/ring/lifecycler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index d5a03e8bdc..6a0f6cb678 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -576,7 +576,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { return ringDesc, true, nil } - // If the ingester failed to clean it's ring entry up in can leave it's state in LEAVING + // If the ingester failed to clean its ring entry up in can leave its state in LEAVING // OR unregister_on_shutdown=false // Move it into ACTIVE to ensure the ingester joins the ring. if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) == i.cfg.NumTokens { From 50402e07b0db67d6efe02d12e137698e2f7fb728 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 30 Jul 2021 08:25:39 -0700 Subject: [PATCH 4/6] Update pkg/ring/lifecycler_test.go Co-authored-by: Bryan Boreham Signed-off-by: Alan Protasio --- pkg/ring/lifecycler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 16c2ce44c7..77145feda7 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -347,7 +347,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi pool := func(condition func(*Desc) bool) map[string]InstanceDesc { var ingesters map[string]InstanceDesc - test.Poll(t, 5000*time.Millisecond, true, func() interface{} { + test.Poll(t, 5*time.Second, true, func() interface{} { d, err := r.KVClient.Get(context.Background(), IngesterRingKey) require.NoError(t, err) From 6092356f8c527b4f7db1e4b4b0ee17a4edc60016 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 30 Jul 2021 08:57:24 -0700 Subject: [PATCH 5/6] Address comments Signed-off-by: Alan Protasio --- CHANGELOG.md | 1 - pkg/ring/lifecycler_test.go | 33 +++++++++++++++++---------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6289393ce7..59c41e64d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,7 +30,6 @@ * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 * [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335 * [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304 -* [BUGFIX] Ingester: Ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `unregister_on_shutdown=false`. #4366 * [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 77145feda7..b125c23daa 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -330,6 +330,10 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) + // We are going to create 2 fake ingester with disabled heart beat and `unregister_on_shutdown=false` then + // test if the ingester 2 became active after: + // * Clean Shutdown (LEAVING after shutdown) + // * Crashes while in the PENDING or JOINING state lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") lifecyclerConfig.UnregisterOnShutdown = false lifecyclerConfig.HeartbeatPeriod = 0 @@ -345,7 +349,8 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2)) - pool := func(condition func(*Desc) bool) map[string]InstanceDesc { + // poll function waits for a condition and returning actual state of the ingesters after the condition succeed. + poll := func(condition func(*Desc) bool) map[string]InstanceDesc { var ingesters map[string]InstanceDesc test.Poll(t, 5*time.Second, true, func() interface{} { d, err := r.KVClient.Get(context.Background(), IngesterRingKey) @@ -362,22 +367,18 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi return ingesters } - startIngesterAndWaitActive := func(lcConfig LifecyclerConfig) *Lifecycler { + // Starts Ingester2 and wait it to became active + startIngester2AndWaitActive := func(lcConfig LifecyclerConfig) *Lifecycler { ingester, err := NewLifecycler(lcConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester)) - - ingesters := pool(func(desc *Desc) bool { - return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == ACTIVE && desc.Ingesters["ing2"].State == ACTIVE + poll(func(desc *Desc) bool { + return len(desc.Ingesters) == 2 && desc.Ingesters["ing2"].State == ACTIVE }) - - assert.Equal(t, ACTIVE, ingesters["ing1"].State) - assert.Equal(t, ACTIVE, ingesters["ing2"].State) - return ingester } - ingesters := pool(func(desc *Desc) bool { + ingesters := poll(func(desc *Desc) bool { return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == ACTIVE && desc.Ingesters["ing2"].State == ACTIVE }) @@ -388,16 +389,16 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi // Stop One ingester gracefully should leave it on LEAVING STATE on the ring require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) - ingesters = pool(func(desc *Desc) bool { + ingesters = poll(func(desc *Desc) bool { return len(desc.Ingesters) == 2 && desc.Ingesters["ing2"].State == LEAVING }) assert.Equal(t, LEAVING, ingesters["ing2"].State) // Start Ingester2 again - Should flip back to ACTIVE in the ring - l2 = startIngesterAndWaitActive(lifecyclerConfig) + l2 = startIngester2AndWaitActive(lifecyclerConfig) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) - //Simulate ingester2 crash on startup and left the ring with JOINING state + // Simulate ingester2 crash on startup and left the ring with JOINING state err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { desc, ok := in.(*Desc) require.Equal(t, true, ok) @@ -408,10 +409,10 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi }) require.NoError(t, err) - l2 = startIngesterAndWaitActive(lifecyclerConfig) + l2 = startIngester2AndWaitActive(lifecyclerConfig) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) - //Simulate ingester2 crash on startup and left the ring with PENDING state + // Simulate ingester2 crash on startup and left the ring with PENDING state err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { desc, ok := in.(*Desc) require.Equal(t, true, ok) @@ -422,7 +423,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi }) require.NoError(t, err) - l2 = startIngesterAndWaitActive(lifecyclerConfig) + l2 = startIngester2AndWaitActive(lifecyclerConfig) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) } From f8b484d428f80563d7cd7de5057bc8805cab9b58 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 2 Aug 2021 11:01:46 -0700 Subject: [PATCH 6/6] Addressing comments on test Signed-off-by: Alan Protasio --- pkg/ring/lifecycler_test.go | 50 ++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index b125c23daa..646c3fac6e 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -330,25 +330,6 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) - // We are going to create 2 fake ingester with disabled heart beat and `unregister_on_shutdown=false` then - // test if the ingester 2 became active after: - // * Clean Shutdown (LEAVING after shutdown) - // * Crashes while in the PENDING or JOINING state - lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") - lifecyclerConfig.UnregisterOnShutdown = false - lifecyclerConfig.HeartbeatPeriod = 0 - - l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) - defer services.StopAndAwaitTerminated(context.Background(), l1) //nolint:errcheck - - lifecyclerConfig.ID = "ing2" - - l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2)) - // poll function waits for a condition and returning actual state of the ingesters after the condition succeed. poll := func(condition func(*Desc) bool) map[string]InstanceDesc { var ingesters map[string]InstanceDesc @@ -367,17 +348,30 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi return ingesters } - // Starts Ingester2 and wait it to became active - startIngester2AndWaitActive := func(lcConfig LifecyclerConfig) *Lifecycler { - ingester, err := NewLifecycler(lcConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) + // Starts Ingester and wait it to became active + startIngesterAndWaitActive := func(ingId string) *Lifecycler { + lifecyclerConfig := testLifecyclerConfig(ringConfig, ingId) + // Disabling heartBeat and unregister_on_shutdown + lifecyclerConfig.UnregisterOnShutdown = false + lifecyclerConfig.HeartbeatPeriod = 0 + lifecycler, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "lifecycler", IngesterRingKey, true, nil) require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester)) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler)) poll(func(desc *Desc) bool { - return len(desc.Ingesters) == 2 && desc.Ingesters["ing2"].State == ACTIVE + return desc.Ingesters[ingId].State == ACTIVE }) - return ingester + return lifecycler } + // We are going to create 2 fake ingester with disabled heart beat and `unregister_on_shutdown=false` then + // test if the ingester 2 became active after: + // * Clean Shutdown (LEAVING after shutdown) + // * Crashes while in the PENDING or JOINING state + l1 := startIngesterAndWaitActive("ing1") + defer services.StopAndAwaitTerminated(context.Background(), l1) //nolint:errcheck + + l2 := startIngesterAndWaitActive("ing2") + ingesters := poll(func(desc *Desc) bool { return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == ACTIVE && desc.Ingesters["ing2"].State == ACTIVE }) @@ -395,7 +389,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi assert.Equal(t, LEAVING, ingesters["ing2"].State) // Start Ingester2 again - Should flip back to ACTIVE in the ring - l2 = startIngester2AndWaitActive(lifecyclerConfig) + l2 = startIngesterAndWaitActive("ing2") require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) // Simulate ingester2 crash on startup and left the ring with JOINING state @@ -409,7 +403,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi }) require.NoError(t, err) - l2 = startIngester2AndWaitActive(lifecyclerConfig) + l2 = startIngesterAndWaitActive("ing2") require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) // Simulate ingester2 crash on startup and left the ring with PENDING state @@ -423,7 +417,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi }) require.NoError(t, err) - l2 = startIngester2AndWaitActive(lifecyclerConfig) + l2 = startIngesterAndWaitActive("ing2") require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) }