Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout for waiting on compactor to become ACTIVE in the ring. #4262

Merged
merged 17 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* [BUGFIX] Querier: fix queries failing with "at least 1 healthy replica required, could only find 0" error right after scaling up store-gateways until they're ACTIVE in the ring. #4263
* [BUGFIX] Store-gateway: when blocks sharding is enabled, do not load all blocks in each store-gateway in case of a cold startup, but load only blocks owned by the store-gateway replica. #4271
* [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269
* [CHANGE] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262

## Blocksconvert

Expand Down
8 changes: 6 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ const (
)

var (
errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s"
RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s"
RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
WaitInstanceStateTimeoutDuration = time.Duration(600)
ac1214 marked this conversation as resolved.
Show resolved Hide resolved

DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
return compact.NewDefaultGrouper(
Expand Down Expand Up @@ -393,6 +394,9 @@ func (c *Compactor) starting(ctx context.Context) error {
// users scanner depends on the ring (to check whether an user belongs
// to this shard or not).
level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring")

ctx, cancel := context.WithTimeout(ctx, WaitInstanceStateTimeoutDuration*time.Second)
defer cancel()
if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil {
return err
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type ReadRing interface {
// and size (number of instances).
ShuffleShard(identifier string, size int) ReadRing

// GetInstanceState returns the current state of an instance or an error if the
// instance does not exist in the ring.
GetInstanceState(instanceID string) (InstanceState, error)

// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
// all instances that have been part of the identifier's shard since "now - lookbackPeriod".
ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
Expand Down Expand Up @@ -763,8 +767,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
}
}

// GetInstanceState returns the current state of an instance or an error if the
// instance does not exist in the ring.
// GetInstanceState implements ReadRing.
ac1214 marked this conversation as resolved.
Show resolved Hide resolved
func (r *Ring) GetInstanceState(instanceID string) (InstanceState, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func GetInstancePort(configPort, listenPort int) int {

// WaitInstanceState waits until the input instanceID is registered within the
// ring matching the provided state. A timeout should be provided within the context.
func WaitInstanceState(ctx context.Context, r *Ring, instanceID string, state InstanceState) error {
func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state InstanceState) error {
backoff := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: time.Second,
Expand Down
115 changes: 115 additions & 0 deletions pkg/ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,65 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

type RingMock struct {
mock.Mock
}

func (r *RingMock) Collect(ch chan<- prometheus.Metric) {}

func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {}

func (r *RingMock) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
args := r.Called(key, op, bufDescs, bufHosts, bufZones)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetAllHealthy(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) ReplicationFactor() int {
return 0
}

func (r *RingMock) InstancesCount() int {
return 0
}

func (r *RingMock) ShuffleShard(identifier string, size int) ReadRing {
args := r.Called(identifier, size)
return args.Get(0).(ReadRing)
}

func (r *RingMock) GetInstanceState(instanceID string) (InstanceState, error) {
args := r.Called(instanceID)
return args.Get(0).(InstanceState), args.Error(1)
}

func (r *RingMock) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing {
args := r.Called(identifier, size, lookbackPeriod, now)
return args.Get(0).(ReadRing)
}

func (r *RingMock) HasInstance(instanceID string) bool {
return true
}

func (r *RingMock) CleanupShuffleShardCache(identifier string) {}

func TestGenerateTokens(t *testing.T) {
tokens := GenerateTokens(1000000, nil)

Expand Down Expand Up @@ -184,3 +239,63 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) {

assert.InDelta(t, maxWaiting, elapsedTime, float64(2*time.Second))
}

func TestWaitInstanceStateTimeout(t *testing.T) {
t.Parallel()

const (
instanceID = "test"
timeoutDuration = time.Duration(3)
ac1214 marked this conversation as resolved.
Show resolved Hide resolved
)

ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second)
defer cancel()

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil)

err := WaitInstanceState(ctx, ring, instanceID, PENDING)

assert.Equal(t, context.DeadlineExceeded, err)
ring.AssertCalled(t, "GetInstanceState", instanceID)
}

func TestWaitInstanceStateTimeoutOnError(t *testing.T) {
t.Parallel()

const (
instanceID = "test"
timeoutDuration = time.Duration(3)
)

ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second)
defer cancel()

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(PENDING, errors.New("instance not found in the ring"))

err := WaitInstanceState(ctx, ring, instanceID, ACTIVE)

assert.Equal(t, context.DeadlineExceeded, err)
ring.AssertCalled(t, "GetInstanceState", instanceID)
}

func TestWaitInstanceStateExitsAfterActualStateEqualsState(t *testing.T) {
t.Parallel()

const (
instanceID = "test"
timeoutDuration = time.Duration(3)
)

ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second)
defer cancel()

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil)

err := WaitInstanceState(ctx, ring, instanceID, ACTIVE)

assert.Nil(t, err)
ring.AssertNumberOfCalls(t, "GetInstanceState", 1)
}