Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout for waiting on compactor to become ACTIVE in the ring. #4262

Merged
merged 17 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* [BUGFIX] Fixed cache fetch error on Redis Cluster. #4056
* [BUGFIX] Ingester: fix issue where runtime limits erroneously override default limits. #4246
* [BUGFIX] Ruler: fix startup in single-binary mode when the new `ruler_storage` is used. #4252
* [CHANGE] Change WaitInstanceState to use MaxRetries parameter. #4262

## Blocksconvert

Expand Down
4 changes: 2 additions & 2 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) {

// We wait until the instance is in the JOINING state, once it does we know that tokens are assigned to this instance and we'll be ready to perform an initial sync of configs.
level.Info(am.logger).Log("waiting until alertmanager is JOINING in the ring")
if err = ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
if err = ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.JOINING, 0); err != nil {
return err
}
level.Info(am.logger).Log("msg", "alertmanager is JOINING in the ring")
Expand Down Expand Up @@ -493,7 +493,7 @@ func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) {

// Wait until the ring client detected this instance in the ACTIVE state.
level.Info(am.logger).Log("msg", "waiting until alertmanager is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
if err := ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.ACTIVE, 0); err != nil {
return err
}
level.Info(am.logger).Log("msg", "alertmanager is ACTIVE in the ring")
Expand Down
8 changes: 4 additions & 4 deletions pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,7 @@ func TestMultitenantAlertmanager_PerTenantSharding(t *testing.T) {
// The alertmanager is ready to be tested once all instances are ACTIVE and the ring settles.
for _, am := range instances {
for _, id := range instanceIDs {
require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE))
require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE, 0))
}
}
}
Expand Down Expand Up @@ -1449,7 +1449,7 @@ func TestAlertmanager_ReplicasPosition(t *testing.T) {

for _, am := range instances {
for _, id := range instanceIDs {
require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE))
require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE, 0))
}
}

Expand Down Expand Up @@ -1568,7 +1568,7 @@ func TestAlertmanager_StateReplicationWithSharding(t *testing.T) {
// The alertmanager is ready to be tested once all instances are ACTIVE and the ring settles.
for _, am := range instances {
for _, id := range instanceIDs {
require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE))
require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE, 0))
}
}
}
Expand Down Expand Up @@ -1757,7 +1757,7 @@ func TestAlertmanager_StateReplicationWithSharding_InitialSyncFromPeers(t *testi
// The alertmanager is ready to be tested once all instances are ACTIVE and the ring settles.
for _, am := range instances {
for _, id := range instanceIDs {
require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE))
require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE, 0))
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
const (
blocksMarkedForDeletionName = "cortex_compactor_blocks_marked_for_deletion_total"
blocksMarkedForDeletionHelp = "Total number of blocks marked for deletion in compactor."
MAX_RETRIES = 600
)

