Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Ingesters unable to re-join the cluster then unregister_on_shutdown=false + -ingester.heartbeat-period=0 #4366

Merged
merged 6 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304
* [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366

## 1.10.0-rc.0 / 2021-06-28

Expand Down
14 changes: 13 additions & 1 deletion pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,8 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
return ringDesc, true, nil
}

// If the ingester failed to clean it's ring entry up in can leave it's state in LEAVING.
// If the ingester failed to clean its ring entry up in can leave its state in LEAVING
// OR unregister_on_shutdown=false
// Move it into ACTIVE to ensure the ingester joins the ring.
if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) == i.cfg.NumTokens {
instanceDesc.State = ACTIVE
Expand All @@ -588,6 +589,17 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
i.setTokens(tokens)

level.Info(log.Logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName)

// Update the ring if the instance has been changed and the heartbeat is disabled.
// We dont need to update KV here when heartbeat is enabled as this info will eventually be update on KV
// on the next heartbeat
if i.cfg.HeartbeatPeriod == 0 && !instanceDesc.Equal(ringDesc.Ingesters[i.ID]) {
// Update timestamp to give gossiping client a chance register ring change.
instanceDesc.Timestamp = time.Now().Unix()
ringDesc.Ingesters[i.ID] = instanceDesc
alanprot marked this conversation as resolved.
Show resolved Hide resolved
return ringDesc, true, nil
}

// we haven't modified the ring, don't try to store it.
return nil, true, nil
alanprot marked this conversation as resolved.
Show resolved Hide resolved
})
Expand Down
106 changes: 106 additions & 0 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,112 @@ type noopFlushTransferer struct {
func (f *noopFlushTransferer) Flush() {}
func (f *noopFlushTransferer) TransferOut(ctx context.Context) error { return nil }

func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testing.T) {
var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec())

r, err := New(ringConfig, "ingester", IngesterRingKey, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))

// We are going to create 2 fake ingester with disabled heart beat and `unregister_on_shutdown=false` then
// test if the ingester 2 became active after:
// * Clean Shutdown (LEAVING after shutdown)
// * Crashes while in the PENDING or JOINING state
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
alanprot marked this conversation as resolved.
Show resolved Hide resolved
lifecyclerConfig.UnregisterOnShutdown = false
lifecyclerConfig.HeartbeatPeriod = 0

l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1))
defer services.StopAndAwaitTerminated(context.Background(), l1) //nolint:errcheck

lifecyclerConfig.ID = "ing2"

l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2))
alanprot marked this conversation as resolved.
Show resolved Hide resolved

// poll function waits for a condition and returning actual state of the ingesters after the condition succeed.
poll := func(condition func(*Desc) bool) map[string]InstanceDesc {
var ingesters map[string]InstanceDesc
test.Poll(t, 5*time.Second, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
require.NoError(t, err)

desc, ok := d.(*Desc)

if ok {
ingesters = desc.Ingesters
}
return ok && condition(desc)
})

return ingesters
alanprot marked this conversation as resolved.
Show resolved Hide resolved
}

// Starts Ingester2 and wait it to became active
startIngester2AndWaitActive := func(lcConfig LifecyclerConfig) *Lifecycler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's nothing specific to Ingester 2 in this function, is there?
That is a consequence of passing LifecyclerConfig with ID set to "ing2", I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

ingester, err := NewLifecycler(lcConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we create a new Lifecycler here? What happens to the old one in l2?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why is it named ingester ? Nearby we have a variable ingesters which is a map[string]InstanceDesc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we create a new Lifecycler here? What happens to the old one in l2?

I think i tried to reuse it and it did not work.. but besides that, In order to simulated a brand new ingester i would assume that creating a new instance is the safest thing to do. make sense?

require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester))
poll(func(desc *Desc) bool {
return len(desc.Ingesters) == 2 && desc.Ingesters["ing2"].State == ACTIVE
})
return ingester
}

ingesters := poll(func(desc *Desc) bool {
return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == ACTIVE && desc.Ingesters["ing2"].State == ACTIVE
})

// Both Ingester should be active and running
assert.Equal(t, ACTIVE, ingesters["ing1"].State)
bboreham marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, ACTIVE, ingesters["ing2"].State)

// Stop One ingester gracefully should leave it on LEAVING STATE on the ring
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))

ingesters = poll(func(desc *Desc) bool {
return len(desc.Ingesters) == 2 && desc.Ingesters["ing2"].State == LEAVING
})
assert.Equal(t, LEAVING, ingesters["ing2"].State)

// Start Ingester2 again - Should flip back to ACTIVE in the ring
l2 = startIngester2AndWaitActive(lifecyclerConfig)
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))

// Simulate ingester2 crash on startup and left the ring with JOINING state
err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
desc, ok := in.(*Desc)
require.Equal(t, true, ok)
ingester2Desc := desc.Ingesters["ing2"]
ingester2Desc.State = JOINING
desc.Ingesters["ing2"] = ingester2Desc
return desc, true, nil
})
require.NoError(t, err)

l2 = startIngester2AndWaitActive(lifecyclerConfig)
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))

// Simulate ingester2 crash on startup and left the ring with PENDING state
err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
desc, ok := in.(*Desc)
require.Equal(t, true, ok)
alanprot marked this conversation as resolved.
Show resolved Hide resolved
ingester2Desc := desc.Ingesters["ing2"]
ingester2Desc.State = PENDING
desc.Ingesters["ing2"] = ingester2Desc
return desc, true, nil
})
require.NoError(t, err)

l2 = startIngester2AndWaitActive(lifecyclerConfig)
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))
}

func TestTokensOnDisk(t *testing.T) {
var ringConfig Config
flagext.DefaultValues(&ringConfig)
Expand Down