Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Ingesters unable to re-join the cluster then unregister_on_shutdown=false + -ingester.heartbeat-period=0 #4366

Merged
merged 6 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304
* [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366

## 1.10.0-rc.0 / 2021-06-28

Expand Down
14 changes: 13 additions & 1 deletion pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,8 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
return ringDesc, true, nil
}

// If the ingester failed to clean it's ring entry up in can leave it's state in LEAVING.
// If the ingester failed to clean its ring entry up in can leave its state in LEAVING
// OR unregister_on_shutdown=false
// Move it into ACTIVE to ensure the ingester joins the ring.
if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) == i.cfg.NumTokens {
instanceDesc.State = ACTIVE
Expand All @@ -588,6 +589,17 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
i.setTokens(tokens)

level.Info(log.Logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName)

// Update the ring if the instance has been changed and the heartbeat is disabled.
// We dont need to update KV here when heartbeat is enabled as this info will eventually be update on KV
// on the next heartbeat
if i.cfg.HeartbeatPeriod == 0 && !instanceDesc.Equal(ringDesc.Ingesters[i.ID]) {
// Update timestamp to give gossiping client a chance register ring change.
instanceDesc.Timestamp = time.Now().Unix()
ringDesc.Ingesters[i.ID] = instanceDesc
alanprot marked this conversation as resolved.
Show resolved Hide resolved
return ringDesc, true, nil
}

// we haven't modified the ring, don't try to store it.
return nil, true, nil
alanprot marked this conversation as resolved.
Show resolved Hide resolved
})
Expand Down
100 changes: 100 additions & 0 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,106 @@ type noopFlushTransferer struct {
func (f *noopFlushTransferer) Flush() {}
func (f *noopFlushTransferer) TransferOut(ctx context.Context) error { return nil }

func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testing.T) {
var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec())

r, err := New(ringConfig, "ingester", IngesterRingKey, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))

// poll function waits for a condition and returning actual state of the ingesters after the condition succeed.
poll := func(condition func(*Desc) bool) map[string]InstanceDesc {
var ingesters map[string]InstanceDesc
test.Poll(t, 5*time.Second, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
require.NoError(t, err)

desc, ok := d.(*Desc)

if ok {
ingesters = desc.Ingesters
}
return ok && condition(desc)
})

return ingesters
alanprot marked this conversation as resolved.
Show resolved Hide resolved
}

// Starts Ingester and wait it to became active
startIngesterAndWaitActive := func(ingId string) *Lifecycler {
lifecyclerConfig := testLifecyclerConfig(ringConfig, ingId)
// Disabling heartBeat and unregister_on_shutdown
lifecyclerConfig.UnregisterOnShutdown = false
lifecyclerConfig.HeartbeatPeriod = 0
lifecycler, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "lifecycler", IngesterRingKey, true, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler))
poll(func(desc *Desc) bool {
return desc.Ingesters[ingId].State == ACTIVE
})
return lifecycler
}

// We are going to create 2 fake ingester with disabled heart beat and `unregister_on_shutdown=false` then
// test if the ingester 2 became active after:
// * Clean Shutdown (LEAVING after shutdown)
// * Crashes while in the PENDING or JOINING state
l1 := startIngesterAndWaitActive("ing1")
defer services.StopAndAwaitTerminated(context.Background(), l1) //nolint:errcheck

l2 := startIngesterAndWaitActive("ing2")

ingesters := poll(func(desc *Desc) bool {
return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == ACTIVE && desc.Ingesters["ing2"].State == ACTIVE
})

// Both Ingester should be active and running
assert.Equal(t, ACTIVE, ingesters["ing1"].State)
bboreham marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, ACTIVE, ingesters["ing2"].State)

// Stop One ingester gracefully should leave it on LEAVING STATE on the ring
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))

ingesters = poll(func(desc *Desc) bool {
return len(desc.Ingesters) == 2 && desc.Ingesters["ing2"].State == LEAVING
})
assert.Equal(t, LEAVING, ingesters["ing2"].State)

// Start Ingester2 again - Should flip back to ACTIVE in the ring
l2 = startIngesterAndWaitActive("ing2")
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))

// Simulate ingester2 crash on startup and left the ring with JOINING state
err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
desc, ok := in.(*Desc)
require.Equal(t, true, ok)
ingester2Desc := desc.Ingesters["ing2"]
ingester2Desc.State = JOINING
desc.Ingesters["ing2"] = ingester2Desc
return desc, true, nil
})
require.NoError(t, err)

l2 = startIngesterAndWaitActive("ing2")
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))

// Simulate ingester2 crash on startup and left the ring with PENDING state
err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
desc, ok := in.(*Desc)
require.Equal(t, true, ok)
alanprot marked this conversation as resolved.
Show resolved Hide resolved
ingester2Desc := desc.Ingesters["ing2"]
ingester2Desc.State = PENDING
desc.Ingesters["ing2"] = ingester2Desc
return desc, true, nil
})
require.NoError(t, err)

l2 = startIngesterAndWaitActive("ing2")
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))
}

func TestTokensOnDisk(t *testing.T) {
var ringConfig Config
flagext.DefaultValues(&ringConfig)
Expand Down