var (
Expand Down Expand Up @@ -393,7 +394,7 @@ func (c *Compactor) starting(ctx context.Context) error {
// users scanner depends on the ring (to check whether an user belongs
// to this shard or not).
level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil {
if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE, MAX_RETRIES); err != nil {
return err
}
level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring")
Expand Down
7 changes: 5 additions & 2 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type ReadRing interface {
// and size (number of instances).
ShuffleShard(identifier string, size int) ReadRing

// GetInstanceState returns the current state of an instance or an error if the
// instance does not exist in the ring.
GetInstanceState(instanceID string) (InstanceState, error)

// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
// all instances that have been part of the identifier's shard since "now - lookbackPeriod".
ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
Expand Down Expand Up @@ -763,8 +767,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
}
}

// GetInstanceState returns the current state of an instance or an error if the
// instance does not exist in the ring.
// GetInstanceState implements ReadRing.
ac1214 marked this conversation as resolved.
Show resolved Hide resolved
func (r *Ring) GetInstanceState(instanceID string) (InstanceState, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ring/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ func GetInstancePort(configPort, listenPort int) int {

// WaitInstanceState waits until the input instanceID is registered within the
// ring matching the provided state. A timeout should be provided within the context.
func WaitInstanceState(ctx context.Context, r *Ring, instanceID string, state InstanceState) error {
func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state InstanceState, maxRetries int) error {
backoff := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: time.Second,
MaxRetries: 0,
MaxRetries: maxRetries,
})

for backoff.Ongoing() {
Expand Down
112 changes: 112 additions & 0 deletions pkg/ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,65 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

type RingMock struct {
mock.Mock
}

func (r *RingMock) Collect(ch chan<- prometheus.Metric) {}

func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {}

func (r *RingMock) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
args := r.Called(key, op, bufDescs, bufHosts, bufZones)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetAllHealthy(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) ReplicationFactor() int {
return 0
}

func (r *RingMock) InstancesCount() int {
return 0
}

func (r *RingMock) ShuffleShard(identifier string, size int) ReadRing {
args := r.Called(identifier, size)
return args.Get(0).(ReadRing)
}

func (r *RingMock) GetInstanceState(instanceID string) (InstanceState, error) {
args := r.Called(instanceID)
return args.Get(0).(InstanceState), args.Error(1)
}

func (r *RingMock) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing {
args := r.Called(identifier, size, lookbackPeriod, now)
return args.Get(0).(ReadRing)
}

func (r *RingMock) HasInstance(instanceID string) bool {
return true
}

func (r *RingMock) CleanupShuffleShardCache(identifier string) {}

func TestGenerateTokens(t *testing.T) {
tokens := GenerateTokens(1000000, nil)

Expand Down Expand Up @@ -184,3 +239,60 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) {

assert.InDelta(t, maxWaiting, elapsedTime, float64(2*time.Second))
}

func TestWaitInstanceStateExitsAfterMaxRetries(t *testing.T) {
t.Parallel()

const (
instanceId = "test"
)

const (
maxRetries = 5
)

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil)

WaitInstanceState(context.Background(), ring, instanceId, PENDING, maxRetries)

ring.AssertNumberOfCalls(t, "GetInstanceState", maxRetries)
}

func TestWaitInstanceStateDoesMaxRetriesOnError(t *testing.T) {
t.Parallel()

const (
instanceId = "test"
)

const (
maxRetries = 5
)

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(PENDING, errors.New("instance not found in the ring"))

WaitInstanceState(context.Background(), ring, instanceId, ACTIVE, maxRetries)

ring.AssertNumberOfCalls(t, "GetInstanceState", maxRetries)
}

func TestWaitInstanceStateExitsAfterActualStateEqualsState(t *testing.T) {
t.Parallel()

const (
instanceId = "test"
)

const (
maxRetries = 5
)

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil)

WaitInstanceState(context.Background(), ring, instanceId, ACTIVE, maxRetries)

ring.AssertNumberOfCalls(t, "GetInstanceState", 1)
}
4 changes: 2 additions & 2 deletions pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (g *StoreGateway) starting(ctx context.Context) (err error) {
// make sure that when we'll run the initial sync we already know the tokens
// assigned to this instance.
level.Info(g.logger).Log("msg", "waiting until store-gateway is JOINING in the ring")
if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.JOINING, 0); err != nil {
return err
}
level.Info(g.logger).Log("msg", "store-gateway is JOINING in the ring")
Expand All @@ -252,7 +252,7 @@ func (g *StoreGateway) starting(ctx context.Context) (err error) {
// make sure that when we'll run the loop it won't be detected as a ring
// topology change.
level.Info(g.logger).Log("msg", "waiting until store-gateway is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.ACTIVE, 0); err != nil {
return err
}
level.Info(g.logger).Log("msg", "store-gateway is ACTIVE in the ring")
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) {
// A gateway is ready for the test once it sees all instances ACTIVE in the ring.
for _, g := range gateways {
for _, instanceID := range gatewayIds {
require.NoError(t, ring.WaitInstanceState(ctx, g.ring, instanceID, ring.ACTIVE))
require.NoError(t, ring.WaitInstanceState(ctx, g.ring, instanceID, ring.ACTIVE, 0))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/sharding_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestDefaultShardingStrategy(t *testing.T) {
defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck

// Wait until the ring client has synced.
require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE))
require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE, 0))

for instanceAddr, expectedBlocks := range testData.expectedBlocks {
filter := NewDefaultShardingStrategy(r, instanceAddr, log.NewNopLogger())
Expand Down Expand Up @@ -620,7 +620,7 @@ func TestShuffleShardingStrategy(t *testing.T) {
defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck

// Wait until the ring client has synced.
require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE))
require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE, 0))

// Assert on filter users.
for _, expected := range testData.expectedUsers {
Expand Down